In [4]:
import pandas as pd
import numpy as np
import os
from datetime import datetime
import re
import warnings

src_dir = r'D:\Ravi_Data_Engineer\02_Target\01_raw'
tgt_dir = r'D:\Ravi_Data_Engineer\02_Target\02_cleansed'
os.makedirs(tgt_dir, exist_ok=True)

def try_parse_date(value, formats):
    for fmt in formats:
        try:
            return datetime.strptime(value.strip(), fmt)
        except Exception:
            continue
    return pd.NaT

def clean_datefields(df, output_format='%Y-%m-%d'):
    common_formats = [
        "%Y-%m-%d", "%d/%m/%Y", "%m/%d/%Y", "%d-%m-%Y", "%m-%d-%Y",
        "%Y/%d/%m", "%d.%m.%Y", "%d %b %Y", "%Y/%m/%d"
    ]

    for col in df.columns:
        if "date" in col.lower():
            df[col] = df[col].astype(str).apply(lambda x: try_parse_date(x, common_formats))
            df[col] = df[col].dt.strftime(output_format)

    return df

def clean_products(df, group_col='category'):
    if group_col not in df.columns:
        print(f"'{group_col}' column not found. Skipping product-specific cleaning.")
        return df
    numeric_cols = df.select_dtypes(include='number').columns
    for col in numeric_cols:
        df[col] = df.groupby(group_col)[col].transform(lambda x: x.fillna(x.mean()))
        df[col] = df[col].round(2)
    return df

def enforce_numeric_cols(df):
    numeric_cols = df.select_dtypes(include='number').columns
    for col in numeric_cols:
        df[col] = pd.to_numeric(df[col], errors='coerce')
        #df[col] = df[col].fillna(0).round(2)
    return df

def clean_transactions(df):
    numeric_cols = df.select_dtypes(include='number').columns
    df[numeric_cols] = df[numeric_cols].fillna(0).round(2)
    return df

def normalize_phone(phone):
    digits = re.sub(r'\D', '', str(phone))
    return digits if len(digits) == 10 else '1234567890'

keyword = input("Enter the keyword to search in file names:").lower()

# Delete existing target file if it matches keyword
for f in os.listdir(tgt_dir):
    if keyword in f.lower():
        os.remove(os.path.join(tgt_dir, f))
        print(f"Deleted existing file from target: {f}")

# Identify source files
files_list_src = [f for f in os.listdir(src_dir) if keyword in f.lower()]
if not files_list_src:
    print(f"No files found with keyword '{keyword}' in {src_dir}.")

cleaned_dfs = []

for file in files_list_src:
    file_path = os.path.join(src_dir, file)
    print(f"\nReading: {file_path}")
    try:
        df = pd.read_csv(file_path) # Read CSV file

        df.columns = df.columns.str.strip().str.lower().str.replace(r'\.\d+$', '', regex=True) # Normalize column names

        df = df.loc[:, ~df.columns.duplicated()] # Remove duplicate columns

        df = df.dropna(how='all') # Drop rows where all elements are Nulls

        df = clean_datefields(df) #standardize datefields

        df = df.map(lambda x: x.strip() if isinstance(x, str) else x) # stripping whitespace from strings
        
        string_cols = df.select_dtypes(include='object').columns
        df[string_cols] = df[string_cols].fillna('Unknown') #Null strings filled with unknown

        df = df.drop_duplicates() # drop duplicates

        if 'phone' in df.columns:
            df['phone'] = df['phone'].apply(normalize_phone) # normalixing phone numbers

        if keyword == 'products':
            df = clean_products(df) # clean product data
        elif keyword == 'transactions':
            df = clean_transactions(df) # clean transaction data

        numeric_cols = df.select_dtypes(include='number').columns
        df[numeric_cols] = df[numeric_cols].round(2) # numeric columns rounded to 2 decimal places

        df = enforce_numeric_cols(df) # enforce numeric datatype for columns

        string_cols = df.select_dtypes(include='object').columns
        non_date_string_cols = [col for col in string_cols if 'date' not in col.lower()]
        if non_date_string_cols:
            df[non_date_string_cols] = df[non_date_string_cols].apply(lambda col: col.str.strip().str.replace(r'[^\w\s]', '', regex=True))
            
        cleaned_dfs.append((file, df))
        print(f" Cleaned: {file} - {df.shape[0]} rows, {df.shape[1]} columns")

    except Exception as e:
        print(f" Failed to read {file}: {e}")
        continue

# Save cleaned files
for file_name, df in cleaned_dfs:
    orig_name, ext = os.path.splitext(file_name)
    new_name = f"cleansed_{orig_name.replace('raw_', '')}{ext}"
    tgt_file = os.path.join(tgt_dir, new_name)
    try:
        df.to_csv(tgt_file, index=False)
        print(f" Saved cleaned data to: {tgt_file}")
    except Exception as e:
        print(f" Failed to save {file_name}: {e}")


Deleted existing file from target: cleansed_retail_transactions.csv

Reading: D:\Ravi_Data_Engineer\02_Target\01_raw\raw_retail_transactions.csv
 Cleaned: raw_retail_transactions.csv - 500000 rows, 8 columns
 Saved cleaned data to: D:\Ravi_Data_Engineer\02_Target\02_cleansed\cleansed_retail_transactions.csv
