Skip to content

Commit

Permalink
Support GCP PubSub queue
Browse files Browse the repository at this point in the history
  • Loading branch information
bradleyfalzon committed Dec 16, 2016
1 parent ab5a048 commit 14e5dbd
Show file tree
Hide file tree
Showing 11 changed files with 310 additions and 60 deletions.
13 changes: 13 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,16 @@ ANALYSER=docker
#DOCKER_HOST=
#DOCKER_CERT_PATH=
#DOCKER_TLS_VERIFY=

# Queuer provides a queue for sending and receiver ci jobs
# can be either: memory or gcppubsub
QUEUER=gcppubsub

# Name of the GCP Project for GCPPUBSUB
# Required if QUEUER=gcppubsub
QUEUER_GCPPUBSUB_PROJECT_ID=gopherci-dev

# Name of the GCP PubSub topic for GCPPUBSUB, it will be created if it does not
# exist.
# Optional if QUEUER=gcppubsub
QUEUER_GCPPUBSUB_TOPIC=
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ sudo: required
services:
- docker
before_install:
- openssl aes-256-cbc -K $encrypted_870f8d7239ea_key -iv $encrypted_870f8d7239ea_iv -in GopherCI-dev-81e88eb7fa6e.json.enc -out /tmp/gcloud-GopherCI-dev.json -d
- docker pull bradleyfalzon/gopherci-env:latest
- go get github.com/mattn/goveralls
- go get golang.org/x/tools/cmd/cover
Expand Down
6 changes: 6 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
- Bug fixes are fine to go straight to PR.
- Tests are encouraged, but help can be given to add them if you require.

# Skipping Tests

Some tests require external integration, such as Google Cloud credentials, you
can skip these tests using `go test -short`, and let the CI process run these
during the Pull Request CI tests.

# Development Environment

You'll need:
Expand Down
Binary file added GopherCI-dev-81e88eb7fa6e.json.enc
Binary file not shown.
2 changes: 1 addition & 1 deletion internal/github/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (g *GitHub) WebHookHandler(w http.ResponseWriter, r *http.Request) {
case *github.IntegrationInstallationEvent:
err = g.integrationInstallationEvent(e)
case *github.PullRequestEvent:
g.queuer.Queue(e)
err = g.queuer.Queue(e)
}
if err != nil {
log.Println(err)
Expand Down
160 changes: 160 additions & 0 deletions internal/queue/gcp-pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package queue

import (
"bytes"
"context"
"encoding/gob"
"log"
"time"

"github.com/google/go-github/github"
"github.com/pkg/errors"

"google.golang.org/api/iterator"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"

"cloud.google.com/go/pubsub"
)

func init() {
// List of all types that could be added to the queue
gob.Register(&github.PullRequestEvent{})
}

const (
// version should be changed each time the message format changes in an
// incompatible way. This will then cause new subscribers to listen on the
// new topic.
version = "1"
defaultSubName = "gopherci-ci-worker"
defaultTopicName = "gopherci-ci"
)

// GCPPubSubQueue is a queue using Google Compute Platform's PubSub product.
type GCPPubSubQueue struct {
ctx context.Context // stop listening when this context is cancelled
c chan<- interface{}
topic *pubsub.Topic
}

var _ Queuer = &GCPPubSubQueue{}

// NewGCPPubSubQueue creates a new Queuer and listens on the queue, sending
// new jobs to the channel c, projectID is required but topicName is optional.
func NewGCPPubSubQueue(ctx context.Context, c chan<- interface{}, projectID, topicName string) (*GCPPubSubQueue, error) {
q := &GCPPubSubQueue{ctx: ctx, c: c}

if projectID == "" {
return nil, errors.New("projectID must not be empty")
}

client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return nil, errors.Wrap(err, "NewGCPPubSubQueue: could not create client")
}

if topicName == "" {
topicName = defaultTopicName
}

log.Printf("NewGCPPubSubQueue: creating topic %q", topicName)
q.topic, err = client.CreateTopic(ctx, topicName+"-"+version)
if code := grpc.Code(err); code != codes.OK && code != codes.AlreadyExists {
return nil, errors.Wrap(err, "NewGCPPubSubQueue: could not create topic")
}

log.Printf("NewGCPPubSubQueue: creating subscription %q", defaultSubName)
subscription, err := client.CreateSubscription(ctx, defaultSubName, q.topic, 0, nil)
if code := grpc.Code(err); code != codes.OK && code != codes.AlreadyExists {
return nil, errors.Wrap(err, "NewGCPPubSubQueue: could not create subscription")
}

itr, err := subscription.Pull(q.ctx)
if err != nil {
return nil, errors.Wrap(err, "GCPPubSubQueue: could not pull subscription")
}

// Close iterator when context closes
go func() {
<-q.ctx.Done()
itr.Stop()
client.Close()
}()

go q.listen(itr)
return q, nil
}

// Queue implements the Queue interface.
func (q *GCPPubSubQueue) Queue(job interface{}) error {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(container{job}); err != nil {
return errors.Wrap(err, "GCPPubSubQueue: could not gob encode job")
}

msgIDs, err := q.topic.Publish(q.ctx, &pubsub.Message{
Data: buf.Bytes(),
})
if err != nil {
return errors.Wrap(err, "GCPPubSubQueue: could not publish job")
}
log.Println("GCPPubSubQueue: published a message with a message ID:", msgIDs[0])

return nil
}

type container struct {
Job interface{}
}

// listen listens for messages from queue and runs the jobs, returns when iterator is stopped
func (q *GCPPubSubQueue) listen(itr *pubsub.MessageIterator) {
for {
msg, err := itr.Next()
switch {
case err == iterator.Done:
log.Println("GCPPubSubQueue: stopping listening")
return
case err != nil:
log.Println("GCPPubSubQueue: could not read next message:", err)
time.Sleep(3 * time.Second) // back-off
continue
}
// Acknowledge the job now, anything else that could fail by this instance
// will fail in others.
msg.Done(true)

log.Printf("GCPPubSubQueue: processing ID %v, published at %v", msg.ID, msg.PublishTime)

reader := bytes.NewReader(msg.Data)
dec := gob.NewDecoder(reader)

var job container
if err := dec.Decode(&job); err != nil {
log.Println("GCPPubSubQueue: could not decode job:", err)
continue
}
q.c <- job.Job
}
}

// delete deletes the topic and subcriptions, used to cleanup unit tests
func (q *GCPPubSubQueue) delete() {
itr := q.topic.Subscriptions(q.ctx)
for {
sub, err := itr.Next()
if err != nil {
break
}
err = sub.Delete(q.ctx)
if err != nil {
log.Println("GCPPubSubQueue: delete subscription error:", err)
}
}
err := q.topic.Delete(q.ctx)
if err != nil {
log.Println("GCPPubSubQueue: delete topic error:", err)
}
}
53 changes: 53 additions & 0 deletions internal/queue/gcp-pubsub_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package queue

import (
"context"
"encoding/gob"
"fmt"
"reflect"
"testing"
"time"
)

// TODO read from .env
const projectID = "gopherci-dev"

func TestGCPPubSubQueue(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
// it appears some other routine maybe leaked
// by the http client
//defer leaktest.Check(t)() // ensure all goroutines exit

ctx, cancelFunc := context.WithCancel(context.Background())

topic := fmt.Sprintf("%s-unit-tests-%v", defaultTopicName, time.Now().Unix())
c := make(chan interface{})
q, err := NewGCPPubSubQueue(ctx, c, projectID, topic)
if err != nil {
t.Fatal("unexpected error:", err)
}

type S struct{ Job int64 }
gob.Register(&S{})

job := S{time.Now().Unix()}
q.Queue(job)

have := <-c

// Remove the test queues
q.delete()

concrete, ok := have.(*S)
if !ok {
t.Fatalf("have type: %T is not %T", have, &S{})
}

if !reflect.DeepEqual(*concrete, job) {
t.Errorf("concrete: %#v, want: %#v", concrete, job)
}

cancelFunc()
}
57 changes: 57 additions & 0 deletions internal/queue/memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package queue

import (
"context"
"log"
"sync"
"time"
)

// MemoryQueue is an in memory queue of infinite size.
type MemoryQueue struct {
ctx context.Context // stop listening when this context is cancelled
c chan<- interface{}
mu sync.Mutex // protects queue
queue []interface{}
}

var _ Queuer = &MemoryQueue{}

// TODO
func NewMemoryQueue(ctx context.Context, c chan<- interface{}) *MemoryQueue {
q := &MemoryQueue{ctx: ctx, c: c}
go q.listen()
return q
}

// Queue implements the Queue interface.
func (q *MemoryQueue) Queue(job interface{}) error {
q.mu.Lock()
q.queue = append(q.queue, job)
q.mu.Unlock()
return nil
}

// listen polls the queue for new jobs and sends them on the pop channel.
func (q *MemoryQueue) listen() {
ticker := time.NewTicker(time.Second) // poll interval
for {
select {
case <-q.ctx.Done():
log.Println("listen stopping")
ticker.Stop()
return
case <-ticker.C:
if len(q.queue) == 0 {
break
}
// queue the next item
var job interface{}
q.mu.Lock()
job, q.queue = q.queue[len(q.queue)-1], q.queue[:len(q.queue)-1]
q.mu.Unlock()
// this could block for a long time, we're ok with that
q.c <- job
}
}
}
File renamed without changes.
57 changes: 0 additions & 57 deletions internal/queue/queue.go
Original file line number Diff line number Diff line change
@@ -1,64 +1,7 @@
package queue

import (
"context"
"log"
"sync"
"time"
)

// Queuer pushes jobs onto a queue and pushes the next job on the provided
// channel.
type Queuer interface {
Queue(interface{}) error
}

// MemoryQueue is an in memory queue of infinite size.
type MemoryQueue struct {
ctx context.Context // stop listening when this context is cancelled
c chan<- interface{}
mu sync.Mutex // protects queue
queue []interface{}
}

var _ Queuer = &MemoryQueue{}

// NewMemoryQueue sets the channel that receives the next job on the queue. A
// buffered or unbuffered channel maybe used.
func NewMemoryQueue(ctx context.Context, c chan<- interface{}) *MemoryQueue {
q := &MemoryQueue{ctx: ctx, c: c}
go q.listen()
return q
}

// Queue implements the Queue interface.
func (q *MemoryQueue) Queue(job interface{}) error {
q.mu.Lock()
q.queue = append(q.queue, job)
q.mu.Unlock()
return nil
}

// listen polls the queue for new jobs and sends them on the pop channel.
func (q *MemoryQueue) listen() {
ticker := time.NewTicker(time.Second) // poll interval
for {
select {
case <-q.ctx.Done():
log.Println("listen stopping")
ticker.Stop()
return
case <-ticker.C:
if len(q.queue) == 0 {
break
}
// queue the next item
var job interface{}
q.mu.Lock()
job, q.queue = q.queue[len(q.queue)-1], q.queue[:len(q.queue)-1]
q.mu.Unlock()
// this could block for a long time, we're ok with that
q.c <- job
}
}
}

0 comments on commit 14e5dbd

Please sign in to comment.