Skip to content

Commit

Permalink
allowing publisher to read from one project and publish to another
Browse files Browse the repository at this point in the history
  • Loading branch information
jprobinson committed Mar 29, 2018
1 parent 610de4b commit 2ee886c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
4 changes: 4 additions & 0 deletions pubsub/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"strings"

"github.com/NYTimes/gizmo/config"
"github.com/Shopify/sarama"
)

// Config holds the basic information for working with Kafka.
Expand All @@ -18,6 +19,9 @@ type Config struct {
Topic string `envconfig:"KAFKA_TOPIC"`

MaxRetry int `envconfig:"KAFKA_MAX_RETRY"`

// Config is a sarama config struct for more control over the underlying Kafka client.
Config *sarama.Config
}

// LoadConfigFromEnv will attempt to load an Kafka object
Expand Down
16 changes: 12 additions & 4 deletions pubsub/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ func NewPublisher(cfg *Config) (pubsub.Publisher, error) {
}
p.topic = cfg.Topic

sconfig := sarama.NewConfig()
sconfig.Producer.Retry.Max = cfg.MaxRetry
sconfig.Producer.RequiredAcks = RequiredAcks
sconfig := cfg.Config
if sconfig == nil {
sconfig = sarama.NewConfig()
sconfig.Producer.Retry.Max = cfg.MaxRetry
sconfig.Producer.RequiredAcks = RequiredAcks
}
// we always want successes to return
sconfig.Producer.Return.Successes = true
p.producer, err = sarama.NewSyncProducer(cfg.BrokerHosts, sconfig)
return p, err
Expand Down Expand Up @@ -130,7 +134,11 @@ func NewSubscriber(cfg *Config, offsetProvider func() int64, offsetBroadcast fun
}
s.topic = cfg.Topic

sconfig := sarama.NewConfig()
sconfig := cfg.Config
if sconfig == nil {
sconfig = sarama.NewConfig()
}
// we always want to see errors, no matter what
sconfig.Consumer.Return.Errors = true
s.cnsmr, err = sarama.NewConsumer(cfg.BrokerHosts, sconfig)
return s, err
Expand Down

0 comments on commit 2ee886c

Please sign in to comment.