In [1]:
import pyspark
from pyspark.sql import Column, DataFrame, SparkSession, functions
from pyspark.sql.functions import *
from py4j.java_collections import MapConverter
import shutil
import random
import threading
conf = pyspark.SparkConf()
conf.setMaster("spark://spark:7077") 

conf.set("spark.hadoop.fs.s3a.endpoint", 'http://s3:9000') \
    .set("spark.hadoop.fs.s3a.access.key", 'minio') \
    .set("spark.hadoop.fs.s3a.secret.key", 'minio123') \
    .set("spark.hadoop.fs.s3a.fast.upload", True) \
    .set("spark.hadoop.fs.s3a.path.style.access", True) \
    .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
sc = pyspark.SparkContext(conf=conf)



:: loading settings :: url = jar:file:/usr/local/spark-3.1.2-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
com.databricks#dbutils-api_2.12 added as a dependency
io.delta#delta-core_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-dab7227a-c24b-49cb-9105-dc4d668ef1ea;1.0
	confs: [default]
	found com.databricks#dbutils-api_2.12;0.0.5 in central
	found io.delta#delta-core_2.12;1.0.0 in central
	found org.antlr#antlr4;4.7 in central
	found org.antlr#antlr4-runtime;4.7 in central
	found org.antlr#antlr-runtime;3.5.2 in central
	found org.antlr#ST4;4.0.8 in central
	found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
	found org.glassfish#javax.json;1.0.4 in central
	found com.ibm.icu#icu4j;58.2 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession(sc).builder.appName("streaming").getOrCreate()
from delta.tables import *

In [3]:
from pyspark.sql.utils import AnalysisException
def get_from_raw_to_delta_stream(streamingRawDF,checkpointLocation, data_uri , partitioned_by):
      stream=streamingRawDF.writeStream \
      .format("delta") \
      .outputMode("append") \
      .partitionBy(partitioned_by) \
      .option("overwriteSchema", "true") \
      .trigger(once=True) \
      .options(ignoreDeletes=True) \
      .option("checkpointLocation", checkpointLocation).start(data_uri)
      return (stream)


def get_streaming_df(OBJECTURL, format='json'):
    if (format in ('json','csv')):
        schema=(spark.read.format('json').load(OBJECTURL).schema)
        return(spark.readStream.format(format).schema(schema).load(OBJECTURL))
    else:
        return(spark.readStream.format(format).load(OBJECTURL))
    return streamingRawDF

def getDeltaTableFromPath(path):
    deltaTable=None
    try:
        deltaTable = DeltaTable.forPath(spark, path)
    except AnalysisException as error:
        if("is not a Delta table" in str(error)):
            print('1st time we call, not yet created')
        else:
            raise(error)
    return (deltaTable)

def mergetoDF(microDF, batchId):
    print(f"inside foreachBatch for batchId{batchId}. rows passed={microDF.count()}")
    microDF=microDF.dropDuplicates(["id"])
    deltaDf.alias("t").merge(microDF.alias("s"), "s.id = t.id")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()
#table_category=raw/bronze/silver/gold
def get_uri(table_name, table_category='raw',is_checkpoint=False ,base_uri='s3a://minio-sink-bucket/'):
    uri=base_uri
    if(table_category=='raw'):
        uri=uri + 'topics/' 
    else:
        uri=uri + table_category + '/' 
        if(is_checkpoint):
            uri=uri+'checkpoint/'
        else:
            uri=uri+'data/'
    return(uri+ table_name)

def get_tables_uri (table_name):
    raw_table=get_uri(table_name)
    bronze_table=get_uri(table_name,table_category='bronze')
    bronze_table_checkpoint=get_uri(table_name,table_category='bronze',is_checkpoint=True)
    silver_table=get_uri(table_name,table_category='silver')
    silver_table_checkpoint=get_uri(table_name,table_category='silver',is_checkpoint=True)
    return(raw_table, bronze_table, bronze_table_checkpoint, silver_table,silver_table_checkpoint )

In [4]:
raw_table, bronze_table, bronze_table_checkpoint, silver_table,silver_table_checkpoint=get_tables_uri (table_name='customers')

streamingCustRawDF=get_streaming_df(raw_table)
streamingCustRawDF=streamingCustRawDF.select(col("after.id"), col("after.first_name"), col("after.last_name"), \
                             col("after.email")).withColumn('inserted', current_timestamp())
partitioned_by=['last_name', 'first_name']
stream=get_from_raw_to_delta_stream(streamingCustRawDF,checkpointLocation=bronze_table_checkpoint, \
                         data_uri=bronze_table, \
                          partitioned_by=partitioned_by)
stream.awaitTermination()

21/11/01 08:41:27 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [5]:
deltaTable=getDeltaTableFromPath(bronze_table)
print("#############  Original Delta Table ###############")
deltaTable.toDF().show()

#############  Original Delta Table ###############


                                                                                

+----+----------+---------+--------------------+--------------------+
|  id|first_name|last_name|               email|            inserted|
+----+----------+---------+--------------------+--------------------+
|1001|      Jane|  changed|sally.thomas@acme...|2021-11-01 08:41:...|
|1001|     Sally|   Thomas|sally.thomas@acme...|2021-11-01 08:41:...|
|1001|      Jane| changed1|sally.thomas@acme...|2021-11-01 08:41:...|
|1005|      John|      Doe|john.doe@example.com|2021-11-01 08:41:...|
|1002|    George|   Bailey|  gbailey@foobar.com|2021-11-01 08:41:...|
|1004|      Anne|Kretchmar|  annek@noanswer.org|2021-11-01 08:41:...|
|1003|    Edward|   Walker|       ed@walker.com|2021-11-01 08:41:...|
+----+----------+---------+--------------------+--------------------+



In [6]:
customer_stream_df=get_streaming_df(bronze_table, 'delta')

In [7]:
deltaDf = getDeltaTableFromPath(silver_table)
data_stream_writer=customer_stream_df.writeStream \
      .format("delta") \
      .outputMode("append") \
      .trigger(once=True) \
      .option("checkpointLocation", silver_table_checkpoint) 
if not deltaDf:
      print('first time, creating table')
      data_stream_writer.start(silver_table)
else:
    print('not first time, merging data')
    data_stream_writer.foreachBatch(mergetoDF).start()

1st time we call, not yet created
first time, creating table


21/11/01 08:41:51 WARN MicroBatchExecution: The read limit MaxFiles: 1000 for DeltaSource[s3a://minio-sink-bucket/bronze/data/customers] is ignored when Trigger.Once() is used.
                                                                                

In [8]:
getDeltaTableFromPath(silver_table).toDF().show()

+----+----------+---------+--------------------+--------------------+
|  id|first_name|last_name|               email|            inserted|
+----+----------+---------+--------------------+--------------------+
|1001|      Jane|  changed|sally.thomas@acme...|2021-11-01 08:41:...|
|1001|     Sally|   Thomas|sally.thomas@acme...|2021-11-01 08:41:...|
|1001|      Jane| changed1|sally.thomas@acme...|2021-11-01 08:41:...|
|1005|      John|      Doe|john.doe@example.com|2021-11-01 08:41:...|
|1002|    George|   Bailey|  gbailey@foobar.com|2021-11-01 08:41:...|
|1004|      Anne|Kretchmar|  annek@noanswer.org|2021-11-01 08:41:...|
|1003|    Edward|   Walker|       ed@walker.com|2021-11-01 08:41:...|
+----+----------+---------+--------------------+--------------------+



orders table

In [9]:
table_name='orders'
raw_table, bronze_table, bronze_table_checkpoint, silver_table,silver_table_checkpoint=get_tables_uri (table_name='orders')
streamingCustRawDF=get_streaming_df(raw_table)
streamingOrderRawDF=streamingCustRawDF.select(col("after.order_number"), col("after.order_date"), col("after.purchaser"), \
                             col("after.product_id"))
partitioned_by=['purchaser']
stream=get_from_raw_to_delta_stream(streamingOrderRawDF,checkpointLocation=bronze_table_checkpoint, \
                         data_uri=bronze_table, \
                          partitioned_by=partitioned_by)
stream.awaitTermination()

deltaDf = getDeltaTableFromPath(bronze_table)
deltaDf.toDF().show()
orderDeltaDf=None
order_stream_df=get_streaming_df(bronze_table, 'delta')
def orderMergetoDF(microDF, batchId):
    print(f"inside foreachBatch for batchId{batchId}. rows passed={microDF.count()}")
    microDF=microDF.dropDuplicates(["order_number"])
    orderDeltaDf.alias("t").merge(microDF.alias("s"), "s.order_number = t.order_number")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()
orderDeltaDf = getDeltaTableFromPath(silver_table)
data_stream_writer=order_stream_df.writeStream \
      .format("delta") \
      .outputMode("append") \
      .trigger(once=True) \
      .option("checkpointLocation", silver_table_checkpoint) 
if not orderDeltaDf:
      print('first time, creating table')
      data_stream_writer.start(silver_table)
else:
    print('not first time, merging data')
    data_stream_writer.foreachBatch(orderMergetoDF).start()     

                                                                                

+------------+----------+---------+----------+
|order_number|order_date|purchaser|product_id|
+------------+----------+---------+----------+
|       10001|     16816|     1001|       102|
|       10002|     16817|     1002|       105|
|       10003|     16850|     1002|       106|
|       10004|     16852|     1003|       107|
+------------+----------+---------+----------+

1st time we call, not yet created
first time, creating table


21/11/01 08:42:43 WARN MicroBatchExecution: The read limit MaxFiles: 1000 for DeltaSource[s3a://minio-sink-bucket/bronze/data/orders] is ignored when Trigger.Once() is used.
                                                                                

In [10]:
getDeltaTableFromPath(silver_table).toDF().show()

+------------+----------+---------+----------+
|order_number|order_date|purchaser|product_id|
+------------+----------+---------+----------+
|       10003|     16850|     1002|       106|
|       10004|     16852|     1003|       107|
|       10001|     16816|     1001|       102|
|       10002|     16817|     1002|       105|
+------------+----------+---------+----------+

