In [None]:

import pandas as pd
import sqlite3
from datetime import datetime

#Connect to SQLite database
conn = sqlite3.connect("bank_core.db")
cursor = conn.cursor()

#Create core tables (if not exist)
cursor.execute("""
CREATE TABLE IF NOT EXISTS accounts (
    account_id TEXT PRIMARY KEY,
    opening_balance REAL
)
""")

cursor.execute("""
CREATE TABLE IF NOT EXISTS transactions (
    transaction_id TEXT PRIMARY KEY,
    account_id TEXT,
    transaction_date TEXT,
    amount REAL,
    transaction_type TEXT,
    channel TEXT,
    load_timestamp TEXT
)
""")

cursor.execute("""
CREATE TABLE IF NOT EXISTS rejected_transactions (
    transaction_id TEXT,
    reason TEXT,
    rejected_at TEXT
)
""")

conn.commit()

#Load raw transactions CSV
raw_df = pd.read_csv("data/raw/transactions_2026-01-01.csv")

#Validation function
def validate_transactions(df):
    valid_rows = []
    rejected_rows = []
    seen_ids = set()

    for _, row in df.iterrows():
        reason = None
        if pd.isnull(row["transaction_id"]):
            reason = "Missing transaction_id"
        elif row["transaction_id"] in seen_ids:
            reason = "Duplicate transaction_id"
        elif row["amount"] == 0:
            reason = "Zero amount"
        elif row["transaction_type"] == "DEBIT" and row["amount"] > 0:
            reason = "Debit amount positive"
        elif row["transaction_type"] == "CREDIT" and row["amount"] < 0:
            reason = "Credit amount negative"

        if reason:
            rejected_rows.append({
                "transaction_id": row["transaction_id"],
                "reason": reason,
                "rejected_at": datetime.now().isoformat()
            })
        else:
            seen_ids.add(row["transaction_id"])
            valid_rows.append(row)

    return pd.DataFrame(valid_rows), pd.DataFrame(rejected_rows)

#Validate transactions
valid_df, rejected_df = validate_transactions(raw_df)

# Add load timestamp
valid_df["load_timestamp"] = datetime.now().isoformat()

#Load into SQLite
valid_df.to_sql("transactions", conn, if_exists="append", index=False)
if not rejected_df.empty:
    rejected_df.to_sql("rejected_transactions", conn, if_exists="append", index=False)

conn.commit()

#SQL Reconciliation Queries

# Total transactions per day
daily_summary = pd.read_sql("""
SELECT
    transaction_date,
    COUNT(*) AS txn_count,
    SUM(amount) AS total_amount
FROM transactions
GROUP BY transaction_date
""", conn)
print(daily_summary)

# Transactions by channel
channel_summary = pd.read_sql("""
SELECT
    channel,
    COUNT(*) AS txn_count,
    SUM(amount) AS total_amount
FROM transactions
GROUP BY channel
""", conn)
print(channel_summary)

# View rejected transactions
rejected_summary = pd.read_sql("SELECT * FROM rejected_transactions", conn)
print(rejected_summary)

#Close connection
conn.close()
