### To Do / License





- Currently alot of this is working
- Need to have people walk toward a truck (in a thread so we can run the rest of the notebook)
- Need to have continuous queries run

```
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
```

Author: Adam Paternostro

### Pip installs

In [None]:
# To read/write to/from Kafka
import sys

# https://kafka-python.readthedocs.io/en/master/index.html
!{sys.executable} -m pip install kafka-python

# https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html
!{sys.executable} -m pip install confluent-kafka

### Initialize

In [None]:
import json
import random
import time
import datetime
import base64

import google.auth
import google.auth.transport.urllib3
import urllib3

In [None]:
# Set these (run this cell to verify the output)

# WARNING: Hardcoded for now
# For testing you need (These will be automated with Terraform)
# network (vpc-main), kafka-subnet, dataflow-subnet
# service principals: dataflow-service-account [Optional: kafka-service-principal for using a service principal]
# buckets: data-analytics-preview (for AVRO schema), dataflow-staging-us-central1-756740881369 (for dataflow temp files w/o soft delete on)
# BigQuery dataset: chocolate_ai [You need to create this]
# BigQuery table: customer_geo_location [This is created for you]

bigquery_location = "us"
region = "us-central1"
kafka_cluster_name = "chocolate-ai-kafka-cluster-02"
kafka_topic_name = "customer-location-topic-02"
dataflow_bucket = "dataflow-staging-us-central1-756740881369" # should not have logical delete on
dataflow_service_account = "dataflow-service-account@data-analytics-preview.iam.gserviceaccount.com" # Needs Role: roles/managedkafka.client
bigquery_dataset_name = "chocolate_ai"
bigquery_streaming_destination_table = "customer_geo_location"
kafka_subnet = "kafka-subnet"
dataflow_subnet = "dataflow-subnet"

# kafka_service_principal_name = "kafka-service-principal" # No longer need since using logged in user
# kafka_service_principal_email = "kafka-service-principal-email" # No longer need since using logged in user

# Get some values using gcloud
project_id = !(gcloud config get-value project)
user = !(gcloud auth list --filter=status:ACTIVE --format="value(account)")

if len(project_id) == 0:
  raise RuntimeError(f"project_id is not set: {project_id}")
project_id = project_id[0]

if len(user) != 1:
  raise RuntimeError(f"user is not set: {user}")
user = user[0]

print(f"project_id = {project_id}")
print(f"user = {user}")

### Helper Methods

#### restAPIHelper
Calls the Google Cloud REST API using the current users credentials.

In [None]:
def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:
  """Calls the Google Cloud REST API passing in the current users credentials"""

  import requests
  import google.auth
  import json

  # Get an access token based upon the current user
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request()
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
    "Content-Type" : "application/json",
    "Authorization" : "Bearer " + access_token
  }

  if http_verb == "GET":
    response = requests.get(url, headers=headers)
  elif http_verb == "POST":
    response = requests.post(url, json=request_body, headers=headers)
  elif http_verb == "PUT":
    response = requests.put(url, json=request_body, headers=headers)
  elif http_verb == "PATCH":
    response = requests.patch(url, json=request_body, headers=headers)
  elif http_verb == "DELETE":
    response = requests.delete(url, headers=headers)
  else:
    raise RuntimeError(f"Unknown HTTP verb: {http_verb}")

  if response.status_code == 200:
    return json.loads(response.content)
    #image_data = json.loads(response.content)["predictions"][0]["bytesBase64Encoded"]
  else:
    error = f"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'"
    raise RuntimeError(error)

#### createApacheKafkaForBigQueryCluster
Creates the cluster if it does not exist.  Waits for it to be created.

In [None]:
def createApacheKafkaForBigQueryCluster():
  """Creates a Apache Kafka For BigQuery Cluster."""

  # First find the cluster if it exists
  # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/list

  url = f"https://managedkafka.googleapis.com/v1/projects/{project_id}/locations/{region}/clusters"

  # Gather existing clusters
  json_result = restAPIHelper(url, "GET", None)
  print(f"createApacheKafkaForBigQueryCluster (GET) json_result: {json_result}")

  # Test to see if cluster exists, if so return
  if "clusters" in json_result:
    for item in json_result["clusters"]:
      print(f"Apache Kafka for BigQuery: {item['name']}")
      # "projects/data-analytics-preview/locations/us-central1/clusters/kafka-cluster"
      if item["name"] == f"projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}":
        print("Apache Kafka for BigQuery already exists")
        return f"projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}"

  # Create Apache Kafka For BigQuery Cluster
  # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/create
  print("Creating Apache Kafka For BigQuery Cluster")

  url = f"https://managedkafka.googleapis.com/v1/projects/{project_id}/locations/{region}/clusters?clusterId={kafka_cluster_name}"

  request_body = {
      "capacityConfig": {
        "vcpuCount": "3",
        "memoryBytes": "3221225472"
      },
      "gcpConfig": {
          "accessConfig": {
              "networkConfigs": {
                  "subnet": f"projects/{project_id}/regions/{region}/subnetworks/{kafka_subnet}"
                  }
            }
        }
    }

  json_result = restAPIHelper(url, "POST", request_body)

  name = json_result["name"]
  done = json_result["done"]
  print("Apache Kafka for BigQuery created: ", name)
  return f"projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}"

#### waitForApacheKafkaForBigQueryCluster
Loops until cluster is created

In [None]:
def waitForApacheKafkaForBigQueryCluster(operation):
  """
  Waits for an Apache Kafka For BigQuery Cluster to be Created.

  opertion:
    projects/data-analytics-preview/locations/us-central1/operations/operation-1723064212031-61f1e264889a9-9e3a863b-90613855
  """

  url = f"https://managedkafka.googleapis.com/v1/{operation}"
  max_retries = 100
  attempt = 0

  while True:
    # Gather existing connections
    json_result = restAPIHelper(url, "GET", None)
    print(f"waitForApacheKafkaForBigQueryCluster (GET) json_result: {json_result}")

    # Test to see if connection exists, if so return
    if "state" in json_result:
      if json_result["state"] == "ACTIVE":
        print("Apache Kafka for BigQuery Cluster created")
        return None

    # Wait for 10 seconds
    attempt += 1
    if attempt > max_retries:
      raise RuntimeError("Apache Kafka for BigQuery Cluster not created")
    time.sleep(30)

#### deleteApacheKafkaForBigQueryCluster
Delete the cluster if exists

In [None]:
def deleteApacheKafkaForBigQueryCluster():
  """Deletes a Apache Kafka For BigQuery Cluster."""

  # First find the cluster if it exists
  # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/list

  url = f"https://managedkafka.googleapis.com/v1/projects/{project_id}/locations/{region}/clusters"

  # Gather existing clusters
  json_result = restAPIHelper(url, "GET", None)
  print(f"createApacheKafkaForBigQueryCluster (GET) json_result: {json_result}")
  found = False

  # Test to see if cluster, if so then delete
  if "clusters" in json_result:
    for item in json_result["clusters"]:
      print(f"Apache Kafka for BigQuery: {item['name']}")
      # "projects/data-analytics-preview/locations/us-central1/clusters/kafka-cluster"
      if item["name"] == f"projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}":
        print("Apache Kafka for BigQuery  exists")
        found = True
        break

  if found == False:
    print("Apache Kafka for BigQuery does not exist")
    return None

  # Create Apache Kafka For BigQuery Cluster
  # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/delete
  print("Deleting Apache Kafka For BigQuery Cluster")

  url = f"https://managedkafka.googleapis.com/v1/projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}"

  json_result = restAPIHelper(url, "DELETE", request_body={})

  print("Apache Kafka for BigQuery deleted")

#### createApacheKafkaForBigQueryTopic
Create the topic if not exists

In [None]:
def createApacheKafkaForBigQueryTopic():
  """Creates a Apache Kafka For BigQuery Topic."""

  # First find the topic if it exists
  # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters.topics/list

  url = f"https://managedkafka.googleapis.com/v1/projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}/topics"

  # Gather existing clusters
  json_result = restAPIHelper(url, "GET", None)
  print(f"createApacheKafkaForBigQueryCluster (GET) json_result: {json_result}")

  # Test to see if cluster exists, if so return
  if "topics" in json_result:
    for item in json_result["topics"]:
      print(f"Apache Kafka for BigQuery Topic: {item['name']}")
      # "projects/data-analytics-preview/locations/us-central1/clusters/kafka-cluster"
      if item["name"] == f"projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}/topics/{kafka_topic_name}":
        print("Apache Kafka for BigQuery Topic already exists")
        return None


  # Create Apache Kafka For BigQuery Topic
  # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters.topics/create
  print("Creating Apache Kafka For BigQuery Topic")

  url = f"https://managedkafka.googleapis.com/v1/projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}/topics?topicId={kafka_topic_name}"

  request_body = {
      "partition_count"    : 2,
      "replication_factor" : 3
    }

  json_result = restAPIHelper(url, "POST", request_body)

  name = json_result["name"]
  print("Apache Kafka for BigQuery Topic created: ", name)
  return None

### Create Apache Kafka for BigQuery Cluster

Create the cluster and the topic.

In [None]:
# To see your clusters: https://console.cloud.google.com/managedkafka/clusterList

opertion = createApacheKafkaForBigQueryCluster()

if opertion is not None:
  waitForApacheKafkaForBigQueryCluster(opertion)

createApacheKafkaForBigQueryTopic()

### Create BigQuery table

In [None]:
%%bigquery

--DROP TABLE `data-analytics-preview.chocolate_ai.customer_geo_location`;

CREATE TABLE IF NOT EXISTS `data-analytics-preview.chocolate_ai.customer_geo_location`
(
    customer_geo_location_id      STRING,
    customer_id               INT64,
    event_timestamp_millis    INT64,
    prior_latitude            FLOAT64,
    prior_longitude           FLOAT64,
    current_latitude          FLOAT64,
    current_longitude         FLOAT64,

    debug_destination_latitude  FLOAT64,
    debug_destination_longitude FLOAT64,
    debug_walking_speed_mps     FLOAT64,
    debug_map_url               STRING
)
CLUSTER BY customer_id, event_timestamp_millis;

### Create Avro Schema for Kafka Messages


Used to parse data in BigQuery to seperate fields.  The schema must match your BigQuery Table and will be used by the DataFlow job to seperate the fields into columns within the table.

In [None]:
avro_schema = {
    'namespace': 'com.databeans.customer_geo_location',
    'type': 'record',
    'name': 'customer_geo_location',
    'fields': [
        {'name': 'customer_geo_location_id', 'type': 'string'},
        {'name': 'customer_id', 'type': 'int'},
        {'name': 'event_timestamp_millis', 'type': 'long'},
        {'name': 'prior_latitude', 'type': 'double'},
        {'name': 'prior_longitude', 'type': 'double'},
        {'name': 'current_latitude', 'type': 'double'},
        {'name': 'current_longitude', 'type': 'double'},
        {'name': 'debug_destination_latitude', 'type': 'double'},
        {'name': 'debug_destination_longitude', 'type': 'double'},
        {'name': 'debug_walking_speed_mps', 'type': 'double'},
        {'name': 'debug_map_url', 'type': 'string'}
    ]
}

with open('customer_geo_location.avsc', 'w') as out:
    json.dump(avro_schema, out, indent=4)

In [None]:
# Save to storage so dataflow job can see it
!gsutil cp customer_geo_location.avsc gs://data-analytics-preview/customer_geo_location.avsc

### DataFlow - Stream Data from Apache Kafka for BigQuery to a BigQuery Table (streaming ingestion)
- <font color='red'>**WARNING:**</font> This will create a new job everything this is run.  The notebook will only stop the lastest job, so please check the DataFlow UI to Cancel any additional job.s

#### createDataflowJobApacheKafkaToBigQuery
Creates the DataFlow Job (verifies the job name, must be a new name for each job)

In [None]:
def createDataflowJobApacheKafkaToBigQuery(jobName):
  """Creates a DataFlow job to copy data from Apache Kafka for BiqQuery to stream data into a BigQuery Table"""

  # First find the job if it exists
  # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list

  url = f"https://dataflow.googleapis.com/v1b3/projects/{project_id}/jobs?location={region}"

  # Gather existing job
  json_result = restAPIHelper(url, "GET", None)
  print(f"createDataflowJobApacheKafkaToBigQuery (GET) json_result: {json_result}")

  # Test to see if job exists, if so return
  if "jobs" in json_result:
    for item in json_result["jobs"]:
      print(f"DataFlow Job Name: {item['name']}")
      if item["name"] == jobName:
        print(f"DataFlow job already exists with date of {item['currentState']}.  Try a new name.")
        return None

  # Create DataFlow Job
  # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch
  print("Creating DataFlow Job from Flex Template")

  url = f"https://dataflow.googleapis.com/v1b3/projects/{project_id}/locations/{region}/flexTemplates:launch"

  # Continuous Queries needs useStorageWriteApiAtLeastOnce = True
  # https://cloud.google.com/dataflow/docs/guides/templates/provided/kafka-to-bigquery#optional-parameters
  #numStorageWriteApiStreams : Specifies the number of write streams, this parameter must be set. Default is 0.
  #storageWriteApiTriggeringFrequencySec : Specifies the triggering frequency in seconds, this parameter must be set. Default is 5 seconds.
  #useStorageWriteApiAtLeastOnce : This parameter takes effect only if "Use BigQuery Storage Write API" is enabled. If enabled the at-least-once semantics will be used for Storage Write API, otherwise exactly-once semantics will be used. Defaults to: false.

  request_body = {
    "launch_parameter": {
        "jobName": jobName,
        "containerSpecGcsPath": "gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex",
        "parameters": {
            "readBootstrapServerAndTopic": f"projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}/topics/{kafka_topic_name}",
            "persistKafkaKey": "false",
            "writeMode": "SINGLE_TABLE_NAME",
            "numStorageWriteApiStreams": "2",
            "useStorageWriteApiAtLeastOnce": "true",
            "storageWriteApiTriggeringFrequencySec": "5",
            "enableCommitOffsets": "false",
            "kafkaReadOffset": "latest",
            "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS",
            "messageFormat": "JSON",
            "useBigQueryDLQ": "false",
            "stagingLocation": f"gs://{dataflow_bucket}/staging",
            "autoscalingAlgorithm": "NONE",
            "serviceAccount": dataflow_service_account,
            "labels": "{\"goog-dataflow-provided-template-type\":\"flex\",\"goog-dataflow-provided-template-name\":\"kafka_to_bigquery_flex\",\"goog-dataflow-provided-template-version\":\"2024-07-16-00_rc00\"}",
            "outputTableSpec": f"{project_id}:{bigquery_dataset_name}.{bigquery_streaming_destination_table}"
        },
        "environment": {
            "numWorkers": 2,
            "tempLocation": f"gs://{dataflow_bucket}/tmp",
            "subnetwork": f"regions/{region}/subnetworks/{dataflow_subnet}",
            "enableStreamingEngine": True,
            "additionalExperiments": [
                "enable_streaming_engine"
            ],
            "additionalUserLabels": {}
        }
    }
}

  json_result = restAPIHelper(url, "POST", request_body)

  job = json_result["job"]
  print("Apache Kafka for BigQuery created: ", job)
  return job

#### stopDataflowJobApacheKafkaToBigQuery
Stops a DataFlow job.  Looks up the ID based upon the name.

In [None]:
def stopDataflowJobApacheKafkaToBigQuery(jobName):
  """Stops a DataFlow job to copy data from Apache Kafka for BiqQuery to stream data into a BigQuery Table"""

  # First find the job if it exists
  # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list

  url = f"https://dataflow.googleapis.com/v1b3/projects/{project_id}/jobs?location={region}"

  # Gather existing jobs
  json_result = restAPIHelper(url, "GET", None)
  print(f"stopDataflowJobApacheKafkaToBigQuery (GET) json_result: {json_result}")
  found = False

  # Test to see if job exists, if so return
  if "jobs" in json_result:
    for item in json_result["jobs"]:
      print(f"DataFlow Job Name: {item['name']} - {item['currentState']}")
      if item["name"] == jobName and item["currentState"] == "JOB_STATE_RUNNING":
        jobId = item["id"]
        found = True
        break

  if not found:
    print("DataFlow not found or is not running.")
    return

  # Stop DataFlow Job
  # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/update
  print("Stopping DataFlow Job ")

  url=f"https://dataflow.googleapis.com/v1b3/projects/{project_id}/locations/{region}/jobs/{jobId}"
  print(url)

  request_body = { "requestedState" : "JOB_STATE_CANCELLED" }

  json_result = restAPIHelper(url, "PUT", request_body)

  #job = json_result["job"]
  print("DataFlow Job Stopped")
  return

#### waitForDataFlowJobToStart
Whats for a job to start

In [None]:
def waitForDataFlowJobToStart(jobName):
  """Waits for job to turn to running"""

  # First find the job if it exists
  # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list

  url = f"https://dataflow.googleapis.com/v1b3/projects/{project_id}/jobs?location={region}"

  # Gather existing jobs
  json_result = restAPIHelper(url, "GET", None)
  print(f"stopDataflowJobApacheKafkaToBigQuery (GET) json_result: {json_result}")
  found = False

  # Test to see if job exists, if so return
  if "jobs" in json_result:
    for item in json_result["jobs"]:
      print(f"DataFlow Job Name: {item['name']} - {item['currentState']}")
      if item["name"] == jobName:
        jobId = item["id"]
        found = True
        break

  if not found:
    print("DataFlow not found or is not running.")
    return

  # Gets the job status
  # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.jobs/get
  print("Getting DataFlow Job ")
  url=f"https://dataflow.googleapis.com/v1b3/projects/{project_id}/locations/{region}/jobs/{jobId}"
  print(url)

  max_retries = 100
  attempt = 0

  while True:
    # Get Job
    json_result = restAPIHelper(url, "GET", None)

    # Test to see if connection exists, if so return
    if "currentState" in json_result:
      print(f"waitForDataFlowJobToStart (GET) currentState: {json_result['currentState']}")
      if json_result["currentState"] == "JOB_STATE_RUNNING":
        print("DataFlow Job is now running")
        return None
    else:
      print(f"waitForDataFlowJobToStart (GET) json_result: {json_result}")


    # Wait for 10 seconds
    attempt += 1
    if attempt > max_retries:
      raise RuntimeError("DataFlow Job not created")
    time.sleep(30)

#### Run the DataFlow Job

In [None]:
# The job can take a few minutes to start.  Click the link to see the progress:
# https://console.cloud.google.com/dataflow/jobs

jobName= f"kafka-stream-{datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}"
createDataflowJobApacheKafkaToBigQuery(jobName)

In [None]:
print(f"https://console.cloud.google.com/dataflow/jobs?project={project_id}")

In [None]:
waitForDataFlowJobToStart(jobName)

### Token Provider / Confluent Provider
Use logged in users credentials (versus a service account)

*   https://cloud.google.com/managed-kafka/docs/authentication-kafka#oauthbearer
*   https://github.com/googleapis/managedkafka
* https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-configuration




In [None]:
# This class is used to get the current Application Default Credential (the user in the case of a notebook)
# This is then called by the Kafka Config by the Python Kafka Library

class TokenProvider(object):

  def __init__(self, **config):
    self.credentials, _project = google.auth.default()
    self.http_client = urllib3.PoolManager()
    self.HEADER = json.dumps(dict(typ='JWT', alg='GOOG_OAUTH2_TOKEN'))

  def valid_credentials(self):
    if not self.credentials.valid:
      self.credentials.refresh(google.auth.transport.urllib3.Request(self.http_client))
    return self.credentials

  def get_jwt(self, creds):
    # print(creds.expiry.timestamp())
    return json.dumps(
        dict(
            exp=creds.expiry.timestamp(),
            iss='Google',
            iat=datetime.datetime.now(datetime.timezone.utc).timestamp(),
            scope='kafka',
            sub=creds.service_account_email,
        )
    )

  def b64_encode(self, source):
    return (
        base64.urlsafe_b64encode(source.encode('utf-8'))
        .decode('utf-8')
        .rstrip('=')
    )

  def get_kafka_access_token(self, creds):
    # print(self.get_jwt(creds))
    return '.'.join([
      self.b64_encode(self.HEADER),
      self.b64_encode(self.get_jwt(creds)),
      self.b64_encode(creds.token)
    ])

  def token(self):
    try:
      # print("TokenProvider.token() called")
      creds = self.valid_credentials()

      # Convert to UTC
      utc_expiry = creds.expiry.replace(tzinfo=datetime.timezone.utc)
      expiry_seconds = (utc_expiry - datetime.datetime.now(datetime.timezone.utc)).total_seconds()

      return self.get_kafka_access_token(creds)
    except Exception as e:
      print(e)
      raise Exception(e)

  def confluent_token(self):
    try:
      # print("TokenProvider.confluent_token() called")
      creds = self.valid_credentials()

      # Convert to UTC
      utc_expiry = creds.expiry.replace(tzinfo=datetime.timezone.utc)
      expiry_seconds = (utc_expiry - datetime.datetime.now(datetime.timezone.utc)).total_seconds()

      return self.get_kafka_access_token(creds), time.time() + expiry_seconds
    except Exception as e:
      print(e)
      raise Exception(e)

In [None]:
# Confluent does not use a TokenProvider, it calls a method
def ConfluentTokenProvider(args, config):
  """Method to get the Confluent Token"""
  t = TokenProvider()
  return t.confluent_token()


# Print any Confluent errors (for debugging)
def ConfluentErrorProvider(e):
  print(e)
  raise Exception(e)

In [None]:
# For Debugging - WARNING: Never save these in your output of the notebook!
# t = TokenProvider()
# t.token()

# ConfluentTokenProvider(None, None)

#### Helper Methods (Fake Data and Callback)

In [None]:
# This can be used to do a callback when you publish a message
# Since we might publish a lot of messages, this is commented out in the Producer code

def delivery_callback(err, msg):
    if err:
        print('ERROR: Message failed delivery: {}'.format(err))
    else:
        print("Produced event to topic {topic}: value = {value:12}".format(topic=msg.topic(),  value=msg.value().decode('utf-8')))

### Create customers who are walking around toward the various locations

#### Latitude / Longitude Helper Methods

##### haversine_distance

In [None]:
import math

def haversine_distance(lat1, lon1, lat2, lon2):
  """
  Calculates the haversine distance between two points on a sphere.

  Args:
    lat1: Latitude of the first point in radians.
    lon1: Longitude of the first point in radians.
    lat2: Latitude of the second point in radians.
    lon2: Longitude of the second point in radians.

  Returns:
    The distance between the two points in kilometers.
  """

  # Earth's radius in kilometers
  R = 6371

  # Convert degrees to radians
  lat1 = math.radians(lat1)
  lon1 = math.radians(lon1)
  lat2 = math.radians(lat2)
  lon2 = math.radians(lon2)

  # Haversine formula
  dlat = lat2 - lat1
  dlon = lon2 - lon1
  a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2
  c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
  distance = R * c

  return distance

In [None]:
# Create 10 people who are in london
# Have the walk towards the location location_1_lat_min, london_lat_max = 51.5174328, -0.1219854
# When they are within 1 km send them an alery
# Only send them the alert once

# have people walk at different speeds
# The average walking speed for most adults is around 4.8 kilometers per hour

##### bounding_box

In [None]:
import math

def bounding_box(latitude, longitude, distance_km):
  """
  Calculates the bounding box coordinates for a given latitude, longitude, and distance.

  Args:
    latitude: Latitude of the center point in decimal degrees.
    longitude: Longitude of the center point in decimal degrees.
    distance_km: Distance in kilometers for the bounding box.

  Returns:
    A tuple containing the minimum and maximum latitude and longitude values
    (min_lat, max_lat, min_lon, max_lon).
  """

  # Earth's radius in kilometers
  earth_radius_km = 6371

  # Convert latitude and longitude to radians
  lat_rad = math.radians(latitude)
  lon_rad = math.radians(longitude)

  # Calculate angular radius
  angular_radius = distance_km / earth_radius_km

  # Calculate bounding box coordinates
  min_lat = math.degrees(lat_rad - angular_radius)
  max_lat = math.degrees(lat_rad + angular_radius)

  # Handle potential issues with longitude calculations near the poles
  if abs(lat_rad) > math.pi / 2 - angular_radius:
    # Adjust longitude range to cover the entire circle
    min_lon = -180
    max_lon = 180
  else:
    min_lon = math.degrees(lon_rad - angular_radius / math.cos(lat_rad))
    max_lon = math.degrees(lon_rad + angular_radius / math.cos(lat_rad))

  return min_lat, max_lat, min_lon, max_lon

#### Kafka Producers

##### simulate_walk_open_source_kafka_producer (Open Source Kafka Producer)

In [None]:
import time
from geopy.distance import geodesic
import urllib.parse
import uuid

def simulate_walk_open_source_kafka_producer(customer_id, customer_name, starting_latitude, starting_longitude, \
                  ending_latitude, ending_longitude, speed_meters_per_second=1.4, just_one_record = False):
    """
    Simulates a walk from a starting point to an ending point.

    Args:
        customer_name (str): Name of the person walking.
        starting_latitude (float): Starting latitude in degrees.
        starting_longitude (float): Starting longitude in degrees.
        ending_latitude (float): Ending latitude in degrees.
        ending_longitude (float): Ending longitude in degrees.
        speed_meters_per_second (float, optional): Walking speed in meters per second. Defaults to 1.4.

    Prints information about the walk at regular intervals.
    """
    from kafka import KafkaProducer
    # Kafka Producer configuration with OAUTHBEARER authentication
    config = {
        'bootstrap_servers': f'bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog',
        'security_protocol': 'SASL_SSL',
        'sasl_mechanism': 'OAUTHBEARER',
        'sasl_oauth_token_provider': TokenProvider()
    }
    producer = KafkaProducer(**config)  # Use keyword unpacking for clear configuration

    # Calculate total distance and time
    start = (starting_longitude, starting_latitude)  # Swap order for consistency with haversine_distance
    end = (ending_longitude, ending_latitude)  # Swap order for consistency with haversine_distance
    total_distance = geodesic(start, end).meters if geodesic else haversine_distance(*start, *end)
    total_time = total_distance / speed_meters_per_second

    # Generate data points
    generate_data_time = 1  # seconds
    num_points = int(total_time / generate_data_time) + 1  # Include start and end points

    prior_latitude = starting_latitude
    prior_longitude = starting_longitude

    for i in range(num_points):
        fraction = i / (num_points - 1)  # Normalize fraction for even distribution
        lat = starting_latitude + fraction * (ending_latitude - starting_latitude)
        lon = starting_longitude + fraction * (ending_longitude - starting_longitude)
        distance_to_destination = haversine_distance(lat, lon, ending_latitude, ending_longitude)
        map_url = f"https://www.google.com/maps/place/{lat},{lon}/@{lat},{lon},17z"

        # Kafka
        # Log
        # user, event_time, current_lat, current_long, starting_lat, starting_long, dest_lat, dest_long, walking_speed_meters_per_second, distance_to_destination, map_url
        message_data = {
            "customer_geo_location_id" : f"{uuid.uuid4()}",
            "customer_id": customer_id,
            "event_timestamp_millis": int(time.time() * 1000),
            "prior_latitude": prior_latitude,
            "prior_longitude": prior_longitude,
            "current_latitude": lat,
            "current_longitude": lon,
            "debug_destination_latitude": ending_latitude,
            "debug_destination_longitude": ending_longitude,
            "debug_walking_speed_mps": speed_meters_per_second,
            "debug_map_url" : f"{map_url}"
        }

        # Save for next interation
        prior_latitude = lat
        prior_longitude = lon

        # Serialize data to bytes
        serialized_data = json.dumps(message_data).encode('utf-8')

        # Define the key based on your needs (e.g., customer_id)
        key = str(customer_id).encode('utf-8')

        # Produce the message with key
        producer.send(kafka_topic_name, key=key, value=serialized_data) # callback=delivery_callback

        if i % 100 == 0:
          print(f"{customer_name} distance to go ({distance_to_destination:.2f} km): {map_url}")
          print(f"message_data: {message_data}")
          producer.flush()
          if just_one_record:
            return # for testing

        time.sleep(generate_data_time)

    producer.flush()



##### simulate_walk_confluent_kafka_producer (Confluent Kafka Producer)

In [None]:
import time
from geopy.distance import geodesic
import urllib.parse
import uuid

def simulate_walk_confluent_kafka_producer(customer_id, customer_name, starting_latitude, starting_longitude, \
                  ending_latitude, ending_longitude, speed_meters_per_second=1.4, just_one_record = False):
    """
    Simulates a walk from a starting point to an ending point.

    Args:
        customer_name (str): Name of the person walking.
        starting_latitude (float): Starting latitude in degrees.
        starting_longitude (float): Starting longitude in degrees.
        ending_latitude (float): Ending latitude in degrees.
        ending_longitude (float): Ending longitude in degrees.
        speed_meters_per_second (float, optional): Walking speed in meters per second. Defaults to 1.4.

    Prints information about the walk at regular intervals.
    """
    import confluent_kafka
    import functools

    # Kafka Producer configuration with SASL_PLAIN authentication
    # This requires a service principal key (json file) which must be base64 encoded
    # !gsutil cp gs://bucket-name/your-key.json sa.key.json <- This assumes you exported the key to a bucket
    # secret = !(cat sa.key.json | base64 -w 0)
    # secret = secret[0]
    #config = {
    #    'bootstrap.servers': f'bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog',
    #    'sasl.username':     f'kafka-sp@{project_id}.iam.gserviceaccount.com',
    #    'sasl.password':     secret,
    #    'security.protocol': 'SASL_SSL',
    #    'sasl.mechanisms':   'PLAIN',
    #    'acks':              'all'
    #}


    # Kafka Producer configuration with OAUTHBEARER authentication
    # https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-configuration
    # https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/oauth_producer.py
    config = {
        'bootstrap.servers': f'bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog',
        'security.protocol': 'SASL_SSL',
        'sasl.mechanisms': 'OAUTHBEARER',
        'oauth_cb': functools.partial(ConfluentTokenProvider, None),
        #'oauthbearer_token_refresh_cb': functools.partial(ConfluentTokenProvider, None),
        'error_cb' : functools.partial(ConfluentErrorProvider),
        'acks': 'all'
    }

    producer = confluent_kafka.Producer(config)

    # Calculate total distance and time
    start = (starting_longitude, starting_latitude)  # Swap order for consistency with haversine_distance
    end = (ending_longitude, ending_latitude)  # Swap order for consistency with haversine_distance
    total_distance = geodesic(start, end).meters if geodesic else haversine_distance(*start, *end)
    total_time = total_distance / speed_meters_per_second

    # Generate data points
    generate_data_time = 1  # seconds
    num_points = int(total_time / generate_data_time) + 1  # Include start and end points

    prior_latitude = starting_latitude
    prior_longitude = starting_longitude

    for i in range(num_points):
        fraction = i / (num_points - 1)  # Normalize fraction for even distribution
        lat = starting_latitude + fraction * (ending_latitude - starting_latitude)
        lon = starting_longitude + fraction * (ending_longitude - starting_longitude)
        distance_to_destination = haversine_distance(lat, lon, ending_latitude, ending_longitude)
        map_url = f"https://www.google.com/maps/place/{lat},{lon}/@{lat},{lon},17z"

        # Kafka
        # Log
        # user, event_time, current_lat, current_long, starting_lat, starting_long, dest_lat, dest_long, walking_speed_meters_per_second, distance_to_destination, map_url
        message_data = {
            "customer_geo_location_id" : f"{uuid.uuid4()}",
            "customer_id": customer_id,
            "event_timestamp_millis": int(time.time() * 1000),
            "prior_latitude": prior_latitude,
            "prior_longitude": prior_longitude,
            "current_latitude": lat,
            "current_longitude": lon,
            "debug_destination_latitude": ending_latitude,
            "debug_destination_longitude": ending_longitude,
            "debug_walking_speed_mps": speed_meters_per_second,
            "debug_map_url" : f"{map_url}"
        }

        # Save for next interation
        prior_latitude = lat
        prior_longitude = lon


        # Serialize data to bytes
        serialized_data = json.dumps(message_data).encode('utf-8')

        # Define the key based on your needs (e.g., customer_id)
        key = str(customer_id).encode('utf-8')

        # Produce the message with key
        producer.produce(kafka_topic_name, key=key, value=serialized_data) # callback=delivery_callback

        if i % 100 == 0:
          print(f"{customer_name} distance to go ({distance_to_destination:.2f} km): {map_url}")
          print(f"message_data: {message_data}")
          producer.flush()
          if just_one_record:
            return # for testing

        time.sleep(generate_data_time)

    producer.flush()

#### Create Simulation

##### create_people

In [None]:
import threading

def create_people(starting_customer_id, number_of_people, producer_method_name = simulate_walk_open_source_kafka_producer) -> None:
  """
  Calculates people who will walk towards various locations.  This will generate a thread for each person
  and simulate them walking.

  Args:
    number_of_people: The number of people to generate.
    producer_method_name: The method to use to simulate the walk.

  Returns:
    None
  """
  location_1_latitude, location_1_longitude = 51.501959717656455, -0.1411590270077308  # The British Museum
  location_2_latitude, location_2_longitude = 51.5081747307638,   -0.07596870182999621 # Tower of London

  # Create a box where we will generate random people starting locations.
  # This is 10 km from The British Museum
  bounding_box_min_lat, bounding_box_max_lat, bounding_box_min_lon, bounding_box_max_lon = bounding_box(location_1_latitude, location_1_longitude, 10)

  people = []
  for i in range(number_of_people):
    print(f"Generating Person: {i+starting_customer_id}")
    person_dict = {
          "customer_id": i+starting_customer_id,
          "name": f"person {i+starting_customer_id}",
          "starting_latitude":  random.uniform(bounding_box_min_lat, bounding_box_max_lat),
          "starting_longitude": random.uniform(bounding_box_min_lon, bounding_box_max_lon),
          "destination_latitude": location_1_latitude if i % 2 == 0 else location_2_latitude,
          "destination_longitude": location_1_longitude if i % 2 == 0 else location_2_longitude,
          "walking_speed_meters_per_second": round(random.uniform(1, 5),2) # The average walking speed of a person is approximately 1.4 meters per second.
    }
    distance_to_destination = haversine_distance(person_dict["starting_latitude"], person_dict["starting_longitude"],\
                                                 person_dict["destination_latitude"], person_dict["destination_longitude"])
    print(distance_to_destination)
    people.append(person_dict)

  threads = []
  for item in people:
    threads.append(threading.Thread(target=producer_method_name, args=(item["customer_id"], item["name"], \
                    item["starting_latitude"], item["starting_longitude"], \
                    item["destination_latitude"], item["destination_longitude"], \
                    item["walking_speed_meters_per_second"])))

  for thread in threads:
    thread.start()

  for thread in threads:
    thread.join()

##### Clear the table so we can see our results (easily)

In [None]:
#%%bigquery

#TRUNCATE TABLE `chocolate_ai.customer_geo_location`;

##### Test a single record from Open Source and Confluent Producers

In [None]:
# Open Source Kafka Producer
Buckingham_Palace_latitude, Buckingham_Palace_longitude = 51.501959717656455, -0.1411590270077308
London_Bridge_latitude, London_Bridge_longitude = 51.5081747307638, -0.07596870182999621

print("Open Source Kafka Producer")
simulate_walk_open_source_kafka_producer(9999, "Person Open Source", Buckingham_Palace_latitude, Buckingham_Palace_longitude, \
              London_Bridge_latitude, London_Bridge_longitude, speed_meters_per_second=100, just_one_record = True)
print()

print("Confluent Kafka Producer")
# Confluent Source Kafka Producer
simulate_walk_confluent_kafka_producer(8888, "Person Confluent", Buckingham_Palace_latitude, Buckingham_Palace_longitude, \
              London_Bridge_latitude, London_Bridge_longitude, speed_meters_per_second=200, just_one_record = True)
print()

In [None]:
%%bigquery

-- It might take a second to see the data
select * from `chocolate_ai.customer_geo_location` order by customer_id DESC, event_timestamp_millis LIMIT 100;

In [None]:
"""
Open Source Kafka Producer
Person Open Source distance to go (4.56 km): https://www.google.com/maps/place/51.501959717656455,-0.1411590270077308/@51.501959717656455,-0.1411590270077308,17z
message_data: {'customer_id': 9999, 'event_timestamp_millis': 1726065315461, 'current_latitude': 51.501959717656455, 'current_longitude': -0.1411590270077308, 'starting_latitude': 51.501959717656455, 'starting_longitude': -0.1411590270077308, 'destination_latitude': 51.509391, 'destination_longitude': -0.0764338, 'walking_speed_mps': 100, 'distance_to_dest_km': 4.555325295137807}

"""
Buckingham_Palace_latitude, Buckingham_Palace_longitude = 51.501959717656455, -0.1411590270077308
London_Bridge_latitude, London_Bridge_longitude = 51.5081747307638, -0.07596870182999621

print(haversine_distance(London_Bridge_latitude, London_Bridge_longitude, Buckingham_Palace_latitude, Buckingham_Palace_longitude))

##### Start a thread and create many people walking **(this will generate lots of data)**
- NOTE:
  - You must **uncomment** out one of the below lines.  This way if you accidently run the cell you do not kick off lots of data.
  - Do not run both since they both start with customer #1 and you would have overlapping routes.

In [None]:
# Create some people and send using Open Source Producer
create_people(101, 500, simulate_walk_open_source_kafka_producer)

# Create some people and send using Confluent Kafka Producer
# create_people(1, 500, simulate_walk_confluent_kafka_producer)

# NOTES:
# 1. If you are running this for a long time, you should right click this cell and clear the output.
# 2. The people are walking straght towards the destination, they will walk over water and through buildings.
#    Google Maps routing could be called for a realistic walking route.

In [None]:
%%bigquery

-- How many records
SELECT COUNT(*) FROM `data-analytics-preview.chocolate_ai.customer_geo_location`;

In [None]:
%%bigquery

-- Events per second (add day if you are running accross days)
SELECT EXTRACT(HOUR FROM TIMESTAMP_MILLIS(event_timestamp_millis)) AS Hour,
       EXTRACT(MINUTE FROM TIMESTAMP_MILLIS(event_timestamp_millis)) AS Minute,
       EXTRACT(SECOND FROM TIMESTAMP_MILLIS(event_timestamp_millis)) AS Second,
       COUNT(*) AS Cnt
  FROM `data-analytics-preview.chocolate_ai.customer_geo_location`
GROUP BY ALL
ORDER BY 1 DESC, 2 DESC, 3 DESC
LIMIT 10;

In [None]:
%%bigquery

-- See which customers have broken the geofence
WITH raw_data AS (
  SELECT *
  FROM `data-analytics-preview.chocolate_ai.customer_geo_location`
)
, geo_data AS (
  SELECT *,
         ST_DISTANCE(
          ST_GEOGPOINT(prior_longitude, prior_latitude),
          ST_GEOGPOINT(current_longitude, current_latitude)
         ) AS meters_travel_since_prior_distance,
         ST_DISTANCE(
          ST_GEOGPOINT(current_longitude, current_latitude),
          ST_GEOGPOINT(debug_destination_longitude, debug_destination_latitude)
         ) AS meters_to_dest_distance,
         ST_DISTANCE(
          ST_GEOGPOINT(current_longitude, current_latitude),
          ST_GEOGPOINT(debug_destination_longitude, debug_destination_latitude)
         ) / 1000 AS km_to_dest_distance,
  FROM raw_data
)
, results AS (
  SELECT *,
         CASE WHEN meters_to_dest_distance > 1000
               AND meters_to_dest_distance - meters_travel_since_prior_distance < 1000
              THEN TRUE
              ELSE FALSE
          END AS entered_geofence
  FROM geo_data
)
SELECT TO_JSON_STRING(STRUCT(customer_geo_location_id,
                             customer_id,
                             current_latitude,
                             current_longitude,
                             km_to_dest_distance,
                             debug_map_url)) AS message,
      TO_JSON(STRUCT(CAST(TIMESTAMP_MILLIS(event_timestamp_millis) AS STRING) AS event_timestamp)) AS _ATTRIBUTES
  FROM results
  where entered_geofence = true;

In [None]:
%%bigquery

-- User Input: Change the below job_id (you get this from the continuous query | job information tab)
-- The number or slots the continuous query
SELECT job.creation_time,
      job.project_id,
      job.project_number,
      job.user_email,
      job.job_id,
      job.job_type,
      job.statement_type,
      job.priority,
      job.start_time,
      job.end_time,
      job.query,
      job.state,
      job.reservation_id,
      job.total_bytes_processed,
      job.total_slot_ms,
      job.error_result.reason     AS error_result_reason,
      job.error_result.location   AS error_result_location,
      job.error_result.debug_info AS error_result_debug_info,
      job.error_result.message    AS error_result_message,

      -- Average slot utilization per job is calculated by dividing
      -- total_slot_ms by the millisecond duration of the job
      CAST(SAFE_DIVIDE(job.total_slot_ms,(TIMESTAMP_DIFF(IFNULL(job.end_time,CURRENT_TIMESTAMP()), job.start_time, MILLISECOND))) AS FLOAT64) AS job_avg_slots

  FROM `data-analytics-preview`.`region-us`.INFORMATION_SCHEMA.JOBS AS job
        CROSS JOIN UNNEST(job.job_stages) as unnest_job_stages
        CROSS JOIN UNNEST(job.timeline) AS unnest_timeline
  WHERE job.job_id = 'bquxjob_544ac149_191ec5b33ed'
GROUP BY ALL;

### Consume data using Kafka Consumers

#### Open Source Kafka Consumer

In [None]:
def openSourceKafkaConsumer():
  from kafka import KafkaConsumer

  # Kafka Consumer configuration with SASL_PLAIN authentication
  # This requires a service principal key (json file) which must be base64 encoded
  # !gsutil cp gs://bucket-name/your-key.json sa.key.json <- This assumes you exported the key to a bucket
  # secret = !(cat sa.key.json | base64 -w 0)
  # secret = secret[0]
  #config = {
  #    'bootstrap_servers': f'bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog',
  #    'security_protocol': 'SASL_SSL',  # Use SASL_PLAINTEXT for username/password
  #    'sasl_mechanism': 'PLAIN',
  #    'sasl_plain_username': f'kafka-sp@{project_id}.iam.gserviceaccount.com',
  #    'sasl_plain_password': secret,
  #    'group_id':          'kafka-group-id',
  #    'auto_offset_reset': 'earliest'
  #}

  # Kafka Consumer configuration with OAUTHBEARER authentication
  config = {
      'bootstrap_servers': f'bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog',
      'security_protocol': 'SASL_SSL',
      'sasl_mechanism': 'OAUTHBEARER',
      'sasl_oauth_token_provider': TokenProvider(),
      'group_id': 'kafka-group-id',
      'auto_offset_reset': 'earliest'
  }

  # Create Consumer instance
  consumer = KafkaConsumer(**config)  # Use keyword unpacking for clear configuration

  # Subscribe to topic
  consumer.subscribe([kafka_topic_name])

  i = 0
  max_items = 50

  # Poll for new messages from Kafka and print them.
  try:
      while True:
          messages = consumer.poll(1.0)
          for partition, messages in messages.items():
              for message in messages:
                  i += 1
                  if i >= max_items:
                      print(f"Reached max items ({max_items})")
                      break
                  try:
                      print(f"Consumed record with key {message.key} and value {message.value}")
                      # Process the message here (e.g., parse JSON, store data)
                      message_data = json.loads(message.value)
                      print(message_data)
                  except Exception as e:
                      print(f"Error processing message: {e}")
              if i >= max_items:
                  break
          if i >= max_items:
              break

  except KeyboardInterrupt:
      pass
  finally:
      # Leave group and commit final offsets
      consumer.close()


In [None]:
openSourceKafkaConsumer()

#### Confluent Source Kafka Consumer

In [None]:
def confluentKafkaConsumer():
  from confluent_kafka import Consumer
  import functools

  # Kafka Consumer configuration with SASL_PLAIN authentication
  # This requires a service principal key (json file) which must be base64 encoded
  # !gsutil cp gs://bucket-name/your-key.json sa.key.json <- This assumes you exported the key to a bucket
  # secret = !(cat sa.key.json | base64 -w 0)
  # secret = secret[0]
  #config = {
  #    # User-specific properties that you must set
  #    'bootstrap.servers': f'bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog',
  #    'sasl.username':     f'kafka-sp@{project_id}.iam.gserviceaccount.com',
  #    'sasl.password':     secret,#
  #    'security.protocol': 'SASL_SSL',
  #    'sasl.mechanisms':   'PLAIN',
  #    'group.id':          'kafka-group-id',
  #    'auto.offset.reset': 'earliest'
  #}

  # Kafka Consumer configuration with OAUTHBEARER authentication
  config = {
      'bootstrap.servers': f'bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog',
      'security.protocol': 'SASL_SSL',
      'sasl.mechanisms': 'OAUTHBEARER',
      'oauth_cb': functools.partial(ConfluentTokenProvider, None),
      'error_cb' : functools.partial(ConfluentErrorProvider),
      'group.id': 'kafka-group-id',
      'auto.offset.reset': 'earliest'
  }

  # Create Consumer instance
  consumer = Consumer(config)

  # Subscribe to topic
  consumer.subscribe([kafka_topic_name])

  i = 0
  max_items = 50

  # Poll for new messages from Kafka and print them.
  try:
      while True:
          msg = consumer.poll(1.0)
          if msg is None:
              # Initial message consumption may take up to
              # `session.timeout.ms` for the consumer group to
              # rebalance and start consuming
              print("Waiting...")
          elif msg.error():
              print("ERROR: %s".format(msg.error()))
          else:
              # Extract the (optional) key and value, and print.
              i += 1
              print(msg.value().decode('utf-8'))
              if i >= max_items:
                  print(f"Reached max items ({max_items})")
                  break
  except KeyboardInterrupt:
      pass
  finally:
      # Leave group and commit final offsets
      consumer.close()

In [None]:
confluentKafkaConsumer()

### Clean Up

In [None]:
# Note if you do not know your job id, or overwrote the value, click here to open and manually Cancel the job
# https://console.cloud.google.com/dataflow/jobs

user_input = input(f"Do you want to delete your DataFlow Job {jobName} (Y/n)?")
if user_input == "Y":
  stopDataflowJobApacheKafkaToBigQuery(jobName)

In [None]:
user_input = input("Do you want to delete your Apache Kafka for BigQuery (Y/n)?")
if user_input == "Y":
  deleteApacheKafkaForBigQueryCluster()

### Reference Links

- [Python Kafka Libary](https://kafka-python.readthedocs.io/en/master/index.html)
- [Confluent Python Library](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html)
- [Google Apache Kafka for BigQuery - Authentication](https://cloud.google.com/managed-kafka/docs/authentication-kafka#oauthbearer)
- [Google Apache Kafka for BigQuery - OAuth Sample Code](https://cloud.google.com/managed-kafka/docs/authentication-kafka#oauthbearer)
* [Google Apache Kafka for BigQuery - Sample Code](https://github.com/googleapis/managedkafka)
- [Confluent - Sample OAuth Code](https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/oauth_producer.py)
* [Confluent - Kafka Config for Python](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-configuration)

In [None]:
for i in range(10):
  time.sleep(30)