# Foundry to Databricks Bronze Layer Pipeline

This notebook demonstrates a data ingestion pipeline that extracts data from Palantir Foundry via JDBC and loads it into a Databricks bronze layer following the medallion architecture pattern.

## Architecture Overview
- **Bronze Layer**: Raw data ingestion with minimal transformation
- **String Casting**: Initial data is cast to strings to handle schema variations
- **Schema Enforcement**: Data is then cast back to proper types based on target schema
- **Incremental Loading**: Uses append mode with merge schema for flexibility

In [None]:
# Import PySpark SQL functions for data transformations
import pyspark.sql.functions as F

## JDBC Connection Helper Function

This function creates a reusable JDBC reader for Palantir Foundry data sources.

In [None]:
def read_foundry_jdbc(table, schema=None):
    """
    Read data from Palantir Foundry using JDBC connection.
    
    Args:
        table (str): The fully qualified table name or dataset resource identifier
        schema (StructType, optional): PySpark schema to apply to the data
    
    Returns:
        DataFrame: PySpark DataFrame containing the requested data
    """
    # JDBC connection string for Foundry SQL interface
    jdbc_url = "jdbc:foundrysql://your-instance.palantirfoundry.com"
    
    # Connection properties with secure credential management
    properties = {
        # Retrieve credentials from Databricks secrets for security
        "password": dbutils.secrets.get(scope="your_scope", key="your_token"),
        "driver": "com.palantir.foundry.sql.jdbc.FoundryJdbcDriver",
    }

    # Configure the JDBC reader with connection details
    reader = (
        spark.read.format("jdbc")
        .option("url", jdbc_url)
        .option("dbtable", table)
        .option("password", properties["password"])
        .option("driver", properties["driver"])
        .option("numPartitions", 8)  # Parallelize read across 8 partitions for performance
    )
    
    # Optionally apply a predefined schema
    if schema is not None:
        reader = reader.schema(schema)
    
    return reader.load()

## Main Pipeline Function: Create Bronze Staging Layer

This function orchestrates the ETL process:
1. Reads a configuration table that lists all tables to export
2. Iterates through each table
3. Performs schema casting and validation
4. Writes to bronze layer with audit timestamp

In [None]:
%%python
def create_staging_layer():
    """
    Main ETL function to ingest data from Foundry into Databricks bronze layer.
    
    Process:
    1. Read configuration table listing all source tables
    2. For each table:
       - Extract data from Foundry
       - Cast to strings for safe initial handling
       - Create temporary views for schema management
       - Cast to proper data types
       - Write to bronze layer with load timestamp
    """
    
    # Step 1: Read the export tables configuration dataset
    # This dataset contains SCHEMA and TABLE_NAME columns that define what to export
    export_tables = read_foundry_jdbc(
        '"ri.foundry.main.dataset.YOUR-DATASET-ID-HERE"'
    )
    
    # Step 2: Create a dictionary mapping schema names to table names
    # Format: {schema_name: "table_name"}
    export_tables_dict = dict(
        export_tables.select("SCHEMA", "TABLE_NAME")
        .rdd.map(lambda row: (row["SCHEMA"], f'"{row["TABLE_NAME"]}"'))
        .collect()
    )

    # Step 3: Iterate through each table to process
    for key in export_tables_dict:
        # Read the source data from Foundry
        df = read_foundry_jdbc(export_tables_dict[key])
        
        # Capture the original schema before casting
        schema = df.schema
        
        # Step 4: Cast all columns to strings initially
        # This provides a safety layer to handle any data type inconsistencies
        df_str = df.select([df[c].cast("string").alias(c) for c in df.columns])

        try:
            # Step 5: Create temporary views for schema management
            schema_view = f"tmp_schema_{key}"
            raw_view = f"tmp_raw_{key}"
            
            # Create an empty DataFrame with original schema as reference
            spark.createDataFrame([], schema).createOrReplaceTempView(schema_view)
            
            # Register the string-casted data as a temp view
            df_str.createOrReplaceTempView(raw_view)

            # Step 6: Extract the target schema from the schema view
            target_schema = spark.table(schema_view).schema

            # Step 7: Cast raw data back to proper types based on target schema
            raw_df = spark.table(raw_view)
            
            # Build list of columns with proper type casting
            casted_cols = [
                F.col(f.name).cast(f.dataType).alias(f.name)
                for f in target_schema
                if f.name in raw_df.columns  # Only cast columns that exist in raw data
            ]
            
            # Apply casting and add audit timestamp for tracking when data was loaded
            casted_df = raw_df.select(casted_cols).withColumn(
                "bronze_load_ts", 
                F.current_timestamp()
            )

            # Step 8: Write the processed data to the bronze layer
            casted_table = f"your_catalog.bronze.bronze_{key}"
            
            # Option 1: Overwrite existing data (commented out)
            # Use this for full refresh scenarios
            #casted_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(casted_table)
            
            # Option 2: Append mode with schema merging (active)
            # Use this for incremental loads where schema may evolve
            casted_df.write.mode("append").option("mergeSchema", "true").saveAsTable(casted_table)

            print(f"Successfully wrote casted table for {key}")
            
        except Exception as e:
            # Error handling: Log the error and continue with next table
            # This prevents one failing table from stopping the entire pipeline
            print(f"Error writing tables for {key}: {str(e)}")
            df.printSchema()  # Print schema for debugging purposes
            continue

## Execute the Pipeline

Run the staging layer creation process for all configured tables.

In [None]:
create_staging_layer()