In [0]:
%sql SHOW EXTERNAL LOCATIONS;

In [0]:
spark.sql("SHOW STORAGE CREDENTIALS").show(truncate=False)


###CREATING VOLUME STORAGE FROM S3 BUCKET

In [0]:
spark.sql("""
DESCRIBE EXTERNAL LOCATION `db_s3_external_databricks-s3-ingest-5cdc9`
""").show(truncate=False)


In [0]:
%sql
CREATE EXTERNAL VOLUME IF NOT EXISTS iandatabricks.default.crm_raw_files
LOCATION 's3://databricks-ian/sourceCRM/'
COMMENT 'Clean shortcut to my CRM files';

### Ingesting to Bronze Layer

### ERP folder..

In [0]:
from pyspark.sql.functions import current_timestamp
import time

# --- 1. Configuration ---
ingestion_map = [
    {"file": "CUST_AZ12.csv", "table": "erp_bronze_customers"},
    {"file": "LOC_A101.csv", "table": "erp_bronze_products"},
    {"file": "PX_CAT_G1V2.csv", "table": "erp_bronze_transactions"}
]
source_base = "/Volumes/iandatabricks/default/erp_raw_files/"

# --- 2. Execution Loop ---
for item in ingestion_map:
    file_filter = item["file"]
    target = f"iandatabricks.bronze.{item['table']}"
    checkpoint = f"{source_base}_checkpoints/{item['table']}"
    
    query = (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("pathGlobFilter", file_filter)
        .option("cloudFiles.schemaLocation", f"{checkpoint}/schema")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(source_base)
        .select("*", "_metadata.file_path")
        .withColumn("load_timestamp", current_timestamp())
        .writeStream
        .option("checkpointLocation", f"{checkpoint}/data")
        .trigger(availableNow=True)
        .toTable(target))

    # Wait for the batch to finish so we can read the stats
    query.awaitTermination()
    
    # --- 3. The Duplicate Check Logic ---
    # We look at the last progress report from the stream
    recent_progress = query.recentProgress
    if recent_progress:
        # Sum up all input rows from the batch
        rows_added = sum(p['numInputRows'] or 0 for p in recent_progress)
        
        if rows_added > 0:
            print(f"✅ SUCCESS: Added {rows_added} new rows to {target} from {file_filter}.")
        else:
            print(f"⚠️  SKIP: No data added to {target}. File {file_filter} is a duplicate or already processed.")
    else:
        print(f"ℹ️  No changes detected for {target}.")

print("\n--- All ingestion tasks complete ---")

In [0]:
# Check the table content and the new file_path column
display(spark.table("iandatabricks.bronze.bronze_erp_customer"))

In [0]:
%sql

select count(*) from iandatabricks.bronze.erp_bronze_customers

In [0]:
%sql
drop table iandatabricks.bronze.bronze_erp_customer

### CRM **Folder**

In [0]:
# Run this to wipe the memory and start fresh
dbutils.fs.rm(checkpoint_path, recurse=True)

In [0]:


# --- 1. Configuration ---
ingestion_map = [
    {"file": "cust_info.csv", "table": "crm_custinfo"},
    {"file": "prd_info.csv", "table": "crm_prdinfo"},
    {"file": "sales_details.csv", "table": "crm_salesdeets"}
]
source_base = "/Volumes/iandatabricks/default/crm_raw_files"

# --- 2. Execution Loop ---
for item in ingestion_map:
    file_filter = item["file"]
    target = f"iandatabricks.bronze.{item['table']}"
    checkpoint = f"{source_base}/_checkpoints/{item['table']}"
    
    query = (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("pathGlobFilter", file_filter)
        .option("cloudFiles.schemaLocation", f"{checkpoint}/schema")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(source_base)
        .select("*", "_metadata.file_path")
        .withColumn("load_timestamp", current_timestamp())
        .writeStream
        .option("checkpointLocation", f"{checkpoint}/data")
        .trigger(availableNow=True)
        .toTable(target))

    # Wait for the batch to finish so we can read the stats
    query.awaitTermination()
    
    # --- 3. The Duplicate Check Logic ---
    # We look at the last progress report from the stream
    recent_progress = query.recentProgress
    if recent_progress:
        # Sum up all input rows from the batch
        rows_added = sum(p['numInputRows'] or 0 for p in recent_progress)
        
        if rows_added > 0:
            print(f"✅ SUCCESS: Added {rows_added} new rows to {target} from {file_filter}.")
        else:
            print(f"⚠️  SKIP: No data added to {target}. File {file_filter} is a duplicate or already processed.")
    else:
        print(f"ℹ️  No changes detected for {target}.")

print("\n--- All ingestion tasks complete ---")