<img src = "https://github.com/VeryFatBoy/notebooks/blob/main/common/images/img_github_singlestore-jupyter_featured_2.png?raw=true">

<div id="singlestore-header" style="display: flex; background-color: rgba(235, 249, 245, 0.25); padding: 5px;">
    <div id="icon-image" style="width: 90px; height: 90px;">
        <img width="100%" height="100%" src="https://raw.githubusercontent.com/singlestore-labs/spaces-notebooks/master/common/images/header-icons/browser.png" />
    </div>
    <div id="text" style="padding: 5px; margin-left: 10px;">
        <div id="badge" style="display: inline-block; background-color: rgba(0, 0, 0, 0.15); border-radius: 4px; padding: 4px 8px; align-items: center; margin-top: 6px; margin-bottom: -2px; font-size: 80%">SingleStore Notebooks</div>
        <h1 style="font-weight: 500; margin: 8px 0 0 4px;">Streaming data from MongoDB Atlas to SingleStore Kai using Kafka and CDC</h1>
    </div>
</div>

In [6]:
!pip cache purge --quiet

[0m

In [7]:
!pip install pymongo kafka-python --quiet

In [8]:
from kafka import KafkaConsumer
from pymongo import MongoClient

In [9]:
try:
    client = MongoClient("mongodb+srv://<username>:<password>@<host>/?retryWrites=true&w=majority")
    db = client["adtech"]
    client.drop_database(db)
    print("Connected to MongoDB successfully")
except Exception as e:
    print(f"Could not connect to MongoDB: '{e}'")

Connected to MongoDB successfully


In [10]:
try:
    consumer = KafkaConsumer(
        "ad_events",
        bootstrap_servers = ["public-kafka.memcompute.com:9092"]
    )
    print("Connected to Kafka consumer successfully")
except Exception as e:
    print(f"Could not connect to Kafka: '{e}'")

Connected to Kafka consumer successfully


In [11]:
MAX_ITERATIONS = 100
BATCH_SIZE = 10

buffer = []

for iteration, message in enumerate(consumer, start = 1):
    if iteration > MAX_ITERATIONS:
        break

    try:
        record = message.value.decode("utf-8")
        fields = list(map(str.strip, record.split("\t")))

        if len(fields) == 9:
            user_id, event_name, advertiser, campaign, gender, income, page_url, region, country = fields

            events_record = {
                "user_id": int(user_id),
                "event_name": event_name,
                "advertiser": advertiser,
                "campaign": int(campaign.split()[0]),
                "gender": gender,
                "income": income,
                "page_url": page_url,
                "region": region,
                "country": country
            }

            buffer.append(events_record)

        if len(buffer) >= BATCH_SIZE:
            db.events.insert_many(buffer)
            buffer.clear()

    except Exception as e:
        print(f"Iteration {iteration}: Could not process data - {str(e)}")

if buffer:
    db.events.insert_many(buffer)