Skip to content

Commit

Permalink
web: behaviour: watch endpoints are disabled by default
Browse files Browse the repository at this point in the history
and add some integration tests making use of the go-concourse
implementation we added.

Signed-off-by: Aidan Oldershaw <aoldershaw@pivotal.io>
  • Loading branch information
Aidan Oldershaw committed Jul 2, 2020
1 parent 3239a65 commit 9adfb76
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 22 deletions.
22 changes: 21 additions & 1 deletion atc/api/jobs_test.go
Expand Up @@ -266,7 +266,7 @@ var _ = Describe("Jobs API", func() {
dbJobFactory.VisibleJobsReturns(atc.Dashboard{job}, nil)

eventsChan = make(chan []watch.DashboardJobEvent, 1)
fakeListAllJobsWatcher.WatchListAllJobsReturns(eventsChan)
fakeListAllJobsWatcher.WatchListAllJobsReturns(eventsChan, nil)
})

AfterEach(func() {
Expand Down Expand Up @@ -323,6 +323,26 @@ var _ = Describe("Jobs API", func() {
Data: []byte(`[{"id":1,"eventType":"PUT","job":{"id":1,"name":"new-job","pipeline_name":"some-pipeline","team_name":"some-team","next_build":null,"finished_build":null,"groups":null}},{"id":2,"eventType":"DELETE","job":null}]`),
}))
})

Context("when the watcher returns watch.ErrDisabled", func() {
BeforeEach(func() {
fakeListAllJobsWatcher.WatchListAllJobsReturns(nil, watch.ErrDisabled)
})

It("returns a 406 status code", func() {
Expect(response.StatusCode).To(Equal(http.StatusNotAcceptable))
})
})

Context("when the watcher returns an unexpected error", func() {
BeforeEach(func() {
fakeListAllJobsWatcher.WatchListAllJobsReturns(nil, errors.New("something bad"))
})

It("returns a 500 status code", func() {
Expect(response.StatusCode).To(Equal(http.StatusInternalServerError))
})
})
})

Describe("GET /api/v1/teams/:team_name/pipelines/:pipeline_name/jobs/:job_name", func() {
Expand Down
23 changes: 14 additions & 9 deletions atc/api/jobserver/jobserverfakes/fake_list_all_jobs_watcher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 11 additions & 3 deletions atc/api/jobserver/list_all.go
Expand Up @@ -15,7 +15,7 @@ import (
//go:generate counterfeiter . ListAllJobsWatcher

type ListAllJobsWatcher interface {
WatchListAllJobs(ctx context.Context, access accessor.Access) <-chan []watch.DashboardJobEvent
WatchListAllJobs(ctx context.Context, access accessor.Access) (<-chan []watch.DashboardJobEvent, error)
}

type JobWatchEvent struct {
Expand All @@ -31,12 +31,20 @@ func (s *Server) ListAllJobs(w http.ResponseWriter, r *http.Request) {

watchMode := stream.IsRequested(r)
var watchEventsChan <-chan []watch.DashboardJobEvent
var err error
if watchMode {
watchEventsChan = s.listAllJobsWatcher.WatchListAllJobs(r.Context(), acc)
watchEventsChan, err = s.listAllJobsWatcher.WatchListAllJobs(r.Context(), acc)
if err == watch.ErrDisabled {
http.Error(w, "ListAllJobs watch endpoint is not enabled", http.StatusNotAcceptable)
return
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

var dashboard atc.Dashboard
var err error

if acc.IsAdmin() {
dashboard, err = s.jobFactory.AllActiveJobs()
Expand Down
16 changes: 12 additions & 4 deletions atc/atccmd/command.go
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/concourse/concourse/atc/api/auth"
"github.com/concourse/concourse/atc/api/buildserver"
"github.com/concourse/concourse/atc/api/containerserver"
"github.com/concourse/concourse/atc/api/jobserver"
"github.com/concourse/concourse/atc/api/pipelineserver"
"github.com/concourse/concourse/atc/api/policychecker"
"github.com/concourse/concourse/atc/auditor"
Expand Down Expand Up @@ -249,6 +250,8 @@ type RunCommand struct {
SystemClaimValues []string `long:"system-claim-value" default:"concourse-worker" description:"Configure which token requests should be considered 'system' requests."`
EnableArchivePipeline bool `long:"enable-archive-pipeline" description:"Enable /api/v1/teams/{team}/pipelines/{pipeline}/archive endpoint."`

EnableWatchEndpoints bool `long:"enable-watch-endpoints" description:"Enable watching API endpoints for changes."`

EnableBuildRerunWhenWorkerDisappears bool `long:"enable-rerun-when-worker-disappears" description:"Enable automatically build rerun when worker disappears"`
}

Expand Down Expand Up @@ -731,9 +734,14 @@ func (cmd *RunCommand) constructAPIMembers(

middleware := token.NewMiddleware(cmd.Auth.AuthFlags.SecureCookies)

listAllJobsWatcher, err := watch.NewListAllJobsWatcher(logger.Session("list-all-jobs-watcher"), dbConn, lockFactory)
if err != nil {
return nil, err
var listAllJobsWatcher jobserver.ListAllJobsWatcher
if cmd.EnableWatchEndpoints {
listAllJobsWatcher, err = watch.NewListAllJobsWatcher(logger.Session("list-all-jobs-watcher"), dbConn, lockFactory)
if err != nil {
return nil, err
}
} else {
listAllJobsWatcher = watch.DisabledListAllJobsWatcher{}
}

apiHandler, err := cmd.constructAPIHandler(
Expand Down Expand Up @@ -1751,7 +1759,7 @@ func (cmd *RunCommand) constructAPIHandler(
tokenVerifier accessor.TokenVerifier,
notifications db.NotificationsBus,
policyChecker *policy.Checker,
listAllJobsWatcher *watch.ListAllJobsWatcher,
listAllJobsWatcher jobserver.ListAllJobsWatcher,
) (http.Handler, error) {

checkPipelineAccessHandlerFactory := auth.NewCheckPipelineAccessHandlerFactory(teamFactory)
Expand Down
14 changes: 14 additions & 0 deletions atc/db/watch/disabled_list_all_jobs_watcher.go
@@ -0,0 +1,14 @@
package watch

import (
"context"

"github.com/concourse/concourse/atc/api/accessor"
)

type DisabledListAllJobsWatcher struct {
}

func (d DisabledListAllJobsWatcher) WatchListAllJobs(ctx context.Context, access accessor.Access) (<-chan []DashboardJobEvent, error) {
return nil, ErrDisabled
}
4 changes: 2 additions & 2 deletions atc/db/watch/list_all_jobs_watcher.go
Expand Up @@ -117,15 +117,15 @@ func (w *ListAllJobsWatcher) setupTriggers() error {
return nil
}

func (w *ListAllJobsWatcher) WatchListAllJobs(ctx context.Context, access accessor.Access) <-chan []DashboardJobEvent {
func (w *ListAllJobsWatcher) WatchListAllJobs(ctx context.Context, access accessor.Access) (<-chan []DashboardJobEvent, error) {
eventsChan := make(chan []DashboardJobEvent)

dirty := make(chan struct{})
var pendingEvents []DashboardJobEvent
var mtx sync.Mutex
go w.watchEvents(ctx, access, &pendingEvents, &mtx, dirty)
go w.sendEvents(ctx, eventsChan, &pendingEvents, &mtx, dirty)
return eventsChan
return eventsChan, nil
}

func (w *ListAllJobsWatcher) watchEvents(
Expand Down
4 changes: 2 additions & 2 deletions atc/db/watch/list_all_jobs_watcher_test.go
Expand Up @@ -141,7 +141,7 @@ var _ = Describe("ListAllJobsWatcher", func() {
})

JustBeforeEach(func() {
eventsChan := watcher.WatchListAllJobs(ctx, access)
eventsChan, _ := watcher.WatchListAllJobs(ctx, access)
go func() {
for events := range eventsChan {
mtx.Lock()
Expand Down Expand Up @@ -279,7 +279,7 @@ var _ = Describe("ListAllJobsWatcher", func() {

It("cancelling the context halts the subscriber", func() {
time.AfterFunc(50*time.Millisecond, cancel)
eventsChan := watcher.WatchListAllJobs(ctx, new(accessorfakes.FakeAccess))
eventsChan, _ := watcher.WatchListAllJobs(ctx, new(accessorfakes.FakeAccess))
Eventually(eventsChan).Should(BeClosed())
})
})
Expand Down
2 changes: 1 addition & 1 deletion atc/db/watch/watchers.go
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/concourse/concourse/atc/db"
)

var ErrDisconnectedFromNotificationBus = fmt.Errorf("temporarily disconnected from notification bus")
var ErrDisabled = fmt.Errorf("watching is disabled")

const eventsChannel = "watch_events"

Expand Down
129 changes: 129 additions & 0 deletions atc/integration/watch_test.go
@@ -0,0 +1,129 @@
package integration_test

import (
"errors"

"github.com/concourse/concourse/atc"
"github.com/concourse/concourse/atc/api/jobserver"
"github.com/concourse/concourse/atc/db/watch"
concourse "github.com/concourse/concourse/go-concourse/concourse"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

type jobsEventsVisitor struct {
OnInitialEvent func([]atc.Job) error
OnPatchEvent func([]jobserver.JobWatchEvent) error
}

func (j jobsEventsVisitor) VisitInitialEvent(jobs []atc.Job) error {
if j.OnInitialEvent == nil {
return errors.New("unexpected initial event")
}
return j.OnInitialEvent(jobs)
}

func (j jobsEventsVisitor) VisitPatchEvent(events []jobserver.JobWatchEvent) error {
if j.OnPatchEvent == nil {
return errors.New("unexpected patch event")
}
return j.OnPatchEvent(events)
}

var _ = Describe("Watch Test", func() {
var (
client concourse.Client
)

BeforeEach(func() {
cmd.EnableWatchEndpoints = true
})

JustBeforeEach(func() {
client = login(atcURL, "test", "test")
})

It("can watch for changes to ListAllJobs", func() {
By("initiating WatchListAllJobs")
givenAPipeline(client, "pipeline")

events := whenIWatchListAllJobs(client)
defer events.Close()

thenIReceiveInitialJobs(events, []atc.Job{getJob(client, "pipeline", "simple")})

By("triggering a job build")
whenITriggerJobBuild(client, "pipeline", "simple")
job := getJob(client, "pipeline", "simple")
thenIReceivePatchEvents(events, []jobserver.JobWatchEvent{putEvent(job)})

By("deleting the pipeline")
whenIDeletePipeline(client, "pipeline")
thenIReceivePatchEvents(events, []jobserver.JobWatchEvent{deleteEvent(job.ID)})
})

Context("when watch endpoints are not enabled", func() {
BeforeEach(func() {
cmd.EnableWatchEndpoints = false
})

It("errors when trying to watch ListAllJobs", func() {
_, err := client.WatchListAllJobs()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("406 Not Acceptable"))
})
})
})

func whenIWatchListAllJobs(client concourse.Client) concourse.JobsEvents {
events, err := client.WatchListAllJobs()
Expect(err).ToNot(HaveOccurred())
return events
}

func thenIReceiveInitialJobs(events concourse.JobsEvents, initialJobs []atc.Job) {
err := events.Accept(jobsEventsVisitor{
OnInitialEvent: func(jobs []atc.Job) error {
defer GinkgoRecover()
Expect(jobs).To(Equal(initialJobs))
return nil
},
})
Expect(err).ToNot(HaveOccurred())
}

func thenIReceivePatchEvents(events concourse.JobsEvents, patchEvents []jobserver.JobWatchEvent) {
err := events.Accept(jobsEventsVisitor{
OnPatchEvent: func(events []jobserver.JobWatchEvent) error {
defer GinkgoRecover()
Expect(events).To(Equal(patchEvents))
return nil
},
})
Expect(err).ToNot(HaveOccurred())
}

func whenITriggerJobBuild(client concourse.Client, pipelineName string, jobName string) {
_, err := client.Team("main").CreateJobBuild(pipelineName, jobName)
Expect(err).ToNot(HaveOccurred())
}

func whenIDeletePipeline(client concourse.Client, pipelineName string) {
_, err := client.Team("main").DeletePipeline(pipelineName)
Expect(err).ToNot(HaveOccurred())
}

func getJob(client concourse.Client, pipelineName string, jobName string) atc.Job {
job, found, err := client.Team("main").Job(pipelineName, jobName)
Expect(err).ToNot(HaveOccurred())
Expect(found).To(BeTrue())
return job
}

func putEvent(job atc.Job) jobserver.JobWatchEvent {
return jobserver.JobWatchEvent{ID: job.ID, Type: watch.Put, Job: &job}
}

func deleteEvent(id int) jobserver.JobWatchEvent {
return jobserver.JobWatchEvent{ID: id, Type: watch.Delete}
}

0 comments on commit 9adfb76

Please sign in to comment.