# Integrating ClickHouse with SageMaker

This notebook demonstrates how to run a SageMaker job which pulls data from a ClickHouse database into a job without requiring an intermediate step such as S3. This eliminates some of the overheads of exporting data to S3 then ingesting into SageMaker.

This example uses Apache Spark to accomplish this. I followed the steps at https://clickhouse.com/docs/en/integrations/apache-spark/spark-native-connector to complete this. No custom container was set up, this is using SageMaker's built-in functionality with ClickHouse's JAR files.

My ClickHouse database is set up using the NYC Taxi dataset with 20 million records. It is hosted on an EC2 instance in my VPC.


To demonstrate basic connectivity in Python, we can connect with the `clickhouse-connect` library as a test.


---

Note: Connecting and pulling data from ClickHouse into SageMaker isn't just limited to Spark. Any library which can read/query a database and has the correct connectivity will work as the connectivity is the same. SageMaker acts similar to a managed container orchestration platform here, it spins up the instances, pulls containers, runs the job and if data is to be ingested to S3, will put to S3 from local.

In [None]:
!pip install clickhouse-connect

In [None]:
import clickhouse_connect

client = clickhouse_connect.get_client(host='db.clickhouse.local', username='default', password='')

In [None]:
client.command("SELECT AVG(fare_amount) FROM trips")

With this, we now set up the PySpark code to read from ClickHouse. This sample script performs a SELECT statement in SparkSQL to get the data, prints the first 5 rows of the dataframe and then writes them to S3.

I could set up a transform or perform other processing here if required, however I have not done this. 

The below code doesn't use any of SageMaker Processing's features such as S3 for input. Spark manages the input and output process. This is done to reduce end-to-end time as I do not need to be concerned with SageMaker downloading/uploading data, instead it happens in my job.

In [None]:
%%writefile code/process.py
import pyspark
import sys
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

# create spark session
spark = SparkSession.builder.appName('ClickHouses').getOrCreate()

# set spark config. note user, password, database and host
spark.conf.set("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
spark.conf.set("spark.sql.catalog.clickhouse.host", "db.clickhouse.local")
spark.conf.set("spark.sql.catalog.clickhouse.protocol", "http")
spark.conf.set("spark.sql.catalog.clickhouse.http_port", "8123")
spark.conf.set("spark.sql.catalog.clickhouse.user", "default")
spark.conf.set("spark.sql.catalog.clickhouse.password", "")
spark.conf.set("spark.sql.catalog.clickhouse.database", "default")
spark.conf.set("spark.clickhouse.write.format", "json")

df = spark.sql("SHOW DATABASES")
df.show()

df = spark.sql("select fare_amount, pickup_date, pickup_datetime, dropoff_date, dropoff_datetime, pickup_longitude, pickup_latitude, dropoff_latitude, dropoff_longitude, passenger_count, trip_distance from clickhouse.default.trips")
df.show(5)

df.write.format('parquet').mode('Overwrite').save('s3://andjsmi-data-testing/clickout/')

With the PySpark code written, I can now create the job configuration. The below code will set up a SageMaker Spark Processing Job with the code. This job runs in a VPC as configuration as to access a resource in a VPC without internet. If using ClickHouse Cloud, then this would not be required.

In [None]:
import sagemaker
from sagemaker.spark.processing import PySparkProcessor
from sagemaker.network import NetworkConfig

network_config = NetworkConfig(
    subnets=['subnet-0761871837d46649a'],
    security_group_ids=['sg-00f312b2360612d64']
)

spark_processor = PySparkProcessor(
    base_job_name='clickspark',
    framework_version='3.3',
    role=sagemaker.get_execution_role(),
    instance_count=4,
    instance_type='ml.c5.4xlarge',
    network_config=network_config
)

Connecting to ClickHouse requires that the ClickHouse Spark Runtime and JDBC drivers are included. The below mentioned JAR files are downloaded from the Maven repository and work with this container Spark dependencies.

When running the below cells, the output will show logs for the job and the nodes that it runs on.

This process will start up the specified instances to create a Spark cluster.

In [None]:
spark_processor.run(
    submit_app="code/process.py",
    submit_jars=['code/clickhouse-spark-runtime-3.3_2.12-0.8.0.jar', 'code/clickhouse-jdbc-0.6.3-all.jar']
)