From cadc791a110475a2d603575df5cb2dcf7876eeb3 Mon Sep 17 00:00:00 2001 From: Carlos Roman Date: Mon, 30 Jan 2017 17:14:09 +0000 Subject: [PATCH] Added direct kafka producer. --- main.go | 80 +++++++++++++++++++++++++++++++++++++++++++--------- main_test.go | 2 +- parser.go | 61 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 128 insertions(+), 15 deletions(-) create mode 100644 parser.go diff --git a/main.go b/main.go index 2f9b8f9..4594ef5 100644 --- a/main.go +++ b/main.go @@ -1,27 +1,28 @@ package main import ( - standardLog "log" - "net/http" - "os" - "strings" - queueConsumer "github.com/Financial-Times/message-queue-gonsumer/consumer" - "io" - "io/ioutil" - "os/signal" - "sync" - "syscall" - "time" - "net" "fmt" "github.com/Financial-Times/go-fthealth/v1a" "github.com/Financial-Times/http-handlers-go/httphandlers" + queueConsumer "github.com/Financial-Times/message-queue-gonsumer/consumer" status "github.com/Financial-Times/service-status-go/httphandlers" + "github.com/Shopify/sarama" log "github.com/Sirupsen/logrus" graphite "github.com/cyberdelia/go-metrics-graphite" "github.com/gorilla/mux" "github.com/jawher/mow.cli" "github.com/rcrowley/go-metrics" + "io" + "io/ioutil" + standardLog "log" + "net" + "net/http" + "os" + "os/signal" + "strings" + "sync" + "syscall" + "time" ) var httpClient = http.Client{ @@ -111,6 +112,11 @@ func main() { Value: 1000, Desc: "Throttle", EnvVar: "THROTTLE"}) + kafkaAddrs := app.String(cli.StringOpt{ + Name: "kafka_addrs", + Value: "localhost:9092", + Desc: "Kafka broker addresses", + EnvVar: "KAFKA_ADDRS"}) app.Action = func() { consumerConfig := queueConsumer.QueueConfig{ @@ -123,6 +129,30 @@ func main() { ConcurrentProcessing: true, } + saramaConsumer, err := sarama.NewConsumer([]string{*kafkaAddrs}, nil) + if err != nil { + panic(err) + } + defer func() { + if err := saramaConsumer.Close(); err != nil { + log.Fatalln(err) + } + }() + partitionConsumer, err := saramaConsumer.ConsumePartition("TestBridge", 0, sarama.OffsetNewest) + if err != nil { + panic(err) + } + + defer func() { + if err := partitionConsumer.Close(); err != nil { + log.Fatalln(err) + } + }() + + // Trap SIGINT to trigger a shutdown. + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + writerMappings := createWriterMappings(*services, *vulcanAddr) var elasticsearchWriterBasicMapping string @@ -141,6 +171,27 @@ func main() { ticker: time.NewTicker(time.Second / time.Duration(*throttle)), } + go func() { + log.Infof("Starting to consume kafka=%v and topic=%v", *kafkaAddrs, "TestBridge") + ConsumerLoop: + for { + select { + case msg := <-partitionConsumer.Messages(): + log.Info("Consumed message offset %d\n", msg.Offset) + ftMsg, err := parseMessage(msg.Value) + if err != nil { + log.Errorf("Error parsing message : %v", err.Error()) + } + if err = ing.processMessage(ftMsg); err != nil { + log.Errorf("Error processing message : %v", err.Error()) + } + + case <-signals: + break ConsumerLoop + } + } + log.Infof("Stopped consuming from kafka=%v and topic=%v", *kafkaAddrs, "TestBridge") + }() outputMetricsIfRequired(*graphiteTCPAddress, *graphitePrefix, *logMetrics) consumer := queueConsumer.NewConsumer(consumerConfig, ing.readMessage, httpClient) @@ -287,10 +338,11 @@ func extractMessageTypeAndId(headers map[string]string) (string, string) { } func sendToWriter(ingestionType string, msgBody string, uuid string, elasticWriter string) error { - + log.Infof("sendToWriter: concept=[%s] uuid=[%s]", ingestionType, uuid) request, reqURL, err := createWriteRequest(ingestionType, strings.NewReader(msgBody), uuid, elasticWriter) if err != nil { - log.Errorf("Cannot create write request: [%v]", err) + log.Errorf("Cannot read error body: [%v]", err) + return fmt.Errorf("reqURL=[%s] concept=[%s] uuid=[%s] error=[%v]", reqURL, ingestionType, uuid, err) } request.ContentLength = -1 resp, reqErr := httpClient.Do(request) diff --git a/main_test.go b/main_test.go index b6f1b5f..ce11788 100644 --- a/main_test.go +++ b/main_test.go @@ -127,7 +127,7 @@ func TestMessageProcessingUnhappyPathIncrementsFailureMeterWithElasticsearch(t * successMeterInitialCount, failureMeterInitialCount := getCounts() failureMeterInitialCountForElasticsearch := getElasticsearchCount() - ing := ingesterService{baseURLMappings: mockedWriterMappings, elasticWriterURL: server.URL+"/bulk", client: http.Client{}} + ing := ingesterService{baseURLMappings: mockedWriterMappings, elasticWriterURL: server.URL + "/bulk", client: http.Client{}} err := ing.processMessage(createMessage(uuid, validMessageTypeOrganisations)) diff --git a/parser.go b/parser.go new file mode 100644 index 0000000..d216ebb --- /dev/null +++ b/parser.go @@ -0,0 +1,61 @@ +package main + +import ( + queueConsumer "github.com/Financial-Times/message-queue-gonsumer/consumer" + "log" + "regexp" + "strings" +) + +// FT async msg format: +// +// message-version CRLF +// *(message-header CRLF) +// CRLF +// message-body +func parseMessage(raw []byte) (m queueConsumer.Message, err error) { + decoded := string(raw[:]) + doubleNewLineStartIndex := getHeaderSectionEndingIndex(string(decoded[:])) + if m.Headers, err = parseHeaders(string(decoded[:doubleNewLineStartIndex])); err != nil { + return + } + m.Body = strings.TrimSpace(string(decoded[doubleNewLineStartIndex:])) + return +} + +func getHeaderSectionEndingIndex(msg string) int { + //FT msg format uses CRLF for line endings + i := strings.Index(msg, "\r\n\r\n") + if i != -1 { + return i + } + //fallback to UNIX line endings + i = strings.Index(msg, "\n\n") + if i != -1 { + return i + } + log.Printf("WARN - message with no message body: [%s]", msg) + return len(msg) +} + +var re = regexp.MustCompile("[\\w-]*:[\\w\\-:/. ]*") + +var kre = regexp.MustCompile("[\\w-]*:") +var vre = regexp.MustCompile(":[\\w-:/. ]*") + +func parseHeaders(msg string) (map[string]string, error) { + headerLines := re.FindAllString(msg, -1) + + headers := make(map[string]string) + for _, line := range headerLines { + key, value := parseHeader(line) + headers[key] = value + } + return headers, nil +} + +func parseHeader(header string) (string, string) { + key := kre.FindString(header) + value := vre.FindString(header) + return key[:len(key)-1], strings.TrimSpace(value[1:]) +}