#### Stream customers data from cloud files to Delta Lake

#####1. read files using dataframe reader API.

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, TimestampType

customers_schema = StructType(fields = [StructField("customer_id", IntegerType()),
                                        StructField("customer_name", StringType()),
                                        StructField("date_of_birth", DateType()),
                                        StructField("telephone", StringType()),
                                        StructField("email", StringType()),
                                        StructField("member_since", DateType()),
                                        StructField("created_timestamp", TimestampType())
                                        ])

In [0]:
customers_df = spark.readStream \
                    .format("json") \
                    .schema(customers_schema) \
                    .load('/Volumes/gizmobox/landing/operational_data/customers_stream/')

#####2. tranform stream data to add columns

In [0]:
from pyspark.sql.functions import col, current_timestamp

In [0]:
customers_transformed_df = customers_df \
                            .withColumn('file_path', col("_metadata.file_path")) \
                            .withColumn('ingetion_date', current_timestamp())

#####3. write transformed data stream to Delta table

In [0]:
streaming_query = customers_transformed_df.writeStream \
                        .format("delta") \
                            .option("checkpointLocation", "/Volumes/gizmobox/landing/operational_data/customers_stream/_checkpoint_stream") \
                                .toTable("gizmobox.bronze.customers_stream")  

In [0]:
%sql
SELECT * FROM 
gizmobox.bronze.customers_stream;

### Autoloader