In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder \
    .appName("StreamToTable") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.4,org.apache.kafka:kafka-clients:3.2.0,org.postgresql:postgresql:42.7.4,com.datastax.spark:spark-cassandra-connector_2.13:3.5.1,ru.yandex.clickhouse:clickhouse-jdbc:0.3.2") \
    .getOrCreate()
    #.config("spark.cassandra.connection.host", "scylla") \
    #.config("spark.cassandra.connection.port", "9042") \
    

In [3]:
print("Spark version:", spark.version)
print("Spark JARs:", spark.sparkContext._jsc.sc().jars())

Spark version: 3.5.4
Spark JARs: ArraySeq(file:///home/jovyan/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.13-3.5.4.jar, file:///home/jovyan/.ivy2/jars/org.postgresql_postgresql-42.7.4.jar, file:///home/jovyan/.ivy2/jars/com.datastax.spark_spark-cassandra-connector_2.13-3.5.1.jar, file:///home/jovyan/.ivy2/jars/ru.yandex.clickhouse_clickhouse-jdbc-0.3.2.jar, file:///home/jovyan/.ivy2/jars/org.apache.spark_spark-token-provider-kafka-0-10_2.13-3.5.4.jar, file:///home/jovyan/.ivy2/jars/org.scala-lang.modules_scala-parallel-collections_2.13-1.0.4.jar, file:///home/jovyan/.ivy2/jars/org.apache.kafka_kafka-clients-3.4.1.jar, file:///home/jovyan/.ivy2/jars/com.google.code.findbugs_jsr305-3.0.0.jar, file:///home/jovyan/.ivy2/jars/org.apache.commons_commons-pool2-2.11.1.jar, file:///home/jovyan/.ivy2/jars/org.apache.hadoop_hadoop-client-runtime-3.3.4.jar, file:///home/jovyan/.ivy2/jars/org.lz4_lz4-java-1.8.0.jar, file:///home/jovyan/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.10.5.jar, f

In [4]:
scala_version = spark.sparkContext._gateway.jvm.scala.util.Properties.versionString()
print("Scala Version:", scala_version)

Scala Version: version 2.13.8


In [5]:
# Define a function to write each batch of data to PostgreSQL
def write_to_postgres(batch_df, batch_id):
    batch_df.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/mydatabase") \
        .option("dbtable", "t1") \
        .option("user", "postgres") \
        .option("password", "postgres") \
        .option("driver", "org.postgresql.Driver") \
        .mode("append") \
        .save()


In [6]:
def write_to_scylladb(batch_df, batch_id):
    try:
        print(f"Writing batch {batch_id} to ScyllaDB")
        batch_df.write \
            .format("org.apache.spark.sql.cassandra") \
            .options(keyspace="keyspace_dev", table="fin_trans_table") \
            .mode("append") \
            .save()
        print(f"Batch {batch_id} written successfully")
    except Exception as e:
        print(f"Error writing batch {batch_id} to ScyllaDB: {e}")
        raise e  # Fail the stream if the batch fails

In [7]:
# Function to write micro-batches to ClickHouse
def write_to_clickhouse(batch_df, batch_id):
    try:
        print(f"Writing batch {batch_id} to ClickHouse")
        batch_df.write \
            .format("jdbc") \
            .option("url", "jdbc:clickhouse://clickhouse:8123/testdb1") \
            .option("dbtable", "testdb1.fin_trans_table") \
            .option("user", "click") \
            .option("password", "click") \
            .option("driver", "ru.yandex.clickhouse.ClickHouseDriver") \
            .mode("append") \
            .save()
        print(f"Batch {batch_id} written successfully")
    except Exception as e:
        print(f"Error writing batch {batch_id} to ClickHouse: {e}")
        raise e

In [8]:

df = spark\
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "kafka:9092") \
      .option("subscribe", "fin_trans_topic") \
      .option("startingOffsets", "earliest") \
      .option("failOnDataLoss", "false") \
      .load()
      
'''
query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream \
    .format("console") \
    .option("checkpointLocation", "/home/jovyan/work/book_data/") \
    .start()
'''

# Select and cast key and value columns to string
query_df = df.selectExpr("CAST(key AS STRING) AS kafka_key", "CAST(value AS STRING) AS kafka_value")

query_df.printSchema()


# Write the streaming data to PostgreSQL using foreachBatch
query = query_df.writeStream \
    .outputMode("append") \
    .foreachBatch(write_to_clickhouse) \
    .option("checkpointLocation", "/home/jovyan/work/_spark_metadata/") \
    .start()

query.awaitTermination()



root
 |-- kafka_key: string (nullable = true)
 |-- kafka_value: string (nullable = true)

Writing batch 1065 to ClickHouse
Batch 1065 written successfully
Writing batch 1066 to ClickHouse
Batch 1066 written successfully
Writing batch 1067 to ClickHouse
Batch 1067 written successfully
Writing batch 1068 to ClickHouse
Batch 1068 written successfully
Writing batch 1069 to ClickHouse
Batch 1069 written successfully


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.12/socket.py", line 720, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

Writing batch 1070 to ClickHouse
Batch 1070 written successfully
Writing batch 1071 to ClickHouse
Batch 1071 written successfully
Writing batch 1072 to ClickHouse
Batch 1072 written successfully
Writing batch 1073 to ClickHouse
Batch 1073 written successfully
Writing batch 1074 to ClickHouse
Batch 1074 written successfully
Writing batch 1075 to ClickHouse
Batch 1075 written successfully
