# Extract Customers Data to Bronze Layer

This notebook extracts customer data from the WorldWideImporters SQL Server database and loads it into the Unity Catalog bronze layer.

## Parameters
- `catalog_name`: Unity Catalog name
- `schema_name`: Schema name (bronze)
- `sql_server_host`: SQL Server hostname
- `sql_database_name`: SQL Database name
- `sql_username`: SQL Server username
- `sql_password`: SQL Server password

In [None]:
# Databricks notebook source
# MAGIC %md
# MAGIC # Extract Customers Data to Bronze Layer

# COMMAND ----------

# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import datetime

# COMMAND ----------

# MAGIC %md
# MAGIC ## Parameters

# COMMAND ----------

# Get parameters from widget or job parameters
dbutils.widgets.text("catalog_name", "don_datalab_catalog", "Catalog Name")
dbutils.widgets.text("schema_name", "bronze", "Schema Name")
dbutils.widgets.text("sql_server_host", "", "SQL Server Host")
dbutils.widgets.text("sql_database_name", "WorldWideImporters", "SQL Database Name")
dbutils.widgets.text("sql_username", "", "SQL Username")
dbutils.widgets.text("sql_password", "", "SQL Password")

catalog_name = dbutils.widgets.get("catalog_name")
schema_name = dbutils.widgets.get("schema_name")
sql_server_host = dbutils.widgets.get("sql_server_host")
sql_database_name = dbutils.widgets.get("sql_database_name")
sql_username = dbutils.widgets.get("sql_username")
sql_password = dbutils.widgets.get("sql_password")

print(f"Catalog: {catalog_name}")
print(f"Schema: {schema_name}")
print(f"SQL Server: {sql_server_host}")
print(f"Database: {sql_database_name}")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Setup Unity Catalog

# COMMAND ----------

# Create catalog if not exists
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog_name}")

# Create schema if not exists
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_name}")

# Use the catalog and schema
spark.sql(f"USE CATALOG {catalog_name}")
spark.sql(f"USE SCHEMA {schema_name}")

print(f"Using catalog: {catalog_name}, schema: {schema_name}")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Extract Customer Data

# COMMAND ----------

# Configure SQL Server connection
jdbc_url = f"jdbc:sqlserver://{sql_server_host}:1433;database={sql_database_name};encrypt=true;trustServerCertificate=true"

connection_properties = {
    "user": sql_username,
    "password": sql_password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# SQL query to extract customer data
customer_query = """
SELECT 
    CustomerID,
    CustomerName,
    BillToCustomerID,
    CustomerCategoryID,
    PrimaryContactPersonID,
    DeliveryMethodID,
    DeliveryCityID,
    PostalCityID,
    AccountOpenedDate,
    StandardDiscountPercentage,
    IsStatementSent,
    IsOnCreditHold,
    PaymentDays,
    PhoneNumber,
    FaxNumber,
    WebsiteURL,
    DeliveryAddressLine1,
    DeliveryPostalCode,
    PostalAddressLine1,
    PostalPostalCode,
    LastEditedBy,
    ValidFrom,
    ValidTo
FROM Sales.Customers
"""

# Read data from SQL Server
try:
    customers_df = spark.read.jdbc(
        url=jdbc_url,
        table=f"({customer_query}) as customers",
        properties=connection_properties
    )
    
    print(f"Successfully extracted {customers_df.count()} customer records")
    
except Exception as e:
    print(f"Error extracting customer data: {str(e)}")
    raise

# COMMAND ----------

# MAGIC %md
# MAGIC ## Add Metadata and Load to Bronze

# COMMAND ----------

# Add metadata columns
customers_bronze = customers_df \
    .withColumn("extraction_timestamp", current_timestamp()) \
    .withColumn("source_system", lit("worldwideimporters_sql")) \
    .withColumn("source_table", lit("Sales.Customers")) \
    .withColumn("bronze_layer_version", lit("1.0"))

# Show sample data
print("Sample customer data:")
customers_bronze.show(5, truncate=False)

print("\nSchema:")
customers_bronze.printSchema()

# COMMAND ----------

# Write to Unity Catalog bronze layer
table_name = f"{catalog_name}.{schema_name}.customers"

try:
    customers_bronze.write \
        .mode("overwrite") \
        .option("mergeSchema", "true") \
        .saveAsTable(table_name)
    
    print(f"Successfully loaded customer data to {table_name}")
    
    # Verify the load
    record_count = spark.table(table_name).count()
    print(f"Verified: {record_count} records in {table_name}")
    
except Exception as e:
    print(f"Error loading customer data to Unity Catalog: {str(e)}")
    raise

# COMMAND ----------

# MAGIC %md
# MAGIC ## Data Quality Checks

# COMMAND ----------

# Basic data quality checks
total_records = spark.table(table_name).count()
null_customer_names = spark.table(table_name).filter(col("CustomerName").isNull()).count()
duplicate_customer_ids = spark.table(table_name).groupBy("CustomerID").count().filter(col("count") > 1).count()

print(f"Data Quality Report for {table_name}:")
print(f"- Total records: {total_records}")
print(f"- Records with null CustomerName: {null_customer_names}")
print(f"- Duplicate CustomerIDs: {duplicate_customer_ids}")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Summary

# COMMAND ----------

print("✅ Customer data extraction completed successfully!")
print(f"📊 Loaded {total_records} customer records to {table_name}")
print(f"⏰ Extraction completed at: {datetime.datetime.now()}")