In [None]:
import pyspark
from pyspark.sql import Column, DataFrame, SparkSession, functions
from pyspark.sql.functions import *
from py4j.java_collections import MapConverter
import shutil
import random
import threading
conf = pyspark.SparkConf()
conf.setMaster("spark://spark:7077") 

conf.set("spark.hadoop.fs.s3a.endpoint", 'http://minio:9000') \
    .set("spark.hadoop.fs.s3a.access.key", 'minio_access_key') \
    .set("spark.hadoop.fs.s3a.secret.key", 'minio_secret_key') \
    .set("spark.hadoop.fs.s3a.fast.upload", True) \
    .set("spark.hadoop.fs.s3a.path.style.access", True) \
    .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
sc = pyspark.SparkContext(conf=conf)

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession(sc).builder.appName("streaming").getOrCreate()
from delta.tables import *

In [None]:
from pyspark.sql.utils import AnalysisException

##start stream from streaming data frame
def get_from_raw_to_delta_stream(streamingRawDF,checkpointLocation, data_uri , partitioned_by):
      stream=streamingRawDF.writeStream \
      .format("delta") \
      .outputMode("append") \
      .partitionBy(partitioned_by) \
      .option("overwriteSchema", "true") \
      .trigger(once=True) \
      .options(ignoreDeletes=True) \
      .option("checkpointLocation", checkpointLocation).start(data_uri)
      return (stream)

##read stream
def get_streaming_df(OBJECTURL, format='json'):
    if (format in ('json','csv')):
        schema=(spark.read.format('json').load(OBJECTURL).schema)
        return(spark.readStream.format(format).schema(schema).load(OBJECTURL))
    else:
        return(spark.readStream.format(format).load(OBJECTURL))
    return streamingRawDF
##define delta table according to path
def getDeltaTableFromPath(path):
    deltaTable=None
    try:
        deltaTable = DeltaTable.forPath(spark, path)
    except AnalysisException as error:
        if("is not a Delta table" in str(error)):
            print('1st time we call, not yet created')
        else:
            raise(error)
    return (deltaTable)
## upsert (insert if new, update if already exist)
def mergetoDF(microDF, batchId):
    print(f"inside foreachBatch for batchId{batchId}. rows passed={microDF.count()}")
    microDF=microDF.dropDuplicates(["id"])
    deltaDf.alias("t").merge(microDF.alias("s"), "s.id = t.id")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()
#generate the path of the table
#table_category=raw/bronze/silver/gold
def get_uri(table_name, table_category='raw',is_checkpoint=False ,base_uri='s3a://minio-sink-bucket/'):
    uri=base_uri
    if(table_category=='raw'):
        uri=uri + 'topics/' 
    else:
        uri=uri + table_category + '/' 
        if(is_checkpoint):
            uri=uri+'checkpoint/'
        else:
            uri=uri+'data/'
    return(uri+ table_name)
#get the topic rellevant tables
def get_tables_uri (table_name):
    raw_table=get_uri(table_name)
    bronze_table=get_uri(table_name,table_category='bronze')
    bronze_table_checkpoint=get_uri(table_name,table_category='bronze',is_checkpoint=True)
    silver_table=get_uri(table_name,table_category='silver')
    silver_table_checkpoint=get_uri(table_name,table_category='silver',is_checkpoint=True)
    return(raw_table, bronze_table, bronze_table_checkpoint, silver_table,silver_table_checkpoint )

#in order to use with hive, we need to generate manifest
def generateManifest(table_url='s3a://minio-sink-bucket/silver/data/customers'):
    deltaTable = getDeltaTableFromPath(table_url)
    deltaTable.generate("symlink_format_manifest")

In [None]:
raw_table, bronze_table, bronze_table_checkpoint, silver_table,silver_table_checkpoint=get_tables_uri (table_name='customers')

streamingCustRawDF=get_streaming_df(raw_table)
streamingCustRawDF=streamingCustRawDF.select(col("after.id"), col("after.first_name"), col("after.last_name"), \
                             col("after.email")).withColumn('inserted', current_timestamp())
partitioned_by=['last_name', 'first_name']
stream=get_from_raw_to_delta_stream(streamingCustRawDF,checkpointLocation=bronze_table_checkpoint, \
                         data_uri=bronze_table, \
                          partitioned_by=partitioned_by)
stream.awaitTermination()

In [None]:
deltaTable=getDeltaTableFromPath(bronze_table)
print("#############  Original Delta Table ###############")
deltaTable.toDF().show()

In [None]:
customer_stream_df=get_streaming_df(bronze_table, 'delta')

In [None]:
deltaDf = getDeltaTableFromPath(silver_table)
data_stream_writer=customer_stream_df.writeStream \
      .format("delta") \
      .outputMode("append") \
      .trigger(once=True) \
      .option("checkpointLocation", silver_table_checkpoint) 
if not deltaDf:
      print('first time, creating table')
      data_stream_writer.start(silver_table)
else:
    print('not first time, merging data')
    data_stream_writer.foreachBatch(mergetoDF).start()
generateManifest(table_url=silver_table)

orders table

In [None]:
table_name='orders'
raw_table, bronze_table, bronze_table_checkpoint, silver_table,silver_table_checkpoint=get_tables_uri (table_name='orders')
streamingCustRawDF=get_streaming_df(raw_table)
streamingOrderRawDF=streamingCustRawDF.select(col("after.order_number"), col("after.order_date"), col("after.purchaser"), \
                             col("after.product_id"))
partitioned_by=['purchaser']
stream=get_from_raw_to_delta_stream(streamingOrderRawDF,checkpointLocation=bronze_table_checkpoint, \
                         data_uri=bronze_table, \
                          partitioned_by=partitioned_by)
stream.awaitTermination()

deltaDf = getDeltaTableFromPath(bronze_table)
deltaDf.toDF().show()
orderDeltaDf=None
order_stream_df=get_streaming_df(bronze_table, 'delta')
def orderMergetoDF(microDF, batchId):
    print(f"inside foreachBatch for batchId{batchId}. rows passed={microDF.count()}")
    microDF=microDF.dropDuplicates(["order_number"])
    orderDeltaDf.alias("t").merge(microDF.alias("s"), "s.order_number = t.order_number")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()
orderDeltaDf = getDeltaTableFromPath(silver_table)
data_stream_writer=order_stream_df.writeStream \
      .format("delta") \
      .outputMode("append") \
      .trigger(once=True) \
      .option("checkpointLocation", silver_table_checkpoint) 
if not orderDeltaDf:
      print('first time, creating table')
      data_stream_writer.start(silver_table)
else:
    print('not first time, merging data')
    data_stream_writer.foreachBatch(orderMergetoDF).start()     

In [None]:
generateManifest(table_url=silver_table)