Skip to content
This repository has been archived by the owner on Jun 14, 2018. It is now read-only.

Commit

Permalink
Added direct kafka producer.
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosroman committed Feb 13, 2017
1 parent 94e329c commit cadc791
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 15 deletions.
80 changes: 66 additions & 14 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
package main

import (
standardLog "log"
"net/http"
"os"
"strings"
queueConsumer "github.com/Financial-Times/message-queue-gonsumer/consumer"
"io"
"io/ioutil"
"os/signal"
"sync"
"syscall"
"time"
"net"
"fmt"
"github.com/Financial-Times/go-fthealth/v1a"
"github.com/Financial-Times/http-handlers-go/httphandlers"
queueConsumer "github.com/Financial-Times/message-queue-gonsumer/consumer"
status "github.com/Financial-Times/service-status-go/httphandlers"
"github.com/Shopify/sarama"
log "github.com/Sirupsen/logrus"
graphite "github.com/cyberdelia/go-metrics-graphite"
"github.com/gorilla/mux"
"github.com/jawher/mow.cli"
"github.com/rcrowley/go-metrics"
"io"
"io/ioutil"
standardLog "log"
"net"
"net/http"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
)

var httpClient = http.Client{
Expand Down Expand Up @@ -111,6 +112,11 @@ func main() {
Value: 1000,
Desc: "Throttle",
EnvVar: "THROTTLE"})
kafkaAddrs := app.String(cli.StringOpt{
Name: "kafka_addrs",
Value: "localhost:9092",
Desc: "Kafka broker addresses",
EnvVar: "KAFKA_ADDRS"})

app.Action = func() {
consumerConfig := queueConsumer.QueueConfig{
Expand All @@ -123,6 +129,30 @@ func main() {
ConcurrentProcessing: true,
}

saramaConsumer, err := sarama.NewConsumer([]string{*kafkaAddrs}, nil)
if err != nil {
panic(err)
}
defer func() {
if err := saramaConsumer.Close(); err != nil {
log.Fatalln(err)
}
}()
partitionConsumer, err := saramaConsumer.ConsumePartition("TestBridge", 0, sarama.OffsetNewest)
if err != nil {
panic(err)
}

defer func() {
if err := partitionConsumer.Close(); err != nil {
log.Fatalln(err)
}
}()

// Trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

writerMappings := createWriterMappings(*services, *vulcanAddr)

var elasticsearchWriterBasicMapping string
Expand All @@ -141,6 +171,27 @@ func main() {
ticker: time.NewTicker(time.Second / time.Duration(*throttle)),
}

go func() {
log.Infof("Starting to consume kafka=%v and topic=%v", *kafkaAddrs, "TestBridge")
ConsumerLoop:
for {
select {
case msg := <-partitionConsumer.Messages():
log.Info("Consumed message offset %d\n", msg.Offset)
ftMsg, err := parseMessage(msg.Value)
if err != nil {
log.Errorf("Error parsing message : %v", err.Error())
}
if err = ing.processMessage(ftMsg); err != nil {
log.Errorf("Error processing message : %v", err.Error())
}

case <-signals:
break ConsumerLoop
}
}
log.Infof("Stopped consuming from kafka=%v and topic=%v", *kafkaAddrs, "TestBridge")
}()
outputMetricsIfRequired(*graphiteTCPAddress, *graphitePrefix, *logMetrics)

consumer := queueConsumer.NewConsumer(consumerConfig, ing.readMessage, httpClient)
Expand Down Expand Up @@ -287,10 +338,11 @@ func extractMessageTypeAndId(headers map[string]string) (string, string) {
}

func sendToWriter(ingestionType string, msgBody string, uuid string, elasticWriter string) error {

log.Infof("sendToWriter: concept=[%s] uuid=[%s]", ingestionType, uuid)
request, reqURL, err := createWriteRequest(ingestionType, strings.NewReader(msgBody), uuid, elasticWriter)
if err != nil {
log.Errorf("Cannot create write request: [%v]", err)
log.Errorf("Cannot read error body: [%v]", err)
return fmt.Errorf("reqURL=[%s] concept=[%s] uuid=[%s] error=[%v]", reqURL, ingestionType, uuid, err)
}
request.ContentLength = -1
resp, reqErr := httpClient.Do(request)
Expand Down
2 changes: 1 addition & 1 deletion main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestMessageProcessingUnhappyPathIncrementsFailureMeterWithElasticsearch(t *
successMeterInitialCount, failureMeterInitialCount := getCounts()
failureMeterInitialCountForElasticsearch := getElasticsearchCount()

ing := ingesterService{baseURLMappings: mockedWriterMappings, elasticWriterURL: server.URL+"/bulk", client: http.Client{}}
ing := ingesterService{baseURLMappings: mockedWriterMappings, elasticWriterURL: server.URL + "/bulk", client: http.Client{}}

err := ing.processMessage(createMessage(uuid, validMessageTypeOrganisations))

Expand Down
61 changes: 61 additions & 0 deletions parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package main

import (
queueConsumer "github.com/Financial-Times/message-queue-gonsumer/consumer"
"log"
"regexp"
"strings"
)

// FT async msg format:
//
// message-version CRLF
// *(message-header CRLF)
// CRLF
// message-body
func parseMessage(raw []byte) (m queueConsumer.Message, err error) {
decoded := string(raw[:])
doubleNewLineStartIndex := getHeaderSectionEndingIndex(string(decoded[:]))
if m.Headers, err = parseHeaders(string(decoded[:doubleNewLineStartIndex])); err != nil {
return
}
m.Body = strings.TrimSpace(string(decoded[doubleNewLineStartIndex:]))
return
}

func getHeaderSectionEndingIndex(msg string) int {
//FT msg format uses CRLF for line endings
i := strings.Index(msg, "\r\n\r\n")
if i != -1 {
return i
}
//fallback to UNIX line endings
i = strings.Index(msg, "\n\n")
if i != -1 {
return i
}
log.Printf("WARN - message with no message body: [%s]", msg)
return len(msg)
}

var re = regexp.MustCompile("[\\w-]*:[\\w\\-:/. ]*")

var kre = regexp.MustCompile("[\\w-]*:")
var vre = regexp.MustCompile(":[\\w-:/. ]*")

func parseHeaders(msg string) (map[string]string, error) {
headerLines := re.FindAllString(msg, -1)

headers := make(map[string]string)
for _, line := range headerLines {
key, value := parseHeader(line)
headers[key] = value
}
return headers, nil
}

func parseHeader(header string) (string, string) {
key := kre.FindString(header)
value := vre.FindString(header)
return key[:len(key)-1], strings.TrimSpace(value[1:])
}

0 comments on commit cadc791

Please sign in to comment.