In [1]:
!docker compose -f clickhouse_cluster.yml down --remove-orphans -v && docker compose -f clickhouse_cluster.yml up -d

[1A[1B[0G[?25l[+] Running 1/0
 [32m✔[0m Network storage_research_default  [32mCreated[0m                               [34m0.1s [0m
 ⠋ Container zookeeper               [39mCreating[0m                              [34m0.0s [0m
[?25h[1A[1A[1A[0G[?25l[+] Running 5/2
 [32m✔[0m Network storage_research_default  [32mCreated[0m                               [34m0.1s [0m
 ⠿ Container zookeeper               [39mStarting[0m                              [34m0.1s [0m
 [32m✔[0m Container clickhouse-node2        [32mCreat...[0m                              [34m0.0s [0m
 [32m✔[0m Container clickhouse-node3        [32mCreat...[0m                              [34m0.0s [0m
 [32m✔[0m Container clickhouse-node4        [32mCreat...[0m                              [34m0.0s [0m
 [32m✔[0m Container clickhouse-node1        [32mCreat...[0m                              [34m0.0s [0m
[?25h[1A[1A[1A[1A[1A[1A[1A[0G[?25l[+] Running 5/6
 [32m✔[0m Netw

In [2]:
from asynch import connect

conn1 = await connect(
    host='127.0.0.1',
    port=9091,
    database='default',
    user='default',
    password='',
)
conn3 = await connect(
    host='127.0.0.1',
    port=9093,
    database='default',
    user='default',
    password='',
)

In [3]:
from asynch.connection import Connection
from asynch.cursors import DictCursor
from more_itertools import chunked
from typing import Iterable


async def create_table(connection: Connection, shard: int, replica: int):
    sql = [
        'CREATE DATABASE shard;',
        'CREATE DATABASE replica;',
        f"""
        CREATE TABLE shard.view_history
        (
            created_at DateTime,
            user_id UUID,
            film_id UUID,
            timestamp UInt16
        )
        Engine=ReplicatedMergeTree('/clickhouse/tables/shard{shard}/view_history', 'replica_1')
        PARTITION BY toYYYYMMDD(created_at)
        ORDER BY created_at;
        """,
        f"""
        CREATE TABLE replica.view_history
        (
            created_at DateTime,
            user_id UUID,
            film_id UUID,
            timestamp UInt16
        )
        Engine=ReplicatedMergeTree('/clickhouse/tables/shard{replica}/view_history', 'replica_2')
        PARTITION BY toYYYYMMDD(created_at)
        ORDER BY created_at;
        """,
        """
        CREATE TABLE default.view_history
        (
            created_at DateTime,
            user_id UUID,
            film_id UUID,
            timestamp UInt16
        )
        ENGINE = Distributed('company_cluster', '', view_history, rand());
        """,
    ]
    async with connection.cursor() as cursor:
        for item in sql:
            await cursor.execute(item)


async def insert_data(connection: Connection, data: Iterable[dict], chunk: int = 1000):
    sql = """
    INSERT INTO default.view_history (created_at, user_id, film_id, timestamp)
    VALUES
    """
    async with connection.cursor(cursor=DictCursor) as cursor:
        for ch in chunked(data, chunk):
            await cursor.execute(sql, ch)


async def select_data(connection: Connection):
    sql = """
    SELECT user_id, avg(timestamp) FROM view_history
    GROUP BY user_id
    """
    async with connection.cursor() as cursor:
        await cursor.execute(sql)


async def clear_table(connection: Connection):
    sql = """
    TRUNCATE TABLE view_history
    """
    async with connection.cursor() as cursor:
        await cursor.execute(sql)


In [4]:
await create_table(conn1, 1, 2)
await create_table(conn3, 2, 1)

In [5]:
from data import ViewHistoryCollection

total = 10_000_000
views = ViewHistoryCollection(total, int(total / 2), int(total / 4)).to_dict()

In [6]:
import time
from collections import defaultdict


async def bench(data: list[dict], chunk: int, n: int):
    elapsed = defaultdict(list)
    for _ in range(n):
        await clear_table(conn1)
        start_time = time.time()
        await insert_data(conn1, data, chunk)
        end_time = time.time()
        elapsed['write'].append(end_time - start_time)

        start_time = time.time()
        await select_data(conn3)
        end_time = time.time()
        elapsed['read'].append(end_time - start_time)

    return elapsed


In [7]:
totals = [1_000, 10_000, 100_000, 1_000_000, 10_000_000]
elapsed = dict()
for total in totals:
    elapsed[total] = await bench(views[: total + 1], 500, 3)


In [8]:
elapsed

{1000: defaultdict(list,
             {'write': [0.0783071517944336,
               0.0319979190826416,
               0.02812957763671875],
              'read': [0.007004976272583008,
               0.06018686294555664,
               0.0067403316497802734]}),
 10000: defaultdict(list,
             {'write': [0.29659414291381836,
               0.2880115509033203,
               0.29579997062683105],
              'read': [0.0207366943359375,
               0.015334129333496094,
               0.017328739166259766]}),
 100000: defaultdict(list,
             {'write': [3.2213289737701416,
               2.9725162982940674,
               2.968675136566162],
              'read': [0.11861944198608398,
               0.12534499168395996,
               0.13104557991027832]}),
 1000000: defaultdict(list,
             {'write': [30.350422382354736,
               30.693453073501587,
               30.72012972831726],
              'read': [0.8556039333343506,
               1.028514385223

In [9]:
await conn1.close()
await conn3.close()

In [10]:
import json

with open('result/clickhouse_cluster.json', 'w') as f:
    json.dump(elapsed, f)

In [11]:
!docker compose -f clickhouse_cluster.yml down

[1A[1B[0G[?25l[+] Running 0/0
 ⠋ Container clickhouse-node3  [39mStopping[0m                                    [34m0.0s [0m
 ⠋ Container clickhouse-node1  [39mStopping[0m                                    [34m0.0s [0m
 ⠋ Container clickhouse-node2  [39mStopping[0m                                    [34m0.0s [0m
 ⠋ Container clickhouse-node4  [39mStopping[0m                                    [34m0.0s [0m
[?25h[1A[1A[1A[1A[1A[0G[?25l[+] Running 0/4
 ⠙ Container clickhouse-node3  [39mStopping[0m                                    [34m0.1s [0m
 ⠙ Container clickhouse-node1  [39mStopping[0m                                    [34m0.1s [0m
 ⠙ Container clickhouse-node2  [39mStopping[0m                                    [34m0.1s [0m
 ⠙ Container clickhouse-node4  [39mStopping[0m                                    [34m0.1s [0m
[?25h[1A[1A[1A[1A[1A[0G[?25l[+] Running 0/4
 ⠹ Container clickhouse-node3  [39mStopping[0m                      