In [68]:
import json
import orjson

from clickhouse_driver import Client
from datetime import datetime
from kafka import KafkaProducer, KafkaConsumer
from kafka import TopicPartition
from kafka.structs import OffsetAndMetadata
from pydantic import BaseModel, Field

producer = KafkaProducer(
    bootstrap_servers=f"127.0.0.1:9092"
)
consumer = KafkaConsumer(
    'film',
    bootstrap_servers=['192.168.5.35:9092'],
    security_protocol='PLAINTEXT',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='$Default',
    value_deserializer=lambda x: x.decode('utf-8')
)
client = Client(host='127.0.0.1')

In [82]:
client.execute('SHOW DATABASES')


[('_temporary_and_external_tables',), ('default',), ('movies',), ('system',)]

In [78]:
def insert_clickhouse(client: Client, data: list) -> None:
    values: str = str([i for i in data]).lstrip('[').rstrip(']')
    client.execute(
        """
        INSERT INTO movies.film (
        user_uuid, movie_id, event, created_at)  VALUES {}
        """.format(values)
    )

def etl_process(topic: str, consumer: KafkaConsumer, clickhouse_client: Client, chunk: int) -> None:
    data: list = []
    for msg in consumer:
        data.append(tuple(json.loads(msg.value).values()))
        if len(data) == chunk:
            insert_clickhouse(clickhouse_client, data)
            data.clear()
            tp = TopicPartition(topic, msg.partition)
            options = {tp: OffsetAndMetadata(msg.offset + 1, None)}
            consumer.commit(options)

def create_tables(client: Client) -> None:
    client.execute('CREATE DATABASE IF NOT EXISTS movies ON CLUSTER company_cluster;')
    client.execute(
        """CREATE TABLE IF NOT EXISTS movies.film ON CLUSTER company_cluster(
            user_uuid String,
            movie_id String,
            event Int64,
            created_at DateTime64
            ) Engine=MergeTree()
            ORDER BY created_at;
     """)

In [72]:
def orjson_dumps(v, *, default):
    return orjson.dumps(v, default=default).decode()

class JsonConfig(BaseModel):
    def toJSON(self):
        return orjson.dumps(self, default=lambda o: o.__dict__)

    class Config:
        json_loads = orjson.loads
        json_dumps = orjson_dumps

class KafkaEventMovieViewTime(JsonConfig):
    user_uuid: str
    movie_id: str
    event: int or str
    created: datetime = Field(default_factory=datetime.utcnow)

def toJSON(self):
    return orjson.dumps(self, default=lambda o: o.__dict__)

In [73]:
with open('d:\\python\\ya\\ugc_assignment\\tests\\stress_load\\fake_data_little.csv', 'r') as file:
    events = file.read().splitlines()

In [80]:
# for i in range(len(events)):
for i in range(3):
    event = json.loads(events[i])
    viewed_progress = KafkaEventMovieViewTime(
        user_uuid=event['user_uuid'],
        movie_id=event['movie_id'],
        event=event['event']
    )
    value = viewed_progress.toJSON()
    key = f'{viewed_progress.user_uuid}:{viewed_progress.movie_id}'.encode()
    producer.send(
        topic='film',
        value=value,
        key=key,
    )

b'{"user_uuid":"0031feab-8f53-412a-8f53-47098a60ac73","movie_id":"00af52ec-9345-4d66-adbe-50eb917f463a","event":309,"created":"2022-04-15T19:36:20.097067"}'
b'{"user_uuid":"0031feab-8f53-412a-8f53-47098a60ac73","movie_id":"00af52ec-9345-4d66-adbe-50eb917f463a","event":821,"created":"2022-04-15T19:36:20.097067"}'
b'{"user_uuid":"0031feab-8f53-412a-8f53-47098a60ac73","movie_id":"00af52ec-9345-4d66-adbe-50eb917f463a","event":987,"created":"2022-04-15T19:36:20.097067"}'


In [81]:
topic = 'film'
chunk = 20000
create_tables(client)
etl_process(topic, consumer, client, chunk)


--16--ConsumerRecord(topic='film', partition=0, offset=20, timestamp=1650051380097, timestamp_type=0, key=b'0031feab-8f53-412a-8f53-47098a60ac73:00af52ec-9345-4d66-adbe-50eb917f463a', value='{"user_uuid":"0031feab-8f53-412a-8f53-47098a60ac73","movie_id":"00af52ec-9345-4d66-adbe-50eb917f463a","event":309,"created":"2022-04-15T19:36:20.097067"}', headers=[], checksum=None, serialized_key_size=73, serialized_value_size=153, serialized_header_size=-1)
--16--{"user_uuid":"0031feab-8f53-412a-8f53-47098a60ac73","movie_id":"00af52ec-9345-4d66-adbe-50eb917f463a","event":309,"created":"2022-04-15T19:36:20.097067"}
--16--dict_values(['0031feab-8f53-412a-8f53-47098a60ac73', '00af52ec-9345-4d66-adbe-50eb917f463a', 309, '2022-04-15T19:36:20.097067'])
--16--ConsumerRecord(topic='film', partition=0, offset=21, timestamp=1650051380097, timestamp_type=0, key=b'0031feab-8f53-412a-8f53-47098a60ac73:00af52ec-9345-4d66-adbe-50eb917f463a', value='{"user_uuid":"0031feab-8f53-412a-8f53-47098a60ac73","movie_id"

KeyboardInterrupt: 