#Stream Customers Data From Cloud files to delta lake
1. Read files from cloud storage using DataStreamReader API
2. Transform Dataframe to the following:
    i. file path: Cloud File path
    ii. ingestion date: Current timestamp
3. Write the transformed data stream to delta lake table

###1. Read files using DataStreamReader API

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType, MapType, TimestampType,DateType

customer_schema = StructType(fields = [StructField("customer_id", IntegerType()),
                                       StructField("customer_name", StringType()),
                                       StructField("date_of_birth",DateType()),
                                       StructField("telephone",StringType()),
                                       StructField("email", StringType()),
                                       StructField("member_since", DateType()),
                                       StructField("created_timestamp", DateType())
                                       ]
                             )

In [0]:
customers_df = (
                spark.readStream
                     .format("json")
                     .schema(customer_schema)
                     .load("/Volumes/shoppix/landing/operational_data/customer_stream/")
)
customers_df.printSchema()

###2. Transform the datafram to add following columns
- file path: cloud file path
- ingestion path: current timestamp

In [0]:
from pyspark.sql.functions import current_timestamp, col

customers_transformed_df = (
                                customers_df.withColumn("file_path", col("_metadata.file_path"))
                                            .withColumn("ingestiondate",current_timestamp())
)

###3. write transformed data to delta lake

In [0]:
from pyspark.sql.streaming import DataStreamWriter

streaming_Query = (
                    customers_transformed_df.writeStream
                                            .format("delta")
                                            .option("checkpointLocation","/Volumes/shoppix/landing/operational_data/customer_stream/_checkpoint_stream")
                                            .trigger(availableNow=True)
                                            .toTable("shoppix.bronze.customer_stream")
)

In [0]:
%sql
SELECT * FROM shoppix.bronze.customer_stream

2. 