From b14971f732251fc6fa8e93457a15a0505bbf03d6 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 15 Sep 2025 14:21:12 +0000 Subject: [PATCH 1/6] update kafka instructions for dealing with messages etc --- .../kafka_to_bigquery/README.md | 49 +++++++++++++++++-- .../kafka_to_bigquery/kafka/Dockerfile | 2 +- 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/dataflow/flex-templates/kafka_to_bigquery/README.md b/dataflow/flex-templates/kafka_to_bigquery/README.md index 22d72092179..309070d7fcc 100644 --- a/dataflow/flex-templates/kafka_to_bigquery/README.md +++ b/dataflow/flex-templates/kafka_to_bigquery/README.md @@ -130,9 +130,9 @@ For this, we need two parts running: > The Kafka server must be accessible to *external* applications. -For this we need a -[static IP address](https://cloud.google.com/compute/docs/ip-addresses/reserve-static-external-ip-address) -for the Kafka server to live. +For this we need an +[external static IP address](https://cloud.google.com/compute/docs/ip-addresses/reserve-static-external-ip-address) +for the Kafka server to live. Not an internal IP address. > ℹ️ If you already have a Kafka server running you can skip this section. > Just make sure to store its IP address into an environment variable. @@ -186,6 +186,14 @@ export KAFKA_IMAGE="gcr.io/$PROJECT/samples/dataflow/kafka:latest" # Build the Kafka server image into Container Registry. gcloud builds submit --tag $KAFKA_IMAGE kafka/ +# If a different topic, address, kafka port, or zookeeper port is desired, +# update the following environment variables before starting the server. +# Otherwise, the default values will be used in the Dockerfile: +export KAFKA_TOPIC= +export KAFKA_ADDRESS= +export KAFKA_PORT= +export ZOOKEEPER_PORT= + # Create and start a new instance. # The --address flag binds the VM's address to the static address we created. # The --container-env KAFKA_ADDRESS is an environment variable passed to the @@ -200,6 +208,41 @@ gcloud compute instances create-with-container kafka-vm \ --tags "kafka-server" ``` +### Sending messages to Kafka server + +The Kafka server should be running at this point, but in its current state no +messages are being sent to a topic which will cause the KafkaToBigQuery +template to fail. So ssh into the `kafka-vm` that was created earlier and issue +the below commands that are required based on your timing. Messages sent before +the template is started will be present when the template is started. If the +desire is to send messages after the template has started, then the messages +will be processed as they are sent. + +```sh +# 1. If the existing topic is not sufficient, please create a new one: +docker run --rm --network host bitnami/kafka:3.4.0 \ +/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 \ +--create --topic --partitions 1 --replication-factor 1 + +# 2. If the existing topic needs deleting, please delete it: +docker run --rm --network host bitnami/kafka:3.4.0 \ +/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 \ +--delete --topic + +# 3. If messages need to be sent, send them to the Kafka topic via the following +# command and then hit enter after each message. End via ctrl+c: +docker run -i --rm --network host bitnami/kafka:3.4.0 \ +/opt/bitnami/kafka/bin/kafka-console-producer.sh \ +--bootstrap-server localhost:9092 --topic messages + +# 4. If the messages need to be verified that they exist, issue this command +# and end via ctrl+c: +docker run -it --rm --network host bitnami/kafka:3.4.0 \ +/opt/bitnami/kafka/bin/kafka-console-consumer.sh \ +--bootstrap-server localhost:9092 --topic messages --from-beginning +``` + + ### Creating and running a Flex Template >
diff --git a/dataflow/flex-templates/kafka_to_bigquery/kafka/Dockerfile b/dataflow/flex-templates/kafka_to_bigquery/kafka/Dockerfile index ae44da65264..e450352be2b 100644 --- a/dataflow/flex-templates/kafka_to_bigquery/kafka/Dockerfile +++ b/dataflow/flex-templates/kafka_to_bigquery/kafka/Dockerfile @@ -33,7 +33,7 @@ ENV ZOOKEEPER_PORT="${ZOOKEEPER_PORT:-2181}" # Download and install Apache Kafka. RUN apk add --no-cache bash \ - && wget http://apache.mirrors.spacedump.net/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \ + && wget http://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \ -O /tmp/kafka.tgz \ && tar xzf /tmp/kafka.tgz -C /opt && rm /tmp/kafka.tgz \ && ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} /opt/kafka From 6637d6d19118ca3ffb9313df32d8935346aa5b9c Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 15 Sep 2025 17:40:59 +0000 Subject: [PATCH 2/6] add flags-file example notes --- .../flex-templates/kafka_to_bigquery/README.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/dataflow/flex-templates/kafka_to_bigquery/README.md b/dataflow/flex-templates/kafka_to_bigquery/README.md index 309070d7fcc..39d13c9146a 100644 --- a/dataflow/flex-templates/kafka_to_bigquery/README.md +++ b/dataflow/flex-templates/kafka_to_bigquery/README.md @@ -300,6 +300,21 @@ gcloud dataflow flex-template run "kafka-to-bigquery-`date +%Y%m%d-%H%M%S`" \ --region "$REGION" ``` +Note: If one of the parameters is a deeply nested json or dictionary, use the +gcloud `--flags-file` parameter to pass in a yaml file a list of all the +parameters including the nested dictionary. Passing in the dictionary straight +from the command line will give a gcloud error. The parameters file can look +like this: + +```yaml +--parameters: + inputTopic: messages + outputTable: $PROJECT:$DATASET.$TABLE + bootstrapServer: $KAFKA_ADDRESS:9092 + schema: + '{type: object, properties: {processing_time: {type: TIMESTAMP}, url: {type: STRING}, rating: {type: STRING}}}' +``` + Run the following query to check the results in BigQuery. ```sh From d331a5d1c42b129b78e51b95b08ee7051aefe629 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 15 Sep 2025 17:50:46 +0000 Subject: [PATCH 3/6] fix some commands --- dataflow/flex-templates/kafka_to_bigquery/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dataflow/flex-templates/kafka_to_bigquery/README.md b/dataflow/flex-templates/kafka_to_bigquery/README.md index 39d13c9146a..82779ad8851 100644 --- a/dataflow/flex-templates/kafka_to_bigquery/README.md +++ b/dataflow/flex-templates/kafka_to_bigquery/README.md @@ -233,13 +233,13 @@ docker run --rm --network host bitnami/kafka:3.4.0 \ # command and then hit enter after each message. End via ctrl+c: docker run -i --rm --network host bitnami/kafka:3.4.0 \ /opt/bitnami/kafka/bin/kafka-console-producer.sh \ ---bootstrap-server localhost:9092 --topic messages +--bootstrap-server localhost:9092 --topic # 4. If the messages need to be verified that they exist, issue this command # and end via ctrl+c: docker run -it --rm --network host bitnami/kafka:3.4.0 \ /opt/bitnami/kafka/bin/kafka-console-consumer.sh \ ---bootstrap-server localhost:9092 --topic messages --from-beginning +--bootstrap-server localhost:9092 --topic --from-beginning ``` From 59790b4b34bf8ca2ddc8d928ea119028357f7a52 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 15 Sep 2025 19:46:05 +0000 Subject: [PATCH 4/6] more changes --- dataflow/flex-templates/kafka_to_bigquery/README.md | 4 ++++ dataflow/flex-templates/kafka_to_bigquery/kafka/Dockerfile | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/dataflow/flex-templates/kafka_to_bigquery/README.md b/dataflow/flex-templates/kafka_to_bigquery/README.md index 82779ad8851..4c45b67ac06 100644 --- a/dataflow/flex-templates/kafka_to_bigquery/README.md +++ b/dataflow/flex-templates/kafka_to_bigquery/README.md @@ -183,6 +183,10 @@ To learn more about pricing, see the ```sh export KAFKA_IMAGE="gcr.io/$PROJECT/samples/dataflow/kafka:latest" +# Note: If the project name has `:` in it that signifies a project within an +# organization (e.g. `example.com:project-id`), replace those with `/` so that +# the Kafa image can be found appropriately. + # Build the Kafka server image into Container Registry. gcloud builds submit --tag $KAFKA_IMAGE kafka/ diff --git a/dataflow/flex-templates/kafka_to_bigquery/kafka/Dockerfile b/dataflow/flex-templates/kafka_to_bigquery/kafka/Dockerfile index e450352be2b..9f4f2412c19 100644 --- a/dataflow/flex-templates/kafka_to_bigquery/kafka/Dockerfile +++ b/dataflow/flex-templates/kafka_to_bigquery/kafka/Dockerfile @@ -24,7 +24,7 @@ ARG KAFKA_VERSION="2.4.0" ARG SCALA_VERSION="2.12" # Set variables with default values used by the `start-kafka.sh` script. -# Override them wiht the `--env` or `-e` flag. +# Override them with the `--env` or `-e` flag. # https://docs.docker.com/engine/reference/builder/#env ENV KAFKA_TOPIC="${KAFKA_TOPIC:-messages}" ENV KAFKA_ADDRESS="${KAFKA_ADDRESS:-localhost}" From c213f398690d66225db592f948686217c28e4482 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 22 Sep 2025 19:58:25 +0000 Subject: [PATCH 5/6] fix comments --- .../kafka_to_bigquery/README.md | 54 ++++++++++++++----- .../kafka_to_bigquery/kafka/Dockerfile | 2 +- 2 files changed, 42 insertions(+), 14 deletions(-) diff --git a/dataflow/flex-templates/kafka_to_bigquery/README.md b/dataflow/flex-templates/kafka_to_bigquery/README.md index 4c45b67ac06..425a9d6ec90 100644 --- a/dataflow/flex-templates/kafka_to_bigquery/README.md +++ b/dataflow/flex-templates/kafka_to_bigquery/README.md @@ -185,7 +185,7 @@ export KAFKA_IMAGE="gcr.io/$PROJECT/samples/dataflow/kafka:latest" # Note: If the project name has `:` in it that signifies a project within an # organization (e.g. `example.com:project-id`), replace those with `/` so that -# the Kafa image can be found appropriately. +# the Kafka image can be found appropriately. # Build the Kafka server image into Container Registry. gcloud builds submit --tag $KAFKA_IMAGE kafka/ @@ -212,40 +212,68 @@ gcloud compute instances create-with-container kafka-vm \ --tags "kafka-server" ``` +Note: The Kafka server should be running at this point, but in its current state +no messages are being sent to a topic, which will cause the KafkaToBigQuery +template to fail. + + ### Sending messages to Kafka server -The Kafka server should be running at this point, but in its current state no -messages are being sent to a topic which will cause the KafkaToBigQuery -template to fail. So ssh into the `kafka-vm` that was created earlier and issue +SSH into the `kafka-vm` that was created earlier and issue the below commands that are required based on your timing. Messages sent before the template is started will be present when the template is started. If the desire is to send messages after the template has started, then the messages will be processed as they are sent. +Pre-Requisite SSH into the Kafka VM +```sh +$ gcloud compute ssh kafka-vm --zone "$ZONE" +``` + +1. Create a Topic + ```sh -# 1. If the existing topic is not sufficient, please create a new one: docker run --rm --network host bitnami/kafka:3.4.0 \ /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 \ --create --topic --partitions 1 --replication-factor 1 +``` -# 2. If the existing topic needs deleting, please delete it: -docker run --rm --network host bitnami/kafka:3.4.0 \ -/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 \ ---delete --topic +2. Send Messages to the Topic + +Run the console producer to send messages. After running the command, type a +message and press Enter. You can send multiple messages. Press Ctrl+C to stop +the producer. + +Note: You can run this step either before starting the Dataflow template +(messages will be ready) or while it's running (messages will be processed as +they arrive). -# 3. If messages need to be sent, send them to the Kafka topic via the following -# command and then hit enter after each message. End via ctrl+c: +```sh docker run -i --rm --network host bitnami/kafka:3.4.0 \ /opt/bitnami/kafka/bin/kafka-console-producer.sh \ --bootstrap-server localhost:9092 --topic +``` + +3. (Optional) Verify the Messages -# 4. If the messages need to be verified that they exist, issue this command -# and end via ctrl+c: +You can check that your messages were sent correctly by starting a consumer. +This will print all messages from the beginning of the topic. Press Ctrl+C to +exit. + +```sh docker run -it --rm --network host bitnami/kafka:3.4.0 \ /opt/bitnami/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 --topic --from-beginning ``` +4. (Optional) Delete a Topic + +```sh +docker run --rm --network host bitnami/kafka:3.4.0 \ +/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 \ +--delete --topic +``` + ### Creating and running a Flex Template diff --git a/dataflow/flex-templates/kafka_to_bigquery/kafka/Dockerfile b/dataflow/flex-templates/kafka_to_bigquery/kafka/Dockerfile index 9f4f2412c19..8216783ef69 100644 --- a/dataflow/flex-templates/kafka_to_bigquery/kafka/Dockerfile +++ b/dataflow/flex-templates/kafka_to_bigquery/kafka/Dockerfile @@ -33,7 +33,7 @@ ENV ZOOKEEPER_PORT="${ZOOKEEPER_PORT:-2181}" # Download and install Apache Kafka. RUN apk add --no-cache bash \ - && wget http://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \ + && wget https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \ -O /tmp/kafka.tgz \ && tar xzf /tmp/kafka.tgz -C /opt && rm /tmp/kafka.tgz \ && ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} /opt/kafka From 73b278c391649be28d554df2e2284070989fb402 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 22 Sep 2025 20:00:09 +0000 Subject: [PATCH 6/6] fix spacing --- dataflow/flex-templates/kafka_to_bigquery/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/dataflow/flex-templates/kafka_to_bigquery/README.md b/dataflow/flex-templates/kafka_to_bigquery/README.md index 425a9d6ec90..4c8121fc95c 100644 --- a/dataflow/flex-templates/kafka_to_bigquery/README.md +++ b/dataflow/flex-templates/kafka_to_bigquery/README.md @@ -226,6 +226,7 @@ desire is to send messages after the template has started, then the messages will be processed as they are sent. Pre-Requisite SSH into the Kafka VM + ```sh $ gcloud compute ssh kafka-vm --zone "$ZONE" ```