###`Product Data POC  on Structured Streaming`

`This notebook we are using for to do POC on structured Streaming `:
* Reading multiple csv files using structured streaming 
* Steaming data load of weather data using delta table


###Details
| Details | Information
| - | -  
| Notebook Created By | Gvnreddy  
| Object Name | product data analysis
| File Type | delimited file
| Target Location | Databricks Delta Table 

###History
|Date | Developed By | comments
|----|-----|----
|01/12/2023|Gvnreddy| Initial Version

#### calling `common` notebook for metadata and other variables

In [0]:
%run ./config_streaming

path,name,size,modificationTime
dbfs:/FileStore/products/,products/,0,0
dbfs:/FileStore/shared_uploads/,shared_uploads/,0,0
dbfs:/FileStore/tables/,tables/,0,0


weather streamings files copied to : dbfs:/FileStore/streaming


Out[4]: True

path,name,size,modificationTime
dbfs:/FileStore/products/,products/,0,0
dbfs:/FileStore/shared_uploads/,shared_uploads/,0,0
dbfs:/FileStore/streaming/,streaming/,0,0
dbfs:/FileStore/tables/,tables/,0,0


In [0]:
%fs head dbfs:/FileStore/streaming/ws01.txt

##### Creating pyspark schema for reading csv files.

#### Creating streaming dataframe using `maxFilesPerTrigger 1 option`

In [0]:
from pyspark.sql.functions import *

# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = spark.readStream.schema(streamSchema).option("maxFilesPerTrigger", 1).csv(streaming_path,header=True,sep="\t")

##### Validating is above created dataframe is Streaming dataframe or not

In [0]:
streamingInputDF.isStreaming

Out[6]: True

In [0]:
if streamingInputDF.isStreaming==True:
  print(' streamingInputDF is Streaming Dataframe')

 streamingInputDF is Streaming Dataframe


##### Perform the following transformations
###### a.	Drop the following columns :  record_min_temp_year, record_max_temp_year
###### b.	Filter out records with actual_precipitation = 0
###### c.	Create a new column deviation_from_avg = abs(actual_precipitation – average_precipitation)


In [0]:
# extracting year from a date column date_format(column,'yyyy')
# extracting month from a date column date_format(colmn,'MM')

In [0]:
%sql
select date_format(current_date(),'yyyy') as year,date_format(current_date(),'MM') as month,date_format(current_date(),'dd') as day

year,month,day
2024,2,1


In [0]:
from pyspark.sql.functions import abs,col,date_format
finalStreamingDF = streamingInputDF.drop("record_min_temp_year","record_max_temp_year").filter("actual_precipitation ==0").withColumn("deviation_from_avg",abs(col("actual_precipitation")-col("average_precipitation")))
finalStreamingDF = finalStreamingDF.withColumn("date",to_date("date","dd-MM-yyyy")).withColumn("YEAR",date_format(col("date"),'yyyy')).withColumn("MONTH",date_format(col("date"),'MM'))

In [0]:
# creating partition table using year and month columns
writedf = finalStreamingDF.writeStream.format("delta").partitionBy("YEAR","MONTH").outputMode("append").option("checkpointLocation", "/tmp/_checkpoints/streaming").start("/tmp/streaming")

##### Stop Streaming using `stop()` method

In [0]:
import time
time.sleep(1*60) #this will stop the program for 1 minutes
writedf.stop()

In [0]:
%fs ls /tmp/streaming/

path,name,size,modificationTime
dbfs:/tmp/streaming/YEAR=2014/,YEAR=2014/,0,0
dbfs:/tmp/streaming/YEAR=2015/,YEAR=2015/,0,0
dbfs:/tmp/streaming/_delta_log/,_delta_log/,0,0


In [0]:
%sql
select * from delta.`/tmp/streaming`  where year=2014 and month=10

date,actual_mean_temp,actual_min_temp,actual_max_temp,average_min_temp,average_max_temp,record_min_temp,record_max_temp,actual_precipitation,average_precipitation,record_precipitation,deviation_from_avg,YEAR,MONTH
2014-10-01,76,70,82,66,84,48,94,0.0,0.21,5.14,0.21,2014,10
2014-10-02,78,70,85,65,84,43,93,0.0,0.2,3.57,0.2,2014,10
2014-10-05,60,47,73,64,83,46,94,0.0,0.18,4.51,0.18,2014,10
2014-10-06,64,50,78,64,83,49,95,0.0,0.17,3.48,0.17,2014,10
2014-10-07,72,60,84,64,82,47,95,0.0,0.18,6.14,0.18,2014,10
2014-10-08,76,63,89,63,82,44,95,0.0,0.17,3.91,0.17,2014,10
2014-10-09,76,64,88,63,82,46,94,0.0,0.17,1.8,0.17,2014,10
2014-10-10,76,65,87,63,82,46,93,0.0,0.16,3.56,0.16,2014,10
2014-10-11,75,62,88,62,82,45,91,0.0,0.16,4.3,0.16,2014,10
2014-10-12,74,62,86,62,81,46,92,0.0,0.15,2.31,0.15,2014,10


In [0]:
df = spark.read.format("delta").load("/tmp/streaming")

In [0]:
%fs ls /tmp/streaming/YEAR=2014/MONTH=10/

path,name,size,modificationTime
dbfs:/tmp/streaming/YEAR=2014/MONTH=10/part-00000-0dd7cab8-903f-42c1-904d-031e1892aa83.c000.snappy.parquet,part-00000-0dd7cab8-903f-42c1-904d-031e1892aa83.c000.snappy.parquet,4553,1706785352000
dbfs:/tmp/streaming/YEAR=2014/MONTH=10/part-00000-11389382-43da-4227-955e-e98ba385beed.c000.snappy.parquet,part-00000-11389382-43da-4227-955e-e98ba385beed.c000.snappy.parquet,4339,1706785390000
dbfs:/tmp/streaming/YEAR=2014/MONTH=10/part-00000-21d7ae93-861a-4b71-a4ff-e0f998c9eff5.c000.snappy.parquet,part-00000-21d7ae93-861a-4b71-a4ff-e0f998c9eff5.c000.snappy.parquet,4641,1706785384000
dbfs:/tmp/streaming/YEAR=2014/MONTH=10/part-00000-25a15e9a-430f-4136-bbed-4ac02c4e3df0.c000.snappy.parquet,part-00000-25a15e9a-430f-4136-bbed-4ac02c4e3df0.c000.snappy.parquet,4624,1706785346000
dbfs:/tmp/streaming/YEAR=2014/MONTH=10/part-00000-5b5bf96a-e325-46cf-aeb7-b6243551f410.c000.snappy.parquet,part-00000-5b5bf96a-e325-46cf-aeb7-b6243551f410.c000.snappy.parquet,4523,1706785368000
dbfs:/tmp/streaming/YEAR=2014/MONTH=10/part-00000-682f92fe-e008-43df-9dcb-ee8c18d23215.c000.snappy.parquet,part-00000-682f92fe-e008-43df-9dcb-ee8c18d23215.c000.snappy.parquet,4339,1706785395000
dbfs:/tmp/streaming/YEAR=2014/MONTH=10/part-00000-77b01ac5-ce48-4072-9044-8729f9343cfe.c000.snappy.parquet,part-00000-77b01ac5-ce48-4072-9044-8729f9343cfe.c000.snappy.parquet,4849,1706785363000
dbfs:/tmp/streaming/YEAR=2014/MONTH=10/part-00000-9b0421df-21b2-461a-b8f6-15fc00de8808.c000.snappy.parquet,part-00000-9b0421df-21b2-461a-b8f6-15fc00de8808.c000.snappy.parquet,4442,1706785358000
dbfs:/tmp/streaming/YEAR=2014/MONTH=10/part-00000-c770cf50-4f4a-4aae-9aeb-ab0c1b2a59b9.c000.snappy.parquet,part-00000-c770cf50-4f4a-4aae-9aeb-ab0c1b2a59b9.c000.snappy.parquet,4567,1706785374000
dbfs:/tmp/streaming/YEAR=2014/MONTH=10/part-00000-d36d5594-1cd4-40ac-87a4-c275ddb0e65f.c000.snappy.parquet,part-00000-d36d5594-1cd4-40ac-87a4-c275ddb0e65f.c000.snappy.parquet,4658,1706785339000
