In [None]:
import os
import sys
import logging
import numpy as np
import pandas as pd

if "__file__" in globals():
    current_dir = os.path.dirname(os.path.abspath(__file__))
else:
    current_dir = os.getcwd()

silver_folder = os.path.dirname(current_dir)
python_folder = os.path.dirname(silver_folder)

if python_folder not in sys.path:
    sys.path.append(python_folder)             # .../python

if python_folder not in sys.path:
    sys.path.append(python_folder)


from utils.db_connection import get_engine
from utils.paths import get_raw_data_path

In [20]:


def extract_from_bronze(table_name: str) -> pd.DataFrame:
    engine = get_engine("bronze")
    try:
        return pd.read_sql(f"SELECT * FROM {table_name}", engine)
    except Exception as e:
        raise RuntimeError(f"Failed to extract from bronze table {table_name}") from e

In [26]:
schema ={
    "cid"                 : "string",
    "country_name"        : "string",
        }

#! high level schema enforcement function 
def enforce_schema(df: pd.DataFrame, schema: dict) -> pd.DataFrame:
    for column, dtype in schema.items():

        if column not in df.columns:
            #! log warning and skip missing columns
            logging.warning(f"[SCHEMA WARNING] Column missing: {column}")
            continue
        if dtype in ("Int64", "int64", "float64"):
            df[column] = pd.to_numeric(df[column], errors="coerce")
            if dtype == "Int64":
                df[column] = df[column].astype("Int64")
        elif dtype.startswith("datetime"):
            df[column] = pd.to_datetime(df[column], errors="coerce")
        elif dtype == "boolean":
            df[column] = df[column].astype("boolean")
        elif dtype == "string":
            df[column] = df[column].astype("string")
        else:
            # fallback (rare cases)
            df[column] = df[column].astype(dtype)

    return df

In [27]:

df= extract_from_bronze("erp_location_a101")
df = enforce_schema(df, schema)
df.info()

Loading config from: d:\data_engineering_project\configs\db_config.json
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 18484 entries, 0 to 18483
Data columns (total 5 columns):
 #   Column        Non-Null Count  Dtype         
---  ------        --------------  -----         
 0   ingest_id     0 non-null      object        
 1   raw_row       18484 non-null  object        
 2   cid           18484 non-null  string        
 3   country_name  18152 non-null  string        
 4   loaded_at     18484 non-null  datetime64[ns]
dtypes: datetime64[ns](1), object(2), string(2)
memory usage: 722.2+ KB


In [28]:
df.drop(columns=["raw_row"], inplace=True, errors="ignore")


In [33]:
df["country_name"].unique()	

<StringArray>
[     'Australia',             'US',         'Canada',             'DE',
 'United Kingdom',         'France',            'USA',        'Germany',
             <NA>,               '',  'United States']
Length: 11, dtype: string

In [83]:
def standardize_customer_id(df: pd.DataFrame) -> pd.DataFrame:
    if "cid" in df.columns:
        df = df[df["cid"].str.len() >= 10]
        df["cid"] = df["cid"].str[-10:]
    else:
        logging.warning("[CID WARNING] 'cid' column not found for standardization.")
        
    if "gender_raw" in df.columns:
        df["gender_raw"] = (
        df["gender_raw"]
        .str.strip().replace({
	    "M": "Male",
	    "F": "Female",
        "" : pd.NA
        })
        )
        df["gender_raw"] = df["gender_raw"].fillna(pd.NA)
    else:
        logging.warning("[GENDER WARNING] 'gender_raw' column not found for standardization.")
    df.drop(columns=["raw_row"], inplace=True, errors="ignore")
    return df

df = standardize_customer_id(df)
df

Unnamed: 0,ingest_id,cid,birth_date_raw,gender_raw,loaded_at
0,1,AW00011000,1971-10-06,Male,2026-02-23 13:03:10
1,2,AW00011001,1976-05-10,Male,2026-02-23 13:03:10
2,3,AW00011002,1971-02-09,Male,2026-02-23 13:03:10
3,4,AW00011003,1973-08-14,Female,2026-02-23 13:03:10
4,5,AW00011004,1979-08-05,Female,2026-02-23 13:03:10
...,...,...,...,...,...
18479,18480,AW00029479,1969-06-30,,2026-02-23 13:03:10
18480,18481,AW00029480,1977-05-06,,2026-02-23 13:03:10
18481,18482,AW00029481,1965-07-04,,2026-02-23 13:03:10
18482,18483,AW00029482,1964-09-01,,2026-02-23 13:03:10


In [74]:
df["gender_raw"].unique()

<StringArray>
['Male', 'Female', <NA>]
Length: 3, dtype: string

In [75]:
df.tail()

Unnamed: 0,ingest_id,raw_row,cid,birth_date_raw,gender_raw,loaded_at
18479,18480,"""{\""cid\"": \""AW00029479\"", \""bdate\"": \""1969-0...",AW00029479,1969-06-30,,2026-02-23 13:03:10
18480,18481,"""{\""cid\"": \""AW00029480\"", \""bdate\"": \""1977-0...",AW00029480,1977-05-06,,2026-02-23 13:03:10
18481,18482,"""{\""cid\"": \""AW00029481\"", \""bdate\"": \""1965-0...",AW00029481,1965-07-04,,2026-02-23 13:03:10
18482,18483,"""{\""cid\"": \""AW00029482\"", \""bdate\"": \""1964-0...",AW00029482,1964-09-01,,2026-02-23 13:03:10
18483,18484,"""{\""cid\"": \""AW00029483\"", \""bdate\"": \""1965-0...",AW00029483,1965-06-06,,2026-02-23 13:03:10


In [76]:
df.drop(columns=["raw_row"], inplace=True)

In [77]:
#df["cid"] = df["cid"].str[3:]

In [88]:
df[
    (df["birth_date_raw"] > "2025-12-31")
    ]

Unnamed: 0,ingest_id,cid,birth_date_raw,gender_raw,loaded_at
257,258,AW00011257,2050-07-06,Female,2026-02-23 13:03:10
410,411,AW00011410,2042-02-22,Male,2026-02-23 13:03:10
551,552,AW00011551,2050-05-21,Male,2026-02-23 13:03:10
562,563,AW00011562,2038-10-17,Male,2026-02-23 13:03:10
581,582,AW00011581,2045-03-03,Female,2026-02-23 13:03:10
775,776,AW00011775,2050-11-22,Female,2026-02-23 13:03:10
912,913,AW00011912,2066-06-16,Female,2026-02-23 13:03:10
1123,1124,AW00012123,2065-12-12,Male,2026-02-23 13:03:10
2417,2418,AW00013417,2050-09-07,Male,2026-02-23 13:03:10
9062,9063,AW00020062,2080-03-15,Male,2026-02-23 13:03:10


In [34]:
for v in df["country_name"].unique():
    print(repr(v))

'Australia'
'US'
'Canada'
'DE'
'United Kingdom'
'France'
'USA'
'Germany'
<NA>
''
'United States'


In [40]:
df["cid"] = df["cid"].str.replace("-", "", regex=False)

In [45]:
df["country_name"] = df["country_name"].str.strip().str.lower().str.title()

In [47]:
df["country_name"].unique()	

<StringArray>
[     'Australia',             'Us',         'Canada',             'De',
 'United Kingdom',         'France',            'Usa',        'Germany',
             <NA>,               '',  'United States']
Length: 11, dtype: string

In [80]:
df = df[df["cid"].str.len() >= 10]
df["cid"] = df["cid"].str[-10:]

In [81]:
df.isnull().sum()	

ingest_id            0
cid                  0
birth_date_raw       5
gender_raw        1476
loaded_at            0
dtype: int64

In [82]:
print(df["cid"].str.len().value_counts())

cid
10    18484
Name: count, dtype: Int64
