In [63]:
import json
import time
from typing import Optional, Text

import pandas as pd
from clickhouse_driver import Client
from pydantic import BaseModel, ValidationError

In [64]:
client = Client(host="0.0.0.0")

In [65]:
client.execute("CREATE DATABASE IF NOT EXISTS test_db")

[]

In [66]:
client.execute("CREATE TABLE IF NOT EXISTS test_db.test_table (Id UInt64, EmployeeName TEXT, JobTitle TEXT, \
                BasePay Float64, OvertimePay Float64, OtherPay Float64, Benefits String, TotalPay Float64, \
                TotalPayBenefits Float64, Year UInt32, Notes String, Agency String, Status String) Engine=MergeTree() ORDER BY Id PRIMARY KEY Id")

[]

In [67]:
df = pd.read_csv("./data/sf_salaries.csv", delimiter=",", encoding="utf-8", low_memory=False)

In [68]:
class Data(BaseModel):
    Id: int
    EmployeeName: Optional[Text]
    JobTitle: Optional[Text]
    BasePay: Optional[float]
    OvertimePay: Optional[float]
    OtherPay: Optional[float]
    Benefits: Optional[str]
    TotalPay: Optional[float]
    TotalPayBenefits: Optional[float]
    Year: Optional[int]
    Notes: Optional[str]
    Agency: Optional[str]
    Status: Optional[str]

In [69]:
payload = json.loads(df.to_json(orient="records"))

In [70]:
for i in payload:
    try:
        d = Data(**i)
        client.execute(
            "INSERT INTO test_db.test_table (Id, EmployeeName, JobTitle, BasePay, OvertimePay, OtherPay, Benefits, TotalPay, TotalPayBenefits, Year, Agency, Status) \
            VALUES (%(Id)s, %(EmployeeName)s, %(JobTitle)s, %(BasePay)s, %(OvertimePay)s, %(OtherPay)s, %(Benefits)s, %(TotalPay)s, %(TotalPayBenefits)s, %(Year)s, %(Agency)s, %(Status)s)",
            {
                "Id": d.Id,
                "EmployeeName": d.EmployeeName,
                "JobTitle": d.JobTitle,
                "BasePay": d.BasePay,
                "OvertimePay": d.OvertimePay,
                "OtherPay": d.OtherPay,
                "Benefits": d.Benefits,
                "TotalPay": d.TotalPay,
                "TotalPayBenefits": d.TotalPayBenefits,
                "Year": d.Year,
                "Agency": d.Agency,
                "Status": d.Status
            }
        )
    except ValidationError:
        pass

In [80]:
start = time.time()
client.execute("SELECT t.* FROM test_db.test_table t LIMIT 1")
end = time.time()
print("Elapsed:", (end - start)*1000, "ms.")

Elapsed: 14.184236526489258 ms.


In [81]:
start = time.time()
result_1 = client.execute("SELECT t.* FROM test_db.test_table t WHERE JobTitle='NURSE MANAGER'")
end = time.time()
print("Elapsed:", (end - start)*1000, "ms.")

Elapsed: 25.981426239013672 ms.
