<a href="https://colab.research.google.com/github/caetano-dev/PixFraudDetection/blob/main/TCC.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pandas
!pip install pyarrow



In [20]:
import os
import re
import duckdb
import pandas as pd
from google.colab import drive

drive.mount('/content/drive')
DRIVE_DIR = '/content/drive/MyDrive/AML'
PROCESSED_DIR = os.path.join(DRIVE_DIR, 'processed')
os.makedirs(PROCESSED_DIR, exist_ok=True)

# Input paths
TX_CSV = os.path.join(DRIVE_DIR, 'HI-Large_Trans.csv')
PATTERNS_TXT = os.path.join(DRIVE_DIR, 'HI-large_Patterns.txt')
ACCOUNTS_CSV = os.path.join(DRIVE_DIR, 'HI-Large_accounts.csv')

# Check if the main transaction file exists before proceeding
if not os.path.exists(TX_CSV):
    print(f"ERROR: The transaction file was not found at the specified path.")
    print(f"Please make sure '{os.path.basename(TX_CSV)}' is in the '{DRIVE_DIR}' folder in your Google Drive.")
else:
    print(f"Successfully located data folder in Google Drive: {DRIVE_DIR}")
    print("-" * 50)

    # Output parquet files will be saved back to your Drive
    OUT_STEP1 = os.path.join(PROCESSED_DIR, '1_filtered_normal_transactions.parquet')
    OUT_STEP2 = os.path.join(PROCESSED_DIR, '2_filtered_laundering_transactions.parquet')
    OUT_STEP3 = os.path.join(PROCESSED_DIR, '3_filtered_accounts.parquet')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Successfully located data folder in Google Drive: /content/drive/MyDrive/AML
--------------------------------------------------


In [21]:
# Column order of the raw transactions (headerless)
standard_columns = [
    'timestamp', 'from_bank', 'from_account', 'to_bank', 'to_account',
    'amount_received', 'currency_received', 'amount_sent', 'currency_sent',
    'payment_type', 'is_laundering'
]

# Define the column types for read_csv_auto
column_types = {
    'timestamp': 'VARCHAR',
    'from_bank': 'VARCHAR',
    'from_account': 'VARCHAR',
    'to_bank': 'VARCHAR',
    'to_account': 'VARCHAR',
    'amount_received': 'VARCHAR',
    'currency_received': 'VARCHAR',
    'amount_sent': 'VARCHAR',
    'currency_sent': 'VARCHAR',
    'payment_type': 'VARCHAR',
    'is_laundering': 'VARCHAR'
}

# ---------------------------------------------------------------------------------
# Pattern file parser (kept in Python; typically small; same semantics as your code)
# ---------------------------------------------------------------------------------
def parse_patterns_file(file_path):
    attempts = []
    current_attempt = None
    attempt_counter = 0

    with open(file_path, 'r') as f:
        for raw in f:
            line = raw.strip()
            if not line:
                continue
            if line.startswith('BEGIN LAUNDERING ATTEMPT'):
                attempt_counter += 1
                m = re.search(r'BEGIN LAUNDERING ATTEMPT\s*-\s*(.+)$', line)
                attempt_type = m.group(1).strip() if m else 'UNKNOWN'
                current_attempt = {
                    'attempt_id': attempt_counter,
                    'attempt_type': attempt_type,
                    'transactions': []
                }
            elif line.startswith('END LAUNDERING ATTEMPT'):
                if current_attempt:
                    attempts.append(current_attempt)
                current_attempt = None
            elif current_attempt:
                parts = [p.strip() for p in line.split(',')]
                if len(parts) >= 11:
                    tx = dict(zip(standard_columns, parts[:11]))
                    tx['attempt_id'] = current_attempt['attempt_id']
                    tx['attempt_type'] = current_attempt['attempt_type']
                    current_attempt['transactions'].append(tx)

    all_transactions = [tx for attempt in attempts for tx in attempt['transactions']]
    return pd.DataFrame(all_transactions, columns=standard_columns + ['attempt_id', 'attempt_type'])


# ---------------------------------------------------------------------------------
# DuckDB setup
# ---------------------------------------------------------------------------------
con = duckdb.connect(database=':memory:')
# Be a bit conservative on Colab
con.execute("PRAGMA threads=8")
# Optional: cap memory to avoid OOMs (adjust to your Colab RAM)
# con.execute("PRAGMA memory_limit='8GB'")

# Shared SQL pieces
# 1) Read the headerless CSV with all columns as VARCHAR (preserve leading zeros)
read_tx_csv_sql = f"""
  SELECT * FROM read_csv_auto(
    '{TX_CSV}',
    delim=',',
    header=false,
    columns={column_types},
    all_varchar=true
  )
"""

# 2) Timestamp parser: strict formats YYYY/MM/DD HH:MM or YYYY/MM/DD HH:MM:SS
ts_parse_sql = """
CASE
  WHEN length(trim(timestamp)) = 16 THEN strptime(trim(timestamp), '%Y/%m/%d %H:%M')
  WHEN length(trim(timestamp)) = 19 THEN strptime(trim(timestamp), '%Y/%m/%d %H:%M:%S')
  ELSE NULL
END
"""

# 3) Typed/normalized view over the raw CSV scan
typed_tx_sql = f"""
WITH raw AS ({read_tx_csv_sql})
SELECT
  {ts_parse_sql}::TIMESTAMP AS timestamp,
  trim(from_bank) AS from_bank,
  trim(from_account) AS from_account,
  trim(to_bank) AS to_bank,
  trim(to_account) AS to_account,
  try_cast(nullif(trim(amount_received), '') AS DOUBLE) AS amount_received,
  trim(currency_received) AS currency_received,
  try_cast(nullif(trim(amount_sent), '') AS DOUBLE) AS amount_sent,
  trim(currency_sent) AS currency_sent,
  trim(payment_type) AS payment_type,
  coalesce(try_cast(nullif(trim(is_laundering), '') AS INTEGER), 0) AS is_laundering
FROM raw
"""

# Canonical filter (USD + ACH)
usd_ach_filter = """
upper(trim(currency_sent))='US DOLLAR' AND
upper(trim(currency_received))='US DOLLAR' AND
upper(trim(payment_type))='ACH'
"""

# ---------------------------------------------------------------------------------
# Step 1: Filter normal transactions (USD+ACH, is_laundering=0) -> Parquet
# ---------------------------------------------------------------------------------
con.execute(f"""
  COPY (
    WITH typed AS ({typed_tx_sql})
    SELECT
      timestamp, from_bank, from_account, to_bank, to_account,
      amount_received, currency_received, amount_sent, currency_sent,
      payment_type, is_laundering
    FROM typed
    WHERE timestamp IS NOT NULL
      AND {usd_ach_filter}
      AND is_laundering = 0
  ) TO '{OUT_STEP1}' (FORMAT PARQUET, COMPRESSION ZSTD)
""")

step1_rows = con.execute(f"SELECT COUNT(*) FROM read_parquet('{OUT_STEP1}')").fetchone()[0]
print(f"Step 1: Saved strictly USD/ACH normal transactions to '{OUT_STEP1}' (rows={step1_rows:,})")

# ---------------------------------------------------------------------------------
# Step 2: Parse patterns, filter USD+ACH positives, then add any missing positives
#         from the main CSV (anti-join on robust keys) -> Parquet
# ---------------------------------------------------------------------------------
patterns_df = parse_patterns_file(PATTERNS_TXT)
if patterns_df.empty:
    # Create an empty patterns table with matching columns so SQL doesn't break
    patterns_df = pd.DataFrame(columns=standard_columns + ['attempt_id', 'attempt_type'])

# Register patterns into DuckDB
con.register('patterns_df', patterns_df)

# Build the unioned laundering set and write to Parquet
con.execute(f"""
  COPY (
    WITH
      pat_raw AS (
        SELECT
          timestamp, from_bank, from_account, to_bank, to_account,
          amount_received, currency_received, amount_sent, currency_sent,
          payment_type, is_laundering,
          CAST(attempt_id AS VARCHAR) AS attempt_id, -- Explicitly cast to VARCHAR
          attempt_type
        FROM patterns_df -- Read directly from the registered DataFrame
      ),
      pat_typed AS (
        SELECT
          {ts_parse_sql}::TIMESTAMP AS timestamp,
          trim(from_bank) AS from_bank,
          trim(from_account) AS from_account,
          trim(to_bank) AS to_bank,
          trim(to_account) AS to_account,
          try_cast(nullif(trim(amount_received), '') AS DOUBLE) AS amount_received,
          trim(currency_received) AS currency_received,
          try_cast(nullif(trim(amount_sent), '') AS DOUBLE) AS amount_sent,
          trim(currency_sent) AS currency_sent,
          trim(payment_type) AS payment_type,
          coalesce(try_cast(nullif(trim(is_laundering), '') AS INTEGER), 0) AS is_laundering,
          try_cast(nullif(trim(attempt_id), '') AS INTEGER) AS attempt_id,
          trim(attempt_type) AS attempt_type
        FROM pat_raw
      ),
      pat_filt AS (
        SELECT
          timestamp, from_bank, from_account, to_bank, to_account,
          amount_received, currency_received, amount_sent, currency_sent,
          payment_type, is_laundering, attempt_id, attempt_type,
          CAST(round(amount_sent * 100) AS BIGINT) AS amount_sent_c,
          CAST(round(amount_received * 100) AS BIGINT) AS amount_received_c
        FROM pat_typed
        WHERE timestamp IS NOT NULL
          AND {usd_ach_filter}
          AND is_laundering = 1
      ),
      raw_pos AS (
        WITH typed AS ({typed_tx_sql})
        SELECT
          timestamp, from_bank, from_account, to_bank, to_account,
          amount_received, currency_received, amount_sent, currency_sent,
          payment_type, is_laundering,
          CAST(round(amount_sent * 100) AS BIGINT) AS amount_sent_c,
          CAST(round(amount_received * 100) AS BIGINT) AS amount_received_c
        FROM typed
        WHERE timestamp IS NOT NULL
          AND {usd_ach_filter}
          AND is_laundering = 1
      ),
      missing AS (
        SELECT raw_pos.*
        FROM raw_pos
        LEFT JOIN pat_filt
          ON raw_pos.timestamp = pat_filt.timestamp
          AND raw_pos.from_bank = pat_filt.from_bank
          AND raw_pos.from_account = pat_filt.from_account
          AND raw_pos.to_bank = pat_filt.to_bank
          AND raw_pos.to_account = pat_filt.to_account
          AND raw_pos.amount_received_c = pat_filt.amount_received_c
          AND raw_pos.amount_sent_c = pat_filt.amount_sent_c
        WHERE pat_filt.timestamp IS NULL -- Check if there was no match in pat_filt
      ),
      unioned AS (
        SELECT
          timestamp, from_bank, from_account, to_bank, to_account,
          amount_received, currency_received, amount_sent, currency_sent,
          payment_type, is_laundering,
          attempt_id, attempt_type
        FROM pat_filt
        UNION ALL
        SELECT
          timestamp, from_bank, from_account, to_bank, to_account,
          amount_received, currency_received, amount_sent, currency_sent,
          payment_type, is_laundering,
          NULL::INTEGER AS attempt_id, 'UNLISTED' AS attempt_type
        FROM missing
      )
    SELECT * FROM unioned
  ) TO '{OUT_STEP2}' (FORMAT PARQUET, COMPRESSION ZSTD)
""")

base_count = con.execute("""
  WITH x as (SELECT attempt_type FROM read_parquet(?) WHERE attempt_type <> 'UNLISTED')
  SELECT COUNT(*) FROM x
""", [OUT_STEP2]).fetchone()[0]
added_count = con.execute("""
  WITH x as (SELECT attempt_type FROM read_parquet(?) WHERE attempt_type = 'UNLISTED')
  SELECT COUNT(*) FROM x
""", [OUT_STEP2]).fetchone()[0]
total_count = con.execute(f"SELECT COUNT(*) FROM read_parquet('{OUT_STEP2}')").fetchone()[0]
print(f"Step 2: Saved USD/ACH laundering transactions to '{OUT_STEP2}' "
      f"(patterns={base_count:,}, added_from_csv={added_count:,}, total={total_count:,})")

# ---------------------------------------------------------------------------------
# Step 3: Combine Step 1 + Step 2, collect involved accounts, filter account file
# ---------------------------------------------------------------------------------
con.execute(f"""
  COPY (
    WITH all_tx AS (
      SELECT
        timestamp, from_bank, from_account, to_bank, to_account,
        amount_received, currency_received, amount_sent, currency_sent,
        payment_type, is_laundering,
        NULL::INTEGER AS attempt_id, NULL::VARCHAR AS attempt_type -- Add these columns with NULLs
      FROM read_parquet('{OUT_STEP1}')
      UNION ALL
      SELECT
        timestamp, from_bank, from_account, to_bank, to_account,
        amount_received, currency_received, amount_sent, currency_sent,
        payment_type, is_laundering,
        attempt_id, attempt_type
      FROM read_parquet('{OUT_STEP2}')
    ),
    involved AS (
      SELECT DISTINCT from_account AS account FROM all_tx WHERE from_account IS NOT NULL
      UNION
      SELECT DISTINCT to_account AS account FROM all_tx WHERE to_account IS NOT NULL
    ),
    accounts AS (
      SELECT * FROM read_csv_auto(
        '{ACCOUNTS_CSV}',
        delim=',',
        header=false,
        columns={{'bank_name': 'VARCHAR', 'bank_id': 'VARCHAR', 'account_id_hex': 'VARCHAR', 'entity_id': 'VARCHAR', 'entity_name': 'VARCHAR'}},
        all_varchar=true
      )
    )
    SELECT a.*
    FROM accounts a
    INNER JOIN involved i
      ON trim(a.account_id_hex) = trim(i.account)
  ) TO '{OUT_STEP3}' (FORMAT PARQUET, COMPRESSION ZSTD)
""")

step3_rows = con.execute(f"SELECT COUNT(*) FROM read_parquet('{OUT_STEP3}')").fetchone()[0]
print(f"Step 3: Saved filtered account details to '{OUT_STEP3}' (rows={step3_rows:,})")

con.close()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Step 1: Saved strictly USD/ACH normal transactions to '/content/drive/MyDrive/AML/processed/1_filtered_normal_transactions.parquet' (rows=7,156,712)


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Step 2: Saved USD/ACH laundering transactions to '/content/drive/MyDrive/AML/processed/2_filtered_laundering_transactions.parquet' (patterns=9,143, added_from_csv=72,103, total=81,246)


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Step 3: Saved filtered account details to '/content/drive/MyDrive/AML/processed/3_filtered_accounts.parquet' (rows=559,855)


In [22]:
# Check the data
df = pd.read_parquet(OUT_STEP1)
df.head()

Unnamed: 0,timestamp,from_bank,from_account,to_bank,to_account,amount_received,currency_received,amount_sent,currency_sent,payment_type,is_laundering
0,2022-08-01 00:05:00,2954,800AD2E90,1818,800AB6A60,1313043.0,US Dollar,1313043.0,US Dollar,ACH,0
1,2022-08-01 00:19:00,21418,800AB4DE0,2310,800ACF520,2567249.0,US Dollar,2567249.0,US Dollar,ACH,0
2,2022-08-01 00:00:00,32248,80056BBD0,11,800BAFE20,12373.74,US Dollar,12373.74,US Dollar,ACH,0
3,2022-08-01 00:16:00,11,800BAFE20,20,800CCE3D0,626872.0,US Dollar,626872.0,US Dollar,ACH,0
4,2022-08-01 00:05:00,0,801480F70,1922,80146E870,191.0,US Dollar,191.0,US Dollar,ACH,0


In [18]:
df = pd.read_parquet(OUT_STEP2)
df.head()

Unnamed: 0,timestamp,from_bank,from_account,to_bank,to_account,amount_received,currency_received,amount_sent,currency_sent,payment_type,is_laundering,attempt_id,attempt_type
1658,2022-09-10 08:10:00,12381,802BE4C90,16788,802BE4F50,3826.53,US Dollar,3826.53,US Dollar,ACH,1,,UNLISTED
1659,2022-09-10 08:11:00,121453,80D33CDA0,228321,80D340ED0,19714.13,US Dollar,19714.13,US Dollar,ACH,1,,UNLISTED
1660,2022-09-10 08:24:00,1292,81001DD60,110836,81002B4D0,912.65,US Dollar,912.65,US Dollar,ACH,1,,UNLISTED
1661,2022-09-10 08:08:00,19329,8103CC7C0,9482,8103CC810,4349.89,US Dollar,4349.89,US Dollar,ACH,1,,UNLISTED
1662,2022-09-02 08:25:00,238473,814453030,53160,814455B00,183.08,US Dollar,183.08,US Dollar,ACH,1,,UNLISTED


In [19]:
df = pd.read_parquet(OUT_STEP3)
df.tail()

Unnamed: 0,bank_name,bank_id,account_id_hex,entity_id,entity_name
93097,First Bank of the West,130449,80FEF28D0,800554710,Partnership #19814
93098,First Bank of Seattle,225156,80DDE2260,800E1DAA0,Corporation #15025
93099,National Bank of the East,15625,80DF764E0,800F6D250,Corporation #3222
93100,Savings Bank of New Orleans,11107,8033451E0,8001128C0,Corporation #5395
93101,Hilltop Trust Bank,26024,807D33470,800F98C50,Partnership #21499
