Skip to content

Commit

Permalink
atc: structure: support pluggable build event processing
Browse files Browse the repository at this point in the history
The motivation for this change is to decouple build events from the
database layer. This can allow us to add different storage options for
build events (e.g. Elasticsearch), which was brought up in #5306.

This commit adds `EventProcessor`. The reason for the generic name (as
opposed to something like `EventStore`) is that I envision this
interface being implemented by things that go beyond just a Store. For
instance, the initial motivation for this was to explore server-side
interpretation of the ANSI control sequences in log events. Another
possible use case for `EventProcessors` is secret redaction - which I'll
look into refactoring in subsequent commits. Multiple `EventProcessors`
can be chained together to form a data transformation pipeline, where
the terminal processor (should) store the events somewhere.

In addition to processing events, the `EventProcessor` has the
`Initialize(Build)` and `Finalize(Build)` methods. For the DB store,
this is creating and deleting a sequence for event ids (respectively).

Since we initialize builds right when they're created, this commit also
introduces a `BuildCreator` interface, the implementation of which
delegates the `Initialize` call to the `EventProcessor`. This is so
higher-level packages (e.g. API) don't need to be aware of
`EventProcessors`, which feels like a lower-level detail.

P.S. a lot of tests are broken after this commit - I'm saving most of
the test fixes for a subsequent commit so it's easier to skim through.

Signed-off-by: Aidan Oldershaw <aoldershaw@pivotal.io>
  • Loading branch information
aoldershaw committed Apr 28, 2020
1 parent 3416c90 commit 1d3172d
Show file tree
Hide file tree
Showing 36 changed files with 1,825 additions and 1,342 deletions.
3 changes: 3 additions & 0 deletions atc/api/api_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (
dbBuildFactory *dbfakes.FakeBuildFactory
dbUserFactory *dbfakes.FakeUserFactory
dbCheckFactory *dbfakes.FakeCheckFactory
dbBuildCreator *dbfakes.FakeBuildCreator
dbTeam *dbfakes.FakeTeam
dbWall *dbfakes.FakeWall
fakeSecretManager *credsfakes.FakeSecrets
Expand Down Expand Up @@ -99,6 +100,7 @@ var _ = BeforeEach(func() {
dbBuildFactory = new(dbfakes.FakeBuildFactory)
dbUserFactory = new(dbfakes.FakeUserFactory)
dbCheckFactory = new(dbfakes.FakeCheckFactory)
dbBuildCreator = new(dbfakes.FakeBuildCreator)
dbWall = new(dbfakes.FakeWall)

interceptTimeoutFactory = new(containerserverfakes.FakeInterceptTimeoutFactory)
Expand Down Expand Up @@ -179,6 +181,7 @@ var _ = BeforeEach(func() {
dbCheckFactory,
dbResourceConfigFactory,
dbUserFactory,
dbBuildCreator,

constructedEventHandler.Construct,

Expand Down
13 changes: 8 additions & 5 deletions atc/api/builds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var _ = Describe("Builds API", func() {
})

It("does not trigger a build", func() {
Expect(dbTeam.CreateStartedBuildCallCount()).To(BeZero())
Expect(dbBuildCreator.CreateStartedBuildCallCount()).To(BeZero())
})
})

Expand All @@ -84,7 +84,7 @@ var _ = Describe("Builds API", func() {

Context("when creating a started build fails", func() {
BeforeEach(func() {
dbTeam.CreateStartedBuildReturns(nil, errors.New("oh no!"))
dbBuildCreator.CreateStartedBuildReturns(nil, errors.New("oh no!"))
})

It("returns 500 Internal Server Error", func() {
Expand All @@ -105,7 +105,7 @@ var _ = Describe("Builds API", func() {
fakeBuild.EndTimeReturns(time.Unix(100, 0))
fakeBuild.ReapTimeReturns(time.Unix(200, 0))

dbTeam.CreateStartedBuildReturns(fakeBuild, nil)
dbBuildCreator.CreateStartedBuildReturns(fakeBuild, nil)
})

It("returns 201 Created", func() {
Expand All @@ -120,8 +120,11 @@ var _ = Describe("Builds API", func() {
})

It("creates a started build", func() {
Expect(dbTeam.CreateStartedBuildCallCount()).To(Equal(1))
Expect(dbTeam.CreateStartedBuildArgsForCall(0)).To(Equal(plan))
Expect(dbBuildCreator.CreateStartedBuildCallCount()).To(Equal(1))
teamID, pipelineID, actualPlan := dbBuildCreator.CreateStartedBuildArgsForCall(0)
Expect(actualPlan).To(Equal(plan))
Expect(teamID).To(Equal(dbTeam.ID()))
Expect(pipelineID).To(BeZero())
})

It("returns the created build", func() {
Expand Down
2 changes: 1 addition & 1 deletion atc/api/buildserver/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (s *Server) CreateBuild(team db.Team) http.Handler {
return
}

build, err := team.CreateStartedBuild(plan)
build, err := s.buildCreator.CreateStartedBuild(team.ID(), 0, plan)
if err != nil {
hLog.Error("failed-to-create-one-off-build", err)
w.WriteHeader(http.StatusInternalServerError)
Expand Down
3 changes: 3 additions & 0 deletions atc/api/buildserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Server struct {

teamFactory db.TeamFactory
buildFactory db.BuildFactory
buildCreator db.BuildCreator
eventHandlerFactory EventHandlerFactory
rejector auth.Rejector
}
Expand All @@ -26,6 +27,7 @@ func NewServer(
externalURL string,
teamFactory db.TeamFactory,
buildFactory db.BuildFactory,
buildCreator db.BuildCreator,
eventHandlerFactory EventHandlerFactory,
) *Server {
return &Server{
Expand All @@ -35,6 +37,7 @@ func NewServer(

teamFactory: teamFactory,
buildFactory: buildFactory,
buildCreator: buildCreator,
eventHandlerFactory: eventHandlerFactory,

rejector: auth.UnauthorizedRejector{},
Expand Down
7 changes: 4 additions & 3 deletions atc/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func NewHandler(
dbCheckFactory db.CheckFactory,
dbResourceConfigFactory db.ResourceConfigFactory,
dbUserFactory db.UserFactory,
dbBuildCreator db.BuildCreator,

eventHandlerFactory buildserver.EventHandlerFactory,

Expand Down Expand Up @@ -87,13 +88,13 @@ func NewHandler(
buildHandlerFactory := buildserver.NewScopedHandlerFactory(logger)
teamHandlerFactory := NewTeamScopedHandlerFactory(logger, dbTeamFactory)

buildServer := buildserver.NewServer(logger, externalURL, dbTeamFactory, dbBuildFactory, eventHandlerFactory)
buildServer := buildserver.NewServer(logger, externalURL, dbTeamFactory, dbBuildFactory, dbBuildCreator, eventHandlerFactory)
checkServer := checkserver.NewServer(logger, dbCheckFactory)
jobServer := jobserver.NewServer(logger, externalURL, secretManager, dbJobFactory, dbCheckFactory)
jobServer := jobserver.NewServer(logger, externalURL, secretManager, dbJobFactory, dbCheckFactory, dbBuildCreator)
resourceServer := resourceserver.NewServer(logger, secretManager, varSourcePool, dbCheckFactory, dbResourceFactory, dbResourceConfigFactory)

versionServer := versionserver.NewServer(logger, externalURL)
pipelineServer := pipelineserver.NewServer(logger, dbTeamFactory, dbPipelineFactory, externalURL, enableArchivePipeline)
pipelineServer := pipelineserver.NewServer(logger, dbTeamFactory, dbPipelineFactory, dbBuildCreator, externalURL, enableArchivePipeline)
configServer := configserver.NewServer(logger, dbTeamFactory, secretManager)
ccServer := ccserver.NewServer(logger, dbTeamFactory, externalURL)
workerServer := workerserver.NewServer(logger, dbTeamFactory, dbWorkerFactory)
Expand Down
13 changes: 7 additions & 6 deletions atc/api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1471,7 +1471,7 @@ var _ = Describe("Jobs API", func() {
})

It("does not trigger the build", func() {
Expect(fakeJob.CreateBuildCallCount()).To(Equal(0))
Expect(dbBuildCreator.CreateBuildCallCount()).To(Equal(0))
})
})

Expand All @@ -1482,7 +1482,7 @@ var _ = Describe("Jobs API", func() {

Context("when triggering the build fails", func() {
BeforeEach(func() {
fakeJob.CreateBuildReturns(nil, errors.New("nopers"))
dbBuildCreator.CreateBuildReturns(nil, errors.New("nopers"))
})
It("returns a 500", func() {
Expect(response.StatusCode).To(Equal(http.StatusInternalServerError))
Expand All @@ -1501,11 +1501,12 @@ var _ = Describe("Jobs API", func() {
build.StartTimeReturns(time.Unix(1, 0))
build.EndTimeReturns(time.Unix(100, 0))

fakeJob.CreateBuildReturns(build, nil)
dbBuildCreator.CreateBuildReturns(build, nil)
})

It("triggers the build", func() {
Expect(fakeJob.CreateBuildCallCount()).To(Equal(1))
Expect(dbBuildCreator.CreateBuildCallCount()).To(Equal(1))
Expect(dbBuildCreator.CreateBuildArgsForCall(0)).To(BeIdenticalTo(fakeJob))
})

Context("when finding the pipeline resources fails", func() {
Expand Down Expand Up @@ -2101,7 +2102,7 @@ var _ = Describe("Jobs API", func() {
})
Context("when creating the rerun build fails", func() {
BeforeEach(func() {
fakeJob.RerunBuildReturns(nil, errors.New("nopers"))
dbBuildCreator.RerunBuildReturns(nil, errors.New("nopers"))
})

It("returns a 500", func() {
Expand All @@ -2121,7 +2122,7 @@ var _ = Describe("Jobs API", func() {
build.StartTimeReturns(time.Unix(1, 0))
build.EndTimeReturns(time.Unix(100, 0))

fakeJob.RerunBuildReturns(build, nil)
dbBuildCreator.RerunBuildReturns(build, nil)
})

It("returns 200 OK", func() {
Expand Down
2 changes: 1 addition & 1 deletion atc/api/jobserver/create_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (s *Server) CreateJobBuild(pipeline db.Pipeline) http.Handler {
return
}

build, err := job.CreateBuild()
build, err := s.buildCreator.CreateBuild(job)
if err != nil {
logger.Error("failed-to-create-job-build", err)
w.WriteHeader(http.StatusInternalServerError)
Expand Down
2 changes: 1 addition & 1 deletion atc/api/jobserver/rerun_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s *Server) RerunJobBuild(pipeline db.Pipeline) http.Handler {
return
}

build, err := job.RerunBuild(buildToRerun)
build, err := s.buildCreator.RerunBuild(job, buildToRerun)
if err != nil {
logger.Error("failed-to-retrigger-build", err)
w.WriteHeader(http.StatusInternalServerError)
Expand Down
3 changes: 3 additions & 0 deletions atc/api/jobserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Server struct {
secretManager creds.Secrets
jobFactory db.JobFactory
checkFactory db.CheckFactory
buildCreator db.BuildCreator
}

func NewServer(
Expand All @@ -23,6 +24,7 @@ func NewServer(
secretManager creds.Secrets,
jobFactory db.JobFactory,
checkFactory db.CheckFactory,
buildCreator db.BuildCreator,
) *Server {
return &Server{
logger: logger,
Expand All @@ -31,5 +33,6 @@ func NewServer(
secretManager: secretManager,
jobFactory: jobFactory,
checkFactory: checkFactory,
buildCreator: buildCreator,
}
}
2 changes: 1 addition & 1 deletion atc/api/pipelineserver/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (s *Server) CreateBuild(pipeline db.Pipeline) http.Handler {
return
}

build, err := pipeline.CreateStartedBuild(plan)
build, err := s.buildCreator.CreateStartedBuild(pipeline.TeamID(), pipeline.ID(), plan)
if err != nil {
logger.Error("failed-to-create-one-off-build", err)
w.WriteHeader(http.StatusInternalServerError)
Expand Down
3 changes: 3 additions & 0 deletions atc/api/pipelineserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Server struct {
teamFactory db.TeamFactory
rejector auth.Rejector
pipelineFactory db.PipelineFactory
buildCreator db.BuildCreator
externalURL string
enableArchivePipeline bool
}
Expand All @@ -19,6 +20,7 @@ func NewServer(
logger lager.Logger,
teamFactory db.TeamFactory,
pipelineFactory db.PipelineFactory,
buildCreator db.BuildCreator,
externalURL string,
enableArchivePipeline bool,
) *Server {
Expand All @@ -27,6 +29,7 @@ func NewServer(
teamFactory: teamFactory,
rejector: auth.UnauthorizedRejector{},
pipelineFactory: pipelineFactory,
buildCreator: buildCreator,
externalURL: externalURL,
enableArchivePipeline: enableArchivePipeline,
}
Expand Down
21 changes: 18 additions & 3 deletions atc/atccmd/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ func (cmd *RunCommand) constructAPIMembers(
dbCheckFactory := db.NewCheckFactory(dbConn, lockFactory, secretManager, cmd.varSourcePool, cmd.GlobalResourceCheckTimeout)
dbClock := db.NewClock()
dbWall := db.NewWall(dbConn, &dbClock)
dbBuildCreator := db.NewBuildCreator(dbConn, lockFactory, cmd.constructEventProcessor(dbConn))

customRoles, err := cmd.parseCustomRoles(logger)
if err != nil {
Expand Down Expand Up @@ -686,6 +687,7 @@ func (cmd *RunCommand) constructAPIMembers(
credsManagers,
accessFactory,
dbWall,
dbBuildCreator,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -876,6 +878,7 @@ func (cmd *RunCommand) constructBackendMembers(
return nil, err
}

eventProcessor := cmd.constructEventProcessor(dbConn)
engine := cmd.constructEngine(
pool,
workerClient,
Expand All @@ -887,12 +890,14 @@ func (cmd *RunCommand) constructBackendMembers(
defaultLimits,
buildContainerStrategy,
lockFactory,
eventProcessor,
)

dbBuildFactory := db.NewBuildFactory(dbConn, lockFactory, cmd.GC.OneOffBuildGracePeriod, cmd.GC.FailedGracePeriod)
dbCheckFactory := db.NewCheckFactory(dbConn, lockFactory, secretManager, cmd.varSourcePool, cmd.GlobalResourceCheckTimeout)
dbPipelineFactory := db.NewPipelineFactory(dbConn, lockFactory)
dbJobFactory := db.NewJobFactory(dbConn, lockFactory)
dbBuildCreator := db.NewBuildCreator(dbConn, lockFactory, eventProcessor)

componentFactory := db.NewComponentFactory(dbConn)

Expand Down Expand Up @@ -939,7 +944,10 @@ func (cmd *RunCommand) constructBackendMembers(
factory.NewBuildFactory(
atc.NewPlanFactory(time.Now().Unix()),
),
alg),
alg,
eventProcessor,
),
BuildCreator: dbBuildCreator,
},
cmd.JobSchedulingMaxInFlight,
),
Expand Down Expand Up @@ -1055,6 +1063,10 @@ func (cmd *RunCommand) constructGCMember(
return members, nil
}

func (cmd *RunCommand) constructEventProcessor(dbConn db.Conn) db.EventProcessor {
return db.NewBuildEventStore(dbConn)
}

func (cmd *RunCommand) validateCustomRoles() error {

path := cmd.ConfigRBAC.Path()
Expand Down Expand Up @@ -1544,6 +1556,7 @@ func (cmd *RunCommand) constructEngine(
defaultLimits atc.ContainerLimits,
strategy worker.ContainerPlacementStrategy,
lockFactory lock.LockFactory,
eventProcessor db.EventProcessor,
) engine.Engine {

stepFactory := builder.NewStepFactory(
Expand All @@ -1560,14 +1573,14 @@ func (cmd *RunCommand) constructEngine(

stepBuilder := builder.NewStepBuilder(
stepFactory,
builder.NewDelegateFactory(),
builder.NewDelegateFactory(eventProcessor),
cmd.ExternalURL.String(),
secretManager,
cmd.varSourcePool,
cmd.EnableRedactSecrets,
)

return engine.NewEngine(stepBuilder)
return engine.NewEngine(stepBuilder, eventProcessor)
}

func (cmd *RunCommand) constructHTTPHandler(
Expand Down Expand Up @@ -1726,6 +1739,7 @@ func (cmd *RunCommand) constructAPIHandler(
credsManagers creds.Managers,
accessFactory accessor.AccessFactory,
dbWall db.Wall,
dbBuildCreator db.BuildCreator,
) (http.Handler, error) {

checkPipelineAccessHandlerFactory := auth.NewCheckPipelineAccessHandlerFactory(teamFactory)
Expand Down Expand Up @@ -1776,6 +1790,7 @@ func (cmd *RunCommand) constructAPIHandler(
dbCheckFactory,
resourceConfigFactory,
dbUserFactory,
dbBuildCreator,

buildserver.NewEventHandler,

Expand Down
Loading

0 comments on commit 1d3172d

Please sign in to comment.