Skip to content

Commit

Permalink
release review updates
Browse files Browse the repository at this point in the history
  • Loading branch information
bpathak-ons committed Oct 20, 2021
1 parent 9d7583e commit d6dcb4e
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 21 deletions.
8 changes: 4 additions & 4 deletions cmd/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ func main() {
ctx := context.Background()

// Get Config
config, err := config.Get()
cfg, err := config.Get()
if err != nil {
log.Fatal(ctx, "error getting config", err)
os.Exit(1)
}

// Create Kafka Producer
pChannels := kafka.CreateProducerChannels()
kafkaProducer, err := kafka.NewProducer(ctx, config.KafkaAddr, config.ContentPublishedTopic, pChannels, &kafka.ProducerConfig{
KafkaVersion: &config.KafkaVersion,
kafkaProducer, err := kafka.NewProducer(ctx, cfg.KafkaAddr, cfg.ContentPublishedTopic, pChannels, &kafka.ProducerConfig{
KafkaVersion: &cfg.KafkaVersion,
})
if err != nil {
log.Fatal(ctx, "fatal error trying to create kafka producer", err, log.Data{"topic": config.ContentPublishedTopic})
log.Fatal(ctx, "fatal error trying to create kafka producer", err, log.Data{"topic": cfg.ContentPublishedTopic})
os.Exit(1)
}

Expand Down
11 changes: 1 addition & 10 deletions event/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ import (
"sync"
"testing"

"github.com/ONSdigital/dp-search-data-extractor/config"

kafka "github.com/ONSdigital/dp-kafka/v2"
"github.com/ONSdigital/dp-search-data-extractor/config"
"github.com/ONSdigital/dp-kafka/v2/kafkatest"
"github.com/ONSdigital/dp-search-data-extractor/event"
"github.com/ONSdigital/dp-search-data-extractor/event/mock"
Expand All @@ -27,14 +26,6 @@ var testEvent = models.ContentPublished{
CollectionID: "Col123",
}

// kafkaStubConsumer mock which exposes Channels function returning empty channels
// to be used on tests that are not supposed to receive any kafka message
var kafkaStubConsumer = &kafkatest.IConsumerGroupMock{
ChannelsFunc: func() *kafka.ConsumerGroupChannels {
return &kafka.ConsumerGroupChannels{}
},
}

func TestConsume(t *testing.T) {

Convey("Given kafka consumer and event handler mocks", t, func() {
Expand Down
2 changes: 0 additions & 2 deletions event/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/ONSdigital/dp-search-data-extractor/event/mock"
"github.com/ONSdigital/dp-search-data-extractor/models"
"github.com/ONSdigital/dp-search-data-extractor/schema"

. "github.com/smartystreets/goconvey/convey"
)

Expand All @@ -35,7 +34,6 @@ var (
)

func TestProducer_SearchDataImport(t *testing.T) {

Convey("Given SearchDataImportProducer has been configured correctly", t, func() {

pChannels := &kafka.ProducerChannels{
Expand Down
3 changes: 1 addition & 2 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ import (
"encoding/json"

"github.com/ONSdigital/dp-net/request"

"github.com/ONSdigital/dp-search-data-extractor/clients"
"github.com/ONSdigital/dp-search-data-extractor/config"
"github.com/ONSdigital/dp-search-data-extractor/event"
"github.com/ONSdigital/dp-search-data-extractor/models"
"github.com/ONSdigital/log.go/v2/log"
)

// ContentPublishedHandler
// ContentPublishedHandler struct to hold handle for zebedee client and the producer
type ContentPublishedHandler struct {
ZebedeeCli clients.ZebedeeClient
Producer event.SearchDataImportProducer
Expand Down
2 changes: 0 additions & 2 deletions handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func TestHandlerForZebedeeReturningMandatoryFields(t *testing.T) {
case avroBytes = <-pChannels.Output:
t.Log("avro byte sent to producer output")
case <-time.After(testTimeout):
t.Fatalf("failing test due to timing out after %v seconds", testTimeout)
t.FailNow()
}

Expand Down Expand Up @@ -188,7 +187,6 @@ func TestHandlerForZebedeeReturningAllFields(t *testing.T) {
case avroBytes = <-pChannels.Output:
t.Log("avro byte sent to producer output")
case <-time.After(testTimeout):
t.Fatalf("failing test due to timing out after %v seconds", testTimeout)
t.FailNow()
}

Expand Down
2 changes: 1 addition & 1 deletion service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (svc *Service) Close(ctx context.Context) error {
hasShutdownError = true
}

// If kafka producder exists, close it.
// If kafka producer exists, close it.
if svc.serviceList.KafkaProducer {
log.Info(ctx, "closing kafka producer")
if err := svc.producer.Close(ctx); err != nil {
Expand Down

0 comments on commit d6dcb4e

Please sign in to comment.