Skip to content

Commit

Permalink
set up consumergroup per instance
Browse files Browse the repository at this point in the history
  • Loading branch information
BlakeHolifield committed Jun 26, 2023
1 parent 12a5dee commit 09ed3fd
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 79 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
name: Go package

on:
# These will only run from a known contributor
push:
branches:
- main
# This can be a security risk, but our vars are container defaults with no data
pull_request_target:

jobs:
Expand Down Expand Up @@ -63,7 +65,7 @@ jobs:

- name: 'Create env file'
run: |
echo "${{ secrets.ENV_FILE }}" > .env
echo "${{ env.ENV_FILE }}" > .env
- name: Test
run: make test
4 changes: 3 additions & 1 deletion cmd/kafka/testMessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ func main() {
Topic: cfg.KafkaConfig.KafkaTopics[0],
Balancer: &kafka.LeastBytes{},
})

defer kafkaWriter.Close()

body := `{
Expand All @@ -45,6 +44,9 @@ func main() {
Value: []byte(body),
}

fmt.Println("Targeting Topic: ", kafkaWriter.Topic)
fmt.Println("Sending message", body)

err := kafkaWriter.WriteMessages(context.TODO(), msg)
if err != nil {
log.Fatalf("Could not write message due to error : %v", err)
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func init() {
options.DbSSLMode = "disable"
options.DbSSLRootCert = ""
options.KafkaConfig = KafkaCfg{
KafkaTopics: []string{"platform-chrome"},
KafkaTopics: []string{"platform.chrome"},
KafkaBrokers: []string{"localhost:9092"},
}

Expand Down
2 changes: 1 addition & 1 deletion deploy/clowdapp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ objects:
kafkaTopics:
# one replica for now; most of this will change
- replicas: 1
partitions: 64
partitions: 8
topicName: platform.chrome
deployments:
- name: api
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-sql-driver/mysql v1.7.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.13.0 // indirect
github.com/jackc/pgio v1.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func main() {
subrouter.Route("/emit-message", routes.BroadcastMessage)
})

// We might want to setup some event listeners at some point, but the pod will
// We might want to set up some event listeners at some point, but the pod will
// have to restart for these to take effect. We can't enable and disable websockets on the fly
if featureflags.IsEnabled("chrome-service.websockets.enabled") {
// start the connection hub
Expand Down
20 changes: 18 additions & 2 deletions rest/cloudevents/cloudEvents.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package cloudevents

import (
"fmt"
"time"
"net/url"
"time"

"github.com/RedHatInsights/chrome-service-backend/rest/connectionhub"
)
Expand Down Expand Up @@ -44,7 +44,7 @@ func (uri URI) IsValid() error {
if err != nil {
return fmt.Errorf("URI is not valid. Expected a valid URI, but got %v.", uri)
}
return nil;
return nil
}

// TODO: Specify accepted data payload
Expand Down Expand Up @@ -75,3 +75,19 @@ func WrapPayload[P any](payload P, source URI, id string, messageType string) En
type KafkaEnvelope struct {
Envelope[connectionhub.WsMessage]
}

func ValidatePayload(p KafkaEnvelope) error {
payloadErr := p.DataContentType.IsValid()
if payloadErr != nil {
return fmt.Errorf("Kafka message payload needs to be JSON formatted, %v", payloadErr)
}
specErr := p.SpecVersion.IsValid()
if specErr != nil {
return fmt.Errorf("%v", specErr)
}
sourceErr := p.Source.IsValid()
if sourceErr != nil {
return fmt.Errorf("%v", sourceErr)
}
return nil
}
5 changes: 4 additions & 1 deletion rest/connectionhub/connection.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package connectionhub

import "github.com/gorilla/websocket"
import (
"github.com/gorilla/websocket"
)

type Connection struct {
Ws *websocket.Conn
ID string
Send chan []byte
}
38 changes: 21 additions & 17 deletions rest/connectionhub/connectionHub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,20 @@ import (
"github.com/sirupsen/logrus"
)

const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second

// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second

// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10

// Maximum message size allowed from peer.
maxMessageSize = 512
)

type clients = map[string]*Client

type Client struct {
Expand Down Expand Up @@ -179,24 +193,11 @@ func (h *connectionHub) Run() {
}
}

const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second

// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second

// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10

// Maximum message size allowed from peer.
maxMessageSize = 512
)

func (c Client) ReadPump() {
conn := c.Conn
// close connection after client is was removed
// close connection after client is removed
defer func() {
logrus.Info(c)
ConnectionHub.Unregister <- c
conn.Ws.Close()
}()
Expand All @@ -217,7 +218,7 @@ func (c Client) ReadPump() {
break
}
var messagePayload WsMessage
json.Unmarshal(msg, &messagePayload)
err = json.Unmarshal(msg, &messagePayload)
if err != nil {
logrus.Errorln("Unable to unmarshall incoming WS message: ", err)
break
Expand All @@ -241,7 +242,10 @@ func (c Client) ReadPump() {

// write writes a message with the given message type and payload.
func (c *Connection) write(mt int, payload []byte) error {
c.Ws.SetWriteDeadline(time.Now().Add(writeWait))
err := c.Ws.SetWriteDeadline(time.Now().Add(writeWait))
if err != nil {
logrus.Errorf("Cannot write message %v", err)
}
return c.Ws.WriteMessage(mt, payload)
}

Expand Down
101 changes: 47 additions & 54 deletions rest/kafka/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,61 @@ import (
"context"
"encoding/json"
"fmt"
"log"

"github.com/RedHatInsights/chrome-service-backend/config"
"github.com/RedHatInsights/chrome-service-backend/rest/cloudevents"
"github.com/RedHatInsights/chrome-service-backend/rest/connectionhub"
"github.com/google/uuid"
"github.com/segmentio/kafka-go"
"github.com/sirupsen/logrus"
"log"
"os"
)

type kafkaConsumer struct {
Topics []string
Readers map[string]*kafka.Reader
}

var KafkaConsumer = kafkaConsumer{}
var Consumer = kafkaConsumer{}

const TenMb = 10e7

func createReader(topic string) *kafka.Reader {
cfg := config.Get()
hostname, err := os.Hostname()
if err != nil {
logrus.Errorln("Couldn't get hostname, using UUID")
hostname = uuid.NewString()
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: cfg.KafkaConfig.KafkaBrokers,
// The consumer group will match the pod name via hostname
// ex platform.chrome.chrome-service-api.<deployHash>.<podHash>
GroupID: fmt.Sprintf("platform.chrome.%s", hostname),
StartOffset: kafka.LastOffset,
Topic: topic,
Logger: kafka.LoggerFunc(logrus.Debugf),
ErrorLogger: kafka.LoggerFunc(logrus.Errorf),
MaxBytes: TenMb,
})
logrus.Infoln("Creating new kafka reader for topic:", topic)
return r
}

func InitializeConsumers() {
cfg := config.Get()
topics := cfg.KafkaConfig.KafkaTopics
readers := make(map[string]*kafka.Reader)
for _, topic := range topics {
readers[topic] = createReader(topic)
}

Consumer.Readers = readers

for _, r := range readers {
go startKafkaReader(r)
}
}

func startKafkaReader(r *kafka.Reader) {
defer r.Close()
Expand All @@ -34,17 +74,17 @@ func startKafkaReader(r *kafka.Reader) {
var p cloudevents.KafkaEnvelope
err = json.Unmarshal(m.Value, &p)
if err != nil {
logrus.Errorln(fmt.Sprintf("Unable Unmarshal message %s\n", string(m.Value)))
logrus.Errorln(fmt.Sprintf("Unable to unmarshal message %s\n", string(m.Value)))
} else if p.Data.Payload == nil {
logrus.Errorln(fmt.Sprintf("No message will be emitted doe to missing payload %s! Message might not follow cloud events spec.\n", string(m.Value)))
logrus.Errorln(fmt.Sprintf("No message will be emitted do to missing payload %s! Message might not follow cloud events spec.\n", string(m.Value)))
} else {
event := cloudevents.WrapPayload(p.Data.Payload, p.Source, p.Id, p.Type)
event.Time = p.Time
data, err := json.Marshal(event)
if err != nil {
log.Println("Unable marshal payload data", p, err)
log.Println("Unable to marshal payload data", p, err)
} else {
validateErr := validatePayload(p)
validateErr := cloudevents.ValidatePayload(p)
if validateErr == nil {
newMessage := connectionhub.Message{
Destinations: connectionhub.MessageDestinations{
Expand Down Expand Up @@ -73,50 +113,3 @@ func startKafkaReader(r *kafka.Reader) {
log.Fatal("failed to close reader:", err)
}
}

func validatePayload(p cloudevents.KafkaEnvelope) error {
payloadErr := p.DataContentType.IsValid()
if payloadErr != nil {
return fmt.Errorf("Kafka message payload needs to be JSON formatted, %v", payloadErr)
}
specErr := p.SpecVersion.IsValid()
if specErr != nil {
return fmt.Errorf("%v", specErr)
}
sourceErr := p.Source.IsValid()
if sourceErr != nil {
return fmt.Errorf("%v", sourceErr)
}
return nil
}

func createReader(topic string) *kafka.Reader {
config := config.Get()
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: config.KafkaConfig.KafkaBrokers,
GroupID: "platform.chrome",
StartOffset: kafka.LastOffset,
Topic: topic,
Logger: kafka.LoggerFunc(logrus.Debugf),
ErrorLogger: kafka.LoggerFunc(logrus.Errorf),
MinBytes: 1, // 1B
MaxBytes: 10e7, // 10MB
})
logrus.Infoln("Creating new kafka reader for topic:", topic)
return r
}

func InitializeConsumers() {
config := config.Get()
topics := config.KafkaConfig.KafkaTopics
readers := make(map[string]*kafka.Reader)
for _, topic := range topics {
readers[topic] = createReader(topic)
}

KafkaConsumer.Readers = readers

for _, r := range readers {
go startKafkaReader(r)
}
}

0 comments on commit 09ed3fd

Please sign in to comment.