In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, TimestampType

customer_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("customer_name", StringType(), True),
    StructField("date_of_birth", DateType(), True),
    StructField("email", StringType(), True),
    StructField("member_since", DateType(), True),
    StructField("created_timestamp", TimestampType(), True),
    StructField("telephone", StringType(), True)
])

In [0]:
#### Read files using Datastream Reader API

customer_df = spark.readStream.format("json").schema(customer_schema).load("/Volumes/gizmobox/landing/operational_data/customers_stream/") ##Can't infer schema

In [0]:
### Transform the df to add two more cols filepath and ingestion date
from pyspark.sql.functions import col, current_timestamp
customer_transformed_df = ( 
                           customer_df.withColumn("filepath", col("_metadata.file_path"))
                           .withColumn("ingestion_date", current_timestamp())
                           )

In [0]:
### Write the transformed data to Delta Table
streaming_query = (
    customer_transformed_df.writeStream.format("delta").option("checkpointLocation", "/Volumes/gizmobox/landing/operational_data/customers_stream/_checkpoint_stream" ).trigger(processingTime="2 minutes").outputMode("append").toTable("gizmobox.bronze.customers_stream")
     ##.option("mergeSchema", "true") ## Merges the schema in case of new cols added without failing the stream on retry
)

In [0]:
%sql
select * from gizmobox.bronze.customers_stream;

In [0]:
## to stop the streaming query
streaming_query.stop()

#### Processing modes in Spark Streaming
1. Microbatch --> Spark collects the data in small intervals and process it as a batch, and after it captures new data. It's the default option
2. Continuous Processing ---> Lower Fault tolerance, but process the data record by record

##### Trigger:
- Default is the no trigger which has 500 microseconds interval
- fixed interval: .trigger(processingTime="2minutes") User specified interval
- Triggered Once: .trigger(once=True); Process all the data available as one micro-batch and stops (deprecated)
- available now: .trigger(availableNow=True); Process all data available as multiple micro batch and stops (parallel processing)
continuous

##### outputMode:

- Append(default): Writes the new rows arrived since the last micro-batch
- complete: writes the entire result to sink every time
- update: Writes only the rows that have changed since the last micro-batch


#### Checkpointing:

It'a fault-tolerance mechanism that allows query to recover from failures and resume processing from where it left off without data loss or duplication

Stores metadata about streaming query, execution plan

Tracks processed offsets (readlogs (start)) and committed results (write logs (end)).

Write-Ahead-Logs (WAL/offset logs) and checkpointing helps provide Fault Tolerance. 

Idempotent Sinks enables exactly once guarantees. (no duplication of data) if not idempotent sink, you need to take care of it


### AutoLoader

Its a new structured streaming source designed for large scale efficient data ingestion. It incrementally and efficiently process new data files as they arrive.

Traditional File Stream Source: Has inefficinet file listing, scalability issues, schema evolution problems
- It constantly searches for any new incoming file this is okay for few files, but for millions of files it might cause memory problems
- Deduplication is time consuming, if there are duplicate files it would take some time to correct it.
- User has to give the schema and it can't be inferred, leading to data loss.

Auto Loader: It uses efficient file detection using cloud services(cloud queue), in-memory replaced by rocks-db for scalability, schema evolution and resilency (rescued data, etc)

```
customer_df = spark.readStream
                  .format("cloudFiles") # enables autoloader
                  .option("cloudFiles.format", "json")
                  .option("cloudFiles.useNotifications", "true") ## to enable notifications using cloud svc provider
                  .option("cloudFiles.schemaLocation", "Volumes/gizmobox/Landing/operational_data/customers_stream/_schema")
                  .option("cloudFiles.inferColumnTypes","true") ## to infer the schema
                  .option("cloudFiles.schemaHints", "date_of_birth DATE ,member_since DATE") ## to influence the behaviour of inferring the schema
                  .option("cloudFiles.schemaEvolutionMode", "addNewColumns") ## schema evolution
                 
                  .load("/Volumes/gizmobox/landing/operational_data/customers_autoLoader/")

```

_rescued_date: string --> Special col added by autoloader to add any unhandled data.