<a href="https://colab.research.google.com/github/ItzDeejey/FinCrimePredictor/blob/main/FinCrimeAnalysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import polars as pl

# Read only the first 100 rows from the large CSV
df_preview = pl.read_csv("/content/drive/MyDrive/Colab Notebooks/financial_fraud_detection_dataset.csv", n_rows=100)

# Write those rows to a new CSV file
df_preview.write_csv("preview.csv")

from google.colab import files
files.download("preview.csv")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [3]:
import polars as pl
from IPython.display import FileLink

# Create a lazy frame from the CSV (this does not load the whole file into memory)
lf = pl.scan_csv("/content/drive/MyDrive/Colab Notebooks/financial_fraud_detection_dataset.csv")

# Filter for rows where 'time_since_last_transaction' is not null
# and limit the result to a preview of 100 rows
lf_filtered = lf.filter(pl.col("time_since_last_transaction").is_not_null()).limit(100)

# Collect the filtered preview into a DataFrame
df_preview_non_null = lf_filtered.collect()

# Write the preview to a new CSV file
df_preview_non_null.write_csv("preview_non_null.csv")

# Create a clickable download link (only works in a Jupyter Notebook)
files.download("preview_non_null.csv")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Preprocessing

In [4]:
import polars as pl

# Use scan_csv for a lazy/streaming read
lf = pl.scan_csv("/content/drive/MyDrive/Colab Notebooks/financial_fraud_detection_dataset.csv")

# Apply your preprocessing transformations:
def parse_timestamp(col):
    return pl.when(pl.col(col).str.contains(r"\.\d+$")) \
             .then(pl.col(col).str.strptime(pl.Datetime, "%Y-%m-%dT%H:%M:%S%.f", strict=False)) \
             .otherwise(pl.col(col).str.strptime(pl.Datetime, "%Y-%m-%dT%H:%M:%S", strict=False))

num_cols = ['spending_deviation_score', 'velocity_score', 'geo_anomaly_score', 'amount', 'time_since_last_transaction']
cat_cols = ['transaction_type', 'merchant_category', 'location', 'device_used', 'payment_channel'] #, 'fraud_type'
text_cols = ['sender_account', 'receiver_account', 'ip_address', 'device_hash']

lf = (
    lf
    #.drop('time_since_last_transaction')
    .unique()
    .with_columns([
         pl.when((pl.col("is_fraud") == False) & (pl.col("fraud_type").is_null() | (pl.col("fraud_type") == "")))
           .then(pl.lit("No Fraud"))
           .otherwise(pl.col("fraud_type"))
           .alias("fraud_type")
    ])  # <== Properly close the first with_columns call here.
    .with_columns([
         *[pl.col(col).cast(pl.Utf8) for col in text_cols],
         parse_timestamp("timestamp").alias("timestamp"),
         *[pl.col(col).cast(pl.Float64) for col in num_cols],
         *[pl.col(col).cast(pl.Categorical) for col in cat_cols]
    ])
)


In [5]:
lf.schema

  lf.schema


Schema([('transaction_id', String),
        ('timestamp', Datetime(time_unit='us', time_zone=None)),
        ('sender_account', String),
        ('receiver_account', String),
        ('amount', Float64),
        ('transaction_type', Categorical(ordering='physical')),
        ('merchant_category', Categorical(ordering='physical')),
        ('location', Categorical(ordering='physical')),
        ('device_used', Categorical(ordering='physical')),
        ('is_fraud', Boolean),
        ('fraud_type', String),
        ('time_since_last_transaction', Float64),
        ('spending_deviation_score', Float64),
        ('velocity_score', Float64),
        ('geo_anomaly_score', Float64),
        ('payment_channel', Categorical(ordering='physical')),
        ('ip_address', String),
        ('device_hash', String)])

In [6]:
# Convert categorical columns to integer codes to use as features later
for col in cat_cols:
    lf = lf.with_columns([pl.col(col).to_physical().cast(pl.Int64).alias(col + "_code") for col in cat_cols])

# Define final feature columns:
final_cat_cols = [col + "_code" for col in cat_cols]
final_features = num_cols + final_cat_cols


In [7]:
# Collect the entire preprocessed dataset into a DataFrame first.
df_polars_full = lf.collect()


In [8]:
filtered = df_polars_full.filter(pl.col("time_since_last_transaction").is_not_null())
not_null_count = len(filtered)

print("Non-null count for 'time_since_last_transaction':", not_null_count)


Non-null count for 'time_since_last_transaction': 4103487


In [9]:
final_features

['spending_deviation_score',
 'velocity_score',
 'geo_anomaly_score',
 'amount',
 'time_since_last_transaction',
 'transaction_type_code',
 'merchant_category_code',
 'location_code',
 'device_used_code',
 'payment_channel_code']

In [12]:
import pyarrow as pa
import numpy as np
from sklearn.linear_model import SGDClassifier
from sklearn.metrics import classification_report
import polars as pl # Ensure polars is imported if it wasn't already in this cell

# Split the lazy frame into training and test sets AFTER preprocessing
test_sample_fraction = 0.4 # Define the fraction of data for testing

# Instead of sampling the LazyFrame, we will collect the full preprocessed data
# and then split the collected DataFrame.
# However, since the goal is streaming training, we will collect the full
# dataset first, shuffle it, and then split.

# Collect the entire preprocessed dataset into a DataFrame first.
# For potentially large datasets, consider collecting the full dataset in batches
# if memory is a concern, and then shuffling within batches or using a different
# splitting strategy if a full shuffle is not feasible.
# Given the use of SGDClassifier for incremental training, collecting the full
# dataset first to split might be okay, as you're still processing it in batches
# during training. If the dataset is too large to fit in memory even after
# collecting, a different splitting strategy might be needed (e.g., hashing
# based split on a key).
# For this fix, we assume the collected preprocessed data can fit into memory for splitting.
df_polars_full = lf.collect()

# Shuffle the collected DataFrame for a random split
df_polars_full = df_polars_full.sample(fraction=1.0, with_replacement=False, seed=42) # Shuffle

# Calculate the split point
num_rows_full = df_polars_full.shape[0]
split_point = int(num_rows_full * (1 - test_sample_fraction))

# Split the shuffled DataFrame into training and test sets
df_polars_train = df_polars_full.head(split_point)
df_polars_test = df_polars_full.tail(num_rows_full - split_point)

# Drop rows with any NaN values in the final_features columns from the training data
df_polars_train_cleaned = df_polars_train.drop_nulls(final_features)
# Drop rows with any NaN values in the final_features columns from the test data
df_polars_test_cleaned = df_polars_test.drop_nulls(final_features)


# Convert the cleaned training DataFrame to an Arrow Table for batch processing
arrow_table = df_polars_train_cleaned.to_arrow()

# Set your chunk (batch) size:
batch_size = 5000

# Calculate the number of batches for the training data
num_rows_train_cleaned = arrow_table.num_rows
num_batches = (num_rows_train_cleaned + batch_size - 1) // batch_size # Ceiling division

# Initialize your incremental model.
model = SGDClassifier(loss="log_loss", random_state=42)
classes = np.array([0, 1])
first_batch = True

# Train on each batch by slicing the Arrow table.
# Iterate through the calculated number of batches
for i in range(num_batches):
    # Define the start and end row index for the current batch
    start_row = i * batch_size
    end_row = min((i + 1) * batch_size, num_rows_train_cleaned) # Use num_rows_train_cleaned here

    # Slice the arrow_table to get the current batch (as a Table)
    batch_arrow_table = arrow_table.slice(offset=start_row, length=end_row - start_row)

    # Convert the Arrow Table batch to an Arrow RecordBatch
    # We expect a single RecordBatch from the slice in this case
    # Check if there are any batches before accessing index 0
    batches = batch_arrow_table.to_batches()
    if batches:
        batch = batches[0]
    else:
        continue # Skip if batch is empty

    # Convert each Arrow RecordBatch to a Polars DataFrame:
    batch_df = pl.from_arrow(batch)

    # Extract feature matrix (X) and target (y):
    # final_features contains your numeric columns and the encoded categorical columns.
    X_chunk = batch_df.select(final_features).to_numpy()
    y_chunk = batch_df.select("is_fraud").to_numpy().ravel()

    if first_batch:
        model.partial_fit(X_chunk, y_chunk, classes=classes)
        first_batch = False
    else:
        model.partial_fit(X_chunk, y_chunk)

print("Incremental training complete!")

# Collect test data (already collected as df_polars_test)
# df_test_polars = test_lf.collect(streaming=True) # Remove this line
X_test = df_polars_test_cleaned.select(final_features).to_numpy()
y_test = df_polars_test_cleaned.select("is_fraud").to_numpy().ravel()

# Predict and evaluate
y_pred = model.predict(X_test)
print("Test Evaluation:")
print(classification_report(y_test, y_pred))

Incremental training complete!
Test Evaluation:
              precision    recall  f1-score   support

       False       0.96      0.95      0.95   1569843
        True       0.04      0.05      0.05     71567

    accuracy                           0.91   1641410
   macro avg       0.50      0.50      0.50   1641410
weighted avg       0.92      0.91      0.91   1641410



In [21]:
# Perform value_counts directly on the collected DataFrame
df_counts = df_polars_full["is_fraud"].value_counts()

# The result of value_counts on a Series is a DataFrame with columns 'is_fraud' and 'counts'
# Now we can directly calculate the ratio
total = df_counts["count"].sum()

# Calculate the ratio and add it as a new column
df = df_counts.with_columns(
    (pl.col("count") / total).alias("ratio")
)

display(df)

is_fraud,count,ratio
bool,u32,f64
True,179553,0.0359106
False,4820447,0.9640894


In [22]:
X_chunk

array([[ 1.1 ,  6.  ,  0.91, ...,  3.  ,  0.  ,  2.  ],
       [-0.4 , 10.  ,  0.89, ...,  4.  ,  3.  ,  1.  ],
       [ 0.02,  1.  ,  0.  , ...,  7.  ,  2.  ,  3.  ],
       ...,
       [ 1.4 ,  5.  ,  0.78, ...,  3.  ,  3.  ,  1.  ],
       [ 0.13, 19.  ,  0.25, ...,  4.  ,  1.  ,  0.  ],
       [ 2.61, 19.  ,  0.42, ...,  5.  ,  1.  ,  1.  ]])

In [25]:
# Collect test data
# df_test_polars = test_lf.collect(streaming=True) # This line caused the error, remove it
# Use the already created df_polars_test_cleaned DataFrame
X_test = df_polars_test_cleaned.select(final_features).to_numpy()
y_test = df_polars_test_cleaned.select("is_fraud").to_numpy().ravel()

# Predict and evaluate
y_pred = model.predict(X_test)
print("Test Evaluation:")
print(classification_report(y_test, y_pred))

Test Evaluation:
              precision    recall  f1-score   support

       False       0.96      0.95      0.95   1569843
        True       0.04      0.05      0.05     71567

    accuracy                           0.91   1641410
   macro avg       0.50      0.50      0.50   1641410
weighted avg       0.92      0.91      0.91   1641410

