In [0]:
%sql
-- Create a catalog for production
CREATE CATALOG IF NOT EXISTS sales;

-- Create schemas for medallion layers
CREATE SCHEMA IF NOT EXISTS sales.bronze;
CREATE SCHEMA IF NOT EXISTS sales.silver;
CREATE SCHEMA IF NOT EXISTS sales.gold;

-- Create a managed volume for raw landing data
CREATE VOLUME IF NOT EXISTS sales.bronze.raw_sales_volume;


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Source folder for raw files
source_path = "/Volumes/sales/bronze/raw_sales_volume/"

# Schema and checkpoint inside the same volume
schema_path = "/Volumes/sales/bronze/raw_sales_volume/_schemas/netflix/"
checkpoint_path = "/Volumes/sales/bronze/raw_sales_volume/_checkpoints/netflix/"

# Read stream with Autoloader
df_netflix = (
    spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.schemaLocation", schema_path)
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        .load(source_path)
)

# Write to a Delta table
df_netflix.writeStream.format("delta") \
    .option("checkpointLocation", checkpoint_path) \
    .trigger(once=True) \
    .toTable("sales.bronze.netflix_data")


In [0]:
%sql
select * from sales.bronze.netflix_data


In [0]:
%sql
DESCRIBE HISTORY sales.bronze.netflix_data;
