Skip to content

Commit

Permalink
Example for using go clients with AWS lambda. (#823)
Browse files Browse the repository at this point in the history
  • Loading branch information
jliunyu committed Jul 12, 2022
1 parent 5811a4f commit 08cc2d3
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 0 deletions.
21 changes: 21 additions & 0 deletions examples/docker_aws_lambda_example/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
FROM fedora:latest

# Install pre-reqs
RUN dnf install wget -y
RUN dnf install gcc -y

RUN rpm --import https://packages.confluent.io/rpm/5.4/archive.key

# Install Go v1.14
RUN wget https://dl.google.com/go/go1.14.linux-amd64.tar.gz && tar -xvzf go1.14.linux-amd64.tar.gz && rm go1.14.linux-amd64.tar.gz
RUN mv go /usr/local
ENV GOROOT=/usr/local/go
ENV PATH="${GOROOT}/bin:${PATH}"

# Build the producer_example
WORKDIR /kafka
COPY examples/docker_aws_lambda_example/go.mod .
COPY examples/docker_aws_lambda_example/producer_example.go .
RUN go build -o producer_example .

ENTRYPOINT ["./producer_example"]
15 changes: 15 additions & 0 deletions examples/docker_aws_lambda_example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Build Docker image
$ docker build -f examples/docker_aws_lambda_example/Dockerfile -t goclients .

# Push the docker image to AWS Elastic Container Registry
Create Amazon Elastic Container Registry first
Push the docker image to AWS ECR according to the [AWS ECR user guide](https://docs.aws.amazon.com/AmazonECR/latest/userguide/docker-push-ecr-image.html), or using all the commands under the `View push commands` of the ECR repository.

# Create AWS lambda function using image from AWS ECR
Choose the `Container Image` when create the lambda function, add the docker image URL from `Container image URL`.

# Config Environment Variables
Add the environment variables under the `Configuration`, we can pass the parameters like `BOOTSTRAP_SERVERS`, `CCLOUDAPIKEY`, `CCLOUDAPISECRET`, `TOPIC` as environment variables.

# Run the test
Click the `Test` button under `Test`.
8 changes: 8 additions & 0 deletions examples/docker_aws_lambda_example/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module docker_example

require (
github.com/aws/aws-lambda-go v1.27.0
github.com/confluentinc/confluent-kafka-go v1.9.0 // indirect
)

go 1.14
132 changes: 132 additions & 0 deletions examples/docker_aws_lambda_example/producer_example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package main

/**
* Copyright 2022 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import (
"context"
"fmt"
"os/signal"
"syscall"
"time"

"os"

"github.com/aws/aws-lambda-go/lambda"
"github.com/confluentinc/confluent-kafka-go/kafka"
)

var p *kafka.Producer

func main() {
// Reference: https://github.com/aws/aws-lambda-go/issues/318#issuecomment-1019318919
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
go func() {
fmt.Printf("waiting for the SIGTERM\n")
s := <-sigs
fmt.Printf("received signal %s\n", s)
if p != nil {
p.Close()
}
fmt.Printf("done\n")
}()

lambda.Start(HandleRequest)
}

// HandleRequest handles creating producer and
// producing messages.
func HandleRequest() error {
broker := os.Getenv("BOOTSTRAP_SERVERS")
topic := os.Getenv("TOPIC")
ccloudAPIKey := os.Getenv("CCLOUDAPIKEY")
ccloudAPISecret := os.Getenv("CCLOUDAPISECRET")

var err error

if p == nil {
p, err = kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": broker,
"sasl.mechanisms": "PLAIN",
"security.protocol": "SASL_SSL",
"sasl.username": ccloudAPIKey,
"sasl.password": ccloudAPISecret,
})

if err != nil {
fmt.Printf("Failed to create producer: %s\n", err)
os.Exit(1)
}

fmt.Printf("Created Producer %v\n", p)
}

a, err := kafka.NewAdminClientFromProducer(p)
defer a.Close()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

maxDur, err := time.ParseDuration("60s")
if err != nil {
panic("ParseDuration(60s)")
}

// Create topics if it doesn't exist
results, err := a.CreateTopics(
ctx,
[]kafka.TopicSpecification{{
Topic: topic,
NumPartitions: 1}},
kafka.SetAdminOperationTimeout(maxDur))
if err != nil {
fmt.Printf("CreateTopics request failed: %v\n", err)
return err
}

for _, result := range results {
fmt.Printf("%s\n", result)
}

deliveryChan := make(chan kafka.Event)

value := "Hello Go!"
err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(value),
Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}},
}, deliveryChan)

if err != nil {
fmt.Printf("Produce failed: %s\n", err)
return err
}

e := <-deliveryChan
m := e.(*kafka.Message)

if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}

close(deliveryChan)

return nil
}

0 comments on commit 08cc2d3

Please sign in to comment.