Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixup kafka-bench and re-enable building it. #150

Merged
merged 1 commit into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ KAR_EXAMPLE_JS_TESTS=$(DOCKER_IMAGE_PREFIX)kar-examples-js-unit-tests:$(DOCKER_I
KAR_EXAMPLE_JAVA_DP=$(DOCKER_IMAGE_PREFIX)kar-examples-java-actors-dp:$(DOCKER_IMAGE_TAG)
KAR_EXAMPLE_JAVA_HELLO=$(DOCKER_IMAGE_PREFIX)kar-examples-java-service-hello:$(DOCKER_IMAGE_TAG)
KAR_BENCH_JS_IMAGE=$(DOCKER_IMAGE_PREFIX)kar-bench-js-image:$(DOCKER_IMAGE_TAG)
KAFKA_BENCH_CONSUMER=$(DOCKER_IMAGE_PREFIX)kar-kafka-bench-consumer:$(DOCKER_IMAGE_TAG)
KAFKA_BENCH_PRODUCER=$(DOCKER_IMAGE_PREFIX)kar-kafka-bench-producer:$(DOCKER_IMAGE_TAG)
KAFKA_BENCH=$(DOCKER_IMAGE_PREFIX)kar-kafka-bench:$(DOCKER_IMAGE_TAG)
KAR_HTTP_BENCH_JS_IMAGE=$(DOCKER_IMAGE_PREFIX)kar-http-bench-js-image:$(DOCKER_IMAGE_TAG)

install: cli
Expand All @@ -62,9 +61,7 @@ dockerBuildExamples:

dockerBuildBenchmarks:
cd benchmark/kar-bench && docker build --build-arg JS_RUNTIME=$(KAR_JS_SDK) -t $(KAR_BENCH_JS_IMAGE) .
# DISABLED DUE TO https://github.com/IBM/kar/issues/118
# cd benchmark/kafka-bench/consumer && docker build -t $(KAFKA_BENCH_CONSUMER) .
# cd benchmark/kafka-bench/producer && docker build -t $(KAFKA_BENCH_PRODUCER) .
cd benchmark/kafka-bench && docker build -t $(KAFKA_BENCH) .
cd benchmark/http-bench && docker build --build-arg JS_RUNTIME=$(KAR_JS_SDK) -t $(KAR_HTTP_BENCH_JS_IMAGE) .

dockerPushCore:
Expand All @@ -85,9 +82,7 @@ dockerPushExamples:

dockerPushBenchmarks:
docker push $(KAR_BENCH_JS_IMAGE)
# DISABLED DUE TO https://github.com/IBM/kar/issues/118
# docker push $(KAFKA_BENCH_CONSUMER)
# docker push $(KAFKA_BENCH_PRODUCER)
docker push $(KAFKA_BENCH)
docker push $(KAR_HTTP_BENCH_JS_IMAGE)

docker:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
#

# Resolve dependencies
FROM golang:1.15.11-alpine AS build
RUN apk add git
RUN go get -v "github.com/Shopify/sarama"
RUN go get -v "github.com/prometheus/common/log"
FROM golang:1.16 AS builder

# Build executable
FROM golang:1.15.11-alpine AS builder
COPY --from=build /go /go
ENV CGO_ENABLED=0
WORKDIR /src
COPY main.go .
RUN go build -o consumer .
CMD ["/src/consumer"]
WORKDIR /kar/benchmark/kafka-bench
COPY go.mod go.mod
COPY go.sum go.sum
RUN go mod download

COPY producer producer
COPY consumer consumer

# Build executables
RUN CGO_ENABLED=0 go install ./...

FROM alpine:3.11
COPY --from=builder /go/bin/consumer /kar/bin/consumer
COPY --from=builder /go/bin/producer /kar/bin/producer
12 changes: 4 additions & 8 deletions benchmark/kafka-bench/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,17 @@ DOCKER_NAMESPACE ?= kar
DOCKER_IMAGE_PREFIX ?= $(DOCKER_REGISTRY)/$(DOCKER_NAMESPACE)/
DOCKER_IMAGE_TAG ?= latest

KAFKA_BENCH_CONSUMER=$(DOCKER_IMAGE_PREFIX)kar-kafka-bench-consumer:$(DOCKER_IMAGE_TAG)
KAFKA_BENCH_PRODUCER=$(DOCKER_IMAGE_PREFIX)kar-kafka-bench-producer:$(DOCKER_IMAGE_TAG)
KAFKA_BENCH=$(DOCKER_IMAGE_PREFIX)kar-kafka-bench:$(DOCKER_IMAGE_TAG)

dockerBuild:
cd consumer && docker build -t $(KAFKA_BENCH_CONSUMER) .
cd producer && docker build -t $(KAFKA_BENCH_PRODUCER) .
docker build -t $(KAFKA_BENCH) .

dockerPush:
docker push $(KAFKA_BENCH_CONSUMER)
docker push $(KAFKA_BENCH_PRODUCER)
docker push $(KAFKA_BENCH)

docker:
make dockerBuild
make dockerPush

build:
cd producer; go build
cd consumer; go build
go install ./...
16 changes: 12 additions & 4 deletions benchmark/kafka-bench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ make build

To run the benchmark, in one window run:
```
./consumer/consumer
$GOPATH/bin/consumer
```

in another window run:
```
./producer/producer
$GOPATH/bin/producer
```

## On Kubernetes
Expand Down Expand Up @@ -44,7 +44,11 @@ kubectl logs kafka-bench-producer
```
This pod should show something like:
```
time="2021-04-09T00:34:46Z" level=info msg="Average Kafka end-to-end time: 3.2248899082568805 ms" source="main.go:147"
2021/08/16 20:48:06.108391 [INFO] Inside Setup!
2021/08/16 20:48:06.108709 [INFO] Sarama return consumer up and running!...
2021/08/16 20:57:02.638099 [INFO] Average Kafka end-to-end time: 5.018634300000017 ms
Message is stored in topic(simple-topic)/partition(0)/offset(16095)
2021/08/16 20:57:03.479145 [INFO] Kafka: end-to-end: samples = 10000; mean = 5.018634300000017; stddev = 2.1366015896192483
```

Checking the other pod:
Expand All @@ -53,7 +57,11 @@ kubectl logs kafka-bench-consumer
```
This pod should show:
```
time="2021-04-09T00:34:46Z" level=info msg="Average Kafka request time: 1.5344495412844037 ms" source="main.go:159"
2021/08/16 20:47:59.327039 [INFO] Starting consumer...
2021/08/16 20:47:59.327160 [INFO] Kafka brokers is [localhost:31093]
2021/08/16 20:47:59.355816 [INFO] Inside Setup!
2021/08/16 20:47:59.355880 [INFO] Sarama consumer up and running!...
2021/08/16 20:57:03.430480 [INFO] Average Kafka request time: 2.747966399999998 ms
```

Clean-up:
Expand Down
35 changes: 18 additions & 17 deletions benchmark/kafka-bench/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"syscall"
"time"

"github.com/IBM/kar/core/pkg/logger"
"github.com/Shopify/sarama"
"github.com/prometheus/common/log"
)

var (
Expand Down Expand Up @@ -123,15 +123,15 @@ func newHandler(conf *sarama.Config, topic string, producer sarama.SyncProducer)

// Setup consumer group session
func (h *handler) Setup(session sarama.ConsumerGroupSession) error {
log.Info("Inside Setup!")
logger.Info("Inside Setup!")
close(h.ready)
// h.ready = make(chan struct{})
return nil
}

// Cleanup consumer group session
func (h *handler) Cleanup(session sarama.ConsumerGroupSession) error {
log.Info("Inside Cleanup!")
logger.Info("Inside Cleanup!")
return nil
}

Expand Down Expand Up @@ -160,15 +160,15 @@ func (h *handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama
// Process request time.
startTime, err := time.Parse(time.RFC3339Nano, string(message.Value))
if err != nil {
log.Info("Time parse error!")
logger.Info("Time parse error!")
}
// Post-process.
count++
duration := currentTime.Sub(startTime).Microseconds()
if count >= warmUpReps {
accDuration += float64(duration) / 1000.0
if count == warmUpReps+timedReps {
log.Infof("Average Kafka request time: %v ms", (accDuration / float64(timedReps)))
logger.Info("Average Kafka request time: %v ms", (accDuration / float64(timedReps)))
count = 0
accDuration = 0
}
Expand Down Expand Up @@ -197,7 +197,7 @@ func subscribe(ctx context.Context, topic, group string, producer sarama.SyncPro
handler := newHandler(conf, topic, producer)
handler.client, err = sarama.NewClient(kafkaBrokers, conf)
if err != nil {
log.Error("failed to instantiate Kafka client: %v", err)
logger.Error("failed to instantiate Kafka client: %v", err)
return nil, http.StatusInternalServerError, err
}

Expand All @@ -208,7 +208,7 @@ func subscribe(ctx context.Context, topic, group string, producer sarama.SyncPro

consumer, err := sarama.NewConsumerGroupFromClient(group, handler.client)
if err != nil {
log.Error("failed to instantiate Kafka consumer for topic %s, group %s: %v", topic, group, err)
logger.Error("failed to instantiate Kafka consumer for topic %s, group %s: %v", topic, group, err)
handler.client.Close()
return nil, http.StatusInternalServerError, err
}
Expand All @@ -222,7 +222,7 @@ func subscribe(ctx context.Context, topic, group string, producer sarama.SyncPro
defer wg.Done()
for {
if err := consumer.Consume(ctx, []string{topic}, handler); err != nil { // abnormal termination
log.Error("failed Kafka consumer for topic %s, group %s: %T, %#v", topic, group, err, err)
logger.Error("failed Kafka consumer for topic %s, group %s: %T, %#v", topic, group, err, err)
break
}
if ctx.Err() != nil { // normal termination
Expand All @@ -232,21 +232,21 @@ func subscribe(ctx context.Context, topic, group string, producer sarama.SyncPro
}()

<-handler.ready // Await till the consumer has been set up
log.Info("Sarama consumer up and running!...")
logger.Info("Sarama consumer up and running!...")

sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
log.Info("terminating: context cancelled")
logger.Info("terminating: context cancelled")
case <-sigterm:
log.Info("terminating: via signal")
logger.Info("terminating: via signal")
}
cancel()
wg.Wait()
log.Info("terminating: closing handler")
logger.Info("terminating: closing handler")
if err = handler.client.Close(); err != nil {
log.Info("Error closing handler: %v", err)
logger.Info("Error closing handler: %v", err)
}

select {
Expand All @@ -260,7 +260,7 @@ func subscribe(ctx context.Context, topic, group string, producer sarama.SyncPro
func createProducer() sarama.SyncProducer {
config, err := newConfig()
if err != nil {
log.Errorf("Error during configuration: %v", err)
logger.Error("Error during configuration: %v", err)
}

config.Producer.RequiredAcks = sarama.WaitForAll
Expand All @@ -277,15 +277,16 @@ func createProducer() sarama.SyncProducer {
}

func main() {
log.Info("Starting consumer...")
log.Infof("Kafka brokers is %v", kafkaBrokers)
logger.SetVerbosity("Info")
logger.Info("Starting consumer...")
logger.Info("Kafka brokers is %v", kafkaBrokers)
// Create the event producer.
// Set producer to nil if return message is not desired.
producer := createProducer()

// Create and subscribe consumer group.
_, _, err := subscribe(ctx, topic, group, producer)
if err != nil {
log.Error("subscribe failed.")
logger.Error("subscribe failed.")
}
}
5 changes: 3 additions & 2 deletions benchmark/kafka-bench/deploy/client-quay.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ spec:
imagePullSecrets:
- name: kar.ibm.com.image-pull
containers:
- name: consumer
image: quay.io/ibm/kar-kafka-bench-producer
- name: producer
image: quay.io/ibm/kar-kafka-bench
command: ["/kar/bin/producer"]
env:
- name: KAFKA_BROKERS
valueFrom:
Expand Down
3 changes: 2 additions & 1 deletion benchmark/kafka-bench/deploy/client.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ spec:
- name: kar.ibm.com.image-pull
containers:
- name: producer
image: localhost:5000/kar/kar-kafka-bench-producer
image: localhost:5000/kar/kar-kafka-bench
command: ["/kar/bin/producer"]
env:
- name: KAFKA_BROKERS
valueFrom:
Expand Down
3 changes: 2 additions & 1 deletion benchmark/kafka-bench/deploy/server-quay.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ spec:
- name: kar.ibm.com.image-pull
containers:
- name: consumer
image: quay.io/ibm/kar-kafka-bench-consumer
image: quay.io/ibm/kar-kafka-bench
command: ["/kar/bin/consumer"]
env:
- name: KAFKA_BROKERS
valueFrom:
Expand Down
3 changes: 2 additions & 1 deletion benchmark/kafka-bench/deploy/server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ spec:
- name: kar.ibm.com.image-pull
containers:
- name: consumer
image: localhost:5000/kar/kar-kafka-bench-consumer
image: localhost:5000/kar/kar-kafka-bench
command: ["/kar/bin/consumer"]
env:
- name: KAFKA_BROKERS
valueFrom:
Expand Down
4 changes: 2 additions & 2 deletions benchmark/kafka-bench/go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
module github.com/IBM/kar.git/benchmark/kafka-bench
module github.com/IBM/kar/benchmark/kafka-bench

go 1.15

require (
github.com/IBM/kar/core v1.0.8
github.com/Shopify/sarama v1.28.0
github.com/prometheus/common v0.20.0
)
Loading