## Stream Customers Data From Cloud Files to Delta Lake using Auto Loader
1. Read files from cloud storage using Auto Loader
1. Transform the dataframe to add the following columns
    -   file path: Cloud file path
    -   ingestion date: Current Timestamp
1. Write the transformed data stream to Delta Lake Table

### 1. Read files using Auto Loader

In [0]:
%python
customers_df = (
                    spark.readStream
                         .format("cloudFiles")
                         .option("cloudFiles.format", "json")
                         .option("cloudFiles.schemaLocation", "/Volumes/gizmobox/landing/operational_data/customers_autoloader/_schema")
                         .option("cloudFiles.inferColumnTypes", "true")
                         .option("cloudFiles.schemaHints", "date_of_birth DATE, member_since DATE, created_timestamp TIMESTAMP")
                         .load("/Volumes/gizmobox/landing/operational_data/customers_autoloader/")
)

### 2. Transform the dataframe to add the following columns
- file path: Cloud file path
- ingestion date: Current Timestamp

In [0]:
%python
from pyspark.sql.functions import current_timestamp, col

customers_transformed_df = (
                                customers_df.withColumn("file_path", col("_metadata.file_path"))
                                            .withColumn("ingestion_date", current_timestamp())
)

### 3. Write the transformed data stream to Delta Table 

In [0]:
%python
streaming_query = (
                    customers_transformed_df.writeStream
                        .format("delta")
                        .option("checkpointLocation", "/Volumes/gizmobox/landing/operational_data/customers_autoloader/_checkpoint_stream")
                        .toTable("gizmobox.bronze.customers_autoloader")
)

In [0]:
select * from gizmobox.bronze.customers_autoloader

In [0]:
%python
streaming_query.stop()