From d6dcb4eab449f73fbcf4dd269744232c460d4b9c Mon Sep 17 00:00:00 2001 From: bpathak-ons Date: Wed, 20 Oct 2021 15:20:23 +0100 Subject: [PATCH] release review updates --- cmd/producer/main.go | 8 ++++---- event/consumer_test.go | 11 +---------- event/producer_test.go | 2 -- handler/handler.go | 3 +-- handler/handler_test.go | 2 -- service/service.go | 2 +- 6 files changed, 7 insertions(+), 21 deletions(-) diff --git a/cmd/producer/main.go b/cmd/producer/main.go index 71c69aa..0dcb726 100644 --- a/cmd/producer/main.go +++ b/cmd/producer/main.go @@ -21,7 +21,7 @@ func main() { ctx := context.Background() // Get Config - config, err := config.Get() + cfg, err := config.Get() if err != nil { log.Fatal(ctx, "error getting config", err) os.Exit(1) @@ -29,11 +29,11 @@ func main() { // Create Kafka Producer pChannels := kafka.CreateProducerChannels() - kafkaProducer, err := kafka.NewProducer(ctx, config.KafkaAddr, config.ContentPublishedTopic, pChannels, &kafka.ProducerConfig{ - KafkaVersion: &config.KafkaVersion, + kafkaProducer, err := kafka.NewProducer(ctx, cfg.KafkaAddr, cfg.ContentPublishedTopic, pChannels, &kafka.ProducerConfig{ + KafkaVersion: &cfg.KafkaVersion, }) if err != nil { - log.Fatal(ctx, "fatal error trying to create kafka producer", err, log.Data{"topic": config.ContentPublishedTopic}) + log.Fatal(ctx, "fatal error trying to create kafka producer", err, log.Data{"topic": cfg.ContentPublishedTopic}) os.Exit(1) } diff --git a/event/consumer_test.go b/event/consumer_test.go index 7ad381f..072ad02 100644 --- a/event/consumer_test.go +++ b/event/consumer_test.go @@ -6,9 +6,8 @@ import ( "sync" "testing" - "github.com/ONSdigital/dp-search-data-extractor/config" - kafka "github.com/ONSdigital/dp-kafka/v2" + "github.com/ONSdigital/dp-search-data-extractor/config" "github.com/ONSdigital/dp-kafka/v2/kafkatest" "github.com/ONSdigital/dp-search-data-extractor/event" "github.com/ONSdigital/dp-search-data-extractor/event/mock" @@ -27,14 +26,6 @@ var testEvent = models.ContentPublished{ CollectionID: "Col123", } -// kafkaStubConsumer mock which exposes Channels function returning empty channels -// to be used on tests that are not supposed to receive any kafka message -var kafkaStubConsumer = &kafkatest.IConsumerGroupMock{ - ChannelsFunc: func() *kafka.ConsumerGroupChannels { - return &kafka.ConsumerGroupChannels{} - }, -} - func TestConsume(t *testing.T) { Convey("Given kafka consumer and event handler mocks", t, func() { diff --git a/event/producer_test.go b/event/producer_test.go index abb9251..5420f0d 100644 --- a/event/producer_test.go +++ b/event/producer_test.go @@ -13,7 +13,6 @@ import ( "github.com/ONSdigital/dp-search-data-extractor/event/mock" "github.com/ONSdigital/dp-search-data-extractor/models" "github.com/ONSdigital/dp-search-data-extractor/schema" - . "github.com/smartystreets/goconvey/convey" ) @@ -35,7 +34,6 @@ var ( ) func TestProducer_SearchDataImport(t *testing.T) { - Convey("Given SearchDataImportProducer has been configured correctly", t, func() { pChannels := &kafka.ProducerChannels{ diff --git a/handler/handler.go b/handler/handler.go index 25c19dd..e07ab47 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -5,7 +5,6 @@ import ( "encoding/json" "github.com/ONSdigital/dp-net/request" - "github.com/ONSdigital/dp-search-data-extractor/clients" "github.com/ONSdigital/dp-search-data-extractor/config" "github.com/ONSdigital/dp-search-data-extractor/event" @@ -13,7 +12,7 @@ import ( "github.com/ONSdigital/log.go/v2/log" ) -// ContentPublishedHandler +// ContentPublishedHandler struct to hold handle for zebedee client and the producer type ContentPublishedHandler struct { ZebedeeCli clients.ZebedeeClient Producer event.SearchDataImportProducer diff --git a/handler/handler_test.go b/handler/handler_test.go index 1606f60..10fd910 100644 --- a/handler/handler_test.go +++ b/handler/handler_test.go @@ -90,7 +90,6 @@ func TestHandlerForZebedeeReturningMandatoryFields(t *testing.T) { case avroBytes = <-pChannels.Output: t.Log("avro byte sent to producer output") case <-time.After(testTimeout): - t.Fatalf("failing test due to timing out after %v seconds", testTimeout) t.FailNow() } @@ -188,7 +187,6 @@ func TestHandlerForZebedeeReturningAllFields(t *testing.T) { case avroBytes = <-pChannels.Output: t.Log("avro byte sent to producer output") case <-time.After(testTimeout): - t.Fatalf("failing test due to timing out after %v seconds", testTimeout) t.FailNow() } diff --git a/service/service.go b/service/service.go index b5527e7..557f27f 100644 --- a/service/service.go +++ b/service/service.go @@ -132,7 +132,7 @@ func (svc *Service) Close(ctx context.Context) error { hasShutdownError = true } - // If kafka producder exists, close it. + // If kafka producer exists, close it. if svc.serviceList.KafkaProducer { log.Info(ctx, "closing kafka producer") if err := svc.producer.Close(ctx); err != nil {