In [1]:
from typing import Any
from datetime import datetime
import uuid
import random

def batch_generator()->list[dict[str, Any]]:
    
    
    for i in range(1_000):
        print(f"Batch {i} / 1 000")
        batch = []
        for _ in range(1000):
            batch.append( {
                "user_id": uuid.uuid4(),
                "film_id": uuid.uuid4(),
                "progress_sec": random.randint(0, 1000),
                "timestamp": datetime.now()
            })
        yield batch

# CLICKHOUSE WRITE TEST

In [2]:
from clickhouse_driver import Client

In [3]:
client = Client(host="localhost")

In [4]:
client.execute('CREATE DATABASE IF NOT EXISTS ugc_analytics ON CLUSTER company_cluster;')

[('clickhouse-node3', 9000, 0, '', 3, 1),
 ('clickhouse-node2', 9000, 0, '', 2, 1),
 ('clickhouse-node4', 9000, 0, '', 1, 1),
 ('clickhouse-node1', 9000, 0, '', 0, 0)]

In [5]:
client.execute(
"""CREATE TABLE IF NOT EXISTS ugc_analytics.ugc_film_views  ON CLUSTER company_cluster (
    user_id UUID,
    film_id UUID,
    progress_sec UInt32,
    timestamp TIMESTAMP
)
Engine=MergeTree()
ORDER BY (user_id, film_id, timestamp);""")

[('clickhouse-node1', 9000, 0, '', 3, 0),
 ('clickhouse-node3', 9000, 0, '', 2, 0),
 ('clickhouse-node2', 9000, 0, '', 1, 0),
 ('clickhouse-node4', 9000, 0, '', 0, 0)]

In [6]:
import time

start = time.time()
timers = []
for data in batch_generator():
    start_timer = time.time()
    client.execute("INSERT INTO ugc_analytics.ugc_film_views (user_id, film_id, progress_sec, timestamp) VALUES", data)
    timers.append(time.time() - start_timer)
    
print(f"Average insert time: {sum(timers) / len(timers)} sec")
print(f"Elapsed: {time.time() - start} sec")

Batch 0 / 1 000
Batch 1 / 1 000
Batch 2 / 1 000
Batch 3 / 1 000
Batch 4 / 1 000
Batch 5 / 1 000
Batch 6 / 1 000
Batch 7 / 1 000
Batch 8 / 1 000
Batch 9 / 1 000
Batch 10 / 1 000
Batch 11 / 1 000
Batch 12 / 1 000
Batch 13 / 1 000
Batch 14 / 1 000
Batch 15 / 1 000
Batch 16 / 1 000
Batch 17 / 1 000
Batch 18 / 1 000
Batch 19 / 1 000
Batch 20 / 1 000
Batch 21 / 1 000
Batch 22 / 1 000
Batch 23 / 1 000
Batch 24 / 1 000
Batch 25 / 1 000
Batch 26 / 1 000
Batch 27 / 1 000
Batch 28 / 1 000
Batch 29 / 1 000
Batch 30 / 1 000
Batch 31 / 1 000
Batch 32 / 1 000
Batch 33 / 1 000
Batch 34 / 1 000
Batch 35 / 1 000
Batch 36 / 1 000
Batch 37 / 1 000
Batch 38 / 1 000
Batch 39 / 1 000
Batch 40 / 1 000
Batch 41 / 1 000
Batch 42 / 1 000
Batch 43 / 1 000
Batch 44 / 1 000
Batch 45 / 1 000
Batch 46 / 1 000
Batch 47 / 1 000
Batch 48 / 1 000
Batch 49 / 1 000
Batch 50 / 1 000
Batch 51 / 1 000
Batch 52 / 1 000
Batch 53 / 1 000
Batch 54 / 1 000
Batch 55 / 1 000
Batch 56 / 1 000
Batch 57 / 1 000
Batch 58 / 1 000
Batch 5

In [9]:
import timeit

def read_test():
    client.execute("SELECT user_id, film_id, progress_sec FROM ugc_analytics.ugc_film_views LIMIT 1000")

print(f"Average select time: {timeit.timeit(read_test, number=1000) / 1000} sec")


Average select time: 0.00768980404199101 sec


In [None]:
# simple table 4 nodes

# Insert Test:
# 1 000 batches of 1000 rows
# Average insert time: 0.01610767674446106 sec
# Elapsed: 19.779046058654785 sec

# Select Test:
# 1000 rows 1000 times
# Average select time: 0.00768980404199101 sec

# VERTICA WRITE TEST

In [10]:
import vertica_python

connection_info = {
    'host': 'localhost',
    'port': 5433,
    'user': 'dbadmin',
    'password': '',
    'database': 'docker',
    'autocommit': True,
} 

In [11]:
with vertica_python.connect(**connection_info) as connection:
    cursor = connection.cursor()  
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS views (
        user_id UUID,
        film_id UUID,
        progress_sec INTEGER,
        timestamp TIMESTAMP
    );
    """) 

In [12]:
import time

start = time.time()
timers = []
with vertica_python.connect(**connection_info) as connection: 
    for data in batch_generator():
        start_timer = time.time()
        with connection.cursor() as cursor:  
            cursor.executemany(
                "INSERT INTO views (user_id, film_id, progress_sec, timestamp) VALUES (:user_id, :film_id, :progress_sec, :timestamp);", 
                data,
                use_prepared_statements=False,
            ) 
        timers.append(time.time() - start_timer)
    
print(f"Average insert time: {sum(timers) / len(timers)} sec")
print(f"Elapsed: {time.time() - start} sec")

Batch 0 / 1 000
Batch 1 / 1 000
Batch 2 / 1 000
Batch 3 / 1 000
Batch 4 / 1 000
Batch 5 / 1 000
Batch 6 / 1 000
Batch 7 / 1 000
Batch 8 / 1 000
Batch 9 / 1 000
Batch 10 / 1 000
Batch 11 / 1 000
Batch 12 / 1 000
Batch 13 / 1 000
Batch 14 / 1 000
Batch 15 / 1 000
Batch 16 / 1 000
Batch 17 / 1 000
Batch 18 / 1 000
Batch 19 / 1 000
Batch 20 / 1 000
Batch 21 / 1 000
Batch 22 / 1 000
Batch 23 / 1 000
Batch 24 / 1 000
Batch 25 / 1 000
Batch 26 / 1 000
Batch 27 / 1 000
Batch 28 / 1 000
Batch 29 / 1 000
Batch 30 / 1 000
Batch 31 / 1 000
Batch 32 / 1 000
Batch 33 / 1 000
Batch 34 / 1 000
Batch 35 / 1 000
Batch 36 / 1 000
Batch 37 / 1 000
Batch 38 / 1 000
Batch 39 / 1 000
Batch 40 / 1 000
Batch 41 / 1 000
Batch 42 / 1 000
Batch 43 / 1 000
Batch 44 / 1 000
Batch 45 / 1 000
Batch 46 / 1 000
Batch 47 / 1 000
Batch 48 / 1 000
Batch 49 / 1 000
Batch 50 / 1 000
Batch 51 / 1 000
Batch 52 / 1 000
Batch 53 / 1 000
Batch 54 / 1 000
Batch 55 / 1 000
Batch 56 / 1 000
Batch 57 / 1 000
Batch 58 / 1 000
Batch 5

In [14]:
import timeit

def vertica_read_test():
    with vertica_python.connect(**connection_info) as connection:
        cursor = connection.cursor()  
        cursor.execute("SELECT user_id, film_id, progress_sec FROM views LIMIT 1000") 
        cursor.fetchall()

print(f"Average select time: {timeit.timeit(vertica_read_test, number=1000) / 1000} sec")



Failed to connect to localhost:9000
Traceback (most recent call last):
  File "/Users/nikitazigman/Library/Caches/pypoetry/virtualenvs/practicum-3sVUemQi-py3.11/lib/python3.11/site-packages/clickhouse_driver/connection.py", line 395, in connect
    return self._init_connection(host, port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nikitazigman/Library/Caches/pypoetry/virtualenvs/practicum-3sVUemQi-py3.11/lib/python3.11/site-packages/clickhouse_driver/connection.py", line 325, in _init_connection
    self.socket = self._create_socket(host, port)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nikitazigman/Library/Caches/pypoetry/virtualenvs/practicum-3sVUemQi-py3.11/lib/python3.11/site-packages/clickhouse_driver/connection.py", line 297, in _create_socket
    raise err
  File "/Users/nikitazigman/Library/Caches/pypoetry/virtualenvs/practicum-3sVUemQi-py3.11/lib/python3.11/site-packages/clickhouse_driver/connection.py", line 288, in _create_socket
    so

NetworkError: Code: 210. Connection refused (localhost:9000)

In [None]:
# Vertica jbfavre/vertica:latest

# Insert Test:
# Average insert time: 0.05144338369369507 sec
# Elapsed: 56.20524001121521 sec

# Select Test:
# 1000 rows 1000 times
