### RAW FILE INGESTION IN SPARK DATAFRAME

In [0]:
from pyspark.sql.types import *

# Explicit schema
schema_pos = """
    store_id STRING,
    sku_id STRING,
    date STRING,
    units_sold STRING,
    price STRING,
    promo_flag STRING
"""

# Auto Loader with schema evolution
df_pos_raw = (
    spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.schemaLocation",
                "abfss://basab@basabstore.dfs.core.windows.net/pos_raw/schema_pos//")
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        .option("cloudFiles.schemaHints", schema_pos)   # safe evolution
        .load("abfss://basab@basabstore.dfs.core.windows.net/pos_raw/raw_data/*.csv")
)


In [0]:
df_pos_raw.display()

In [0]:
df_pos_raw.printSchema()

In [0]:
schema_products="""
    sku_id STRING,
    category STRING,
    subcategory STRING,
    brand STRING
"""

df_products_raw=spark.readStream.format("cloudFiles")\
                    .option("cloudFiles.format","csv")\
                    .option("cloudFiles.schemaLocation","abfss://basab@basabstore.dfs.core.windows.net/products_raw/products_schema/")\
                    .option("cloudFiles.schemaEvolutionMode","addNewColumns")\
                    .option("cloudFiles.schemaHints",schema_products)\
                    .load("abfss://basab@basabstore.dfs.core.windows.net/products_raw/raw_data/*.csv")


In [0]:
df_products_raw.display()

In [0]:
schema_inventory="""
    store_id STRING,
    sku_id STRING,
    stock_level STRING
"""

df_inventory_raw=spark.readStream.format("cloudFiles")\
                        .option("cloudFiles.format","csv")\
                            .option("cloudFiles.schemaLocation","abfss://basab@basabstore.dfs.core.windows.net/inventory_raw/inventory_schema/")\
                            .option("cloudFiles.schemaEvolutionMode","addNewColumns")\
                            .option("cloudFiles.schemaHints",schema_inventory)\
                            .load("abfss://basab@basabstore.dfs.core.windows.net/inventory_raw/raw_data/")

In [0]:
df_inventory_raw.display()

In [0]:
schema_store="""
    store_id STRING,
    region STRING,
    format STRING,
    size_sqft STRING,
    opening_date STRING
"""

df_stores_raw=spark.readStream.format("cloudFiles")\
                        .option("cloudFiles.format","csv")\
                            .option("cloudFiles.schemaLocation","abfss://basab@basabstore.dfs.core.windows.net/stores_raw/stores_schema/")\
                            .option("cloudFiles.schemaEvolutionMode","addNewColumns")\
                            .option("cloudFiles.schemaHints",schema_store)\
                            .load("abfss://basab@basabstore.dfs.core.windows.net/stores_raw/raw_data/")

In [0]:
df_stores_raw.display()

In [0]:
weather_schema="""
    date STRING,
    region STRING,
    temperature_c STRING,
    rainfall_mm STRING
"""

df_weather_raw=spark.readStream.format("cloudFiles")\
                        .option("cloudFiles.format","csv")\
                            .option("cloudFiles.schemaLocation","abfss://basab@basabstore.dfs.core.windows.net/weather_raw/weather_schema/")\
                            .option("cloudFiles.schemaEvolutionMode","addNewColumns")\
                            .option("cloudFiles.schemaHints",weather_schema)\
                            .load("abfss://basab@basabstore.dfs.core.windows.net/weather_raw/raw_data/")
df_weather_raw.display()

In [0]:
%sql
create schema if not exists basab_catalog_retail.bronze_tables

In [0]:
#checkpoints for bronze_tables


In [0]:
df_pos_raw.writeStream \
    .format("delta") \
    .option("checkpointLocation", "abfss://basab@basabstore.dfs.core.windows.net/pos_raw/checkpoints")\
    .option("mergeSchema", "true") \
    .outputMode("append") \
    .toTable("basab_catalog_retail.bronze_tables.pos_bronze")

In [0]:
%sql
select * from basab_catalog_retail.bronze_tables.pos_bronze

In [0]:
df_pos_raw.writeStream \
    .format("delta") \
    .option("checkpointLocation", "abfss://basab@basabstore.dfs.core.windows.net/pos_raw/checkpoints")\
    .option("mergeSchema", "true") \
    .outputMode("append") \
    .toTable("basab_catalog_retail.bronze_tables.pos_bronze")

In [0]:
%sql
select * from basab_catalog_retail.bronze_tables.pos_bronze


In [0]:
df_products_raw.writeStream \
    .format("delta") \
    .option("checkpointLocation", "abfss://basab@basabstore.dfs.core.windows.net/products_raw/checkpoints")\
    .option("mergeSchema", "true") \
    .outputMode("append") \
    .toTable("basab_catalog_retail.bronze_tables.products_bronze")

In [0]:
%sql
select * from basab_catalog_retail.bronze_tables.products_bronze

In [0]:
df_stores_raw.writeStream \
    .format("delta") \
    .option("checkpointLocation", "abfss://basab@basabstore.dfs.core.windows.net/stores_raw/checkpoints")\
    .option("mergeSchema", "true") \
    .outputMode("append") \
    .toTable("basab_catalog_retail.bronze_tables.stores_bronze")

In [0]:
%sql
select * from basab_catalog_retail.bronze_tables.stores_bronze

In [0]:
df_inventory_raw.writeStream \
    .format("delta") \
    .option("checkpointLocation", "abfss://basab@basabstore.dfs.core.windows.net/inventory_raw/checkpoints")\
    .option("mergeSchema", "true") \
    .outputMode("append") \
    .toTable("basab_catalog_retail.bronze_tables.inventory_bronze")

In [0]:
%sql
select * from basab_catalog_retail.bronze_tables.inventory_bronze

In [0]:
df_weather_raw.writeStream \
    .format("delta") \
    .option("checkpointLocation", "abfss://basab@basabstore.dfs.core.windows.net/weather_raw/checkpoints/")\
    .option("mergeSchema", "true") \
    .outputMode("append") \
    .toTable("basab_catalog_retail.bronze_tables.weather_bronze")

In [0]:
%sql
select * from basab_catalog_retail.bronze_tables.weather_bronze

In [0]:
df_holiday_raw=spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://basab@basabstore.dfs.core.windows.net/holiday_data/holidays.csv")


In [0]:
df_holiday_raw.display()

In [0]:
df_holiday_raw.write.format("delta").saveAsTable("basab_catalog_retail.bronze_tables.holiday_bronze")

In [0]:
%sql
select * from basab_catalog_retail.bronze_tables.holiday_bronze