# Stream Customers data from cloud files to delta lake
1. Read files from cloud storage using DataStreamReader API
2. Transform the dataframe to add the following columns
  * file path: cloud file path
  * ingestion date: current timestamp
3. Write the transformed data stream to Delta Lake Table

In [0]:
# %sql
# -- drop table previously created to start again
# DROP TABLE IF EXISTS gizmobox.bronze.customers_autoloader;

In [0]:
import pyspark.sql.types as T
import pyspark.sql.functions as F

# no need to specify schema with AutoLoader

# customers_schema = T.StructType([
#     T.StructField('customer_id', T.IntegerType()),
#     T.StructField('customer_name', T.StringType()),
#     T.StructField('date_of_birth', T.DateType()),
#     T.StructField('telephone', T.StringType()),
#     T.StructField('email', T.StringType()),
#     T.StructField('member_since', T.DateType()),
#     T.StructField('created_timestamp', T.TimestampType())
# ])



In [0]:
# read files using DataStreamReader API
# with pathGlobFilter to filter files by name
df = spark.readStream.format('cloudFiles')\
                    .option('cloudFiles.format', 'json')\
                    .option('cloudFiles.schemaLocation', '/Volumes/gizmobox/landing/operational_data/customers_autoloader/_schema')\
                    .option('cloudFiles.inferColumnTypes', 'true')\
                    .option('cloudFiles.schemaHints', 'date_of_birth DATE, member_since DATE, created_timestamp TIMESTAMP')\
                    .option('pathGlobFilter', 'customers_2024_*.json')\
                    .load('/Volumes/gizmobox/landing/operational_data/customers_autoloader/')
                    

In [0]:
# transform the dataframe to add 2 columns: file path, ingestion date
df = df.withColumn('file_path', F.col('_metadata.file_path'))\
        .withColumn('ingestion_date', F.current_timestamp())

In [0]:
# write the transformed data stream to Delta Table
streaming_query = df.writeStream.format('delta')\
                                .option('checkpointLocation', '/Volumes/gizmobox/landing/operational_data/customers_autoloader/_checkpoint_stream')\
                                .toTable('gizmobox.bronze.customers_autoloader')

In [0]:
%sql
SELECT * FROM gizmobox.bronze.customers_autoloader

In [0]:
# stop the stream - otherwise you will be charged until the stream is stopped (or you can terminate the cluster)
streaming_query.stop()