Skip to content

Commit

Permalink
GCPPubSub refactor to support new streaming client API
Browse files Browse the repository at this point in the history
Google have recently made some breaking changes to their Pub/Sub
client API to support new high performance streaming methods.

https://groups.google.com/forum/#!topic/google-api-go-announce/aaqRDIQ3rvU/discussion
https://groups.google.com/forum/#!topic/google-api-go-announce/8pt6oetAdKc/discussion

The main change is the new methods don't easily support blocking
operations, nor one message at a time use cases. This is being
discussed: googleapis/google-cloud-go#566

This change attempts to use the new API but in a blocking method
indirectly discussed: googleapis/google-cloud-go#569

Since finishing and testing this method, which was successful, it
appears Google is discussing this use case further internally, so
this may not be the final solution, but gets us through for the moment.

If we're required to stop using the Pub/Sub client, and instead use
the APIv1 client, the issue does contain a gist of how it could work,
but it hasn't been tested in various failure modes, as it's a lower
level API - but I'm confident it just requires more testing and likely
no more changes.

Further, these changes did necessitate some refectoring on the internal
APIs, this was mostly opportunistic but made the changes simpler.

This refactors were essentially use a channel to push messages onto
the queue, previously this was an interface called Queuer. Also,
previously new jobs to be executed were sent on a channel, instead
each type of queuer should take a callback, and execute that callback
with the job as the only parameter.

GCPPubSubQueue was tested to ensure only one message is removed from
the queue at any time, allowing other instances to consume the remaining
messages, and on shutdown the executing job is allowed to finish in
full before the process exits.
  • Loading branch information
bradleyfalzon committed Mar 22, 2017
1 parent a88607a commit b464990
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 162 deletions.
7 changes: 3 additions & 4 deletions internal/github/github.go
Expand Up @@ -6,14 +6,13 @@ import (
"github.com/bradleyfalzon/ghinstallation"
"github.com/bradleyfalzon/gopherci/internal/analyser"
"github.com/bradleyfalzon/gopherci/internal/db"
"github.com/bradleyfalzon/gopherci/internal/queue"
)

// GitHub is the type gopherci uses to interract with github.com.
type GitHub struct {
db db.DB
analyser analyser.Analyser
queuer queue.Queuer
queuePush chan<- interface{}
webhookSecret []byte // shared webhook secret configured for the integration
integrationID int // id is the integration id
integrationKey []byte // integrationKey is the private key for the installationID
Expand All @@ -26,11 +25,11 @@ type GitHub struct {
// integrationID is the GitHub Integration ID (not installation ID).
// integrationKey is the key for the integrationID provided to you by GitHub
// during the integration registration.
func New(analyser analyser.Analyser, db db.DB, queuer queue.Queuer, integrationID int, integrationKey []byte, webhookSecret string) (*GitHub, error) {
func New(analyser analyser.Analyser, db db.DB, queuePush chan<- interface{}, integrationID int, integrationKey []byte, webhookSecret string) (*GitHub, error) {
g := &GitHub{
analyser: analyser,
db: db,
queuer: queuer,
queuePush: queuePush,
webhookSecret: []byte(webhookSecret),
integrationID: integrationID,
integrationKey: integrationKey,
Expand Down
4 changes: 2 additions & 2 deletions internal/github/handlers.go
Expand Up @@ -39,11 +39,11 @@ func (g *GitHub) WebHookHandler(w http.ResponseWriter, r *http.Request) {
err = g.integrationInstallationEvent(e)
case *github.PushEvent:
log.Printf("github: push event: installation id: %v", *e.Installation.ID)
err = g.queuer.Queue(e)
g.queuePush <- e
case *github.PullRequestEvent:
if validPRAction(*e.Action) {
log.Printf("github: pull request event: %v, installation id: %v", *e.Action, *e.Installation.ID)
err = g.queuer.Queue(e)
g.queuePush <- e
}
default:
log.Printf("github: ignored webhook event: %T", event)
Expand Down
6 changes: 4 additions & 2 deletions internal/github/handlers_test.go
Expand Up @@ -84,13 +84,15 @@ func setup(t *testing.T) (*GitHub, *mockAnalyser, *db.MockDB) {
wg sync.WaitGroup
c = make(chan interface{})
)
queue := queue.NewMemoryQueue(context.Background(), &wg, c)
queue := queue.NewMemoryQueue()
queue.Wait(context.Background(), &wg, c, func(job interface{}) {})

// New GitHub
g, err := New(mockAnalyser, memDB, queue, 1, integrationKey, webhookSecret)
g, err := New(mockAnalyser, memDB, c, 1, integrationKey, webhookSecret)
if err != nil {
t.Fatal("could not initialise GitHub:", err)
}

return g, mockAnalyser, memDB
}

Expand Down
110 changes: 59 additions & 51 deletions internal/queue/gcp-pubsub.go
Expand Up @@ -8,10 +8,11 @@ import (
"sync"
"time"

xContext "golang.org/x/net/context"

"github.com/google/go-github/github"
"github.com/pkg/errors"

"google.golang.org/api/iterator"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"

Expand All @@ -35,20 +36,18 @@ const (

// GCPPubSubQueue is a queue using Google Compute Platform's PubSub product.
type GCPPubSubQueue struct {
ctx context.Context // stop listening when this context is cancelled
c chan<- interface{}
topic *pubsub.Topic
topic *pubsub.Topic
subscription *pubsub.Subscription
}

var _ Queuer = &GCPPubSubQueue{}
var cxnTimeout = 15 * time.Second

// NewGCPPubSubQueue creates a new Queuer and listens on the queue, sending
// new jobs to the channel c, projectID is required but topicName is optional.
// Calls wg.Done() when finished after context has ben cancelled and current
// job has finished.
func NewGCPPubSubQueue(ctx context.Context, wg *sync.WaitGroup, c chan<- interface{}, projectID, topicName string) (*GCPPubSubQueue, error) {
q := &GCPPubSubQueue{ctx: ctx, c: c}
func NewGCPPubSubQueue(ctx context.Context, projectID, topicName string) (*GCPPubSubQueue, error) {
q := &GCPPubSubQueue{}

if projectID == "" {
return nil, errors.New("projectID must not be empty")
Expand Down Expand Up @@ -78,31 +77,49 @@ func NewGCPPubSubQueue(ctx context.Context, wg *sync.WaitGroup, c chan<- interfa
subName := topicName + "-" + defaultSubName

log.Printf("NewGCPPubSubQueue: creating subscription %q", subName)
subscription, err := client.CreateSubscription(cxnCtx, subName, q.topic, 0, nil)
q.subscription, err = client.CreateSubscription(cxnCtx, subName, q.topic, 0, nil)
if code := grpc.Code(err); code != codes.OK && code != codes.AlreadyExists {
return nil, errors.Wrap(err, "NewGCPPubSubQueue: could not create subscription")
}

itr, err := subscription.Pull(q.ctx)
if err != nil {
return nil, errors.Wrap(err, "GCPPubSubQueue: could not pull subscription")
}
q.subscription.ReceiveSettings.MaxOutstandingMessages = 1 // this successfully limits concurrency

// Close iterator when context closes
return q, nil
}

// Wait waits for messages on queuePush and adds them to the Pub/Sub queue.
// Upon receiving messages from Pub/Sub, f is invoked with the message. Wait
// is non-blocking, increments wg for each routine started, and when context
// is closed will mark the wg as done as routines are shutdown.
func (q GCPPubSubQueue) Wait(ctx context.Context, wg *sync.WaitGroup, queuePush <-chan interface{}, f func(interface{})) {
// Routine to add jobs to the GCP Pub/Sub Queue
wg.Add(1)
go func() {
<-q.ctx.Done()
log.Println("GCPPubSubQueue: closing")
itr.Stop()
client.Close()
for {
select {
case <-ctx.Done():
log.Println("GCPPubSubQueue job waiter exiting")
q.topic.Stop()
wg.Done()
return
case job := <-queuePush:
log.Println("GCPPubSubQueue job waiter got message, queuing...")
q.queue(ctx, job)
}
}
}()

// Routine to listen for jobs and process one at a time
wg.Add(1)
go q.listen(wg, itr)
return q, nil
go func() {
q.receive(ctx, f)
log.Println("GCPPubSubQueue job receiver exiting")
wg.Done()
}()
}

// Queue implements the Queue interface.
func (q *GCPPubSubQueue) Queue(job interface{}) error {
// queue adds a message to the queue.
func (q *GCPPubSubQueue) queue(ctx context.Context, job interface{}) error {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(container{job}); err != nil {
Expand All @@ -112,11 +129,12 @@ func (q *GCPPubSubQueue) Queue(job interface{}) error {
var (
msg = &pubsub.Message{Data: buf.Bytes()}
maxAttempts = 3
msgIDs []string
msgID string
err error
)
for i := 1; i <= maxAttempts; i++ {
msgIDs, err = q.topic.Publish(q.ctx, msg)
res := q.topic.Publish(ctx, msg)
msgID, err = res.Get(ctx)
if err == nil {
break
}
Expand All @@ -126,7 +144,7 @@ func (q *GCPPubSubQueue) Queue(job interface{}) error {
if err != nil {
return errors.Wrap(err, "GCPPubSubQueue: could not publish job")
}
log.Println("GCPPubSubQueue: published a message with a message ID:", msgIDs[0])
log.Println("GCPPubSubQueue: published a message with a message ID:", msgID)

return nil
}
Expand All @@ -135,27 +153,14 @@ type container struct {
Job interface{}
}

// listen listens for messages from queue and runs the jobs, returns when
// iterator is stopped, calls wg.Done when returning.
func (q *GCPPubSubQueue) listen(wg *sync.WaitGroup, itr *pubsub.MessageIterator) {
defer wg.Done()
for {
msg, err := itr.Next()
switch {
case err == iterator.Done:
log.Println("GCPPubSubQueue: stopping listening")
return
case err != nil:
log.Println("GCPPubSubQueue: could not read next message:", err)
time.Sleep(3 * time.Second) // back-off
continue
}
// receive calls sub.Receive, which blocks forever waiting for new jobs.
func (q *GCPPubSubQueue) receive(ctx context.Context, f func(interface{})) {
err := q.subscription.Receive(ctx, func(ctx xContext.Context, msg *pubsub.Message) {
log.Printf("GCPPubSubQueue: processing ID %v, published at %v", msg.ID, msg.PublishTime)

// Acknowledge the job now, anything else that could fail by this instance
// will fail in others.
msg.Done(true)

// will probably fail for others.
msg.Ack()
log.Printf("GCPPubSubQueue: ack'd ID %v", msg.ID)

reader := bytes.NewReader(msg.Data)
Expand All @@ -164,28 +169,31 @@ func (q *GCPPubSubQueue) listen(wg *sync.WaitGroup, itr *pubsub.MessageIterator)
var job container
if err := dec.Decode(&job); err != nil {
log.Println("GCPPubSubQueue: could not decode job:", err)
continue
return
}
log.Printf("GCPPubSubQueue: adding ID %v to job channel", msg.ID)
q.c <- job.Job
log.Printf("GCPPubSubQueue: successfully added ID %v to job queue", msg.ID)
log.Printf("GCPPubSubQueue: process ID %v", msg.ID)

f(job.Job)
})
if err != nil && err != context.Canceled {
log.Printf("GCPPubSubQueue: could not receive on subscription: %v", err)
}
}

// delete deletes the topic and subcriptions, used to cleanup unit tests
func (q *GCPPubSubQueue) delete() {
itr := q.topic.Subscriptions(q.ctx)
// delete deletes the topic and subcriptions, used to cleanup unit tests.
func (q *GCPPubSubQueue) delete(ctx context.Context) {
itr := q.topic.Subscriptions(ctx)
for {
sub, err := itr.Next()
if err != nil {
break
}
err = sub.Delete(q.ctx)
err = sub.Delete(ctx)
if err != nil {
log.Println("GCPPubSubQueue: delete subscription error:", err)
}
}
err := q.topic.Delete(q.ctx)
err := q.topic.Delete(ctx)
if err != nil {
log.Println("GCPPubSubQueue: delete topic error:", err)
}
Expand Down
39 changes: 27 additions & 12 deletions internal/queue/gcp-pubsub_test.go
Expand Up @@ -28,30 +28,47 @@ func TestGCPPubSubQueue(t *testing.T) {
wg sync.WaitGroup
c = make(chan interface{})
topic = fmt.Sprintf("%s-unit-tests-%v", defaultTopicName, time.Now().Unix())
have interface{}
)
q, err := NewGCPPubSubQueue(ctx, &wg, c, projectID, topic)
q, err := NewGCPPubSubQueue(ctx, projectID, topic)
if err != nil {
t.Fatal("unexpected error:", err)
}

f := func(job interface{}) {
have = job
}

q.Wait(ctx, &wg, c, f)

type S struct{ Job string }
gob.Register(&S{})
job := S{"unit-test-" + topic}
q.Queue(job)
c <- job

for i := 0; i < 10; i++ {
time.Sleep(1 * time.Second)
if have == nil {
continue
}

have := <-c
q.delete()
concrete, ok := have.(*S)
if !ok {
t.Fatalf("have type: %T is not %T", have, &S{})
}

concrete, ok := have.(*S)
if !ok {
t.Fatalf("have type: %T is not %T", have, &S{})
if !reflect.DeepEqual(*concrete, job) {
t.Errorf("have (concrete): %#v, want: %#v", *concrete, job)
}
}

if !reflect.DeepEqual(*concrete, job) {
t.Errorf("have (concrete): %#v, want: %#v", *concrete, job)
if have == nil {
t.Error("did not receive job from queue")
}

q.delete(ctx)
cancel()
wg.Wait()
}

func TestGCPPubSubQueue_timeout(t *testing.T) {
Expand All @@ -64,11 +81,9 @@ func TestGCPPubSubQueue_timeout(t *testing.T) {

var (
ctx = context.Background()
wg sync.WaitGroup
c = make(chan interface{})
topic = fmt.Sprintf("%s-unit-tests-%v", defaultTopicName, time.Now().Unix())
)
_, err := NewGCPPubSubQueue(ctx, &wg, c, projectID, topic)
_, err := NewGCPPubSubQueue(ctx, projectID, topic)

have := errors.Cause(err)
if want := context.DeadlineExceeded; have != want {
Expand Down

0 comments on commit b464990

Please sign in to comment.