Skip to content

Commit

Permalink
more context mgmt
Browse files Browse the repository at this point in the history
  • Loading branch information
jprobinson committed May 27, 2016
1 parent 688c6c5 commit cec21d2
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 8 deletions.
29 changes: 29 additions & 0 deletions config/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"io/ioutil"
"log"

"golang.org/x/net/context"
"golang.org/x/oauth2"
Expand All @@ -17,6 +18,8 @@ type (
ProjectID string `envconfig:"GCP_PROJECT_ID"`

JSONAuthPath string `envconfig:"GCP_JSON_AUTH_PATH"`

Token string `envconfig:"GCP_AUTH_TOKEN"`
}

PubSub struct {
Expand All @@ -38,6 +41,14 @@ func LoadPubSubFromEnv() PubSub {
return ps
}

// LoadDatastoreFromEnv will attempt to load a Metrics object
// from environment variables.
func LoadDatastoreFromEnv() Datastore {
var ds Datastore
LoadEnvConfig(&ds)
return ds
}

func (d Datastore) NewContext() (context.Context, error) {
return d.GCP.NewContext(datastore.ScopeDatastore)
}
Expand All @@ -51,10 +62,15 @@ func (p PubSub) NewContext() (context.Context, error) {
}

func (g GCP) NewContext(scopes ...string) (context.Context, error) {
log.Printf("gcp conf? %#v", g)
if len(g.JSONAuthPath) > 0 {
return g.contextFromJSON(scopes...)
}

if len(g.Token) > 0 {
return g.contextFromToken(scopes...)
}

if len(scopes) == 0 {
scopes = append(scopes, compute.ComputeScope)
}
Expand All @@ -66,6 +82,19 @@ func (g GCP) NewContext(scopes ...string) (context.Context, error) {
return cloud.NewContext(g.ProjectID, client), nil
}

func (g GCP) contextFromToken(scopes ...string) (context.Context, error) {
conf, err := google.JWTConfigFromJSON(
[]byte(g.Token),
scopes...,
)
if err != nil {
log.Print("probs with token:", g.Token)
return nil, err
}

return cloud.NewContext(g.ProjectID, conf.Client(oauth2.NoContext)), nil
}

func (g GCP) contextFromJSON(scopes ...string) (context.Context, error) {
jsonKey, err := ioutil.ReadFile(g.JSONAuthPath)
if err != nil {
Expand Down
15 changes: 7 additions & 8 deletions pubsub/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
)

type GCPSubscriber struct {
handle *pubsub.SubscriptionHandle
ctx context.Context
name string
sub *pubsub.Subscription
ctx context.Context
name string

stop chan chan error
err error
Expand All @@ -24,9 +24,9 @@ func NewGCPSubscriber(ctx context.Context, cfg config.PubSub) (Subscriber, error
return nil, err
}
return &GCPSubscriber{
handle: client.Subscription(cfg.Subscription),
ctx: ctx,
name: cfg.Subscription,
sub: client.Subscription(cfg.Subscription),
ctx: ctx,
name: cfg.Subscription,
}, nil
}

Expand All @@ -45,7 +45,7 @@ func (s *GCPSubscriber) Start() <-chan SubscriberMessage {
err error
)

iter, err = s.handle.Pull(s.ctx, defaultGCPMaxMessages, defaultGCPMaxExtension)
iter, err = s.sub.Pull(s.ctx, defaultGCPMaxMessages, defaultGCPMaxExtension)
if err != nil {
s.err = err
return
Expand Down Expand Up @@ -116,7 +116,6 @@ type GCPPublisher struct {
func NewGCPPublisher(ctx context.Context, cfg config.PubSub) (Publisher, error) {

return &GCPPublisher{
// handle: client.Topic(cfg.Topic),
topic: cfg.Topic,
ctx: ctx,
}, nil
Expand Down

0 comments on commit cec21d2

Please sign in to comment.