In [1]:
%%sql
DROP TABLE IF EXISTS ExchangeRates;
DROP TABLE IF EXISTS StocksProcessed;
DROP TABLE IF EXISTS Stocks;

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 4, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

# Preparing the data:
1. Upload Parquet file to unmanaged section
2. Read into dataframe and write into table



In [2]:
df = spark.read.format("parquet").load("Files/Landing/PARQUET/Stocks/").dropDuplicates(['symbol','seq'])
df.write.format('delta').mode('overwrite').saveAsTable('Stocks')

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 6, Finished, Available)

**Explore the data**

In [3]:
%%sql
select * from Stocks where symbol='NSFT' order by symbol,time,seq limit 100

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 7, Finished, Available)

<Spark SQL result set with 100 rows and 19 fields>

**Create and populate lookup table**

In [4]:
%%sql
DROP TABLE IF EXISTS ExchangeRates;

CREATE TABLE ExchangeRates
(Currency STRING,Rate FLOAT)
USING DELTA;

INSERT INTO ExchangeRates VALUES('USD',1.35)

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 10, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

Reading lookup table

In [5]:
dfr=spark.table('ExchangeRates')
display(dfr)

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 11, Finished, Available)

SynapseWidget(Synapse.DataFrame, e57256e7-5acb-428e-947f-35a861c73895)

**Applying sample transformations and writing historical data into destination table**

In [6]:
spark.read.format("delta")\
    .table("Stocks")\
    .join(dfr,'Currency')\
    .selectExpr('symbol','cast(bidPrice as float)','cast(time as timestamp)','seq','Currency','Round(bidPrice*Rate,2) as bidPrice_CAD')\
    .write\
    .format('delta')\
    .mode('overwrite')\
    .saveAsTable('StocksProcessed')
display(spark.table('StocksProcessed').orderBy('symbol','time','seq'))    

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 12, Finished, Available)

SynapseWidget(Synapse.DataFrame, abe91d52-1ab0-412b-a203-0ea264ad32d0)

# Applying Change Data Feed
**Check out this video to learn more about Change Data Feed and Time Travel:** https://youtu.be/XGVvEYor14g

In [7]:
%%sql
ALTER TABLE Stocks SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 13, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

**Get the min table version that CDF is valid from**

In [8]:
%%sql
DESCRIBE HISTORY Stocks

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 14, Finished, Available)

<Spark SQL result set with 2 rows and 15 fields>

In [9]:
cdfStVersion=1

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 15, Finished, Available)

**Reviewing CDF in batch format**

In [10]:
import pyspark.sql.functions as F

dfc=spark.read.format("delta")\
    .option("readChangeFeed", "true")\
    .option("startingVersion", cdfStVersion) \
    .table("Stocks")\
    .orderBy(F.col("_commit_version").desc())
display(dfc)

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 16, Finished, Available)

SynapseWidget(Synapse.DataFrame, e7ba6cb9-5c27-4086-b13d-331b6d1fc3ff)

Read from Delta table and display

In [11]:
tableName='StocksProcessed'
deltaTablePath='Tables/'+tableName

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 17, Finished, Available)

**Optional: run this command to clean checkpoint folder only when repeatedly running the stream** 

In [12]:
from notebookutils import mssparkutils
try:
    mssparkutils.fs.rm (f'Files/Checkpoints/{tableName}',True)
except:
    print ("Path doesn't exist, no further action required")

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 18, Finished, Available)

**De-duplicate transaction versions- keep most recent transaction**

In [13]:
from pyspark.sql.window import Window
dfs=spark.read.format("delta")\
    .option("readChangeFeed", "true")\
    .option("startingVersion", cdfStVersion) \
    .table("Stocks")\
    .filter("_change_type !='update_preimage'")\
    .join(dfr,'Currency')\
    .withColumn('rowNum',F.row_number()\
    .over(Window.partitionBy('symbol','seq')\
    .orderBy(F.col("_commit_version").desc())))\
    .selectExpr('symbol','cast(bidPrice as float)','cast(time as timestamp)','seq','Currency',\
    'Round(bidPrice*Rate,2) as bidPrice_CAD','_change_type','_commit_version')\
    .filter('rowNum=1')\
    .alias('source')
display(dfs)    

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 19, Finished, Available)

SynapseWidget(Synapse.DataFrame, 762617bd-1fd3-447d-a055-02351ec80460)

## Streaming CDF by Spark Structured Streaming
**Check out these tutorials to learn more about Spark Stuctured Streaming:** 
https://youtu.be/kg_UvdXgH80, 
https://youtu.be/Dp3FhnMVhiY

**ForEachBatch processing function**

In [14]:
from delta.tables import *
from pyspark.sql.window import Window

dlttarget=DeltaTable.forPath(spark,'Tables/stocksprocessed').alias('target')
def mergeBatch(df,batchID):
    df=df.withColumn('rowNum',F.row_number()\
        .over(Window.partitionBy('symbol','seq').orderBy(F.col("_commit_version").desc())))\
        .filter('rowNum=1')
    dlttarget.merge(df,'source.symbol=target.symbol and source.seq=target.seq')\
        .whenMatchedDelete(condition = "source.`_change_type`='delete'")\
        .whenMatchedUpdateAll(condition = "source.`_change_type`='update_postimage'")\
        .whenNotMatchedInsertAll()\
        .execute()
    pass

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 20, Finished, Available)

**Streaming changes with forEachBatch destination**

In [15]:
import pyspark.sql.functions as F
strm=spark.readStream\
    .format("delta")\
    .option("readChangeFeed", "true")\
    .option("startingVersion", cdfStVersion) \
    .table("Stocks")\
    .filter("_change_type !='update_preimage'")\
    .join(dfr,'Currency')\
    .selectExpr('symbol','cast(bidPrice as float)','cast(time as timestamp)','seq','Currency',\
    'Round(bidPrice*Rate,2) as bidPrice_CAD','_change_type','_commit_version')\
    .alias('source')\
    .writeStream\
        .queryName(tableName)\
        .format("delta")\
        .foreachBatch(mergeBatch)\
        .option("checkpointLocation", f"Files/Checkpoints/{tableName}")
strmHandle=strm.start()

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 21, Finished, Available)

In [16]:
strmHandle.status

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 22, Finished, Available)

{'message': 'Getting offsets from DeltaSource[abfss://46df290c-a4bf-4f21-ae5b-b2a4f313e3d8@onelake.dfs.fabric.microsoft.com/07a9d45a-7cef-4ec8-aa73-60477164a94a/Tables/stocks]',
 'isDataAvailable': False,
 'isTriggerActive': True}

In [17]:
import time,random
def transactionsPipeline(frequency):
    stTime=time.localtime
    loopCond=True
    rng=range(1,10,1)
    while(loopCond):
        multiplier=random.choice(rng)/10.0
        spark.sql(f"UPDATE Stocks Set bidPrice=bidPrice*{multiplier},time=CURRENT_TIMESTAMP()")
        time.sleep(frequency)

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 23, Finished, Available)

In [18]:
transactionsPipeline(1)

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 24, Cancelled, Waiting)

**Stop the stream**

In [19]:
strmHandle.stop()

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 25, Finished, Available)

# Part 2- Streaming to Event streams destination

In [1]:
!pip install azure-eventhub

StatementMeta(, 9e9de051-172f-4bf0-8563-74bcf43a2142, 3, Finished, Available)

Collecting azure-eventhub
  Downloading azure_eventhub-5.11.7-py3-none-any.whl (320 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m320.6/320.6 kB[0m [31m13.8 MB/s[0m eta [36m0:00:00[0m
Collecting typing-extensions>=4.0.1 (from azure-eventhub)
  Downloading typing_extensions-4.11.0-py3-none-any.whl (34 kB)
Installing collected packages: typing-extensions, azure-eventhub
  Attempting uninstall: typing-extensions
    Found existing installation: typing_extensions 4.5.0
    Uninstalling typing_extensions-4.5.0:
      Successfully uninstalled typing_extensions-4.5.0
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
sentence-transformers 2.0.0 requires sentencepiece, which is not installed.
sentence-transformers 2.0.0 requires torchvision, which is not installed.
dash 2.14.0 requires Flask<2.3.0,>=1.0.4, but you have flask 3.0.0 which is in

In [20]:
from notebookutils import mssparkutils
import pyspark.sql.functions as F
from pyspark.sql.types import * 

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 26, Finished, Available)

In [22]:
%%sql
DESCRIBE HISTORY Stocks

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 28, Finished, Available)

<Spark SQL result set with 32 rows and 15 fields>

In [7]:
cdfStVersion=31

StatementMeta(, f1c55da6-d1f4-4bdb-b4c6-8521b71aa22a, 9, Finished, Available)

**Streaming pipeline**

In [27]:
def createStreamingtoEH(ehConf,streamName,cdfStVersion):
    dfs=spark.readStream\
        .format("delta")\
        .option("readChangeFeed", "true")\
        .option("startingVersion", cdfStVersion) \
        .table("Stocks")\
        .withColumn('time',F.col('time').cast('timestamp'))\
        .withColumn('bidPrice',F.col('bidPrice').cast('float'))\
        .filter("_change_type ='update_postimage' or _change_type ='inserted'")\
        .withWatermark("time", "10 seconds")\
        .groupBy('symbol',F.window('time','60 seconds','60 seconds'))\
        .agg(F.avg('bidPrice').alias('avgBidPrice'))

    strmEh=dfs.withColumn('body', F.to_json(F.struct(*dfs.columns),options={"ignoreNullFields": False}))\
        .writeStream\
            .queryName(streamName)\
            .format("eventhubs")\
            .options(**ehConf)\
            .option("checkpointLocation", f"Files/Checkpoints/{streamName}")
    strmEHHandle=strmEh.start()
    return strmEHHandle

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 33, Finished, Available)

**Utility function to clean up streaming checkpoint folder**

In [30]:
from notebookutils import mssparkutils
def cleanCheckpoint(streamName):
    try:
        mssparkutils.fs.rm (f'Files/Checkpoints/{streamName}',True)
    except:
        print ("Path doesn't exist, no further action required")

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 36, Finished, Available)

In [31]:
# Create Event stream and copy Event Hubs endpoint connection string here
connectionString = ""
ehConf = {}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
streamName="StreamToES"

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 37, Finished, Available)

Clean streaming folder if this notebook needs to be run repeatedly

In [32]:
cleanCheckpoint(streamName)

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 38, Finished, Available)

In [33]:
strmESHandle=createStreamingtoEH(ehConf,streamName,cdfStVersion)

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 39, Finished, Available)

In [34]:
strmESHandle.status

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 40, Finished, Available)

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [35]:
transactionsPipeline(1)

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 41, Cancelled, Waiting)

In [36]:
strmESHandle.stop()

StatementMeta(, 90ae5b2f-b271-487c-b8c4-bbb695814cf1, 42, Finished, Available)