Skip to content

Commit

Permalink
Fix kafka issue (#419)
Browse files Browse the repository at this point in the history
  • Loading branch information
nrwiersma authored and buger committed Jan 25, 2017
1 parent d66ba03 commit e726e45
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 2 deletions.
2 changes: 1 addition & 1 deletion input_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func NewKafkaInput(address string, config *KafkaConfig) *KafkaInput {

var con sarama.Consumer

if config.consumer.(*mocks.Consumer) != nil {
if mock, ok := config.consumer.(*mocks.Consumer); ok && mock != nil {
con = config.consumer
} else {
var err error
Expand Down
2 changes: 1 addition & 1 deletion output_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func NewKafkaOutput(address string, config *KafkaConfig) io.Writer {

var producer sarama.AsyncProducer

if config.producer.(*mocks.AsyncProducer) != nil {
if mock, ok := config.producer.(*mocks.AsyncProducer); ok && mock != nil {
producer = config.producer
} else {
c.Producer.RequiredAcks = sarama.WaitForLocal
Expand Down

0 comments on commit e726e45

Please sign in to comment.