# Let it flow ! with Google Managed Kafka & Kafka Connect

## Pre Requisite

### Runtime

Make sure you selected the proper runtime (the one in the right VPC)

### Wrapper to authenthicate with oauth
upload tokenprovider.py from https://github.com/c-damien/bigquery-academy-2025/gmk/

In [1]:
from google.colab import files
uploaded = files.upload()

Saving tokenprovider.py to tokenprovider (3).py


## Update the project parameters

In [None]:
!pip install confluent-kafka google-auth urllib3 packaging



In [5]:
#@title Variables { run: "auto", display-mode: "form" }
KAFKA_PROJECT_ID = "" #@param {type:"string"}
KAFKA_CLUSTER_NAME = "lab-kafka-cluster" #@param {type:"string"}
REGION = "us-central1" #@param {type:"string"}
TOPIC_NAME = "kafka-lab-topic" #@param {type:"string"}

# Send a simple message to your Managed Kafka Cluster

In [2]:
import confluent_kafka
from tokenprovider import TokenProvider
import json
from confluent_kafka.admin import AdminClient
from confluent_kafka import KafkaException

token_provider = TokenProvider()

conf = {
    'bootstrap.servers': 'bootstrap.'+KAFKA_CLUSTER_NAME+'.'+REGION+'.managedkafka.'+KAFKA_PROJECT_ID+'.cloud.goog:9092',
    'client.id': 'my-topic-writer',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'OAUTHBEARER',
    'oauth_cb': token_provider.get_token,
}

print(' Connecting to bootstrap.'+KAFKA_CLUSTER_NAME+'.'+REGION+'.managedkafka.'+KAFKA_PROJECT_ID+'.cloud.goog:9092')

# --- main Script: simple producer ---
try:

    producer = confluent_kafka.Producer(conf)

    #callback function after publishing a message
    def callback(error, message):
      if error is not None:
        print(error)
        return
      print("Delivered a message to {}[{}]".format(message.topic(), message.partition()))

    #main part to publish
    for i in range(1):
      payload = "my first kafka message" # our message

      producer.produce(TOPIC_NAME, payload.encode('utf-8'), callback=callback)
    producer.flush()

except KafkaException as e:
    print(f"Failed to fetch topics: {e}")

except Exception as e:
    print(f"An unexpected error occurred: {e}")

 Connecting to bootstrap.game-kafka.us-central1.managedkafka.data-cloud-interactive-demo.cloud.goog:9092
Delivered a message to game_signals[1]


### Send a JSON message to Kafka

In [None]:
import confluent_kafka
from tokenprovider import TokenProvider
import json
from confluent_kafka.admin import AdminClient
from confluent_kafka import KafkaException



token_provider = TokenProvider()
# --- Configuration ---
conf = {
    'bootstrap.servers': 'bootstrap.'+KAFKA_CLUSTER_NAME+'.'+REGION+'.managedkafka.'+KAFKA_PROJECT_ID+'.cloud.goog:9092',
    'client.id': 'my-topic-lister-app',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'OAUTHBEARER',
    'oauth_cb': token_provider.get_token,
}

# --- Script ---
try:
    # 1. Create a producer
    producer = confluent_kafka.Producer(conf)
    def callback(error, message):
      if error is not None:
        print(error)
        return
      print("Delivered a message to {}[{}]".format(message.topic(), message.partition()))

    for i in range(1):

      ###########our message
      schema = {
        "type": "struct",
        "fields": [
          {"type": "string", "optional": False, "field": "user_id"},
          {"type": "string", "optional": False, "field": "action"},
          {"type": "string", "optional": True, "field": "timestamp"},
          {"type": "string", "optional": True, "field": "page"}
        ],
       "optional": False,
       "name": "user_activity_schema"
      }
      payload = {
      'user_id': '1a2b3c',
      'action': 'click',
      'timestamp': '2025-09-28T15:52:00Z',
      'page': '/products/kafka-for-beginners'
      }
      message_with_schema = {
        "schema": schema,
        "payload": payload
      }
      ###########our message

      json_data = json.dumps(message_with_schema)
      producer.produce(TOPIC_NAME, json_data.encode('utf-8'),  key=payload['user_id'].encode('utf-8'), callback=callback)
    producer.flush()

except KafkaException as e:
    print(f"Failed to fetch topics: {e}")

except Exception as e:
    print(f"An unexpected error occurred: {e}")

### Consume messages from your topic

In [7]:
import sys
from confluent_kafka import Consumer, KafkaError, KafkaException


conf = {
    'bootstrap.servers': 'bootstrap.'+KAFKA_CLUSTER_NAME+'.'+REGION+'.managedkafka.'+KAFKA_PROJECT_ID+'.cloud.goog:9092',
    'client.id': 'my-topic-lister-app',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'OAUTHBEARER',
    'oauth_cb': token_provider.get_token,
    'group.id': "my_group",
    # This setting ensures the consumer starts reading from the
    # beginning of the topic if no offset is stored for its group.
    'auto.offset.reset': 'earliest'
}



consumer = Consumer(conf)

try:
    # 1. Subscribe to the topic
    consumer.subscribe([TOPIC_NAME])
    print(f"Subscribed to topic "+TOPIC_NAME+". Waiting for messages...")

    # 2. Start the infinite polling loop
    while True:
        # Poll for new messages with a 1-second timeout
        msg = consumer.poll(timeout=1.0)

        # 3. Handle the poll result
        if msg is None:
            # No message received within the timeout period
            continue

        if msg.error():
            # Handle errors
            if msg.error().code() == KafkaError._PARTITION_EOF:
                sys.stderr.write(f'%% Reached end of partition {msg.topic()} [{msg.partition()}] at offset {msg.offset()}\n')
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            # Proper message received
            key = msg.key().decode('utf-8') if msg.key() else 'NoKey'
            value = msg.value().decode('utf-8')
            print(f"Received message: key={key} value='{value}'")

except KafkaException as e:
    print(f"Failed to fetch topics: {e}")

finally:
    # 4. Close the consumer connection
    consumer.close()
    print("Consumer connection closed.")

Subscribed to topic game_signals. Waiting for messages...
Consumer connection closed.


KeyboardInterrupt: 