### Stream customers data from cloud files to Delta Lake
1. Read files from cloud storage using DataStreamReader API.
2. Transform the data frame to add the following columns
    - filepath : Cloud_file_path
    - ingestion date : current_timestamp
3. Write the transformed data stream to Delta lake Table

stream processing will not detect the schema unlike batch processing for better speed. So we have to define schema before reading.


In [0]:
from pyspark.sql.types import StructType,StructField, IntegerType, StringType, DateType, TimestampType

customers_schema= StructType(fields=[StructField("customer_id", IntegerType()),
                                     StructField("customer_name",StringType()),
                                     StructField("date_of_birth",DateType()),
                                     StructField("telephone",StringType()),
                                     StructField("email",StringType()),
                                     StructField("member_since",TimestampType()),
                                     StructField("created_timestamp",TimestampType())])

In [0]:
customers_df= (spark.readStream.format("json")
                   .schema(customers_schema)
                   .load("/Volumes/gizmobox/landing/operational-data/customers_stream"))

                  


In [0]:
display(customers_df)

In [0]:
from pyspark.sql.functions import col,current_timestamp
customers_transformed_df= (customers_df.withColumn("file_path", col("_metadata.file_path"))
                                       .withColumn("ingestion_date",current_timestamp()))
                                       

In [0]:
display(customers_transformed_df)

In [0]:
streaming_query= (customers_transformed_df.writeStream.format("delta")
                        .option("checkpointLocation", "/Volumes/gizmobox/landing/operational-data/customers_stream/_checkpoint_stream")
                        .toTable("gizmobox.bronze.customers_stream"));
                        

In [0]:
streaming_query.stop()

In [0]:
%sql
select * from gizmobox.bronze.customers_stream