In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [2]:
import pandas as pd
from datetime import datetime

# Read CSV as raw, untrusted data
header_raw = pd.read_csv(
    "/kaggle/input/noisy-raw-data/store_sales_header_noisy.csv",
    dtype=str,          # VERY IMPORTANT: everything as string
    keep_default_na=False
)

items_raw = pd.read_csv(
    "/kaggle/input/noisy-raw-data/store_sales_line_items_noisy.csv",
    dtype=str,
    keep_default_na=False
)


In [3]:
def standardize_columns(df):
    df.columns = (
        df.columns
          .str.strip()
          .str.lower()
          .str.replace(" ", "_")
    )
    return df

header_raw = standardize_columns(header_raw)
items_raw = standardize_columns(items_raw)


In [4]:
def normalize_strings(df):
    for col in df.columns:
        df[col] = df[col].apply(
            lambda x: x.strip() if isinstance(x, str) else x
        )
    return df

header_raw = normalize_strings(header_raw)
items_raw = normalize_strings(items_raw)


In [5]:
INGESTION_TIME = datetime.now()
BATCH_ID = INGESTION_TIME.strftime("%Y%m%d%H%M%S")

header_raw["ingestion_timestamp"] = INGESTION_TIME
header_raw["source_file"] = "store_sales_header_noisy.csv"
header_raw["batch_id"] = BATCH_ID

items_raw["ingestion_timestamp"] = INGESTION_TIME
items_raw["source_file"] = "store_sales_line_items_noisy.csv"
items_raw["batch_id"] = BATCH_ID


In [6]:
header_raw.to_csv("raw_store_sales_header.csv", index=False)
items_raw.to_csv("raw_store_sales_line_items.csv", index=False)


In [8]:
import pandas as pd
import re

# =====================================================
# 1. LOAD RAW DATA (UNTRUSTED)
# =====================================================

header_df = pd.read_csv("/kaggle/working/raw_store_sales_header.csv", dtype=str)
items_df = pd.read_csv("/kaggle/working/raw_store_sales_line_items.csv", dtype=str)

# =====================================================
# 2. LOAD REFERENCE DATA (TRUSTED)
# =====================================================

stores = pd.read_csv("/kaggle/input/store-and-product/stores (1).csv")
products = pd.read_csv("/kaggle/input/store-and-product/products (1).csv")

valid_stores = set(stores["store_id"])
valid_products = set(products["product_id"])

# =====================================================
# 3. HELPER FUNCTION
# =====================================================

def parse_amount(raw_value):
    """
    Parses values like:
    - 15012.04
    - INR 15,012.04
    - Rs. 5,000
    """
    if raw_value is None or raw_value == "":
        raise ValueError("Empty amount")

    cleaned = re.sub(r"[^\d.,-]", "", raw_value)
    cleaned = cleaned.replace(",", "")
    return float(cleaned)

# =====================================================
# 4. HEADER VALIDATION (PDF-ALIGNED)
# =====================================================

def validate_header(row):
    errors = []

    # Store validation (HARD)
    if row["store_id"] not in valid_stores:
        errors.append("Invalid store_id")

    # Amount validation
    try:
        amt = parse_amount(row["total_amount"])
        if amt <= 0:
            errors.append("Non-positive total_amount")
    except:
        errors.append("Unparseable total_amount")

    # Date validation
    try:
        txn_date = pd.to_datetime(row["transaction_date"])
        if txn_date.year < 2020 or txn_date > pd.Timestamp.now():
            errors.append("transaction_date out of range")
    except:
        errors.append("Invalid transaction_date")

    return errors

header_df["validation_errors"] = header_df.apply(validate_header, axis=1)
header_df["is_valid"] = header_df["validation_errors"].apply(lambda x: len(x) == 0)
header_df["validation_errors"] = header_df["validation_errors"].apply(
    lambda x: "; ".join(x)
)

# =====================================================
# 5. LINE ITEM VALIDATION (PDF-ALIGNED)
# =====================================================

def validate_line_item(row):
    errors = []

    # Product validation (HARD)
    if row["product_id"] not in valid_products:
        errors.append("Invalid product_id")

    # Quantity validation
    try:
        qty = int(row["quantity"])
        if qty <= 0:
            errors.append("Non-positive quantity")
    except:
        errors.append("Invalid quantity format")

    # Amount validation
    try:
        amt = parse_amount(row["line_item_amount"])
        if amt < 0:
            errors.append("Negative line_item_amount")
    except:
        errors.append("Unparseable line_item_amount")

    return errors

items_df["validation_errors"] = items_df.apply(validate_line_item, axis=1)
items_df["is_valid"] = items_df["validation_errors"].apply(lambda x: len(x) == 0)
items_df["validation_errors"] = items_df["validation_errors"].apply(
    lambda x: "; ".join(x)
)

# =====================================================
# 6. SAVE STAGING OUTPUTS
# =====================================================

header_df.to_csv("staging_store_sales_header.csv", index=False)
items_df.to_csv("staging_store_sales_line_items.csv", index=False)

print(" Stage 2 complete: Validation done ")


 Stage 2 complete: Validation done 


  txn_date = pd.to_datetime(row["transaction_date"])
  txn_date = pd.to_datetime(row["transaction_date"])


In [9]:
header_df

Unnamed: 0,transaction_id,store_id,customer_id,transaction_date,total_amount,ingestion_timestamp,source_file,batch_id,validation_errors,is_valid
0,T000001,S001,c0019,13-12-2025 11:30:46,"INR 9,116.76",2025-12-19 10:44:40.801434,store_sales_header_noisy.csv,20251219104440,,True
1,T000002,S003,C0036,13-12-2025 18:01:28,21674.84,2025-12-19 10:44:40.801434,store_sales_header_noisy.csv,20251219104440,,True
2,T000003,S002,C0051,2025/12/13 21:05,727.62,2025-12-19 10:44:40.801434,store_sales_header_noisy.csv,20251219104440,,True
3,T000004,S002,C0043,2025/12/13 15:03,5636.62,2025-12-19 10:44:40.801434,store_sales_header_noisy.csv,20251219104440,,True
4,T000005,S002,C0053,13/12/2025 12:15:50 PM,"â‚¹17,547.67",2025-12-19 10:44:40.801434,store_sales_header_noisy.csv,20251219104440,,True
...,...,...,...,...,...,...,...,...,...,...
114,T000115,Store-S001,C0047,2025/12/19 20:30,"INR 27,216.98",2025-12-19 10:44:40.801434,store_sales_header_noisy.csv,20251219104440,Invalid store_id; transaction_date out of range,False
115,T000116,005,c0055,2025/12/19 13:54,12340.43,2025-12-19 10:44:40.801434,store_sales_header_noisy.csv,20251219104440,Invalid store_id; transaction_date out of range,False
116,T000117,s005,c0054,2025-12-19 12:40:57,"â‚¹11,443.87",2025-12-19 10:44:40.801434,store_sales_header_noisy.csv,20251219104440,Invalid store_id; transaction_date out of range,False
117,T000118,S003,C0027,2025-12-19T20:49:15,11722.64,2025-12-19 10:44:40.801434,store_sales_header_noisy.csv,20251219104440,transaction_date out of range,False


In [11]:
items_df

Unnamed: 0,line_item_id,transaction_id,product_id,promotion_id,quantity,line_item_amount,ingestion_timestamp,source_file,batch_id,validation_errors,is_valid
0,1,T000001,P0004,0,4,"â‚¹9,106.76",2025-12-19 10:44:40.801434,store_sales_line_items_noisy.csv,20251219104440,,True
1,2,T000002,P0011,0,1,"INR 3,753.01",2025-12-19 10:44:40.801434,store_sales_line_items_noisy.csv,20251219104440,,True
2,3,T000002,P0011,,03,11259.03,2025-12-19 10:44:40.801434,store_sales_line_items_noisy.csv,20251219104440,,True
3,4,T000002,P0012,,2,INR 221.38,2025-12-19 10:44:40.801434,store_sales_line_items_noisy.csv,20251219104440,,True
4,5,T000002,P0002,,2,1048.42,2025-12-19 10:44:40.801434,store_sales_line_items_noisy.csv,20251219104440,,True
...,...,...,...,...,...,...,...,...,...,...,...
365,366,T000119,P0003,0,4,"INR 1,455.24",2025-12-19 10:44:40.801434,store_sales_line_items_noisy.csv,20251219104440,,True
366,367,T000119,P0006,PR002,3,"INR 6,097.56",2025-12-19 10:44:40.801434,store_sales_line_items_noisy.csv,20251219104440,,True
367,368,T000119,P0020,,3,1513.35,2025-12-19 10:44:40.801434,store_sales_line_items_noisy.csv,20251219104440,,True
368,369,T000119,P0023,,02,827.18,2025-12-19 10:44:40.801434,store_sales_line_items_noisy.csv,20251219104440,,True


In [12]:
import pandas as pd
import re

# =====================================================
# 1. LOAD STAGING DATA
# =====================================================

header_df = pd.read_csv("/kaggle/working/staging_store_sales_header.csv", dtype=str)
items_df = pd.read_csv("/kaggle/working/staging_store_sales_line_items.csv", dtype=str)

# =====================================================
# 2. HELPER: AMOUNT PARSER (SAME AS STAGE 2)
# =====================================================

def parse_amount(raw_value):
    if raw_value is None or raw_value == "":
        raise ValueError("Empty amount")

    cleaned = re.sub(r"[^\d.,-]", "", raw_value)
    cleaned = cleaned.replace(",", "")
    return float(cleaned)

# =====================================================
# 3. PREPARE LINE ITEM TOTALS (ONLY VALID LINE ITEMS)
# =====================================================

# Convert amounts safely for valid line items
items_df["parsed_line_amount"] = items_df.apply(
    lambda r: parse_amount(r["line_item_amount"])
    if r["is_valid"] == True or r["is_valid"] == "True"
    else None,
    axis=1
)

# Sum line items per transaction
line_totals = (
    items_df
    .dropna(subset=["parsed_line_amount"])
    .groupby("transaction_id")["parsed_line_amount"]
    .sum()
    .reset_index()
    .rename(columns={"parsed_line_amount": "calculated_total"})
)

# =====================================================
# 4. MERGE WITH HEADER DATA
# =====================================================

header_df = header_df.merge(
    line_totals,
    on="transaction_id",
    how="left"
)

# =====================================================
# 5. RECONCILIATION CHECK
# =====================================================

TOLERANCE = 0.01  # currency rounding tolerance

def reconcile(row):
    errors = []

    # Carry forward existing errors
    if isinstance(row["validation_errors"], str) and row["validation_errors"]:
        errors.extend(row["validation_errors"].split("; "))

    try:
        header_total = parse_amount(row["total_amount"])
        line_total = row["calculated_total"]

        if pd.isna(line_total):
            errors.append("No valid line items found")
        elif abs(header_total - line_total) > TOLERANCE:
            errors.append("Header-Line total mismatch")

    except:
        errors.append("Reconciliation failed")

    return list(set(errors))  # remove duplicates

header_df["recon_errors"] = header_df.apply(reconcile, axis=1)

header_df["is_valid"] = header_df["recon_errors"].apply(lambda x: len(x) == 0)
header_df["validation_errors"] = header_df["recon_errors"].apply(
    lambda x: "; ".join(x)
)

header_df.drop(columns=["recon_errors"], inplace=True)

# =====================================================
# 6. SAVE RECONCILED STAGING OUTPUT
# =====================================================

header_df.to_csv("reconciled_store_sales_header.csv", index=False)
items_df.to_csv("reconciled_store_sales_line_items.csv", index=False)

print("âœ… Stage 3 complete: Headerâ€“Line reconciliation applied.")


âœ… Stage 3 complete: Headerâ€“Line reconciliation applied.


In [13]:
header_df

Unnamed: 0,transaction_id,store_id,customer_id,transaction_date,total_amount,ingestion_timestamp,source_file,batch_id,validation_errors,is_valid,calculated_total
0,T000001,S001,c0019,13-12-2025 11:30:46,"INR 9,116.76",2025-12-19 10:44:40.801434,store_sales_header_noisy.csv,20251219104440,Header-Line total mismatch,False,9106.76
1,T000002,S003,C0036,13-12-2025 18:01:28,21674.84,2025-12-19 10:44:40.801434,store_sales_header_noisy.csv,20251219104440,,True,21674.84
2,T000003,S002,C0051,2025/12/13 21:05,727.62,2025-12-19 10:44:40.801434,store_sales_header_noisy.csv,20251219104440,,True,727.62
3,T000004,S002,C0043,2025/12/13 15:03,5636.62,2025-12-19 10:44:40.801434,store_sales_header_noisy.csv,20251219104440,,True,5636.62
4,T000005,S002,C0053,13/12/2025 12:15:50 PM,"â‚¹17,547.67",2025-12-19 10:44:40.801434,store_sales_header_noisy.csv,20251219104440,Header-Line total mismatch,False,16750.71
...,...,...,...,...,...,...,...,...,...,...,...
114,T000115,Store-S001,C0047,2025/12/19 20:30,"INR 27,216.98",2025-12-19 10:44:40.801434,store_sales_header_noisy.csv,20251219104440,transaction_date out of range; Header-Line tot...,False,8775.34
115,T000116,005,c0055,2025/12/19 13:54,12340.43,2025-12-19 10:44:40.801434,store_sales_header_noisy.csv,20251219104440,transaction_date out of range; Invalid store_id,False,12340.43
116,T000117,s005,c0054,2025-12-19 12:40:57,"â‚¹11,443.87",2025-12-19 10:44:40.801434,store_sales_header_noisy.csv,20251219104440,transaction_date out of range; Header-Line tot...,False,6991.63
117,T000118,S003,C0027,2025-12-19T20:49:15,11722.64,2025-12-19 10:44:40.801434,store_sales_header_noisy.csv,20251219104440,transaction_date out of range,False,11722.64


In [14]:
import pandas as pd
from datetime import datetime

# =====================================================
# 1. LOAD RECONCILED DATA
# =====================================================

header_df = pd.read_csv("/kaggle/working/reconciled_store_sales_header.csv", dtype=str)
items_df = pd.read_csv("/kaggle/working/reconciled_store_sales_line_items.csv", dtype=str)

# Normalize is_valid to boolean
header_df["is_valid"] = header_df["is_valid"].astype(str).str.lower() == "true"
items_df["is_valid"] = items_df["is_valid"].astype(str).str.lower() == "true"

# =====================================================
# 2. SPLIT HEADER DATA
# =====================================================

clean_headers = header_df[header_df["is_valid"] == True]
bad_headers = header_df[header_df["is_valid"] == False]

# =====================================================
# 3. SPLIT LINE ITEMS
# =====================================================

clean_items = items_df[items_df["is_valid"] == True]
bad_items = items_df[items_df["is_valid"] == False]

# =====================================================
# 4. ADD QUARANTINE METADATA
# =====================================================

QUARANTINE_TIME = datetime.now()

for df in [bad_headers, bad_items]:
    df["quarantine_timestamp"] = QUARANTINE_TIME

# =====================================================
# 5. SAVE OUTPUT FILES
# =====================================================

clean_headers.to_csv("clean_store_sales_header.csv", index=False)
clean_items.to_csv("clean_store_sales_line_items.csv", index=False)

bad_headers.to_csv("quarantine_store_sales_header.csv", index=False)
bad_items.to_csv("quarantine_store_sales_line_items.csv", index=False)

print("âœ… Stage 4 complete: Clean and Quarantine datasets created.")


âœ… Stage 4 complete: Clean and Quarantine datasets created.


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["quarantine_timestamp"] = QUARANTINE_TIME
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["quarantine_timestamp"] = QUARANTINE_TIME


In [16]:
import pandas as pd
import re

# =====================================================
# 1. LOAD CLEAN DATA ONLY
# =====================================================

header_df = pd.read_csv("clean_store_sales_header.csv", dtype=str)
items_df = pd.read_csv("clean_store_sales_line_items.csv", dtype=str)

# =====================================================
# 2. HELPER: AMOUNT NORMALIZATION
# =====================================================

def normalize_amount(raw_value):
    """
    Converts:
    - 'INR 9,116.76'
    - 'Rs. 5,000'
    - '15012.04'
    â†’ float
    """
    if raw_value is None or raw_value == "":
        return None

    cleaned = re.sub(r"[^\d.,-]", "", raw_value)
    cleaned = cleaned.replace(",", "")
    return float(cleaned)

# =====================================================
# 3. HEADER DATA TYPE CONVERSION
# =====================================================

# Preserve raw amount for audit
header_df["total_amount_raw"] = header_df["total_amount"]

# Normalize amount
header_df["total_amount"] = header_df["total_amount"].apply(normalize_amount)

# Convert transaction_date (FIXED: mixed formats)
header_df["transaction_date"] = pd.to_datetime(
    header_df["transaction_date"],
    format="mixed",      # ðŸ‘ˆ KEY FIX
    dayfirst=True,       # handles dd-mm-yyyy safely
    errors="raise"       # fail fast if unexpected
)

# Convert flags
header_df["is_valid"] = header_df["is_valid"].astype(bool)

# =====================================================
# 4. LINE ITEM DATA TYPE CONVERSION
# =====================================================

# Preserve raw amount
items_df["line_item_amount_raw"] = items_df["line_item_amount"]

# Normalize amount
items_df["line_item_amount"] = items_df["line_item_amount"].apply(normalize_amount)

# Convert quantity
items_df["quantity"] = items_df["quantity"].astype(int)

# Convert flags
items_df["is_valid"] = items_df["is_valid"].astype(bool)

# =====================================================
# 5. SELECT FINAL PRODUCTION COLUMNS
# =====================================================

prod_header_cols = [
    "transaction_id",
    "store_id",
    "transaction_date",
    "total_amount",
    "total_amount_raw"
]

prod_item_cols = [
    "line_item_id",
    "transaction_id",
    "product_id",
    "quantity",
    "line_item_amount",
    "line_item_amount_raw"
]

prod_header_df = header_df[prod_header_cols]
prod_items_df = items_df[prod_item_cols]

# =====================================================
# 6. SAVE PRODUCTION DATA
# =====================================================

prod_header_df.to_csv("prod_store_sales_header.csv", index=False)
prod_items_df.to_csv("prod_store_sales_line_items.csv", index=False)

print("âœ… Stage 5 complete: Data normalized and production-ready.")


âœ… Stage 5 complete: Data normalized and production-ready.


In [17]:
prod_header_df

Unnamed: 0,transaction_id,store_id,transaction_date,total_amount,total_amount_raw
0,T000002,S003,2025-12-13 18:01:28,21674.84,21674.84
1,T000003,S002,2025-12-13 21:05:00,727.62,727.62
2,T000004,S002,2025-12-13 15:03:00,5636.62,5636.62
3,T000009,S005,2025-12-13 13:07:00,11259.03,11259.03 INR
4,T000011,S003,2025-12-13 11:52:57,13715.78,13715.78
5,T000012,S002,2025-12-13 13:26:10,11488.93,"INR 11,488.93"
6,T000013,S002,2025-12-13 13:56:12,6127.41,6127.41
7,T000015,S002,2025-12-13 19:33:00,1555.4,1555.40 INR
8,T000016,S003,2025-12-13 11:41:00,14147.87,14147.87 INR
9,T000018,S005,2025-12-13 19:50:00,2364.59,2364.59


In [18]:
prod_items_df

Unnamed: 0,line_item_id,transaction_id,product_id,quantity,line_item_amount,line_item_amount_raw
0,1,T000001,P0004,4,9106.76,"â‚¹9,106.76"
1,2,T000002,P0011,1,3753.01,"INR 3,753.01"
2,3,T000002,P0011,3,11259.03,11259.03
3,4,T000002,P0012,2,221.38,INR 221.38
4,5,T000002,P0002,2,1048.42,1048.42
...,...,...,...,...,...,...
294,366,T000119,P0003,4,1455.24,"INR 1,455.24"
295,367,T000119,P0006,3,6097.56,"INR 6,097.56"
296,368,T000119,P0020,3,1513.35,1513.35
297,369,T000119,P0023,2,827.18,827.18
