In [1]:
import logging
from datetime import datetime
from typing import Dict, Tuple

import polars as pl
import pymysql
from polars import LazyFrame

# --- 1. Configuration and Setup ---

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('bank_etl_pipeline_polars.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)



In [2]:
DATABASE_CONFIG = {
    "host": "localhost",
    "user": "root",
    "password": "Barik@1010arpita", 
    "database": "bank_monitoring_db",
    "port": 3306,
}

In [3]:
# --- 2. Schema Definition (Polars Style) ---

def define_accounts_schema() -> Dict[str, pl.DataType]:
    """Defines the schema for the accounts data using Polars dtypes."""
    return {
        "cust_id": pl.String,
        "account_number": pl.String,
        "name": pl.String,
        "acc_type": pl.String,
        "opened_date": pl.String
    }

def define_transactions_schema() -> Dict[str, pl.DataType]:
    """Defines the schema for the transactions data using Polars dtypes."""
    return {
        "txn_id": pl.String,
        "source_acc_number": pl.String,
        "target_acc_number": pl.String,
        "source_account_debit": pl.Decimal(precision=15, scale=2),
        "target_account_credit": pl.Decimal(precision=15, scale=2),
        "txn_amount": pl.Decimal(precision=15, scale=2),
        "transaction_date": pl.String,
        "source_acc_type": pl.String
    }

In [4]:
# --- 3. Extract Phase ---

def read_accounts_data(file_path: str) -> LazyFrame:
    """Lazily read accounts CSV data from the specified file path using Polars."""
    try:
        # MODIFIED: Used schema_overrides instead of the deprecated dtypes
        lf = pl.scan_csv(file_path, schema_overrides=define_accounts_schema())

        # Filter out completely empty rows
        lf = lf.filter(
            ~((pl.col("cust_id").is_null()) &
              (pl.col("account_number").is_null()) &
              (pl.col("name").is_null()))
        )
        logger.info(f"Successfully planned reading of accounts from {file_path}")
        return lf
    except Exception as e:
        logger.error(f"Failed to read accounts CSV data: {str(e)}")
        raise

def read_transactions_data(file_path: str) -> LazyFrame:
    """Lazily read transactions CSV data from the specified file path using Polars."""
    try:
        # MODIFIED: Used schema_overrides instead of the deprecated dtypes
        lf = pl.scan_csv(file_path, schema_overrides=define_transactions_schema())

        # Filter out completely empty rows
        lf = lf.filter(
            ~((pl.col("txn_id").is_null()) &
              (pl.col("source_acc_number").is_null()) &
              (pl.col("target_acc_number").is_null()))
        )
        logger.info(f"Successfully planned reading of transactions from {file_path}")
        return lf
    except Exception as e:
        logger.error(f"Failed to read transactions CSV data: {str(e)}")
        raise

In [5]:
# --- 4. Transform (Validation) Phase ---

def validate_accounts(accounts_lf: LazyFrame) -> Tuple[LazyFrame, LazyFrame]:
    """Validate accounts data and separate valid from invalid records using Polars."""
    try:
        # Find duplicate account numbers
        duplicate_accounts_df = accounts_lf.group_by("account_number").agg(
            # MODIFIED: Used pl.len() instead of the deprecated pl.count()
            pl.len().alias("count")
        ).filter(pl.col("count") > 1).collect()
        duplicate_account_numbers = duplicate_accounts_df["account_number"]

        # Add validation reason column
        accounts_with_reason = accounts_lf.with_columns(
            invalid_reason=pl.when(
                (pl.col("cust_id").is_null() | (pl.col("cust_id") == "")) |
                (pl.col("account_number").is_null() | (pl.col("account_number") == "")) |
                (pl.col("name").is_null() | (pl.col("name") == "")) |
                (pl.col("acc_type").is_null() | (pl.col("acc_type") == "")) |
                (pl.col("opened_date").is_null() | (pl.col("opened_date") == ""))
            ).then(pl.lit("Null values found in required fields"))
            .when(pl.col("account_number").is_in(duplicate_account_numbers))
            .then(pl.lit("Duplicate account number"))
            .otherwise(pl.lit(None))
        )

        # Separate valid and invalid records
        valid_accounts = accounts_with_reason.filter(pl.col("invalid_reason").is_null())
        invalid_accounts = accounts_with_reason.filter(pl.col("invalid_reason").is_not_null())

        logger.info("Account validation plan created.")
        return valid_accounts, invalid_accounts
    except Exception as e:
        logger.error(f"Failed to validate accounts: {str(e)}")
        raise

def validate_transactions(transactions_lf: LazyFrame, valid_accounts_lf: LazyFrame) -> Tuple[LazyFrame, LazyFrame]:
    """Validate transactions data and separate valid from invalid records using Polars."""
    try:
        # Get list of valid account numbers
        valid_account_numbers = valid_accounts_lf.select("account_number").collect().to_series()

        # Check for duplicate transactions
        duplicate_txn_df = transactions_lf.group_by("txn_id").agg(
            # MODIFIED: Used pl.len() instead of the deprecated pl.count()
            pl.len().alias("count")
        ).filter(pl.col("count") > 1).collect()
        duplicate_txn_ids = duplicate_txn_df["txn_id"]

        # Add validation reason column
        transactions_with_reason = transactions_lf.with_columns(
            invalid_reason=pl.when(pl.col("txn_amount") > 50000)
            .then(pl.lit("High-value transaction (>50,000)"))
            .when(pl.col("source_acc_type") == "loan")
            .then(pl.lit("Invalid source account type (loan)"))
            .when(
                (pl.col("txn_id").is_null() | (pl.col("txn_id") == "")) |
                (pl.col("source_acc_number").is_null() | (pl.col("source_acc_number") == "")) |
                (pl.col("target_acc_number").is_null() | (pl.col("target_acc_number") == "")) |
                pl.col("source_account_debit").is_null() |
                pl.col("target_account_credit").is_null() |
                pl.col("txn_amount").is_null() |
                (pl.col("transaction_date").is_null() | (pl.col("transaction_date") == "")) |
                (pl.col("source_acc_type").is_null() | (pl.col("source_acc_type") == ""))
            ).then(pl.lit("Null values found in required fields"))
            .when(
                (pl.col("txn_amount") != pl.col("source_account_debit")) |
                (pl.col("txn_amount") != pl.col("target_account_credit")) |
                (pl.col("source_account_debit") != pl.col("target_account_credit"))
            ).then(pl.lit("Mismatched transaction amounts"))
            .when(
                ~pl.col("source_acc_number").is_in(valid_account_numbers) |
                ~pl.col("target_acc_number").is_in(valid_account_numbers)
            ).then(pl.lit("Orphan accounts (not found in accounts file)"))
            .when(pl.col("txn_id").is_in(duplicate_txn_ids))
            .then(pl.lit("Duplicate transaction ID"))
            .otherwise(pl.lit(None))
        )

        # Separate valid and invalid records
        valid_transactions = transactions_with_reason.filter(pl.col("invalid_reason").is_null())
        invalid_transactions = transactions_with_reason.filter(pl.col("invalid_reason").is_not_null())

        logger.info("Transaction validation plan created.")
        return valid_transactions, invalid_transactions
    except Exception as e:
        logger.error(f"Failed to validate transactions: {str(e)}")
        raise

In [6]:
# --- 5. Load Phase ---

def create_database_tables():
    """Create the required MySQL tables."""
    # This function would contain the pymysql code to create tables if they don't exist.
    # For this example, we assume tables are already created.
    logger.info("Database table creation check complete.")
    pass

from urllib.parse import quote_plus # Add this import at the top of your script

def load_to_database(df: pl.DataFrame, table_name: str) -> None:
    """Load a Polars DataFrame to a MySQL database."""
    try:
        # --- SOLUTION: URL-encode the password to handle special characters ---
        safe_password = quote_plus(DATABASE_CONFIG['password'])

        # Use the new safe_password to build the connection URI
        db_uri = (
            f"mysql+pymysql://{DATABASE_CONFIG['user']}:{safe_password}"
            f"@{DATABASE_CONFIG['host']}:{DATABASE_CONFIG['port']}/{DATABASE_CONFIG['database']}"
        )

        df_with_timestamp = df.with_columns(created_at=datetime.now())
        
        df_with_timestamp.write_database(
            table_name=table_name,
            connection=db_uri,
            if_table_exists="append"
        )
        
        logger.info(f"Successfully loaded {len(df)} records to {table_name}")
    except Exception as e:
        logger.error(f"Failed to load data to database table {table_name}: {e}")
        raise

In [7]:
# --- 6. Main ETL Pipeline Execution ---

def main():
    """Main ETL pipeline execution function."""
    try:
        logger.info("Starting Bank Transaction Monitoring ETL Pipeline with Polars")

        # Configuration
        accounts_file_path = "c:/Users/ARPITA/Downloads/accounts.csv"
        transactions_file_path = "c:/Users/ARPITA/Downloads/transactions.csv"

        # Step 1: Extract (lazily)
        logger.info("Step 1: Planning to read CSV files")
        accounts_lf = read_accounts_data(accounts_file_path)
        transactions_lf = read_transactions_data(transactions_file_path)

        # Step 2: Transform (Validate)
        logger.info("Step 2a: Planning accounts data validation")
        valid_accounts, invalid_accounts = validate_accounts(accounts_lf)

        logger.info("Step 2b: Planning transactions data validation")
        valid_transactions, invalid_transactions = validate_transactions(transactions_lf, valid_accounts)

        # --- EXECUTION TRIGGER (.collect()) ---
        logger.info("Executing transformation plans...")
        
        # Collect final dataframes
        invalid_accounts_final = invalid_accounts.collect()
        invalid_transactions_final = invalid_transactions.collect()
        valid_transactions_final = valid_transactions.collect()
        valid_accounts_final = valid_accounts.collect()
        
        # Step 3: Load
        logger.info("Step 3: Loading data to database")
        create_database_tables() # Ensure tables exist

        if not invalid_accounts_final.is_empty():
            load_to_database(invalid_accounts_final.select(pl.exclude("invalid_reason")), "invalid_account")

        if not invalid_transactions_final.is_empty():
            load_to_database(invalid_transactions_final.select(pl.exclude("invalid_reason")), "invalid_transactions")

        if not valid_transactions_final.is_empty():
            load_to_database(valid_transactions_final.select(pl.exclude("invalid_reason")), "valid_transactions")

        logger.info("ETL Pipeline completed successfully")
        
        # --- Summary ---
        total_accounts = accounts_lf.collect().height
        total_transactions = transactions_lf.collect().height
        valid_accounts_count = valid_accounts_final.height
        valid_transactions_count = valid_transactions_final.height
        invalid_accounts_count = invalid_accounts_final.height
        invalid_transactions_count = invalid_transactions_final.height

        print(f"\n=== Bank Transaction Monitoring ETL Pipeline Summary ===")
        print(f"Total accounts processed: {total_accounts}")
        print(f"Valid accounts: {valid_accounts_count}")
        print(f"Invalid accounts: {invalid_accounts_count}")
        print(f"")
        print(f"Total transactions processed: {total_transactions}")
        print(f"Valid transactions: {valid_transactions_count}")
        print(f"Invalid transactions: {invalid_transactions_count}")
        print(f"")
        if total_accounts > 0:
            print(f"Account success rate: {(valid_accounts_count/total_accounts)*100:.2f}%")
        if total_transactions > 0:
            print(f"Transaction success rate: {(valid_transactions_count/total_transactions)*100:.2f}%")

        # Show sample data
        print(f"\n=== Sample Invalid Accounts ===")
        print(invalid_accounts_final.select("account_number", "name", "invalid_reason").head(5))

        print(f"\n=== Sample Invalid Transactions ===")
        print(invalid_transactions_final.select("txn_id", "source_acc_number", "target_acc_number", "invalid_reason").head(5))

        print(f"\n=== Sample Valid Transactions ===")
        print(valid_transactions_final.head(5))

    except Exception as e:
        logger.error(f"ETL Pipeline failed: {str(e)}")
        raise

if __name__ == "__main__":
    # To prevent issues with long tracebacks on display, configure Polars
    pl.Config.set_tbl_rows(10)
    pl.Config.set_tbl_cols(10)
    main()

2025-10-18 12:49:52,553 - __main__ - INFO - Starting Bank Transaction Monitoring ETL Pipeline with Polars
2025-10-18 12:49:52,554 - __main__ - INFO - Step 1: Planning to read CSV files
2025-10-18 12:49:52,556 - __main__ - INFO - Successfully planned reading of accounts from c:/Users/ARPITA/Downloads/accounts.csv
2025-10-18 12:49:52,559 - __main__ - INFO - Successfully planned reading of transactions from c:/Users/ARPITA/Downloads/transactions.csv
2025-10-18 12:49:52,560 - __main__ - INFO - Step 2a: Planning accounts data validation
2025-10-18 12:49:52,599 - __main__ - INFO - Account validation plan created.
2025-10-18 12:49:52,600 - __main__ - INFO - Step 2b: Planning transactions data validation
Please use `implode` to return to previous behavior.

See https://github.com/pola-rs/polars/issues/22149 for more information.
  valid_account_numbers = valid_accounts_lf.select("account_number").collect().to_series()
2025-10-18 12:49:52,629 - __main__ - INFO - Transaction validation plan crea


=== Bank Transaction Monitoring ETL Pipeline Summary ===
Total accounts processed: 404
Valid accounts: 385
Invalid accounts: 19

Total transactions processed: 1300
Valid transactions: 893
Invalid transactions: 407

Account success rate: 95.30%
Transaction success rate: 68.69%

=== Sample Invalid Accounts ===
shape: (5, 3)
┌────────────────┬───────────────┬──────────────────────────┐
│ account_number ┆ name          ┆ invalid_reason           │
│ ---            ┆ ---           ┆ ---                      │
│ str            ┆ str           ┆ str                      │
╞════════════════╪═══════════════╪══════════════════════════╡
│ ACC100001      ┆ Lisa Shaw     ┆ Duplicate account number │
│ ACC100001      ┆ Ben Shaw      ┆ Duplicate account number │
│ ACC100010      ┆ Ben Duckett   ┆ Duplicate account number │
│ ACC100010      ┆ Cody Shepherd ┆ Duplicate account number │
│ ACC100018      ┆ Howard Wilson ┆ Duplicate account number │
└────────────────┴───────────────┴─────────────────────