In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession, functions as F
import warnings
warnings.filterwarnings('ignore')

In [2]:
# Defining MinIO login credentials.
accessKeyId= "cdc-user"
secretAccessKey= "cdc-password"

In [3]:
# Creating SparkSession .
spark = SparkSession.builder \
.appName("Spark-MinIO") \
.master("spark://spark-master:7077") \
.config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.4") \
.getOrCreate()

:: loading settings :: url = jar:file:/opt/bitnami/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /opt/bitnami/spark/.ivy2/cache
The jars for the packages stored in: /opt/bitnami/spark/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c3ec0583-153a-4353-8ac1-2f5d8a7d03ca;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.4 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.4 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.5 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
downloading https://repo1.maven.org/maven2/org/apa

In [4]:
def minio_config(spark_context: SparkContext):
    spark_context._jsc.hadoopConfiguration().set('fs.s3a.access.key', accessKeyId)
    spark_context._jsc.hadoopConfiguration().set('fs.s3a.secret.key', secretAccessKey)
    spark_context._jsc.hadoopConfiguration().set('fs.s3a.path.style.access', 'true')
    spark_context._jsc.hadoopConfiguration().set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
    spark_context._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 'http://minio:9000')

In [5]:
minio_config(spark.sparkContext)

In [6]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "dbserver1.public.customers") \
  .option("startingOffsets", "latest") \
  .option("multiline","true") \
  .load()

In [7]:
df_schema = StructType([
     StructField('schema',  StringType(), True),
     StructField('payload', StringType(), True)
     ])

In [8]:
df2 = df.selectExpr("cast (value as string) as json")\
    .select(F.from_json("json", schema=df_schema).alias("data"))\
    .select("data.payload")

In [9]:
df2.printSchema()

root
 |-- payload: string (nullable = true)



In [10]:
message_schema = StructType([
     StructField('before', MapType(StringType(), StringType(), True), True),
     StructField('after', MapType(StringType(), StringType(), True), True),
     StructField('op', StringType(), True),
     StructField('ts_ms', StringType(), True)
     ]
)

In [11]:
after_fields = [
    "customerId", "customerFName", "customerLName", "customerEmail",
    "customerPassword", "customerStreet", "customerCity", "customerState", "customerZipcode"]

In [12]:
before_fields = [
    "customerId", "customerFName", "customerLName", "customerEmail",
    "customerPassword", "customerStreet", "customerCity", "customerState", "customerZipcode"]

In [13]:
df_final = df2.withColumn("payload",F.from_json(F.col("payload"), message_schema))\
.select(*[F.col("payload.before").getItem(f).alias('payload.before.'+f) for f in before_fields], \
*[F.col("payload.after").getItem(f).alias('payload.after.'+f) for f in after_fields], \
        'payload.op','payload.ts_ms')

In [14]:
df_final

DataFrame[payload.before.customerId: string, payload.before.customerFName: string, payload.before.customerLName: string, payload.before.customerEmail: string, payload.before.customerPassword: string, payload.before.customerStreet: string, payload.before.customerCity: string, payload.before.customerState: string, payload.before.customerZipcode: string, payload.after.customerId: string, payload.after.customerFName: string, payload.after.customerLName: string, payload.after.customerEmail: string, payload.after.customerPassword: string, payload.after.customerStreet: string, payload.after.customerCity: string, payload.after.customerState: string, payload.after.customerZipcode: string, op: string, ts_ms: string]

In [15]:
df_final.printSchema()

root
 |-- payload.before.customerId: string (nullable = true)
 |-- payload.before.customerFName: string (nullable = true)
 |-- payload.before.customerLName: string (nullable = true)
 |-- payload.before.customerEmail: string (nullable = true)
 |-- payload.before.customerPassword: string (nullable = true)
 |-- payload.before.customerStreet: string (nullable = true)
 |-- payload.before.customerCity: string (nullable = true)
 |-- payload.before.customerState: string (nullable = true)
 |-- payload.before.customerZipcode: string (nullable = true)
 |-- payload.after.customerId: string (nullable = true)
 |-- payload.after.customerFName: string (nullable = true)
 |-- payload.after.customerLName: string (nullable = true)
 |-- payload.after.customerEmail: string (nullable = true)
 |-- payload.after.customerPassword: string (nullable = true)
 |-- payload.after.customerStreet: string (nullable = true)
 |-- payload.after.customerCity: string (nullable = true)
 |-- payload.after.customerState: string

In [16]:
## Write to Minio with Spark Streaming

In [None]:
def write_to_multiple_sinks(df, batchId):
    # write to file
    df.write\
            .format("csv") \
            .mode("append") \
            .save("s3a://cdc-bucket/output-data")

# Sink
checkpointDir = "s3a://cdc-bucket/checkpoint/foreachBatch"
# start streaming
streamingQuery = (df_final
                  .writeStream
                  .foreachBatch(write_to_multiple_sinks)
                  .option("checkpointLocation", checkpointDir)
                  .option("failOnDataLoss", "false")
                  .option("maxRetries", 3)
                  .start())

streamingQuery.awaitTermination()

25/01/25 08:09:41 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/01/25 08:09:43 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
25/01/25 08:09:46 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


In [None]:
spark.stop()