## Benchmark: ClickHouse Vs. InfluxDB Vs. Postgresql Vs. Parquet 

-----

#### How to use:
* Rename the file "properties-model.ini" to "properties.ini"
* Fill with your own credentials
----

The proposal of this work is to compare the speed in read/writing a midle level of data ( a dataset with 9 columns and 50.000 lines) to four diferent databases:
* ClickHouse
* InfluxDB
* Postgresql
* Parquet (in a S3 Minio Storage)
* DuckDB with Polars
* MongoDB
* Kdb+

 - [ ] Clickhouse read
 
Deve-se relevar:
é uma "cold-storage" ou  "frezze-storage"
influxdb: alta leitura etem a vantagem da indexaçõa para viizualização de dados em gráficos

notas: 
* comparar tamanho do csv com parquet

### Imports 

In [74]:
import configparser
from datetime import datetime

import influxdb_client
import pandas as pd
from clickhouse_driver import Client
from dotenv import load_dotenv
from minio import Minio
from pymongo import MongoClient
from pytz import timezone
from sqlalchemy import create_engine

load_dotenv()


# import io
# import time
# import numpy as np
# import clickhouse_connect
# pip install python-dotenv
# import psycopg2
# import os
# import pyarrow as pa
# import pyarrow.parquet as pq
# import s3fs
# from friendly.jupyter import Friendly
# from minio.error import S3Error
# from pyarrow import Table
# import os
# from influxdb_client import InfluxDBClient, Point, WritePrecision
# from influxdb_client.client.write_api import SYNCHRONOUS
# Friendly.dark()

False

In [None]:
# teset

In [None]:
# Variables
dbname = "EURUSDtest"

In [None]:
arq = configparser.RawConfigParser()
arq.read("properties.ini")
ClickHouseUser = arq.get("CLICKHOUSE", "user")
ClickHouseKey = arq.get("CLICKHOUSE", "key")
ClickHouseUrl = arq.get("CLICKHOUSE", "url")

InfluxDBUser = arq.get("INFLUXDB", "user")
InfluxDBKey = arq.get("INFLUXDB", "key")
InfluxDBUrl = arq.get("INFLUXDB", "url")
InfluxDBBucket = arq.get("INFLUXDB", "bucket")

PostgresqlUser = arq.get("POSTGRESQL", "user")
PostgresqlKey = arq.get("POSTGRESQL", "key")
PostgresqlUrl = arq.get("POSTGRESQL", "url")
PostgresqlDB = arq.get("POSTGRESQL", "database")

S3MinioUser = arq.get("S3MINIO", "user")
S3MinioKey = arq.get("S3MINIO", "key")
S3MinioUrl = arq.get("S3MINIO", "url")
S3MinioRegion = arq.get("S3MINIO", "region")

MongoUser = arq.get("MONGODB", "user")
MongoKey = arq.get("MONGODB", "key")
MongoUrl = arq.get("MONGODB", "url")

In [None]:
%%time
# Load Dataset
df = pd.read_csv("out.csv", index_col=0)

In [None]:
# df.head()

In [None]:
df["from"] = pd.to_datetime(df["from"], unit="s")
df["to"] = pd.to_datetime(df["to"], unit="s")
# Optional use when not transoformed yet
# Transform Datetime

#### Funçoes

-> Class

In [None]:
def timestamp2dataHora(x, timezone_="America/Sao_Paulo"):
    d = datetime.fromtimestamp(x, tz=timezone(timezone_))
    return d

### ClickHouse

In [None]:
# !! O client oficial usa um driver http, nesse exemplo vamos usar a biblioteca
# de terceirtos clickhouse_driver recomendada, por sua vez que usa tcp.
client = Client(
    host=ClickHouseUrl,
    user=ClickHouseUser,
    password=ClickHouseKey,
    settings={"use_numpy": True},
)

In [None]:
# Create Tables in ClickHouse
# !! ALTERAR TIPOS !!
# ENGINE: 'Memory' desaparece quando server é reiniciado
client.execute(
    "CREATE TABLE IF NOT EXISTS {} (id UInt32,"
    "from DateTime, at UInt64, to DateTime, open Float64,"
    "close Float64, min Float64, max  Float64, volume UInt32)"
    "ENGINE MergeTree ORDER BY to".format(dbname)
)

In [None]:
%%time
# Write dataframe to db
client.insert_dataframe("INSERT INTO {} VALUES".format(dbname), df)

In [None]:
%%time
client.query_dataframe("SELECT * FROM default.{}".format(dbname))  # LIMIT 10000

In [None]:
%%time
df = pd.DataFrame(client.query_dataframe("SELECT * FROM default.{}".format(dbname)))

### InfluxDB


In [None]:
client = influxdb_client.InfluxDBClient(
    url=InfluxDBUrl, token=InfluxDBKey, org=InfluxDBUser
)

In [None]:
# Read data from CSV without index and parse 'TimeStamp' as date.
df = pd.read_csv("out.csv", sep=",", index_col=False, parse_dates=["from"])
# Set 'TimeStamp' field as index of dataframe # test another indexs
df.set_index("from", inplace=True)

In [None]:
df.head()

In [None]:
%%time
# gravando... demorou... mas deu certo
with client.write_api() as writer:
    writer.write(
        bucket=InfluxDBBucket,
        record=df,
        data_frame_measurement_name="id",
        data_frame_tag_columns=["volume"],
    )

In [None]:
# data
#   |> pivot(
#     rowKey:["_time"],
#     columnKey: ["_field"],
#     valueColumn: "_value"
#   )

In [None]:
# Read

### Postgresql

In [None]:
# Connect / Create Tables
engine = create_engine(
    "postgresql+psycopg2://{}:{}@{}:5432/{}".format(
        PostgresqlUser, PostgresqlKey, PostgresqlUrl, PostgresqlDB
    )
)

In [None]:
# Drop old table and create new empty table
df.head(0).to_sql("comparedbs", engine, if_exists="replace", index=False)

In [None]:
%%time
# Write
conn = engine.raw_connection()
cur = conn.cursor()
output = io.StringIO()
df.to_csv(output, sep="\t", header=False, index=False)
output.seek(0)
contents = output.getvalue()

cur.copy_from(output, "comparedbs")  # , null="")  # null values become ''
conn.commit()
cur.close()
conn.close()

In [None]:
# Read

### S3 Parquet

In [72]:
# fazer sem funçao para ver se melhora
# verifique se esta no ssd os arquivos da pasta git
def main():
    client = Minio(
        S3MinioUrl,
        secure=False,
        region=S3MinioRegion,
        access_key="MatMPA7NyHltz7DQ",
        secret_key="SO1IG5iBPSjNPZanYUaHCLcoSbjphLCP",
    )

    # Make bucket if not exist.
    found = client.bucket_exists("data")
    if not found:
        client.make_bucket("data")
    else:
        print("Bucket 'data' already exists")

    # Upload
    client.fput_object(
        "data",
        "data.parquet",
        "data/data.parquet",
    )
    # print(
    #     "'data/data.parquet' is successfully uploaded as "
    #     "object 'data.parquet' to bucket 'data'."
    # )

In [73]:
%%time
df.to_parquet("data/data.parquet")
if __name__ == "__main__":
    try:
        main()
    except S3Error as exc:
        print("error occurred.", exc)

Bucket 'data' already exists
CPU times: user 610 ms, sys: 133 ms, total: 743 ms
Wall time: 4.05 s


In [71]:
pq = pd.read_parquet("data/data.parquet", engine="pyarrow")
pq.head()

Unnamed: 0_level_0,Unnamed: 0,id,at,to,open,close,min,max,volume
from,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
2023-01-02 15:58:45,0,7730801,1672675140000000000,2023-01-02 15:59:00,1.065995,1.066035,1.06593,1.06607,57
2023-01-02 15:59:00,1,7730802,1672675155000000000,2023-01-02 15:59:15,1.066055,1.066085,1.066005,1.066115,52
2023-01-02 15:59:15,2,7730803,1672675170000000000,2023-01-02 15:59:30,1.06608,1.066025,1.066025,1.06611,57
2023-01-02 15:59:30,3,7730804,1672675185000000000,2023-01-02 15:59:45,1.06598,1.065985,1.065885,1.066045,64
2023-01-02 15:59:45,4,7730805,1672675200000000000,2023-01-02 16:00:00,1.065975,1.066055,1.06583,1.066055,50


### MongoDB

In [None]:
client = MongoClient(MongoUrl);

In [None]:
DB = client["collection_name"]

In [None]:
db = client["test"]

### DuckDB

### Kdb+