In [None]:
#explicit specify schema for input stream
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

In [None]:
dbutils.fs.mkdirs("dbfs:/FileStore/shared_uploads/streaming")

Out[14]: True

In [None]:
dbutils.fs.ls("dbfs:/FileStore/shared_uploads/streaming")

Out[52]: [FileInfo(path='dbfs:/FileStore/shared_uploads/streaming/laptops_01.csv', name='laptops_01.csv', size=760, modificationTime=1692626899000),
 FileInfo(path='dbfs:/FileStore/shared_uploads/streaming/laptops_02.csv', name='laptops_02.csv', size=833, modificationTime=1692626899000),
 FileInfo(path='dbfs:/FileStore/shared_uploads/streaming/laptops_03.csv', name='laptops_03.csv', size=834, modificationTime=1692639302000),
 FileInfo(path='dbfs:/FileStore/shared_uploads/streaming/laptops_04.csv', name='laptops_04.csv', size=910, modificationTime=1692639302000),
 FileInfo(path='dbfs:/FileStore/shared_uploads/streaming/laptops_05.csv', name='laptops_05.csv', size=828, modificationTime=1692641517000),
 FileInfo(path='dbfs:/FileStore/shared_uploads/streaming/laptops_06.csv', name='laptops_06.csv', size=910, modificationTime=1692641517000),
 FileInfo(path='dbfs:/FileStore/shared_uploads/streaming/laptops_07.csv', name='laptops_07.csv', size=841, modificationTime=1692703242000),
 FileInfo(p

In [None]:
##explicit specify schema for input stream
# False: fields are not nullable
schema = StructType([ 
    StructField("Id",IntegerType(),False), 
    StructField("Company",StringType(),False), 
    StructField("Product",StringType(),False), 
    StructField("TypeName", StringType(), False), 
    StructField("Price_euros", FloatType(), False)
  ])

In [None]:
# read data we have already uploaded into dbfs
labtob_data = spark.read.format("csv")\
                .option("header", "true")\
                .schema(schema)\
                .load("dbfs:/FileStore/shared_uploads/streaming/laptops_01.csv")
display(labtob_data)

Id,Company,Product,TypeName,Price_euros
1,Apple,MacBook Pro,Ultrabook,1339.69
2,Apple,Macbook Air,Ultrabook,898.94
3,HP,250 G6,Notebook,575.0
4,Apple,MacBook Pro,Ultrabook,2537.45
5,Apple,MacBook Pro,Ultrabook,1803.6
6,Acer,Aspire 3,Notebook,400.0
7,Apple,MacBook Pro,Ultrabook,2139.97
8,Apple,Macbook Air,Ultrabook,1158.7
9,Asus,ZenBook UX430UN,Ultrabook,1495.0
10,Acer,Swift 3,Ultrabook,770.0


In [None]:
#isStreaming: false means data not stream(real time) is batch data
print("Is laptobs data streamed? {}".format(labtob_data.isStreaming))

Is laptobs data streamed? False


In [None]:
"""
readStream:
- monitor operations in sub folder
- if upload another file in sub folder will assume the stream (append as new records when 
read as dataframe)
"""

In [None]:
# Read data as stream not batch
#load: the dir path we read data from
labtob_data_stream = spark.readStream.format("csv")\
                .option("header", "true")\
                .schema(schema)\
                .load("dbfs:/FileStore/shared_uploads/streaming")
labtob_data_stream.display()

Id,Company,Product,TypeName,Price_euros
21,Asus,Vivobook E200HA,Netbook,191.9
22,Lenovo,Legion Y520-15IKBN,Gaming,999.0
23,HP,255 G6,Notebook,258.0
24,Dell,Inspiron 5379,2 in 1 Convertible,819.0
25,HP,15-BS101nv (i7-8550U/8GB/256GB/FHD/W10),Ultrabook,659.0
26,Dell,Inspiron 3567,Notebook,418.64
27,Apple,MacBook Air,Ultrabook,1099.0
28,Dell,Inspiron 5570,Notebook,800.0
29,Dell,Latitude 5590,Ultrabook,1298.0
30,HP,ProBook 470,Notebook,896.0


In [None]:
"""
Stream jobs continues running & monitoring input for new records to process
"""

In [None]:
print("Is laptobs data streamed? {}".format(labtob_data_stream.isStreaming))

Is laptobs data streamed? True


In [None]:
from pyspark.sql.functions import round

In [None]:
#Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
"""
Parameters
----------
colName : str
    string, name of the new column.
col : :class:`Column`
    a :class:`Column` expression for the new column.
"""
lab_data = labtob_data_stream\
           .withColumn("price_usd", round(labtob_data_stream.Price_euros * 1.5, 2))
lab_data.display()

Id,Company,Product,TypeName,Price_euros,price_usd
60,Asus,X541UA-DM1897 (i3-6006U/4GB/256GB/FHD/Linux),Notebook,415.0,622.5
61,Dell,Inspiron 5770,Notebook,1299.0,1948.5
62,Dell,Vostro 5471,Ultrabook,879.0,1318.5
63,Lenovo,IdeaPad 520S-14IKB,Notebook,599.0,898.5
64,Asus,UX410UA-GV350T (i5-8250U/8GB/256GB/FHD/W10),Notebook,941.0,1411.5
66,HP,250 G6,Notebook,690.0,1035.0
67,Asus,ZenBook Pro,Ultrabook,1983.0,2974.5
68,HP,250 G6,Notebook,438.69,658.04
69,HP,Stream 14-AX040wm,Notebook,229.0,343.5
70,Lenovo,V310-15ISK (i5-7200U/4GB/1TB/FHD/W10),Notebook,549.0,823.5


In [None]:
lab_data_filtered = lab_data.select('TypeName', 'price_usd')
lab_data_filtered.display()

TypeName,price_usd
Notebook,622.5
Notebook,1948.5
Ultrabook,1318.5
Notebook,898.5
Notebook,1411.5
Notebook,1035.0
Ultrabook,2974.5
Notebook,658.04
Notebook,343.5
Notebook,823.5


Databricks visualization. Run in Databricks to view.

In [None]:
lab_data = lab_data.select('Id', 'Company', 'price_usd')\
                    .where("price_usd > 2000")
lab_data.display()

Id,Company,price_usd
67,Asus,2974.5
107,Microsoft,2010.0
112,Lenovo,2220.0
114,Dell,2068.5
115,Lenovo,2098.5
117,Dell,2443.5
118,HP,2098.48
42,Dell,2248.5
46,Apple,2128.5
59,MSI,3673.5


In [None]:
#Fixed interval micro batch trigger
  # specified by user every 20 seconds write stream data into memory
  # with new table ex:premium_labtops_20
  # so every 20 seconds new table in memory updated if there are new stram data
"""
- If no data available no processing
If previous micro-batch completes
within the interval:
    - engine waits till interval is over
If previous micro-batch takes longer
than specified interval:
    - next micro-batch starts as soon as data arrives
"""
lab_data.writeStream\
    .format("memory")\
    .queryName("premium_labtops_20")\
    .trigger(processingTime = "20 seconds")\
    .start()

Out[41]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fdb0178c0a0>

In [None]:
"""
Because this is batch query:
need to rerun if new stream/records data added
"""
spark.sql("select Company, count(*) from premium_labtops_20 group by Company")\
    .display()

Company,count(1)
Dell,7
Asus,3
Lenovo,2
Microsoft,1
HP,1
Apple,9
MSI,2


In [None]:
spark.sql("select Company, round(avg(price_usd), 2) from premium_labtops_20 group by Company")\
    .display()

Company,"round(avg(price_usd), 2)"
Dell,2307.86
Asus,2463.5
Lenovo,2159.25
Microsoft,2010.0
HP,2098.48
Apple,2927.7
MSI,3517.88


In [None]:
"""
Once Micro Batch Trigger:
- Execute only one micro-batch to process all available data
- Once processed, query will stop
- Used when cluster periodically spun up
to process data since last period
- May result in significant cost savings, because run job when user need
not continous
"""
lab_data.writeStream\
    .format("memory")\
    .queryName("premium_labtops_once")\
    .trigger(once = True)\
    .start()

Out[49]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fdb01eccfd0>

In [None]:
"""
Because this is batch query:
need to rerun if new stream/records data added
but first rerun to writeStream with trigger "Once time micro batch":
  if new data will updare in table in memory
"""
spark.sql("select Company, round(avg(price_usd), 2) from premium_labtops_once group by Company")\
    .display()

Company,"round(avg(price_usd), 2)"
HP,2359.12
Dell,2417.25
Asus,2463.5
Lenovo,2832.0
Microsoft,2010.0
MSI,3301.88
Huawei,2023.5
Apple,2927.7
Razer,9148.5


In [None]:
dbutils.fs.mkdirs("dbfs:/FileStore/shared_uploads/auto_loader_streaming")

Out[53]: True

In [None]:
dbutils.fs.ls("dbfs:/FileStore/shared_uploads/auto_loader_streaming")

Out[54]: [FileInfo(path='dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_01.csv', name='laptops_01.csv', size=760, modificationTime=1692953530000),
 FileInfo(path='dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_02.csv', name='laptops_02.csv', size=833, modificationTime=1692953530000)]

In [None]:
"""
What is Auto Loader?
- Auto Loader is a Spark feature that allows this out-of-the box. It allows to incrementally load the data as soon as it lands on the cloud storage.

- Under the hood (in Azure Databricks), running Auto Loader will automatically set up an Azure Event Grid and Queue Storage services. Through these services, auto loader uses the queue from Azure Storage to easily find the new files, pass them to Spark and thus load the data with low latency and at a low cost within your streaming or batch jobs. The Auto Loader logs which files were processed which guarantees an exactly once processing of the incoming data.
"""

In [None]:
"""
                                Auto Loader

- format("cloudFiles"): Auto loader provides structured streaming source called cloud files
when specify, automatically process new file when arrive

- schema: specify location to track the schema of files 
- load: specify dir location which we read data from 
"""
laptob_stream_data = spark.readStream.format("cloudFiles")\
    .option("cloudFiles.format", "csv")\
        .option("cloudFiles.schemaLocation", "dbfs:/FileStore/shared_uploads/auto_loader_streaming")\
            .load("dbfs:/FileStore/shared_uploads/auto_loader_streaming")

In [None]:
display(laptob_stream_data)

Id,Company,Product,TypeName,Price_euros,_rescued_data
21,Asus,Vivobook E200HA,Netbook,191.9,
22,Lenovo,Legion Y520-15IKBN,Gaming,999.0,
23,HP,255 G6,Notebook,258.0,
24,Dell,Inspiron 5379,2 in 1 Convertible,819.0,
25,HP,15-BS101nv (i7-8550U/8GB/256GB/FHD/W10),Ultrabook,659.0,
26,Dell,Inspiron 3567,Notebook,418.64,
27,Apple,MacBook Air,Ultrabook,1099.0,
28,Dell,Inspiron 5570,Notebook,800.0,
29,Dell,Latitude 5590,Ultrabook,1298.0,
30,HP,ProBook 470,Notebook,896.0,


In [None]:
"""
                            What is the rescued data column?

When Auto Loader infers the schema, a rescued data column is automatically added to your schema as _rescued_data.

The rescued data column ensures that columns that don’t match with the schema are rescued instead of being dropped. The rescued data column contains any data that isn’t parsed for the following reasons:

The column is missing from the schema.
Type mismatches.
Case mismatches.
adding data files with extra columns

The rescued data column contains a JSON: containing the rescued columns and the source file path of the record.
"""

In [None]:
#show _rescued_data column after add file with extra column 
display(laptob_stream_data)

Id,Company,Product,TypeName,Price_euros,_rescued_data
41,Asus,X540UA-DM186 (i3-6006U/4GB/1TB/FHD/Linux),Notebook,389.0,"{""TYPENAME"":""Notebook"",""_file_path"":""dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_03_extracol.csv""}"
42,Dell,Inspiron 7577,Gaming,1499.0,"{""TYPENAME"":""Gaming"",""_file_path"":""dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_03_extracol.csv""}"
43,Asus,X542UQ-GO005 (i5-7200U/8GB/1TB/GeForce,Notebook,522.99,"{""TYPENAME"":""Notebook"",""_file_path"":""dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_03_extracol.csv""}"
44,Acer,Aspire A515-51G,Notebook,682.0,"{""TYPENAME"":""Notebook"",""_file_path"":""dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_03_extracol.csv""}"
45,Dell,Inspiron 7773,2 in 1 Convertible,999.0,"{""TYPENAME"":""2 in 1 Convertible"",""_file_path"":""dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_03_extracol.csv""}"
46,Apple,MacBook Pro,Ultrabook,1419.0,"{""TYPENAME"":""Ultrabook"",""_file_path"":""dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_03_extracol.csv""}"
47,Lenovo,IdeaPad 320-15ISK,Notebook,369.0,"{""TYPENAME"":""Notebook"",""_file_path"":""dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_03_extracol.csv""}"
48,Asus,Rog Strix,Gaming,1299.0,"{""TYPENAME"":""Gaming"",""_file_path"":""dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_03_extracol.csv""}"
49,Dell,Inspiron 3567,Notebook,639.0,"{""TYPENAME"":""Notebook"",""_file_path"":""dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_03_extracol.csv""}"
50,Asus,X751NV-TY001T (N4200/4GB/1TB/GeForce,Notebook,466.0,"{""TYPENAME"":""Notebook"",""_file_path"":""dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_03_extracol.csv""}"


In [None]:
dbutils.fs.ls("dbfs:/FileStore/shared_uploads/auto_loader_streaming")

Out[63]: [FileInfo(path='dbfs:/FileStore/shared_uploads/auto_loader_streaming/_schemas/', name='_schemas/', size=0, modificationTime=1692954295000),
 FileInfo(path='dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_01.csv', name='laptops_01.csv', size=760, modificationTime=1692953530000),
 FileInfo(path='dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_02.csv', name='laptops_02.csv', size=833, modificationTime=1692953530000),
 FileInfo(path='dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_03_extracol.csv', name='laptops_03_extracol.csv', size=1029, modificationTime=1692956416000),
 FileInfo(path='dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_04_rearrangedcols.csv', name='laptops_04_rearrangedcols.csv', size=910, modificationTime=1692956590000)]

In [None]:
#show _rescued_data column after add file with extra column 
display(laptob_stream_data)

Id,Company,Product,TypeName,Price_euros,_rescued_data
41,Asus,X540UA-DM186 (i3-6006U/4GB/1TB/FHD/Linux),Notebook,389.0,"{""TYPENAME"":""Notebook"",""_file_path"":""dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_03_extracol.csv""}"
42,Dell,Inspiron 7577,Gaming,1499.0,"{""TYPENAME"":""Gaming"",""_file_path"":""dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_03_extracol.csv""}"
43,Asus,X542UQ-GO005 (i5-7200U/8GB/1TB/GeForce,Notebook,522.99,"{""TYPENAME"":""Notebook"",""_file_path"":""dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_03_extracol.csv""}"
44,Acer,Aspire A515-51G,Notebook,682.0,"{""TYPENAME"":""Notebook"",""_file_path"":""dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_03_extracol.csv""}"
45,Dell,Inspiron 7773,2 in 1 Convertible,999.0,"{""TYPENAME"":""2 in 1 Convertible"",""_file_path"":""dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_03_extracol.csv""}"
46,Apple,MacBook Pro,Ultrabook,1419.0,"{""TYPENAME"":""Ultrabook"",""_file_path"":""dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_03_extracol.csv""}"
47,Lenovo,IdeaPad 320-15ISK,Notebook,369.0,"{""TYPENAME"":""Notebook"",""_file_path"":""dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_03_extracol.csv""}"
48,Asus,Rog Strix,Gaming,1299.0,"{""TYPENAME"":""Gaming"",""_file_path"":""dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_03_extracol.csv""}"
49,Dell,Inspiron 3567,Notebook,639.0,"{""TYPENAME"":""Notebook"",""_file_path"":""dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_03_extracol.csv""}"
50,Asus,X751NV-TY001T (N4200/4GB/1TB/GeForce,Notebook,466.0,"{""TYPENAME"":""Notebook"",""_file_path"":""dbfs:/FileStore/shared_uploads/auto_loader_streaming/laptops_03_extracol.csv""}"


In [None]:
"""
                            Writing Stream to file sink:

1. filter laptob data 
2. make a sub directory called "dest_location"
3. Writed filtered data into "dest_location"

conclusion: when add new files into the path has fitered data, filtered_data updated &
the new stream data will automatically added to "dest_location"
"""

Out[64]: '\n        Writing Stream to file sink:\n1. filter laptob data \n2. make a sub directory called "dest_location"\n3. Writed filtered data into "dest_location"\n\nconclusion: when add new files into the path has fitered data, the new stream data will\nautomatically added to "dest_location"\n'

In [None]:
# 1. filter data which have company = "Dell"
laptob_stream_Dell = laptob_stream_data.filter(laptob_stream_data.Company == "Dell")\
    .select("Company", "Product", "Price_euros")

In [None]:
"""
upload new file and monitor # of records
"""
display(laptob_stream_Dell)

Company,Product,Price_euros
Dell,Inspiron 7577,1499.0
Dell,Inspiron 7773,999.0
Dell,Inspiron 3567,639.0
Dell,Inspiron 3576,767.8
Dell,Inspiron 5770,1299.0
Dell,Vostro 5471,879.0
Dell,Inspiron 5370,955.0
Dell,Inspiron 5570,870.0
Dell,Inspiron 5570,855.0
Dell,Inspiron 5379,819.0


In [None]:
# 2. make a sub directory called "dest_location"
dbutils.fs.mkdirs("dbfs:/FileStore/shared_uploads/streams_dest/dest_location/")

Out[67]: True

In [None]:
dbutils.fs.ls("dbfs:/FileStore/shared_uploads/streams_dest/dest_location/")

Out[68]: []

In [None]:
# 3. Write filtered data into "dest_location"
# checkpointLocation: to write data into a dbfs, coz if job fail for any reason the intemediate results already saved
laptob_stream_Dell.writeStream\
    .option("mergeSchema", "True")\
        .format("csv")\
            .option("checkpointLocation", "dbfs:/FileStore/shared_uploads/streams_dest/dest_location/checkpoint_1")\
                .start("dbfs:/FileStore/shared_uploads/streams_dest/dest_location/")

Out[69]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fdb015d3520>

In [None]:
"""
conclusion: when add new files into the path has fitered data, filtered_data updated &
the new stream data will automatically added to "dest_location"
"""
dbutils.fs.ls("dbfs:/FileStore/shared_uploads/streams_dest/dest_location/")

Out[80]: [FileInfo(path='dbfs:/FileStore/shared_uploads/streams_dest/dest_location/_spark_metadata/', name='_spark_metadata/', size=0, modificationTime=1693031065000),
 FileInfo(path='dbfs:/FileStore/shared_uploads/streams_dest/dest_location/checkpoint_1/', name='checkpoint_1/', size=0, modificationTime=1693031065000),
 FileInfo(path='dbfs:/FileStore/shared_uploads/streams_dest/dest_location/part-00000-23921989-5ea2-4bc6-9a71-13f5a7f6c4ed-c000.csv', name='part-00000-23921989-5ea2-4bc6-9a71-13f5a7f6c4ed-c000.csv', size=73, modificationTime=1693031731000),
 FileInfo(path='dbfs:/FileStore/shared_uploads/streams_dest/dest_location/part-00000-b393c330-37b2-4169-acf9-0e14c32eca93-c000.csv', name='part-00000-b393c330-37b2-4169-acf9-0e14c32eca93-c000.csv', size=209, modificationTime=1693031068000),
 FileInfo(path='dbfs:/FileStore/shared_uploads/streams_dest/dest_location/part-00001-0a02731a-2309-42d6-ba9b-5ed98fe03d5b-c000.csv', name='part-00001-0a02731a-2309-42d6-ba9b-5ed98fe03d5b-c000.csv', 