In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, DoubleType, LongType

In [2]:
spark_depenedencies_jars = [
    "org.apache.hadoop:hadoop-common:3.3.1",
    "org.apache.hadoop:hadoop-client:3.3.1",
    "org.apache.hadoop:hadoop-aws:3.3.1",
    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1",
    "org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.1",
    "io.delta:delta-core_2.12:2.2.0"
]
spark_depenedencies_jars_str = ",".join(spark_depenedencies_jars)

In [3]:
SPARK_MASTER = "spark://spark-master:7077"
SPARK_MASTER_LOCAL = "local"
HIVE_METASTORE_URI_LOCAL = "thrift://localhost:9083"
HIVE_METASTORE_URI = "thrift://hive-metastore:9083"

In [4]:
KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"
BASE_CHECKPOINT_LOCATION = "s3a://warehouse/checkpoint/"
BASE_DELTA_DIR = "/home/jovyan/work/delta"
BASE_DELTA_DIR_S3 = "s3a://warehouse/delta/inventory/"

In [5]:
aws_access_key = "minio"
aws_secret_key = "minio123"
aws_endpoint = "http://minio:9000"

In [6]:
spark = SparkSession.builder \
        .appName("Kafka Streaming Example") \
        .master(SPARK_MASTER_LOCAL) \
        .config("spark.hadoop.fs.s3a.access.key", aws_access_key) \
        .config("spark.hadoop.fs.s3a.secret.key", aws_secret_key) \
        .config("spark.hadoop.fs.s3a.endpoint", aws_endpoint) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.databricks.delta.retentionDurationCheck.enabled", "false") \
        .config("spark.hadoop.hive.metastore.uris", HIVE_METASTORE_URI) \
        .config("spark.sql.catalogImplementation", "hive") \
        .config("spark.jars.packages", spark_depenedencies_jars_str) \
        .enableHiveSupport() \
        .getOrCreate()

In [7]:
def read_kafka_stream(kafka_topic, schema):
    kafka_stream = spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
                .option("subscribe", kafka_topic) \
                .option("failOnDataLoss","false") \
                .option("startingOffsets", "earliest") \
                .load()
    data_stream = kafka_stream.selectExpr("cast (value as string) as json") \
                            .select(from_json("json", schema).alias("cdc")) \
                            .select("cdc.payload.after.*", "cdc.payload.op")
    data_stream = data_stream.withColumn("curr_timestamp", current_timestamp())
    return data_stream

In [8]:
def console_print_stream(data_stream):
    console_stream = data_stream \
                .writeStream \
                .trigger(processingTime="5 seconds") \
                .outputMode("update") \
                .option("truncate", "false")\
                .format("console") \
                .start()
    return console_stream

In [9]:
def write_delta_table(data_stream, checkpoint_location, delta_dir):
    data_stream.writeStream \
                .format("delta") \
                .outputMode("append") \
                .option("checkpointLocation", checkpoint_location) \
                .start(delta_dir)

In [10]:
customers_table_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
])

orders_table_schema = StructType([
    StructField("order_number", IntegerType(), True),
    StructField("order_date", IntegerType(), True),
    StructField("purchaser", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
])

products_table_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("description", StringType(), True),
    StructField("weight", DoubleType(), True),
])

source_schema = StructType([
    StructField("version", StringType(), False),
    StructField("connector", StringType(), False),
    StructField("name", StringType(), False),
    StructField("ts_ms", LongType(), False),
    StructField("snapshot", StringType(), True),
    StructField("db", StringType(), False),
    StructField("sequence", StringType(), True),
    StructField("table", StringType(), True),
    StructField("server_id", LongType(), False),
    StructField("gtid", StringType(), True),
    StructField("file", StringType(), False),
    StructField("pos", LongType(), False),
    StructField("row", IntegerType(), False),
    StructField("thread", LongType(), True),
    StructField("query", StringType(), True),
])

transaction_schema = StructType([
    StructField("id", StringType(), False),
    StructField("total_order", LongType(), False),
    StructField("data_collection_order", LongType(), False),
])

customers_payload_schema = StructType([
    StructField("before", customers_table_schema, True),
    StructField("after", customers_table_schema, True),
    StructField("source", source_schema, True),
    StructField("op", StringType(), False),
    StructField("ts_ms", LongType(), True),
    StructField("transaction", transaction_schema, True),
])

orders_payload_schema = StructType([
    StructField("before", orders_table_schema, True),
    StructField("after", orders_table_schema, True),
    StructField("source", source_schema, True),
    StructField("op", StringType(), False),
    StructField("ts_ms", LongType(), True),
    StructField("transaction", transaction_schema, True),
])

products_payload_schema = StructType([
    StructField("before", products_table_schema, True),
    StructField("after", products_table_schema, True),
    StructField("source", source_schema, True),
    StructField("op", StringType(), False),
    StructField("ts_ms", LongType(), True),
    StructField("transaction", transaction_schema, True),
])

customers_schema = StructType([
      StructField("schema",StringType(),True),
      StructField("payload",customers_payload_schema,True),
])

orders_schema = StructType([
      StructField("schema",StringType(),True),
      StructField("payload",orders_payload_schema,True),
])

products_schema = StructType([
      StructField("schema",StringType(),True),
      StructField("payload",products_payload_schema,True),
])

In [11]:
kafka_customers_topic = "dbserver1.inventory.customers"
kafka_orders_topic = "dbserver1.inventory.orders"
kafka_products_topic = "dbserver1.inventory.products"

In [12]:
customers_checkpoint_location = BASE_CHECKPOINT_LOCATION + "customers"
orders_checkpoint_location = BASE_CHECKPOINT_LOCATION + "orders"
products_checkpoint_location = BASE_CHECKPOINT_LOCATION + "products"

In [13]:
customers_delta_dir = BASE_DELTA_DIR_S3 + "customers"
orders_delta_dir = BASE_DELTA_DIR_S3 + "orders"
products_delta_dir = BASE_DELTA_DIR_S3 + "products"

In [14]:
customers_stream = read_kafka_stream(kafka_customers_topic, customers_schema)
orders_stream = read_kafka_stream(kafka_orders_topic, orders_schema)
products_stream = read_kafka_stream(kafka_products_topic, products_schema)

In [15]:
customers_console_stream = console_print_stream(customers_stream)
orders_console_stream = console_print_stream(orders_stream)
products_console_stream = console_print_stream(products_stream)

In [16]:
write_delta_table(customers_stream, customers_checkpoint_location, customers_delta_dir)
write_delta_table(orders_stream, orders_checkpoint_location, orders_delta_dir)
write_delta_table(products_stream, products_checkpoint_location, products_delta_dir)

In [None]:
spark.streams.awaitTermination()