Skip to content

Commit

Permalink
EVG-7106: backup collection job (#2997)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sam Kleinman committed Jan 2, 2020
1 parent 5fda235 commit c350f8e
Show file tree
Hide file tree
Showing 83 changed files with 8,486 additions and 2,895 deletions.
2 changes: 1 addition & 1 deletion command/s3_put.go
Expand Up @@ -399,7 +399,7 @@ func (s3pc *s3put) createPailBucket(httpClient *http.Client) error {
Credentials: pail.CreateAWSCredentials(s3pc.AwsKey, s3pc.AwsSecret, ""),
Region: endpoints.UsEast1RegionID,
Name: s3pc.Bucket,
Permission: s3pc.Permissions,
Permissions: pail.S3Permissions(s3pc.Permissions),
ContentType: s3pc.ContentType,
}
bucket, err := pail.NewS3MultiPartBucketWithHTTPClient(httpClient, opts)
Expand Down
1 change: 1 addition & 0 deletions config.go
Expand Up @@ -54,6 +54,7 @@ type Settings struct {
Banner string `bson:"banner" json:"banner" yaml:"banner"`
BannerTheme BannerTheme `bson:"banner_theme" json:"banner_theme" yaml:"banner_theme"`
Bugsnag string `yaml:"bugsnag" bson:"bugsnag" json:"bugsnag"`
Backup BackupConfig `bson:"backup" json:"backup" yaml:"backup"`
ClientBinariesDir string `yaml:"client_binaries_dir" bson:"client_binaries_dir" json:"client_binaries_dir"`
CommitQueue CommitQueueConfig `yaml:"commit_queue" bson:"commit_queue" json:"commit_queue" id:"commit_queue"`
ConfigDir string `yaml:"configdir" bson:"configdir" json:"configdir"`
Expand Down
64 changes: 64 additions & 0 deletions config_backup.go
@@ -0,0 +1,64 @@
package evergreen

import (
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

type BackupConfig struct {
BucketName string `bson:"bucket_name" json:"bucket_name" yaml:"bucket_name"`
Key string `bson:"key" json:"key" yaml:"key"`
Secret string `bson:"secret" json:"secret" yaml:"secret"`
Prefix string `bson:"prefix" json:"prefix" yaml:"prefix"`
Compress bool `bson:"compress" json:"compress" yaml:"compress"`
}

func (c *BackupConfig) SectionId() string { return "backup" }
func (c *BackupConfig) ValidateAndDefault() error { return nil }

func (c *BackupConfig) Populated() bool {
return c.BucketName != "" && c.Prefix != ""
}

func (c *BackupConfig) Set() error {
env := GetEnvironment()
ctx, cancel := env.Context()
defer cancel()
coll := env.DB().Collection(ConfigCollection)

_, err := coll.UpdateOne(ctx, byId(c.SectionId()), bson.M{
"$set": bson.M{
"bucket_name": c.BucketName,
"key": c.Key,
"secret": c.Secret,
"compress": c.Compress,
"prefix": c.Prefix,
},
}, options.Update().SetUpsert(true))

return errors.Wrapf(err, "error updating section %s", c.SectionId())
}

func (c *BackupConfig) Get(env Environment) error {
ctx, cancel := env.Context()
defer cancel()
coll := env.DB().Collection(ConfigCollection)

res := coll.FindOne(ctx, byId(c.SectionId()))
if err := res.Err(); err != nil {
return errors.Wrapf(err, "error retrieving section %s", c.SectionId())
}

if err := res.Decode(c); err != nil {
if err == mongo.ErrNoDocuments {
*c = BackupConfig{}
return nil
}

return errors.Wrap(err, "problem decoding result")
}

return nil
}
2 changes: 2 additions & 0 deletions config_db.go
Expand Up @@ -56,6 +56,7 @@ var (
containerPoolsKey = bsonutil.MustHaveTag(Settings{}, "ContainerPools")
commitQueueKey = bsonutil.MustHaveTag(Settings{}, "CommitQueue")
ldapRoleMapKey = bsonutil.MustHaveTag(Settings{}, "LDAPRoleMap")
backupConfig = bsonutil.MustHaveTag(Settings{}, "Backup")

// degraded mode flags
taskDispatchKey = bsonutil.MustHaveTag(ServiceFlags{}, "TaskDispatchDisabled")
Expand All @@ -82,6 +83,7 @@ var (
commitQueueDisabledKey = bsonutil.MustHaveTag(ServiceFlags{}, "CommitQueueDisabled")
plannerDisabledKey = bsonutil.MustHaveTag(ServiceFlags{}, "PlannerDisabled")
hostAllocatorDisabledKey = bsonutil.MustHaveTag(ServiceFlags{}, "HostAllocatorDisabled")
drBackupDisabledKey = bsonutil.MustHaveTag(ServiceFlags{}, "DRBackupDisabled")

// ContainerPoolsConfig keys
poolsKey = bsonutil.MustHaveTag(ContainerPoolsConfig{}, "Pools")
Expand Down
2 changes: 2 additions & 0 deletions config_serviceflags.go
Expand Up @@ -27,6 +27,7 @@ type ServiceFlags struct {
CommitQueueDisabled bool `bson:"commit_queue_disabled" json:"commit_queue_disabled"`
PlannerDisabled bool `bson:"planner_disabled" json:"planner_disabled"`
HostAllocatorDisabled bool `bson:"host_allocator_disabled" json:"host_allocator_disabled"`
DRBackupDisabled bool `bson:"dr_backup_disabled" json:"dr_backup_disabled"`

// Notification Flags
EventProcessingDisabled bool `bson:"event_processing_disabled" json:"event_processing_disabled"`
Expand Down Expand Up @@ -91,6 +92,7 @@ func (c *ServiceFlags) Set() error {
commitQueueDisabledKey: c.CommitQueueDisabled,
plannerDisabledKey: c.PlannerDisabled,
hostAllocatorDisabledKey: c.HostAllocatorDisabled,
drBackupDisabledKey: c.DRBackupDisabled,
},
}, options.Update().SetUpsert(true))

Expand Down
4 changes: 2 additions & 2 deletions db/db_utils.go
Expand Up @@ -309,7 +309,7 @@ func WriteGridFile(fsPrefix, name string, source io.Reader) error {
defer cancel()
bucket, err := pail.NewGridFSBucketWithClient(ctx, env.Client(), pail.GridFSOptions{
Database: env.DB().Name(),
Prefix: fsPrefix,
Name: fsPrefix,
})

if err != nil {
Expand All @@ -325,7 +325,7 @@ func GetGridFile(fsPrefix, name string) (io.ReadCloser, error) {
defer cancel()
bucket, err := pail.NewGridFSBucketWithClient(ctx, env.Client(), pail.GridFSOptions{
Database: env.DB().Name(),
Prefix: fsPrefix,
Name: fsPrefix,
})

if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions model/stats/db.go
Expand Up @@ -80,10 +80,10 @@ import (
)

const (
hourlyTestStatsCollection = "hourly_test_stats"
dailyTestStatsCollection = "daily_test_stats"
HourlyTestStatsCollection = "hourly_test_stats"
DailyTestStatsCollection = "daily_test_stats"
DailyTaskStatsCollection = "daily_task_stats"
dailyStatsStatusCollection = "daily_stats_status"
DailyStatsStatusCollection = "daily_stats_status"
bulkSize = 1000
nsInASecond = time.Second / time.Nanosecond
)
Expand Down Expand Up @@ -186,7 +186,7 @@ func hourlyTestStatsForOldTasksPipeline(projectId string, requester string, star
// And the merge the documents with the existing ones.
mergePipeline := []bson.M{
{"$lookup": bson.M{
"from": hourlyTestStatsCollection,
"from": HourlyTestStatsCollection,
"localField": dbTestStatsIdKey,
"foreignField": dbTestStatsIdKey,
"as": "existing",
Expand Down Expand Up @@ -1117,7 +1117,7 @@ func makeSum(condition bson.M) bson.M {

func GetDailyTestDoc(id DbTestStatsId) (*dbTestStats, error) {
doc := dbTestStats{}
err := db.FindOne(dailyTestStatsCollection, bson.M{"_id": id}, db.NoProjection, db.NoSort, &doc)
err := db.FindOne(DailyTestStatsCollection, bson.M{"_id": id}, db.NoProjection, db.NoSort, &doc)
if adb.ResultsNotFound(err) {
return nil, nil
}
Expand All @@ -1126,7 +1126,7 @@ func GetDailyTestDoc(id DbTestStatsId) (*dbTestStats, error) {

func GetHourlyTestDoc(id DbTestStatsId) (*dbTestStats, error) {
doc := dbTestStats{}
err := db.FindOne(hourlyTestStatsCollection, bson.M{"_id": id}, db.NoProjection, db.NoSort, &doc)
err := db.FindOne(HourlyTestStatsCollection, bson.M{"_id": id}, db.NoProjection, db.NoSort, &doc)
if adb.ResultsNotFound(err) {
return nil, nil
}
Expand Down
2 changes: 1 addition & 1 deletion model/stats/query.go
Expand Up @@ -263,7 +263,7 @@ func GetTestStats(filter StatsFilter) ([]TestStats, error) {
}
var stats []TestStats
pipeline := filter.testStatsQueryPipeline()
err = db.Aggregate(dailyTestStatsCollection, pipeline, &stats)
err = db.Aggregate(DailyTestStatsCollection, pipeline, &stats)
if err != nil {
return nil, errors.Wrap(err, "Failed to aggregate test statistics")
}
Expand Down
4 changes: 2 additions & 2 deletions model/stats/query_test.go
Expand Up @@ -28,7 +28,7 @@ func TestStatsQuerySuite(t *testing.T) {
}

func (s *statsQuerySuite) SetupTest() {
s.clearCollection(dailyTestStatsCollection)
s.clearCollection(DailyTestStatsCollection)
s.clearCollection(DailyTaskStatsCollection)

s.baseTestFilter = StatsFilter{
Expand Down Expand Up @@ -948,7 +948,7 @@ func (s *statsQuerySuite) clearCollection(name string) {

func (s *statsQuerySuite) insertDailyTestStats(project string, requester string, testFile string, taskName string, variant string, distro string, date time.Time, numPass int, numFail int, avgDuration float64) {

err := db.Insert(dailyTestStatsCollection, bson.M{
err := db.Insert(DailyTestStatsCollection, bson.M{
"_id": DbTestStatsId{
Project: project,
Requester: requester,
Expand Down
10 changes: 5 additions & 5 deletions model/stats/stats.go
Expand Up @@ -51,7 +51,7 @@ func createDefaultStatsStatus(projectId string) StatsStatus {
func GetStatsStatus(projectId string) (StatsStatus, error) {
status := StatsStatus{}
query := statsStatusQuery(projectId)
err := db.FindOne(dailyStatsStatusCollection, query, db.NoProjection, db.NoSort, &status)
err := db.FindOne(DailyStatsStatusCollection, query, db.NoProjection, db.NoSort, &status)
if adb.ResultsNotFound(err) {
return createDefaultStatsStatus(projectId), nil
}
Expand All @@ -69,7 +69,7 @@ func UpdateStatsStatus(projectId string, lastJobRun time.Time, processedTasksUnt
ProcessedTasksUntil: processedTasksUntil,
Runtime: runtime,
}
_, err := db.Upsert(dailyStatsStatusCollection, bson.M{"_id": projectId}, status)
_, err := db.Upsert(DailyStatsStatusCollection, bson.M{"_id": projectId}, status)
if err != nil {
return errors.Wrap(err, "Failed to update test stats status")
}
Expand Down Expand Up @@ -104,7 +104,7 @@ func GenerateHourlyTestStats(ctx context.Context, opts GenerateOptions) error {
end := start.Add(time.Hour)
// Generate the stats based on tasks.
pipeline := hourlyTestStatsPipeline(opts.ProjectID, opts.Requester, start, end, opts.Tasks, opts.Runtime)
err := aggregateIntoCollection(ctx, task.Collection, pipeline, hourlyTestStatsCollection)
err := aggregateIntoCollection(ctx, task.Collection, pipeline, HourlyTestStatsCollection)
if err != nil {
return errors.Wrap(err, "Failed to generate hourly stats")
}
Expand All @@ -119,7 +119,7 @@ func GenerateHourlyTestStats(ctx context.Context, opts GenerateOptions) error {
})
// Generate/Update the stats for old tasks.
pipeline = hourlyTestStatsForOldTasksPipeline(opts.ProjectID, opts.Requester, start, end, opts.Tasks, opts.Runtime)
err = aggregateIntoCollection(ctx, task.OldCollection, pipeline, hourlyTestStatsCollection)
err = aggregateIntoCollection(ctx, task.OldCollection, pipeline, HourlyTestStatsCollection)
if err != nil {
return errors.Wrap(err, "Failed to generate hourly stats for old tasks")
}
Expand All @@ -141,7 +141,7 @@ func GenerateDailyTestStatsFromHourly(ctx context.Context, opts GenerateOptions)
start := util.GetUTCDay(opts.Window)
end := start.Add(24 * time.Hour)
pipeline := dailyTestStatsFromHourlyPipeline(opts.ProjectID, opts.Requester, start, end, opts.Tasks, opts.Runtime)
err := aggregateIntoCollection(ctx, hourlyTestStatsCollection, pipeline, dailyTestStatsCollection)
err := aggregateIntoCollection(ctx, HourlyTestStatsCollection, pipeline, DailyTestStatsCollection)
if err != nil {
return errors.Wrap(err, "Failed to aggregate hourly stats into daily stats")
}
Expand Down
14 changes: 7 additions & 7 deletions model/stats/stats_test.go
Expand Up @@ -38,9 +38,9 @@ func TestStatsSuite(t *testing.T) {

func (s *statsSuite) SetupTest() {
collectionsToClear := []string{
hourlyTestStatsCollection,
dailyTestStatsCollection,
dailyStatsStatusCollection,
HourlyTestStatsCollection,
DailyTestStatsCollection,
DailyStatsStatusCollection,
DailyTaskStatsCollection,
task.Collection,
task.OldCollection,
Expand Down Expand Up @@ -615,7 +615,7 @@ func (s *statsSuite) initHourly() {

func (s *statsSuite) insertHourlyTestStats(project string, requester string, testFile string, taskName string, variant string, distro string, date time.Time, numPass int, numFail int, avgDuration float64, lastID mgobson.ObjectId) {

err := db.Insert(hourlyTestStatsCollection, bson.M{
err := db.Insert(HourlyTestStatsCollection, bson.M{
"_id": DbTestStatsId{
Project: project,
Requester: requester,
Expand Down Expand Up @@ -866,11 +866,11 @@ func (s *statsSuite) countDocs(collection string) int {
}

func (s *statsSuite) countDailyTestDocs() int {
return s.countDocs(dailyTestStatsCollection)
return s.countDocs(DailyTestStatsCollection)
}

func (s *statsSuite) countHourlyTestDocs() int {
return s.countDocs(hourlyTestStatsCollection)
return s.countDocs(HourlyTestStatsCollection)
}

func (s *statsSuite) countDailyTaskDocs() int {
Expand Down Expand Up @@ -926,7 +926,7 @@ func (s *statsSuite) getLastHourlyTestStat(testStatsID DbTestStatsId) (*dbTestSt
"$lt": end,
},
}
err := db.FindAll(hourlyTestStatsCollection, qry, db.NoProjection, []string{"-last_id"}, db.NoSkip, 1, &testResults)
err := db.FindAll(HourlyTestStatsCollection, qry, db.NoProjection, []string{"-last_id"}, db.NoSkip, 1, &testResults)
if adb.ResultsNotFound(err) {
return nil, nil
}
Expand Down
2 changes: 1 addition & 1 deletion operations/agent.go
Expand Up @@ -107,7 +107,7 @@ func Agent() cli.Command {
Credentials: pail.CreateAWSCredentials(os.Getenv("S3_KEY"), os.Getenv("S3_SECRET"), ""),
Region: endpoints.UsEast1RegionID,
Name: os.Getenv("S3_BUCKET"),
Permission: "public-read",
Permissions: pail.S3PermissionsPublicRead,
ContentType: "text/plain",
},
}
Expand Down

0 comments on commit c350f8e

Please sign in to comment.