## Pre-requisites

### Create Delta table

In [0]:
%sql
DROP TABLE IF EXISTS quickstart_catalog.quickstart_schema.users_int;
CREATE table IF NOT EXISTS quickstart_catalog.quickstart_schema.users_int (
  id INT,
  name STRING,
  dob DATE,
  email STRING,
  gender STRING,
  country STRING,
  region STRING,
  city STRING,
  asset INT,
  marital_status STRING
) USING DELTA;
 
DESCRIBE EXTENDED quickstart_catalog.quickstart_schema.users_int;
 
SELECT * from quickstart_catalog.quickstart_schema.users_int;

In [0]:
dbutils.fs.rm("/Volumes/quickstart_catalog/quickstart_schema/sandbox/schema/", recurse=True)
dbutils.fs.rm("/Volumes/quickstart_catalog/quickstart_schema/sandbox/cloudfiles_checkpoint", recurse=True)

## Data Ingestion using Autoloader

### Scenario 1

In [0]:
from pyspark.sql.types import StructType, StructField, StringType
 
cloud_files_properties = {
    "cloudFiles.format": "csv",
    "cloudFiles.schemaLocation": "/Volumes/quickstart_catalog/quickstart_schema/sandbox/schema/",
    "header": "true",
    "cloudFiles.inferColumnTypes": "true",
    "timestampFormat": "yyyy-MM-dd"
}
df = (
    spark.readStream
        .format("cloudFiles")
        .options(**cloud_files_properties)
        .load("/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/users_staging_cloudfiles/")
)

In [0]:
query = (
    df.writeStream
        .format("delta")
        .outputMode("append")  
        .option("checkpointLocation", "/Volumes/quickstart_catalog/quickstart_schema/sandbox/cloudfiles_checkpoint")
        .option("mergeSchema", "true")
        .trigger(availableNow=True)  
        .toTable("quickstart_catalog.quickstart_schema.users_int")
)
 

In [0]:
spark.table("quickstart_catalog.quickstart_schema.users_int").display()