In [1]:
# https://clickhouse.com/docs/en/integrations/python
# https://clickhouse.com/docs/en/integrations/java
# https://clickhouse.com/docs/en/sql-reference/data-types
# https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
import clickhouse_connect
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import yaml
spark = SparkSession.builder\
    .appName("CSV to PostgreSQL") \
    .config("spark.jars", "clickhouse-jdbc-0.6.0.jar")\
    .getOrCreate()
print(spark.sparkContext._jsc.sc().listJars())

24/02/12 16:48:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Vector(spark://macplgah.cg.shawcable.net:63367/jars/clickhouse-jdbc-0.6.0.jar)


In [2]:
# Read the CSV file
data_file = "TimeSeries/cu/cu.data.1.Allitems"
data_file = "TimeSeries/cu/cu.data.0.Current"
df = spark.read.option("delimiter", "\t").csv(data_file, header=True, inferSchema=True)

# remove trailing spaces in column names
for column in df.columns:
    df = df.withColumnRenamed(column, column.strip())

                                                                                

In [3]:
df.printSchema()
df.show()

root
 |-- series_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- period: string (nullable = true)
 |-- value: double (nullable = true)
 |-- footnote_codes: string (nullable = true)

+-----------------+----+------+-----+--------------+
|        series_id|year|period|value|footnote_codes|
+-----------------+----+------+-----+--------------+
|CUSR0000SA0      |1997|   M01|159.4|          NULL|
|CUSR0000SA0      |1997|   M02|159.7|          NULL|
|CUSR0000SA0      |1997|   M03|159.8|          NULL|
|CUSR0000SA0      |1997|   M04|159.9|          NULL|
|CUSR0000SA0      |1997|   M05|159.9|          NULL|
|CUSR0000SA0      |1997|   M06|160.2|          NULL|
|CUSR0000SA0      |1997|   M07|160.4|          NULL|
|CUSR0000SA0      |1997|   M08|160.8|          NULL|
|CUSR0000SA0      |1997|   M09|161.2|          NULL|
|CUSR0000SA0      |1997|   M10|161.5|          NULL|
|CUSR0000SA0      |1997|   M11|161.7|          NULL|
|CUSR0000SA0      |1997|   M12|161.8|          NULL|


In [132]:
# generate SQL to create table using clickhouse connect 
mapping = {
    'string': 'String',
    'binary': 'String',  # ClickHouse stores binary as String
    'boolean': 'UInt8',  # Boolean in ClickHouse is commonly represented as UInt8
    'date': 'Date',
    'timestamp': 'DateTime',
    'double': 'Float64',
    'float': 'Float32',
    'byte': 'Int8',
    'short': 'Int16',
    'int': 'Int32',
    'long': 'Int64',
    'decimal': 'Decimal',  # May need precision and scale: Decimal(P, S)
}
# generate table column types
table_column_types = []
for field in df.schema.fields:
    null_count = df.filter(col(field.name).isNull()).count()
    name = field.name
    type = mapping[field.dataType.simpleString()]
    if null_count:
        type = f" Nullable({type})"
    column_definition = f"{name} {type}"
    table_column_types.append(column_definition)

table_name = 'bls.tbl_cu_1'
sql = f"CREATE TABLE IF NOT EXISTS {table_name}({','.join(table_column_types)}) ENGINE = MergeTree() PRIMARY KEY (series_id, year, period)"
print(sql)


CREATE TABLE IF NOT EXISTS bls.tbl_cu_1(series_id String,year Int32,period String,value Float64,footnote_codes  Nullable(String)) ENGINE = MergeTree() PRIMARY KEY (series_id, year, period)


In [138]:
client = clickhouse_connect.get_client(host='localhost', port=8123, username='default', password='', database='bls')
client.command(sql)

<clickhouse_connect.driver.summary.QuerySummary at 0x7f8f38cd7760>

In [None]:
# ETL using Spark
url = f"jdbc:clickhouse://localhost:8123/bls"
properties = {
    "user": 'default',
    "password": '',
    "driver": "com.clickhouse.jdbc.ClickHouseDriver",
}
# Write the DataFrame to PostgreSQL
df.write.jdbc(url=url, table=table_name, mode='append', properties=properties)
