Skip to content

Commit

Permalink
Adding configs and pubsub implementations for GCP plus additional con…
Browse files Browse the repository at this point in the history
…text endpoints and service types (#61)

* adding support for go-metrics/exp

* go-metrics out, kit/metrics in. New config.Metrics type manage providers

* removing nop Metrics and using kit/metrics/discard

* adding metrics env load, updating examples to use kit/metrics and adding graphite host again as deprecated

* [wip] adding initial GCP pubsub implementations

* fixing gcp pubsub stop

* adding datastore config and server context handlers

* adding ctx to pubsub

* adding gcp envconfig tags

* more context mgmt

* using context by default in pubsub

* fixing pubsub tests, updating docs

* go-metrics out, kit/metrics in. New config.Metrics type manage providers

* adding metrics env load, updating examples to use kit/metrics and adding graphite host again as deprecated

* [wip] adding initial GCP pubsub implementations

* fixing gcp pubsub stop

* adding datastore config and server context handlers

* adding ctx to pubsub

* adding gcp envconfig tags

* more context mgmt

* using context by default in pubsub

* fixing pubsub tests, updating docs

* filling in docs for pubsub/gcp

* adding tests for pubsub/gcp

* adding GCP to config.Config

* context should be recreated fresh on each request

* updating docs

* more doc updates
  • Loading branch information
jprobinson committed Jun 13, 2016
1 parent 53c5ab2 commit 46437ab
Show file tree
Hide file tree
Showing 20 changed files with 661 additions and 43 deletions.
45 changes: 33 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ This package contains a handful of structs meant for managing common configurati
* MongoDB
* Oracle
* AWS (SNS, SQS, S3, DynamoDB, ElastiCache)
* GCP
* Kafka
* Gorilla's `securecookie`
* Gizmo Servers
Expand Down Expand Up @@ -80,15 +81,6 @@ type JSONService interface {
JSONMiddleware(JSONEndpoint) JSONEndpoint
}

type ContextService interface {
Service

// route - method - func
ContextEndpoints() map[string]map[string]ContextHandlerFunc
// ContextMiddleware provides a hook for service-wide middleware around ContextHandler
ContextMiddleware(ContextHandler) ContextHandler
}

type MixedService interface {
Service

Expand All @@ -102,13 +94,40 @@ type MixedService interface {
// JSONMiddleware provides a hook for service-wide middleware around JSONEndpoints.
JSONMiddleware(JSONEndpoint) JSONEndpoint
}

type ContextService interface {
Service

// route - method - func
ContextEndpoints() map[string]map[string]ContextHandlerFunc
// ContextMiddleware provides a hook for service-wide middleware around ContextHandler
ContextMiddleware(ContextHandler) ContextHandler
}

type JSONContextService interface {
ContextService

// route - method - func
JSONEndpoints() map[string]map[string]JSONContextEndpoint
JSONContextMiddleware(JSONContextEndpoint) JSONContextEndpoint
}

type MixedContextService interface {
ContextService

// route - method - func
JSONEndpoints() map[string]map[string]JSONContextEndpoint
JSONContextMiddleware(JSONContextEndpoint) JSONContextEndpoint
}
```

Where a `JSONEndpoint`, `ContextHandler` and `ContextHandlerFunc` are defined as:
Where `JSONEndpoint`, `JSONContextEndpoint`, `ContextHandler` and `ContextHandlerFunc` are defined as:

```go
type JSONEndpoint func(*http.Request) (int, interface{}, error)

type JSONContextEndpoint func(context.Context, *http.Request) (int, interface{}, error)

type ContextHandler interface {
ServeHTTPContext(context.Context, http.ResponseWriter, *http.Request)
}
Expand Down Expand Up @@ -145,9 +164,9 @@ This package contains two generic interfaces for publishing data to queues and s
// to emit protobufs.
type Publisher interface {
// Publish will publish a message.
Publish(string, proto.Message) error
Publish(ctx context.Context, key string, msg proto.Message) error
// Publish will publish a []byte message.
PublishRaw(string, []byte) error
PublishRaw(ctx context.Context, key string, msg []byte) error
}

// Subscriber is a generic interface to encapsulate how we want our subscribers
Expand All @@ -170,6 +189,8 @@ There are currently 2 implementations of each type of `pubsub` interfaces:

For pubsub via Amazon's SNS/SQS, you can use the `SNSPublisher` and the `SQSSubscriber`.

For pubsub via Google's Pubsub, you can use the `GCPPublisher` and the `GCPSubscriber`.

For pubsub via Kafka topics, you can use the `KakfaPublisher` and the `KafkaSubscriber`.

## The `pubsub/pubsubtest` package
Expand Down
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type (

Kafka *Kafka

GCP GCP
PubSub PubSub

Oracle *Oracle

MySQL *MySQL
Expand Down Expand Up @@ -81,6 +84,8 @@ func LoadConfigFromEnv() *Config {
app.Cookie = LoadCookieFromEnv()
app.Server = LoadServerFromEnv()
app.Metrics = LoadMetricsFromEnv()
app.PubSub = LoadPubSubFromEnv()
app.GCP = LoadGCPFromEnv()
return &app
}

Expand Down
107 changes: 107 additions & 0 deletions config/gcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package config

import (
"io/ioutil"
"log"

"golang.org/x/net/context"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/compute/v1"
"google.golang.org/cloud"
)

type (

// GCP holds common Google Cloud Platform credentials.
GCP struct {
ProjectID string `envconfig:"GCP_PROJECT_ID" json:"GCP_PROJECT_ID"`

// JSONAuthPath points to a file containing a JWT JSON config.
// This is meant to be a fall back for development environments.
JSONAuthPath string `envconfig:"GCP_JSON_AUTH_PATH" json:"GCP_JSON_AUTH_PATH"`

// Token is a JWT JSON config and may be needed for container
// environments.
Token string `envconfig:"GCP_AUTH_TOKEN" json:"GCP_AUTH_TOKEN"`
}

// PubSub holds common credentials and config values for
// working with GCP PubSub.
PubSub struct {
GCP

// For publishing
Topic string `envconfig:"GCP_PUBSUB_TOPIC" json:"GCP_PUBSUB_TOPIC"`
// For subscribing
Subscription string `envconfig:"GCP_PUBSUB_SUBSCRIPTION" json:"GCP_PUBSUB_SUBSCRIPTION"`
}
)

// LoadGCPFromEnv will attempt to load a GCP config
// from environment variables.
func LoadGCPFromEnv() GCP {
var gcp GCP
LoadEnvConfig(&gcp)
return gcp
}

// LoadPubSubFromEnv will attempt to load a PubSub config
// from environment variables.
func LoadPubSubFromEnv() PubSub {
var ps PubSub
LoadEnvConfig(&ps)
return ps
}

// NewContext will check attempt to create a new context from
// a the Token or JSONAuthPath fields if provided, otherwise
// google.DefaultClient will be used.
func (g GCP) NewContext(scopes ...string) (context.Context, error) {
if len(g.Token) > 0 {
return g.contextFromToken(scopes...)
}

if len(g.JSONAuthPath) > 0 {
return g.contextFromJSON(scopes...)
}

if len(scopes) == 0 {
scopes = append(scopes, compute.ComputeScope)
}

client, err := google.DefaultClient(oauth2.NoContext, scopes...)
if err != nil {
return nil, err
}
return cloud.NewContext(g.ProjectID, client), nil
}

func (g GCP) contextFromToken(scopes ...string) (context.Context, error) {
conf, err := google.JWTConfigFromJSON(
[]byte(g.Token),
scopes...,
)
if err != nil {
log.Print("probs with token:", g.Token)
return nil, err
}

return cloud.NewContext(g.ProjectID, conf.Client(oauth2.NoContext)), nil
}

func (g GCP) contextFromJSON(scopes ...string) (context.Context, error) {
jsonKey, err := ioutil.ReadFile(g.JSONAuthPath)
if err != nil {
return nil, err
}
conf, err := google.JWTConfigFromJSON(
jsonKey,
scopes...,
)
if err != nil {
return nil, err
}

return cloud.NewContext(g.ProjectID, conf.Client(oauth2.NoContext)), nil
}
1 change: 1 addition & 0 deletions config/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
// Metrics config can be used to configure and instantiate a new
// go-kit/kit/metrics/provider.Provider.
type Metrics struct {
// if empty, will server default to "expvar"
Type MetricsType `envconfig:"METRICS_TYPE"`

// Prefix will be prefixed onto
Expand Down
39 changes: 36 additions & 3 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,43 @@ The 3 service types that are accepted and hostable on the `SimpleServer`:
JSONMiddleware(JSONEndpoint) JSONEndpoint
}
Where a `JSONEndpoint` is defined as:
type ContextService interface {
Service
// route - method - func
ContextEndpoints() map[string]map[string]ContextHandlerFunc
// ContextMiddleware provides a hook for service-wide middleware around ContextHandler
ContextMiddleware(ContextHandler) ContextHandler
}
type JSONContextService interface {
ContextService
// route - method - func
JSONEndpoints() map[string]map[string]JSONContextEndpoint
JSONContextMiddleware(JSONContextEndpoint) JSONContextEndpoint
}
type MixedContextService interface {
ContextService
// route - method - func
JSONEndpoints() map[string]map[string]JSONContextEndpoint
JSONContextMiddleware(JSONContextEndpoint) JSONContextEndpoint
}
Where `JSONEndpoint`, `JSONContextEndpoint`, `ContextHandler` and `ContextHandlerFunc` are defined as:
type JSONEndpoint func(*http.Request) (int, interface{}, error)
type JSONContextEndpoint func(context.Context, *http.Request) (int, interface{}, error)
type ContextHandler interface {
ServeHTTPContext(context.Context, http.ResponseWriter, *http.Request)
}
type ContextHandlerFunc func(context.Context, http.ResponseWriter, *http.Request)
Also, the one service type that works with an `RPCServer`:
type RPCService interface {
Expand All @@ -115,9 +148,9 @@ This package contains two generic interfaces for publishing data to queues and s
// to emit protobufs.
type Publisher interface {
// Publish will publish a message.
Publish(string, proto.Message) error
Publish(ctx context.Context, key string, msg proto.Message) error
// Publish will publish a []byte message.
PublishRaw(string, []byte) error
PublishRaw(ctx context.Context, key string, msg []byte) error
}
// Subscriber is a generic interface to encapsulate how we want our subscribers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (s *StreamService) Stream(w http.ResponseWriter, r *http.Request) {
server.LogWithFields(r).WithField("error", err).Error("unable to read payload")
return
}
err = pub.PublishRaw(cfg.Topic, payload)
err = pub.PublishRaw(nil, cfg.Topic, payload)
if err != nil {
server.LogWithFields(r).WithField("error", err).Error("unable to publish payload")
return
Expand Down
2 changes: 1 addition & 1 deletion examples/pubsub/api-sns-pub/service/cats.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func (s *JSONPubService) PublishCats(r *http.Request) (int, interface{}, error)
return http.StatusInternalServerError, nil, err
}

err = s.pub.Publish(catArticle.Url, &catArticle)
err = s.pub.Publish(nil, catArticle.Url, &catArticle)
if err != nil {
return http.StatusInternalServerError, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion examples/pubsub/cli-sns-pub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func main() {
Url: "http://www.nytimes.com/2015/11/25/its-a-cat-world",
}

err = pub.Publish(catArticle.Url, catArticle)
err = pub.Publish(nil, catArticle.Url, catArticle)
if err != nil {
pubsub.Log.WithFields(logrus.Fields{
"error": err,
Expand Down
8 changes: 5 additions & 3 deletions pubsub/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync/atomic"
"time"

"golang.org/x/net/context"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
Expand Down Expand Up @@ -57,18 +59,18 @@ func NewSNSPublisher(cfg *config.SNS) (*SNSPublisher, error) {

// Publish will marshal the proto message and emit it to the SNS topic.
// The key will be used as the SNS message subject.
func (p *SNSPublisher) Publish(key string, m proto.Message) error {
func (p *SNSPublisher) Publish(ctx context.Context, key string, m proto.Message) error {
mb, err := proto.Marshal(m)
if err != nil {
return err
}

return p.PublishRaw(key, mb)
return p.PublishRaw(ctx, key, mb)
}

// PublishRaw will emit the byte array to the SNS topic.
// The key will be used as the SNS message subject.
func (p *SNSPublisher) PublishRaw(key string, m []byte) error {
func (p *SNSPublisher) PublishRaw(_ context.Context, key string, m []byte) error {
msg := &sns.PublishInput{
TopicArn: &p.topic,
Subject: &key,
Expand Down
3 changes: 2 additions & 1 deletion pubsub/awspub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
)

func TestSNSPublisher(t *testing.T) {
Expand All @@ -17,7 +18,7 @@ func TestSNSPublisher(t *testing.T) {

test1Key := "yo!"
test1 := &TestProto{"hi there!"}
err := pub.Publish(test1Key, test1)
err := pub.Publish(context.Background(), test1Key, test1)
if err != nil {
t.Error("Publish returned an unexpected error: ", err)
}
Expand Down
6 changes: 4 additions & 2 deletions pubsub/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ Package pubsub contains two generic interfaces for publishing data to queues and
// to emit protobufs.
type Publisher interface {
// Publish will publish a message.
Publish(string, proto.Message) error
Publish(ctx context.Context, key string, msg proto.Message) error
// Publish will publish a []byte message.
PublishRaw(string, []byte) error
PublishRaw(ctx context.Context, key string, msg []byte) error
}
// Subscriber is a generic interface to encapsulate how we want our subscribers
Expand All @@ -30,6 +30,8 @@ There are currently 2 implementations of each type of `pubsub` interfaces:
For pubsub via Amazon's SNS/SQS, you can use the `SNSPublisher` and the `SQSSubscriber`.
For pubsub via GCP's Pubsub, you can use the `GCPPublisher` and the `GCPSubscriber`.
For pubsub via Kafka topics, you can use the `KakfaPublisher` and the `KafkaSubscriber`.
*/
package pubsub
Loading

0 comments on commit 46437ab

Please sign in to comment.