diff --git a/dataflow/flex-templates/kafka_to_bigquery/README.md b/dataflow/flex-templates/kafka_to_bigquery/README.md index 22d72092179..4c8121fc95c 100644 --- a/dataflow/flex-templates/kafka_to_bigquery/README.md +++ b/dataflow/flex-templates/kafka_to_bigquery/README.md @@ -130,9 +130,9 @@ For this, we need two parts running: > The Kafka server must be accessible to *external* applications. -For this we need a -[static IP address](https://cloud.google.com/compute/docs/ip-addresses/reserve-static-external-ip-address) -for the Kafka server to live. +For this we need an +[external static IP address](https://cloud.google.com/compute/docs/ip-addresses/reserve-static-external-ip-address) +for the Kafka server to live. Not an internal IP address. > ℹ️ If you already have a Kafka server running you can skip this section. > Just make sure to store its IP address into an environment variable. @@ -183,9 +183,21 @@ To learn more about pricing, see the ```sh export KAFKA_IMAGE="gcr.io/$PROJECT/samples/dataflow/kafka:latest" +# Note: If the project name has `:` in it that signifies a project within an +# organization (e.g. `example.com:project-id`), replace those with `/` so that +# the Kafka image can be found appropriately. + # Build the Kafka server image into Container Registry. gcloud builds submit --tag $KAFKA_IMAGE kafka/ +# If a different topic, address, kafka port, or zookeeper port is desired, +# update the following environment variables before starting the server. +# Otherwise, the default values will be used in the Dockerfile: +export KAFKA_TOPIC= +export KAFKA_ADDRESS= +export KAFKA_PORT= +export ZOOKEEPER_PORT= + # Create and start a new instance. # The --address flag binds the VM's address to the static address we created. # The --container-env KAFKA_ADDRESS is an environment variable passed to the @@ -200,6 +212,70 @@ gcloud compute instances create-with-container kafka-vm \ --tags "kafka-server" ``` +Note: The Kafka server should be running at this point, but in its current state +no messages are being sent to a topic, which will cause the KafkaToBigQuery +template to fail. + + +### Sending messages to Kafka server + +SSH into the `kafka-vm` that was created earlier and issue +the below commands that are required based on your timing. Messages sent before +the template is started will be present when the template is started. If the +desire is to send messages after the template has started, then the messages +will be processed as they are sent. + +Pre-Requisite SSH into the Kafka VM + +```sh +$ gcloud compute ssh kafka-vm --zone "$ZONE" +``` + +1. Create a Topic + +```sh +docker run --rm --network host bitnami/kafka:3.4.0 \ +/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 \ +--create --topic --partitions 1 --replication-factor 1 +``` + +2. Send Messages to the Topic + +Run the console producer to send messages. After running the command, type a +message and press Enter. You can send multiple messages. Press Ctrl+C to stop +the producer. + +Note: You can run this step either before starting the Dataflow template +(messages will be ready) or while it's running (messages will be processed as +they arrive). + +```sh +docker run -i --rm --network host bitnami/kafka:3.4.0 \ +/opt/bitnami/kafka/bin/kafka-console-producer.sh \ +--bootstrap-server localhost:9092 --topic +``` + +3. (Optional) Verify the Messages + +You can check that your messages were sent correctly by starting a consumer. +This will print all messages from the beginning of the topic. Press Ctrl+C to +exit. + +```sh +docker run -it --rm --network host bitnami/kafka:3.4.0 \ +/opt/bitnami/kafka/bin/kafka-console-consumer.sh \ +--bootstrap-server localhost:9092 --topic --from-beginning +``` + +4. (Optional) Delete a Topic + +```sh +docker run --rm --network host bitnami/kafka:3.4.0 \ +/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 \ +--delete --topic +``` + + ### Creating and running a Flex Template >
@@ -257,6 +333,21 @@ gcloud dataflow flex-template run "kafka-to-bigquery-`date +%Y%m%d-%H%M%S`" \ --region "$REGION" ``` +Note: If one of the parameters is a deeply nested json or dictionary, use the +gcloud `--flags-file` parameter to pass in a yaml file a list of all the +parameters including the nested dictionary. Passing in the dictionary straight +from the command line will give a gcloud error. The parameters file can look +like this: + +```yaml +--parameters: + inputTopic: messages + outputTable: $PROJECT:$DATASET.$TABLE + bootstrapServer: $KAFKA_ADDRESS:9092 + schema: + '{type: object, properties: {processing_time: {type: TIMESTAMP}, url: {type: STRING}, rating: {type: STRING}}}' +``` + Run the following query to check the results in BigQuery. ```sh diff --git a/dataflow/flex-templates/kafka_to_bigquery/kafka/Dockerfile b/dataflow/flex-templates/kafka_to_bigquery/kafka/Dockerfile index ae44da65264..8216783ef69 100644 --- a/dataflow/flex-templates/kafka_to_bigquery/kafka/Dockerfile +++ b/dataflow/flex-templates/kafka_to_bigquery/kafka/Dockerfile @@ -24,7 +24,7 @@ ARG KAFKA_VERSION="2.4.0" ARG SCALA_VERSION="2.12" # Set variables with default values used by the `start-kafka.sh` script. -# Override them wiht the `--env` or `-e` flag. +# Override them with the `--env` or `-e` flag. # https://docs.docker.com/engine/reference/builder/#env ENV KAFKA_TOPIC="${KAFKA_TOPIC:-messages}" ENV KAFKA_ADDRESS="${KAFKA_ADDRESS:-localhost}" @@ -33,7 +33,7 @@ ENV ZOOKEEPER_PORT="${ZOOKEEPER_PORT:-2181}" # Download and install Apache Kafka. RUN apk add --no-cache bash \ - && wget http://apache.mirrors.spacedump.net/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \ + && wget https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \ -O /tmp/kafka.tgz \ && tar xzf /tmp/kafka.tgz -C /opt && rm /tmp/kafka.tgz \ && ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} /opt/kafka