Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 94 additions & 3 deletions dataflow/flex-templates/kafka_to_bigquery/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ For this, we need two parts running:
> </details>

The Kafka server must be accessible to *external* applications.
For this we need a
[static IP address](https://cloud.google.com/compute/docs/ip-addresses/reserve-static-external-ip-address)
for the Kafka server to live.
For this we need an
[external static IP address](https://cloud.google.com/compute/docs/ip-addresses/reserve-static-external-ip-address)
for the Kafka server to live. Not an internal IP address.

> ℹ️ If you already have a Kafka server running you can skip this section.
> Just make sure to store its IP address into an environment variable.
Expand Down Expand Up @@ -183,9 +183,21 @@ To learn more about pricing, see the
```sh
export KAFKA_IMAGE="gcr.io/$PROJECT/samples/dataflow/kafka:latest"

# Note: If the project name has `:` in it that signifies a project within an
# organization (e.g. `example.com:project-id`), replace those with `/` so that
# the Kafka image can be found appropriately.

# Build the Kafka server image into Container Registry.
gcloud builds submit --tag $KAFKA_IMAGE kafka/

# If a different topic, address, kafka port, or zookeeper port is desired,
# update the following environment variables before starting the server.
# Otherwise, the default values will be used in the Dockerfile:
export KAFKA_TOPIC=<topic-name>
export KAFKA_ADDRESS=<kafka-address>
export KAFKA_PORT=<kafka-port>
export ZOOKEEPER_PORT=<zookeeper-port>

# Create and start a new instance.
# The --address flag binds the VM's address to the static address we created.
# The --container-env KAFKA_ADDRESS is an environment variable passed to the
Expand All @@ -200,6 +212,70 @@ gcloud compute instances create-with-container kafka-vm \
--tags "kafka-server"
```

Note: The Kafka server should be running at this point, but in its current state
no messages are being sent to a topic, which will cause the KafkaToBigQuery
template to fail.


### Sending messages to Kafka server

SSH into the `kafka-vm` that was created earlier and issue
the below commands that are required based on your timing. Messages sent before
the template is started will be present when the template is started. If the
desire is to send messages after the template has started, then the messages
will be processed as they are sent.

Pre-Requisite SSH into the Kafka VM

```sh
$ gcloud compute ssh kafka-vm --zone "$ZONE"
```

1. Create a Topic

```sh
docker run --rm --network host bitnami/kafka:3.4.0 \
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic <topic-name> --partitions 1 --replication-factor 1
```

2. Send Messages to the Topic

Run the console producer to send messages. After running the command, type a
message and press Enter. You can send multiple messages. Press Ctrl+C to stop
the producer.

Note: You can run this step either before starting the Dataflow template
(messages will be ready) or while it's running (messages will be processed as
they arrive).

```sh
docker run -i --rm --network host bitnami/kafka:3.4.0 \
/opt/bitnami/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 --topic <topic-name>
```

3. (Optional) Verify the Messages

You can check that your messages were sent correctly by starting a consumer.
This will print all messages from the beginning of the topic. Press Ctrl+C to
exit.

```sh
docker run -it --rm --network host bitnami/kafka:3.4.0 \
/opt/bitnami/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic <topic-name> --from-beginning
```

4. (Optional) Delete a Topic

```sh
docker run --rm --network host bitnami/kafka:3.4.0 \
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--delete --topic <topic-name>
```


### Creating and running a Flex Template

> <details><summary>
Expand Down Expand Up @@ -257,6 +333,21 @@ gcloud dataflow flex-template run "kafka-to-bigquery-`date +%Y%m%d-%H%M%S`" \
--region "$REGION"
```

Note: If one of the parameters is a deeply nested json or dictionary, use the
gcloud `--flags-file` parameter to pass in a yaml file a list of all the
parameters including the nested dictionary. Passing in the dictionary straight
from the command line will give a gcloud error. The parameters file can look
like this:

```yaml
--parameters:
inputTopic: messages
outputTable: $PROJECT:$DATASET.$TABLE
bootstrapServer: $KAFKA_ADDRESS:9092
schema:
'{type: object, properties: {processing_time: {type: TIMESTAMP}, url: {type: STRING}, rating: {type: STRING}}}'
```

Run the following query to check the results in BigQuery.

```sh
Expand Down
4 changes: 2 additions & 2 deletions dataflow/flex-templates/kafka_to_bigquery/kafka/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ARG KAFKA_VERSION="2.4.0"
ARG SCALA_VERSION="2.12"

# Set variables with default values used by the `start-kafka.sh` script.
# Override them wiht the `--env` or `-e` flag.
# Override them with the `--env` or `-e` flag.
# https://docs.docker.com/engine/reference/builder/#env
ENV KAFKA_TOPIC="${KAFKA_TOPIC:-messages}"
ENV KAFKA_ADDRESS="${KAFKA_ADDRESS:-localhost}"
Expand All @@ -33,7 +33,7 @@ ENV ZOOKEEPER_PORT="${ZOOKEEPER_PORT:-2181}"

# Download and install Apache Kafka.
RUN apk add --no-cache bash \
&& wget http://apache.mirrors.spacedump.net/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
&& wget https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
-O /tmp/kafka.tgz \
&& tar xzf /tmp/kafka.tgz -C /opt && rm /tmp/kafka.tgz \
&& ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} /opt/kafka
Expand Down