In [1]:
import pandas as pd

In [2]:
df = pd.read_csv("transactions_uncleaned.csv")

## Timestamp Column Cleaning 

In [3]:
df['timestamp'] = df['timestamp'].str.split('.').str[0]
df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')

## ID Column Cleaning

In [4]:
df = df.drop_duplicates(subset=['transaction_id'], keep='first')
df['transaction_id'] = df['transaction_id'].astype('Int64')
df = df.dropna(subset=['transaction_id'])

## Product Column Cleaning

In [5]:
df['product_id'] = df['product_id'].where(df['product_id'] <= 10)
df['product_id'] = df['product_id'].astype('Int64')

## Quantity Column Cleaning

In [6]:
df['quantity'] = df['quantity'].abs()
df['quantity'] = df['quantity'].astype('Int64')

## Price Column Cleaning

In [7]:
df['price'] = df['price'].where(df['price'] >= 1)
df['price'] = df['price'].abs()
df['price'] = df['price'].where(df['price'] < 999999)

## Customer id Column Cleaning

In [8]:
df['customer_id'] = df['customer_id'].astype('Int64')

## Sales Chaneel Column Cleaning

In [9]:
df['sales_channel'] = df['sales_channel'].where(df['sales_channel'] != 'invalid_channel')

## Discount Code Column Cleaning

In [10]:
df['discount_code'] = df['discount_code'].where(df['discount_code'] != 'NOTVALID')

## Region Column Cleaning

In [11]:
df['region'] = df['region'].where(df['region'] != 'invalid_region')

## Payment Method Column Cleaning

In [12]:
df['payment_method'] = df['payment_method'].where(df['payment_method'] != 'invalid_method')

## Transactions Status Column Cleaning

In [13]:
df['transaction_status'] = df['transaction_status'].where(df['transaction_status'] != 'unknown')

In [18]:
from sqlalchemy import create_engine, text
import numpy as np


df = df.replace({np.nan: None})

conn_string = 'postgresql://postgres:postgres@localhost/retail_store'
db = create_engine(conn_string)
engine = create_engine(conn_string)


with db.connect() as conn:
    
    conn.execute(text("""
        CREATE TABLE IF NOT EXISTS transactions (
            transaction_id      INTEGER PRIMARY KEY,
            timestamp           TIMESTAMP,
            product_id          INTEGER,
            quantity            INTEGER,
            price               NUMERIC(10, 2),
            customer_id         INTEGER ,
            sales_channel       VARCHAR(50),
            discount_code       VARCHAR(50),
            region              VARCHAR(50),
            payment_method      VARCHAR(50),
            transaction_status  VARCHAR(50),
            valid_transaction_id BOOLEAN,
            valid_product_customer_ids BOOLEAN,
            valid_quantity_price BOOLEAN,
            standardized_sales_channel VARCHAR(50),
            valid_discount_code_flag BOOLEAN,
            valid_payment_method_flag BOOLEAN,
            revenue NUMERIC(12, 2),
            standardized_transaction_status VARCHAR(50)  
        );
    """))

    
    with conn.begin():
        sql = text("""
            INSERT INTO transactions (
                transaction_id, timestamp, product_id, quantity, price, 
                customer_id, sales_channel, discount_code, region, payment_method, transaction_status)
            VALUES (
                :transaction_id, :timestamp, :product_id, :quantity, :price, 
                :customer_id, :sales_channel, :discount_code, :region, :payment_method, :transaction_status)
            ON CONFLICT (transaction_id) DO NOTHING  -- Skip duplicates
        """)
        
        for row in df.to_dict(orient='records'):
            conn.execute(sql, row)


In [19]:
with engine.connect() as conn:
    with conn.begin():
        conn.execute(text("""
            -- Remove rows with missing transaction_id or timestamp
            DELETE FROM transactions
            WHERE transaction_id IS NULL OR timestamp IS NULL;
        """))


In [20]:
with engine.connect() as conn:
    with conn.begin():
        conn.execute(text("""
            UPDATE transactions
            SET valid_transaction_id = CASE
                WHEN transaction_id IS NOT NULL THEN TRUE
                ELSE FALSE
            END;
        """))

In [21]:
with engine.connect() as conn:
    with conn.begin():
        conn.execute(text("""
            UPDATE transactions
            SET valid_product_customer_ids = CASE
                WHEN product_id IS NOT NULL AND customer_id IS NOT NULL THEN TRUE
                ELSE FALSE
            END;
        """))

In [22]:
with engine.connect() as conn:
    with conn.begin():
        conn.execute(text("""
            UPDATE transactions
            SET valid_quantity_price = CASE
                WHEN quantity IS NOT NULL AND quantity > 0
                    AND price IS NOT NULL AND price > 0 THEN TRUE
                ELSE FALSE
            END;
        """))

In [23]:
with engine.connect() as conn:
    with conn.begin():
        conn.execute(text("""
            UPDATE transactions
            SET standardized_sales_channel = CASE
                WHEN sales_channel IN ('store', 'online', 'reseller') THEN sales_channel
                ELSE 'unknown'
            END;
        """))

In [24]:
with engine.connect() as conn:
    with conn.begin():
        conn.execute(text("""
            UPDATE transactions
            SET valid_discount_code_flag = CASE
                WHEN discount_code IN ('WELCOME10', 'HALFOFF', 'STUDENT20') THEN TRUE
                ELSE FALSE
            END;
        """))

In [25]:
with engine.connect() as conn:
    with conn.begin():
        conn.execute(text("""
            UPDATE transactions
            SET valid_payment_method_flag = CASE
                WHEN payment_method IN ('credit_card', 'debit_card', 'paypal', 'cash') THEN TRUE
                ELSE FALSE
            END;
        """))

In [26]:
with engine.connect() as conn:
    with conn.begin():
        conn.execute(text("""
            UPDATE transactions
            SET revenue = quantity * price
            WHERE quantity IS NOT NULL AND price IS NOT NULL;
        """))

In [27]:
with engine.connect() as conn:
    with conn.begin():
        conn.execute(text("""
            UPDATE transactions
            SET standardized_transaction_status = CASE
                WHEN LOWER(transaction_status) LIKE '%completed%' THEN 'Completed'
                WHEN LOWER(transaction_status) LIKE '%pending%' THEN 'Pending'
                WHEN LOWER(transaction_status) LIKE '%failed%' THEN 'Failed'
                ELSE 'Unknown'
            END;
        """))

In [28]:
with engine.connect() as conn:
    with conn.begin():
        conn.execute(text("""
            CREATE OR REPLACE VIEW vw_analytics_transactions AS
            SELECT
                transaction_id,
                timestamp,
                product_id,
                customer_id,
                quantity,
                price,
                revenue,
                
                -- Standardized and cleaned fields
                standardized_sales_channel AS sales_channel,
                region,
                payment_method,
                standardized_transaction_status AS transaction_status,
                discount_code,

                -- Flags for data quality
                valid_transaction_id,
                valid_product_customer_ids,
                valid_quantity_price,
                valid_discount_code_flag,
                valid_payment_method_flag

            FROM transactions
            WHERE
                -- Include only fully valid transactions
                valid_transaction_id = TRUE
                AND valid_product_customer_ids = TRUE
                AND valid_quantity_price = TRUE
                AND valid_payment_method_flag = TRUE;

        """))

In [29]:
with engine.connect() as conn:
    with conn.begin():
        conn.execute(text("""
            UPDATE transactions
            SET revenue = quantity * price
            WHERE quantity IS NOT NULL AND price IS NOT NULL;
        """))

In [30]:
with engine.connect() as conn:
    with conn.begin():
        conn.execute(text("""
            DELETE FROM transactions
            WHERE 
                transaction_id IS NULL OR
                timestamp IS NULL OR
                product_id IS NULL OR
                quantity IS NULL OR
                price IS NULL OR
                customer_id IS NULL OR
                sales_channel IS NULL OR
                region IS NULL OR
                payment_method IS NULL OR
                transaction_status IS NULL;
        """))