# GlassFlow Demo

## Resources
- Use local Kafka
- Use local ClickHouse

Run `docker compose up` from the `demos/` folder to start GlassFlow, Kafka and ClickHouse.

## Index

1. [Deduplication Problem](#problem-duplicates-in-clickhouse)
   1. [Setup ClickHouse](#setup-clickhouse)
   2. [Generate Events](#generate-events)
2. [Solution](#solution-glassflow) 
   1. [Setup ClickHouse and Kafka](#setup-clickhouse-and-kafka)
   2. [Create GlassFlow Pipeline](#create-glassflow-pipeline)
   3. [Generate Events](#generate-events)
3. [Enrich data with a join (Extra)](#extra-enrich-data-with-joins)
   1. [Setup ClickHouse and Kafka](#setup-clickhouse-and-kafka)
   2. [Create GlassFlow Pipeline](#create-glassflow-pipeline)
   3. [Generate Events](#generate-events)

## Problem: Duplicates in ClickHouse

### Setup ClickHouse

In [2]:
from clickhouse_connect import get_client
from dotenv import load_dotenv
import os

load_dotenv()

client = get_client(
    host=os.getenv('CLICKHOUSE_HOST'), 
    port=os.getenv('CLICKHOUSE_PORT'), 
    user=os.getenv('CLICKHOUSE_USERNAME'), 
    password=os.getenv('CLICKHOUSE_PASSWORD'),
    database=os.getenv('CLICKHOUSE_DATABASE'),
    secure=False
)

client.command("DROP TABLE IF EXISTS orders")
client.command(
    """CREATE TABLE IF NOT EXISTS orders 
    (
        order_id UUID, 
        user_id UUID, 
        product_id UUID, 
        quantity Int32, 
        price Float64, 
        created_at DateTime
    ) 
    ENGINE = ReplacingMergeTree
    ORDER BY order_id"""
)


<clickhouse_connect.driver.summary.QuerySummary at 0x10cafe710>

### Generate Events

In [3]:
import glassgen

orders_schema = {
    "order_id": "$uuid",
    "user_id": "$uuid",
    "product_id": "$uuid",
    "quantity": "$intrange(1, 10)",
    "price": "$price(1, 100)",
    "created_at": '$datetime(%Y-%m-%d %H:%M:%S)',
}

gen = glassgen.generate(
    config={
        "sink": {
            "type": "yield"
        },
        "generator": {
            "num_records": 100000,
            "rps": 10000,
            "event_options": {
                "duplication": {
                    "enabled": True,
                    "ratio": 0.5,
                    "key_field": "user_id",
                    "time_window": "1h"
                    
                }
            },
        },
        "schema": orders_schema
    }
)

In [65]:
import datetime as dt

columns = [
    "order_id",
    "user_id",
    "product_id",
    "quantity",
    "price",
    "created_at"
]
batch_size = 10000
batch = []
for event in gen:
    event_copy = event.copy()
    event_copy["created_at"] = event_copy["created_at"].replace('"', "")
    event_copy["created_at"] = dt.datetime.strptime(event_copy["created_at"], "%Y-%m-%d %H:%M:%S")
    event_list = [event_copy[c] for c in columns]
    batch.append(event_list)
    if len(batch) >= batch_size:
        client.insert(
            "orders",
            batch,
            column_names=columns
        )
        batch = []
if batch:
    client.insert(
        "orders",
        batch,
        column_names=columns
    )

In [66]:
total = client.command("SELECT count(*) FROM orders")
unique = client.command("SELECT count(DISTINCT order_id) FROM orders")

print(f"Total: {total}")
print(f"Unique: {unique}")
print(f"Percentage of duplicates: {100 * (total - unique) / total:.2f}%")

Total: 67514
Unique: 66667
Percentage of duplicates: 1.25%


## Solution: GlassFlow

We will showcase how to deduplicate an event stream and how to enreach it by joining the stream with another stream.


### Setup ClickHouse and Kafka
Now we need to create a table in ClickHouse and a couple topics for our events.

In [4]:
# Create ClickHouse table
client = get_client(
    host=os.getenv('CLICKHOUSE_HOST'), 
    port=os.getenv('CLICKHOUSE_PORT'), 
    user=os.getenv('CLICKHOUSE_USERNAME'), 
    password=os.getenv('CLICKHOUSE_PASSWORD'),
    database=os.getenv('CLICKHOUSE_DATABASE'),
    secure=False
)

client.command("DROP TABLE IF EXISTS orders_glassflow")
client.command(
    """CREATE TABLE IF NOT EXISTS orders_glassflow 
    (
        order_id UUID, 
        user_id UUID, 
        product_id UUID, 
        price Float64, 
        quantity Int32, 
        created_at DateTime
    ) 
    ENGINE = ReplacingMergeTree
    ORDER BY order_id"""
)

<clickhouse_connect.driver.summary.QuerySummary at 0x10d784510>

In [5]:
# Create topics
!docker exec -t kafka kafka-topics --create --topic orders --bootstrap-server localhost:9093 --partitions 1 --replication-factor 1 --command-config /etc/kafka/client.properties

Created topic orders.


In [6]:
%%bash
cd ~/Documents/code/clickhouse-etl/demos/
echo '{"order_id": "90190f44-e417-4066-b48d-babb4707e2f8","user_id": "90190f44-e417-4066-b48d-babb4707e2f8","product_id": "90190f44-e417-4066-b48d-babb4707e2f8","price": 5.99,"quantity": 1,"created_at": "2025-05-12 15:29:12"}' | \
docker compose exec -T kafka kafka-console-producer --topic orders --bootstrap-server localhost:9093 --producer.config /etc/kafka/client.properties

### Create GlassFlow Pipeline
Using the UI → http://localhost:8080

### Generate Events

1. Generate events for the user topic
2. Generate events for the preferences topic

In [84]:
num_records = 100000
rps = 10000


# Generate events
orders_config = {
    "schema": orders_schema,
    "sink": {
        "type": "kafka",
        "params": {
            "bootstrap.servers": os.getenv("KAFKA_BOOTSTRAP_SERVERS"),
            "topic": "orders",
            "security.protocol": os.getenv("KAFKA_SECURITY_PROTOCOL"),
            "sasl.mechanism": os.getenv("KAFKA_SASL_MECHANISM"),
            "sasl.username": os.getenv("KAFKA_SASL_USERNAME"),
            "sasl.password": os.getenv("KAFKA_SASL_PASSWORD"),
        },
    },
    "generator": {
        "num_records": num_records,
        "rps": rps,
        "event_options": {
            "duplication": {
                "enabled": True,
                "ratio": 0.5,
                "key_field": "order_id",
                "time_window": "1h"
            }
        },
    },
}

orders_generator_resp = glassgen.generate(
    config=orders_config
)

In [85]:
total = client.command("SELECT count(*) FROM orders_glassflow")
unique = client.command("SELECT count(DISTINCT order_id) FROM orders_glassflow")

print(f"Total: {total}")
print(f"Unique: {unique}")
print(f"Percentage of duplicates: {100 * (total - unique) / total:.2f}%")


Total: 132913
Unique: 132913
Percentage of duplicates: 0.00%


## (Extra) Enrich Data with Joins

### Setup ClickHouse and Kafka

In [8]:
client.command("DROP TABLE IF EXISTS orders_enriched")
client.command(
    """CREATE TABLE IF NOT EXISTS orders_enriched 
    (
        order_id UUID, 
        user_id UUID, 
        product_id UUID, 
        price Float64, 
        quantity Int32, 
        user_name String, 
        user_email String, 
        user_phone_number String, 
        user_address String, 
        user_city String, 
        user_zipcode String, 
        user_country String, 
        created_at DateTime
    ) 
    ENGINE = ReplacingMergeTree
    ORDER BY order_id"""
)

<clickhouse_connect.driver.summary.QuerySummary at 0x10b8ffd10>

In [7]:
!docker exec -t kafka kafka-topics --create --topic users --bootstrap-server localhost:9093 --partitions 1 --replication-factor 1 --command-config /etc/kafka/client.properties

Created topic users.


In [9]:
%%bash
cd ~/Documents/code/clickhouse-etl/demos/

echo '{"user_id": "90190f44-e417-4066-b48d-babb4707e2f8","name": "Gerald Lee","email": "ghoward@example.net","phone_number": "962-849-9130","address": "PSC 0823, Box 2668 APO AE 84257","city": "Clarkborough","zipcode": "80412","country": "Oman","created_at": "2025-05-12 15:29:12"}' | \
docker compose exec -T kafka kafka-console-producer --topic users --bootstrap-server localhost:9093 --producer.config /etc/kafka/client.properties

### Create GlassFlow Pipeline
Using the UI → http://localhost:8080

### Generate Events

In [89]:
import itertools
import uuid


class JoinEventSchema(glassgen.ConfigSchema):
    def __init__(self, schema_dict: dict, join_key: str, list_of_keys: list, **kwargs):
        fields = self._schema_dict_to_fields(schema_dict)
        super().__init__(fields=fields, **kwargs)
        self.validate()
        self._join_key = join_key
        self._join_key_select = itertools.cycle(list_of_keys)

    @property
    def join_key_select(self) -> itertools.cycle:
        return self._join_key_select

    @property
    def join_key(self) -> str:
        return self._join_key

    def _generate_record(self):
        """Generate a single record based on the schema"""
        record = super()._generate_record()
        record[self._join_key] = next(self.join_key_select)
        return record


def get_glassgen_config(topic_name, num_records, duplication_key, rps=1000):
    return {
        "sink": {
            "type": "kafka",
            "params": {
                "bootstrap.servers": os.getenv("KAFKA_BOOTSTRAP_SERVERS"),
                "topic": topic_name,
                "security.protocol": os.getenv("KAFKA_SECURITY_PROTOCOL"),
                "sasl.mechanism": os.getenv("KAFKA_SASL_MECHANISM"),
                "sasl.username": os.getenv("KAFKA_SASL_USERNAME"),
                "sasl.password": os.getenv("KAFKA_SASL_PASSWORD"),
            },
        },
        "generator": {
            "num_records": num_records,
            "rps": rps,
            "event_options": {
                "duplication": {
                    "enabled": True,
                    "ratio": 0.5,
                    "key_field": duplication_key,
                    "time_window": "1h"
                }
            },
        },
    }

right_num_records = 5000
right_rps = 10000
left_num_records = 100000
left_rps = 10000

join_keys = [str(uuid.uuid4()) for _ in range(right_num_records)]


# Generate users events
orders_schema = JoinEventSchema(
    schema_dict=orders_schema,
    join_key="user_id",
    list_of_keys=join_keys
)
orders_generator_resp = glassgen.generate(
    config=get_glassgen_config(
        num_records=left_num_records,
        rps=left_rps,
        topic_name="orders",
        duplication_key="order_id",
    ),
    schema=orders_schema,
)


# Generate preferences events
users_schema = JoinEventSchema(
    schema_dict={
        "user_id": "$uuid",
        "name": "$name",
        "email": "$email",
        "phone_number": "$phone_number",
        "address": "$address",
        "city": "$city",
        "zipcode": "$zipcode",
        "country": "$country",
        "created_at": '$datetime(%Y-%m-%d %H:%M:%S)',
    },
    join_key="user_id",
    list_of_keys=join_keys,
)
users_generator_resp = glassgen.generate(
    config=get_glassgen_config(
        num_records=right_num_records,
        rps=right_rps,
        topic_name="users",
        duplication_key="user_id",
    ),
    schema=users_schema,
)