# Ingestion Explorations

In [0]:
# Reading data from the container to check the schema

df = (
    spark
    .read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("abfss://source@sd0212.dfs.core.windows.net/ap_invoices")
)

display(df)

df.schema

In [0]:
# Reading data from the container to check the schema

df = (
    spark
    .read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("abfss://source@sd0212.dfs.core.windows.net/suppliers")
)

display(df)

df.schema

In [0]:
# Reading data from the container to check the schema

df = (
    spark
    .read
    .format("json")
    .option("inferSchema", "true")
    .load("abfss://source@sd0212.dfs.core.windows.net/gl_control_totals")
)

display(df)

df.schema

In [0]:
# Reading data from the container to check the schema

df = (
    spark
    .read
    .format("csv")
    .option("inferSchema", "true")
    .load("abfss://source@sd0212.dfs.core.windows.net/ap_invoices")
)

display(df)

df.schema

### Python Code

In [0]:
"""
Data Reading Module
===================================
"""
# ============================================================================
# DEPENDENCIES
# ============================================================================
import logging
from pyspark.sql.streaming import StreamingQuery

# ============================================================================
# CONFIGURATION
# ============================================================================
# Storage configuration
STORAGE_ACCOUNT = "sd0212"
CHECKPOINT_CONTAINER = "bronze"
SOURCE_CONTAINER = "source"

# Tables configuration
TABLES = ["ap_invoices", "suppliers", "gl_control_totals"]
SCHEMA_HINTS = {
    "ap_invoices": "InvoiceID STRING, InvoiceDate STRING, DueDate STRING, PaidDate STRING, SupplierID INT, Category STRING, CostCenter STRING, InvoiceAmount DOUBLE, Currency STRING, POID STRING, Quantity INT, UnitPrice_PO DOUBLE, UnitPrice_Invoice DOUBLE",
    "suppliers": "SupplierID INT, Supplier STRING",
    "gl_control_totals": "GL_Approved_Spend DOUBLE, Month STRING"
}

FORMATS = {
    "ap_invoices": "csv",
    "suppliers": "csv",
    "gl_control_totals": "json"
}

# Logging configuration
logging.basicConfig(
    level=logging.INFO, 
    format='%(asctime)s - %(levelname)s - %(name)s - %(message)s'
)


# ============================================================================
# FUNCTION
# ============================================================================
def auto_load_data(storage_account, checkpoint_container, source_container, table, file_format, schema_hints):
    """
    Auto-load data from source container to bronze container using Structured Streaming.
    
    Args:
        storage_account: Azure storage account name
        checkpoint_container: Container for checkpoints and output data
        source_container: Container with source data
        table: Table name to process
        schema_hints: Dictionary with schema hints for each table
    
    Returns:
        StreamingQuery: The streaming query object
    """
    try:
        checkpoint_path = f"abfss://{checkpoint_container}@{storage_account}.dfs.core.windows.net/checkpoint_{table}"
        source_path = f"abfss://{source_container}@{storage_account}.dfs.core.windows.net/{table}"
        output_path = f"abfss://{checkpoint_container}@{storage_account}.dfs.core.windows.net/{table}"
        
        # Read stream with Auto Loader
        df = (
            spark
            .readStream
            .format("cloudFiles")
            .option("cloudFiles.format", file_format)
            .option("cloudFiles.schemaLocation", checkpoint_path)
            .option("cloudFiles.schemaHints", schema_hints[table])
            .option("cloudFiles.inferColumnTypes", "true")
            .load(source_path)
        )   
        
        # Write stream 
        query = (
            df
            .writeStream
            .format("delta")
            .outputMode("append")
            .option("checkpointLocation", checkpoint_path)
            .option("path", output_path)
            .trigger(availableNow=True)
            .start()
        )
        
        # Wait for the stream to complete (since using availableNow trigger)
        query.awaitTermination()
        
        logging.info(f"Data loading for table {table} completed successfully.")
        return query
        
    except Exception as e:
        logging.error(f"Error processing table {table}: {str(e)}")
        raise

# ============================================================================
# EXECUTION
# ============================================================================
for table in TABLES:
    try:
        auto_load_data(STORAGE_ACCOUNT, CHECKPOINT_CONTAINER, SOURCE_CONTAINER, table, FORMATS, SCHEMA_HINTS)
    except Exception as e:
        logging.error(f"Failed to process table {table}: {str(e)}")
    
