
## Introduction

In this Notebook we will see how to work with Delta Lake to manage real-time streaming data. 

Some of the things we will look at are:
* Creating a new Delta Table
* Consuming data from event hub and store it in a delta table
* Analyzing real-time flight data with SparkSQL


In addition to Delta Tables we will also get to see some tips and tricks on working on Fabrc environment.


### Environment Setup

We will be using [Databricks Notebooks workflow](https://docs.databricks.com/notebooks/notebook-workflows.html) element to set up environment for this exercise. 

`dbutils.notebook.run()` command will run another notebook and return its output to be used here.

`dbutils` has some other interesting uses such as interacting with file system or reading [Databricks Secrets](https://docs.databricks.com/dev-tools/databricks-utils.html#dbutils-secrets)


## Set medallion paths

In [1]:
# Reference a notebook to get and set Path variables 
setup_responses = mssparkutils.notebook.run("Get-Metadata").split()

# Set medallion paths
bronzePath = setup_responses[0]
bronzeLakehouse = setup_responses[1]
silverLakehouse = setup_responses[2]
goldLakehouse = setup_responses[3]

print(f"bronze data path is {bronzePath}")      
print("bronze lakehouse is {}".format(bronzeLakehouse))
print("silver lakehouse is {}".format(silverLakehouse))
print("gold lakehouse is {}".format(goldLakehouse))

StatementMeta(, 6bcc396a-9c9c-4abb-9886-dd6609b72cd0, 3, Finished, Available)

bronze data path is abfss://classroomB2@msit-onelake.dfs.fabric.microsoft.com/liad_bronze.Lakehouse/Files
bronze lakehouse is liad_bronze
silver lakehouse is liad_silver
gold lakehouse is liad_gold


## Consume the data from event hub 

***Load flight data to a spark data frame***

In [19]:
from datetime import date
ehConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt('Endpoint=sb://eventsforfabric.servicebus.windows.net/;SharedAccessKeyName=manage;SharedAccessKey=wr/fsIbvRfljovPzpAYLvPf/XI3JZLAtz+AEhMnlKIw=;EntityPath=opensky')
}
df_streaming = spark.readStream.format("eventhubs").options(**ehConf).load()
df = df_streaming.withColumn("body", df_streaming["body"].cast("string"))

StatementMeta(, 16e817ca-1bf4-446f-b9ba-fc76377bac5f, 18, Finished, Available)


## Delta Tables

Let's load the streaming flight data to a Delta Table. In our case we want to track history and opt not to overwrite data every time process is running.


### Create Delta Table

***Write the streaming spark dataframe to a Delta Table***

Let's start with defining were our delta table is going to be and where the checkpoint files and other metadata will be stored. We will partition data every day to give flexibility in choosing the subset.

In [20]:
DeltaLakePath=(f"{bronzePath}/DeltaLake/open_sky/"+date.today().strftime("%Y%m%d"))
df.writeStream \
    .format("Delta") \
    .outputMode("append") \
    .option("checkpointLocation",f"{bronzePath}/checkpoint/") \
    .start(DeltaLakePath)

StatementMeta(, 16e817ca-1bf4-446f-b9ba-fc76377bac5f, 19, Finished, Available)

<pyspark.sql.streaming.StreamingQuery at 0x7fc327abc160>

***Next define a spark table from delta for analysis/reporting***

Spark table will be created for each day of data

In [24]:
DeltaLakePath=(f"{bronzePath}/DeltaLake/open_sky/"+date.today().strftime("%Y%m%d"))

defTable=f'CREATE TABLE {bronzeLakehouse}.openSky_'+date.today().strftime("%Y%m%d")+' USING DELTA Location '+"'"+DeltaLakePath+"'"
spark.sql(defTable)

StatementMeta(, 16e817ca-1bf4-446f-b9ba-fc76377bac5f, 23, Finished, Available)

DataFrame[]

***Let's check the Delta Lake streaming data***

You can query the files directly from Delta Lake and observe new rows are coming in

In [163]:
df = spark.read.parquet(f"{broznePath}/DeltaLake/open_sky/20230901/*.parquet")
# df now is a Spark DataFrame containing parquet data from "abfss://classroomB2@msit-onelake.dfs.fabric.microsoft.com/liad_bronze.Lakehouse/Files/DeltaLake/open_sky/20230901/part-00000-778ec743-bca1-452e-bec3-3ad34e297d99-c000.snappy.parquet".
display(df.count())

StatementMeta(, 16e817ca-1bf4-446f-b9ba-fc76377bac5f, 166, Finished, Available)

40494


### Analyze Streaming Delta data

You can query the streaming data from the spark table and observe new rows are coming in

In [183]:
from pyspark.sql.functions import from_json, col, split
from pyspark.sql.types import ArrayType, StringType, IntegerType, FloatType, BooleanType, NullType


df_100 = spark.sql("SELECT * FROM liad_bronze.opensky_20230901 ORDER BY enqueuedTime DESC LIMIT 100")
display(df_100)

StatementMeta(, 16e817ca-1bf4-446f-b9ba-fc76377bac5f, 186, Finished, Available)

SynapseWidget(Synapse.DataFrame, 0a8dbb86-92db-4a3f-8ecb-6c14d8385e45)

The streaming updates are reflected in the delta table history

In [197]:
%%sql

DESCRIBE HISTORY opensky_20230901

StatementMeta(, 16e817ca-1bf4-446f-b9ba-fc76377bac5f, 200, Finished, Available)

<Spark SQL result set with 1000 rows and 15 fields>

***Let's clean up the data in the body field***

Now we have to deal with the comma delimited data in the body field and process further to clean the data

In [2]:
from pyspark.sql import functions as F
from pyspark.sql.functions import split, regexp_replace, trim

df = spark.sql("SELECT body FROM liad_bronze.opensky_20230901")

# Remove square brackets and then split string to array
df = df.withColumn("body", regexp_replace("body", r"(^\[)|(\]$)", ""))
df = df.withColumn("body", split("body", ", "))

new_columns = ['icao24', 'callsign', 'origin_country', 'time_position', 'last_contact',
               'longitude', 'latitude', 'baro_altitude', 'on_ground', 'velocity',
               'true_track', 'vertical_rate', 'sensors', 'geo_altitude', 'squawk',
               'spi', 'position_source']

# Convert array elements to their corresponding columns
for i, new_column in enumerate(new_columns):
    df = df.withColumn(new_column, df["body"][i])
    df = df.withColumn(new_column, regexp_replace(new_column, '"', ''))

# Remove trailing spaces from 'callsign'
df = df.withColumn("callsign", trim(df["callsign"]))

# Convert 'time_position' from Unix timestamp to datetime
df = df.withColumn("time_position", F.from_unixtime("time_position"))
df = df.withColumn("last_contact", F.from_unixtime("last_contact"))

# Drop the original 'body' column as it is now redundant
df = df.drop("body")

# Show transformed DataFrame
df.show()

# Create a temporary view
df.createOrReplaceTempView("opensky_data")



StatementMeta(, 6bcc396a-9c9c-4abb-9886-dd6609b72cd0, 4, Finished, Available)

+------+--------+--------------+-------------------+-------------------+---------+--------+-------------+---------+--------+----------+-------------+-------+------------+------+-----+---------------+
|icao24|callsign|origin_country|      time_position|       last_contact|longitude|latitude|baro_altitude|on_ground|velocity|true_track|vertical_rate|sensors|geo_altitude|squawk|  spi|position_source|
+------+--------+--------------+-------------------+-------------------+---------+--------+-------------+---------+--------+----------+-------------+-------+------------+------+-----+---------------+
|a55220| AWI6220| United States|2023-09-01 21:31:48|2023-09-01 21:31:49| -87.9313| 42.0893|      2567.94|    false|   153.1|     86.34|        -3.25|   null|     2720.34|  2516|false|              0|
|a9c080|  N7274Q| United States|2023-09-01 21:31:49|2023-09-01 21:31:49| -74.9074| 39.3872|       2057.4|    false|   70.91|     56.54|            0|   null|     2156.46|  5634|false|              0|


***We have real-time flight data now***


In [3]:
%%sql

select *
from opensky_data
order by last_contact desc

StatementMeta(, 6bcc396a-9c9c-4abb-9886-dd6609b72cd0, 5, Finished, Available)

<Spark SQL result set with 1000 rows and 17 fields>

In [194]:
%%sql

SELECT callsign, time_position, longitude, latitude 
FROM opensky_data
where callsign = 'JBU178'
ORDER by time_position DESC


StatementMeta(, 16e817ca-1bf4-446f-b9ba-fc76377bac5f, 197, Finished, Available)

<Spark SQL result set with 9 rows and 4 fields>


### Load snapshot of the data in silver lakehouse

***Load hourly snapshot data to silver lakehouse***

Get the most recent position in the last hour of each plane

In [15]:
snapshot = spark.sql("""
select d.callsign, d.time_position, d.longitude, d.latitude, DATE(d.time_position) as date_position
from opensky_data d
INNER JOIN
(select callsign, max(time_position) as time_position
from opensky_data
where time_position >= from_unixtime(unix_timestamp(current_timestamp) - 3600)
group by callsign) l
    on l.callsign = d.callsign
    and l.time_position = d.time_position
""")

snapshot.show(10)

StatementMeta(, 6bcc396a-9c9c-4abb-9886-dd6609b72cd0, 17, Finished, Available)

+--------+-------------------+---------+--------+-------------+
|callsign|      time_position|longitude|latitude|date_position|
+--------+-------------------+---------+--------+-------------+
|  KOW733|2023-09-01 21:39:45| -70.0765|  41.244|   2023-09-01|
|  N2451A|2023-09-01 21:39:07| -95.2941|  32.512|   2023-09-01|
|  N4894D|2023-09-01 21:42:10| -77.6827|  43.111|   2023-09-01|
|  N5150D|2023-09-01 21:40:54|-117.1379| 32.8116|   2023-09-01|
| AAL1708|2023-09-01 21:38:30| -74.1806| 40.6817|   2023-09-01|
| FFT1699|2023-09-01 21:41:40| -84.4441| 33.6495|   2023-09-01|
|   TKK78|2023-09-01 21:38:44|  -71.691| 41.1887|   2023-09-01|
|  N263RS|2023-09-01 21:39:51| -79.7929| 32.9348|   2023-09-01|
|  N239SF|2023-09-01 21:42:09|-122.1168| 37.4622|   2023-09-01|
|   N80FG|2023-09-01 21:39:22| -78.5344| 36.3558|   2023-09-01|
+--------+-------------------+---------+--------+-------------+
only showing top 10 rows



Write the snapshot to silver lakehouse

In [17]:
snapshot.write.partitionBy("date_position").mode("append").format("delta").saveAsTable(f"{silverLakehouse}.flight_traffic_hourly")

StatementMeta(, 6bcc396a-9c9c-4abb-9886-dd6609b72cd0, 19, Finished, Available)