# Connection with local DataBase

In [59]:
from sqlalchemy import create_engine
import urllib
import pandas as pd

# Your existing pyodbc connection details
server = r"MELKON\SQLEXPRESS"
database = "DataWarehouse"

# Construct ODBC connection string
odbc_str = f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};Trusted_Connection=yes;TrustServerCertificate=yes;"

# Encode for SQLAlchemy
connection_url = f"mssql+pyodbc:///?odbc_connect={urllib.parse.quote_plus(odbc_str)}"

# Create SQLAlchemy engine
engine = create_engine(connection_url)

**Script Overview:**

======================================================================================================================
This SQL script updates the `silver.crm_cst_info` table by adding an `is_future` column and inserting the latest 
cleaned customer records from `bronze.crm_cst_info`. It ensures data consistency by:  

- Selecting the most recent record per customer (`cst_id`) using `ROW_NUMBER()`.  
- Cleaning names and standardizing marital status and gender values.  
- Flagging future-dated records in the `is_future` column.  

This script is part of an ETL process, ensuring accurate and up-to-date customer data for analytics and reporting. 🚀

=======================================================================================================================

In [58]:
# SQL Query
query = """
SELECT
    cst_id,
    cst_key,
    TRIM(cst_firstname) AS cst_firstname,
    TRIM(cst_lastname) AS cst_lastname,
    CASE 
        WHEN UPPER(TRIM(cst_material_status)) = 'M' THEN 'Married'
        WHEN UPPER(TRIM(cst_material_status)) = 'S' THEN 'Single'
        ELSE 'n/a'
    END AS cst_material_status,
    CASE 
        WHEN UPPER(TRIM(cst_gender)) = 'M' THEN 'Male'
        WHEN UPPER(TRIM(cst_gender)) = 'F' THEN 'Female'
        ELSE 'n/a'
    END AS cst_gender,
    cst_create_date,
    CASE 
        WHEN cst_create_date IS NULL THEN 0
        WHEN cst_create_date > GETDATE() THEN 1
        ELSE 0
    END AS cst_is_future
FROM (
    SELECT
        *,
        ROW_NUMBER() OVER(PARTITION BY cst_id ORDER BY cst_create_date DESC) AS flag_last
    FROM bronze.crm_cst_info
    WHERE cst_id IS NOT NULL
) latest_record
WHERE flag_last = 1;
"""

# Execute query using SQLAlchemy engine
df_results = pd.read_sql(query, engine)

# Display DataFrame
df_results

Unnamed: 0,cst_id,cst_key,cst_firstname,cst_lastname,cst_material_status,cst_gender,cst_create_date,cst_is_future
0,11000,AW00011000,Jon,Yang,Married,Male,2025-10-06,1
1,11001,AW00011001,Eugene,Huang,Single,Male,2025-10-06,1
2,11002,AW00011002,Ruben,Torres,Married,Male,2025-10-06,1
3,11003,AW00011003,Christy,Zhu,Single,Female,2025-10-06,1
4,11004,AW00011004,Elizabeth,Johnson,Single,Female,2025-10-06,1
...,...,...,...,...,...,...,...,...
18479,29479,AW00029479,Tommy,Tang,Married,,2026-01-25,1
18480,29480,AW00029480,Nina,Raji,Single,,2026-01-25,1
18481,29481,AW00029481,Ivan,Suri,Single,,2026-01-25,1
18482,29482,AW00029482,Clayton,Zhang,Married,,2026-01-25,1


In [60]:
df = df_results
print(df.head())

   cst_id     cst_key cst_firstname cst_lastname cst_material_status  \
0   11000  AW00011000           Jon         Yang             Married   
1   11001  AW00011001        Eugene        Huang              Single   
2   11002  AW00011002         Ruben       Torres             Married   
3   11003  AW00011003       Christy          Zhu              Single   
4   11004  AW00011004     Elizabeth      Johnson              Single   

  cst_gender cst_create_date  cst_is_future  
0       Male      2025-10-06              1  
1       Male      2025-10-06              1  
2       Male      2025-10-06              1  
3     Female      2025-10-06              1  
4     Female      2025-10-06              1  


In [61]:
import pandas as pd
from sqlalchemy import create_engine
import urllib

# Database Connection
server = r"MELKON\SQLEXPRESS"
database = "DataWarehouse"

# Construct ODBC connection string
odbc_str = f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};Trusted_Connection=yes;TrustServerCertificate=yes;"
connection_url = f"mssql+pyodbc:///?odbc_connect={urllib.parse.quote_plus(odbc_str)}"

# Create SQLAlchemy engine
engine = create_engine(connection_url)

# Step 1: Get all tables in 'bronze' schema
query_get_tables = """
SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = 'bronze'
"""
df_tables = pd.read_sql(query_get_tables, engine)
table_names = df_tables['TABLE_NAME'].tolist()

# Step 2: Iterate through each table
for table_name in table_names:
    print(f"\n🔹 Processing Table: {table_name}\n")

    # Step 3: Get all column names for the current table
    query_get_columns = f"""
    SELECT COLUMN_NAME, DATA_TYPE 
    FROM INFORMATION_SCHEMA.COLUMNS
    WHERE TABLE_SCHEMA = 'bronze' AND TABLE_NAME = '{table_name}'
    """
    
    df_columns = pd.read_sql(query_get_columns, engine)
    column_names = df_columns['COLUMN_NAME'].tolist()
    column_data_types = dict(zip(df_columns['COLUMN_NAME'], df_columns['DATA_TYPE']))

    # Step 4: Generate dynamic SELECT statement with cleansing
    select_clauses = []
    
    for col in column_names:
        col_lower = col.lower()
        data_type = column_data_types[col]

        # 1️⃣ Trim Strings
        if data_type in ['varchar', 'nvarchar']:
            select_clauses.append(f"TRIM({col}) AS {col}")

        # 2️⃣ Standardize Material Status
        elif "material_status" in col_lower:
            select_clauses.append(f"""
                CASE 
                    WHEN UPPER(TRIM({col})) = 'M' THEN 'Married'
                    WHEN UPPER(TRIM({col})) = 'S' THEN 'Single'
                    ELSE 'n/a'
                END AS {col}
            """)

        # 3️⃣ Standardize Gender
        elif "gender" in col_lower:
            select_clauses.append(f"""
                CASE 
                    WHEN UPPER(TRIM({col})) = 'M' THEN 'Male'
                    WHEN UPPER(TRIM({col})) = 'F' THEN 'Female'
                    ELSE 'n/a'
                END AS {col}
            """)

        # 4️⃣ Handle Future Dates
        elif "create_date" in col_lower and data_type in ['datetime', 'date']:
            select_clauses.append(f"""
                CASE 
                    WHEN {col} IS NULL THEN 0
                    WHEN {col} > GETDATE() THEN 1
                    ELSE 0
                END AS cst_is_future
            """)

        # 5️⃣ Keep Everything Else as is
        else:
            select_clauses.append(f"{col}")

    # Convert list into SQL SELECT statement
    select_query = ",\n    ".join(select_clauses)

    # Step 5: Execute the dynamically generated query
    query_clean = f"""
    SELECT
        {select_query}
    FROM (
        SELECT
            *,
            ROW_NUMBER() OVER(PARTITION BY cst_id ORDER BY cst_create_date DESC) AS flag_last
        FROM bronze.{table_name}
        WHERE cst_id IS NOT NULL
    ) latest_record
    WHERE flag_last = 1;
    """

    # Execute query and load into DataFrame
    df_cleaned = pd.read_sql(query_clean, engine)

    # Print the cleaned table for each iteration
    print(df_cleaned)

print("\n✅ Cleaning process completed for all tables in 'bronze' schema!")



🔹 Processing Table: crm_cst_info

       cst_id     cst_key cst_firstname cst_lastname cst_material_status  \
0       11000  AW00011000           Jon         Yang                   M   
1       11001  AW00011001        Eugene        Huang                   S   
2       11002  AW00011002         Ruben       Torres                   M   
3       11003  AW00011003       Christy          Zhu                   S   
4       11004  AW00011004     Elizabeth      Johnson                   S   
...       ...         ...           ...          ...                 ...   
18479   29479  AW00029479         Tommy         Tang                   M   
18480   29480  AW00029480          Nina         Raji                   S   
18481   29481  AW00029481          Ivan         Suri                   S   
18482   29482  AW00029482       Clayton        Zhang                   M   
18483   29483  AW00029483          Marc      Navarro                   M   

      cst_gender  cst_is_future  
0              M  

ProgrammingError: (pyodbc.ProgrammingError) ('42S22', "[42S22] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Invalid column name 'cst_id'. (207) (SQLExecDirectW); [42S22] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Invalid column name 'cst_id'. (207); [42S22] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Invalid column name 'cst_create_date'. (207)")
[SQL: 
    SELECT
        prd_id,
    TRIM(prd_key) AS prd_key,
    TRIM(prd_nm) AS prd_nm,
    prd_cost,
    TRIM(prd_line) AS prd_line,
    prd_start_dt,
    prd_end_dt
    FROM (
        SELECT
            *,
            ROW_NUMBER() OVER(PARTITION BY cst_id ORDER BY cst_create_date DESC) AS flag_last
        FROM bronze.crm_prd_info
        WHERE cst_id IS NOT NULL
    ) latest_record
    WHERE flag_last = 1;
    ]
(Background on this error at: https://sqlalche.me/e/20/f405)