In [1]:
from warnings import filterwarnings
filterwarnings('ignore')

In [2]:
# Data manipulation and processing
import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask import delayed, compute
from dask.distributed import Client

# Statistical methods and time series
from statsmodels.tsa.arima.model import ARIMA
from scipy.stats import skew, kurtosis, zscore, chi2
from itertools import product
from joblib import Parallel, delayed

# Outlier detection
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor, NearestNeighbors
from sklearn.preprocessing import RobustScaler

# Clustering for anomaly detection
from sklearn.cluster import DBSCAN, KMeans
from sklearn.metrics import silhouette_score
from sklearn.model_selection import GridSearchCV

# Supervised methods for outlier detection
from sklearn.svm import OneClassSVM

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# TensorFlow and Keras (deep learning)
import tensorflow as tf
from tensorflow.keras import layers, models, Model
from tensorflow.keras.layers import Input, Dense, Dropout, BatchNormalization, Lambda
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.optimizers import Adam
from keras import backend as K
from keras.losses import binary_crossentropy

# Additional utilities
import os
from scipy.signal import find_peaks
from scipy import stats
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, mean_squared_error


2025-02-12 01:20:36.307977: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-02-12 01:20:36.761527: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-02-12 01:20:36.900775: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-02-12 01:20:37.893432: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
# Define paths
data_path = 'Data_hasking'  # Path to transaction parquet files
customer_data_path = '/home/malek/projects/def-miktiger/malek/TB_CUSTOMERS.parquet'  # Path to customer data

# Load transaction data with only necessary columns
try:
    ddf = dd.read_parquet(data_path, columns=['CUST_CUSTNO', 'AMOUNT', 'VALUEDATE'])
except Exception as e:
    raise FileNotFoundError(f"Error loading transaction data: {e}")

# Load customer data into a Dask DataFrame
try:
    tb_customers_ddf = dd.read_parquet(customer_data_path)
except Exception as e:
    raise FileNotFoundError(f"Error loading customer data: {e}")

# Filter individual customers
individual_customers_ddf = tb_customers_ddf[tb_customers_ddf["CUSTOMER_TYPE"] == "Cá nhân"]
individual_customer_ids = individual_customers_ddf["CUST_CUSTNO"].drop_duplicates().compute()

# Get transaction customer IDs
transaction_customer_ids = ddf["CUST_CUSTNO"].drop_duplicates().compute()

# Find matching customer IDs
matching_customer_ids = individual_customer_ids[individual_customer_ids.isin(transaction_customer_ids)]

# Filter transaction data for all matching customers
df_individual_ddf = ddf[ddf["CUST_CUSTNO"].isin(matching_customer_ids)]

# Convert VALUEDATE to datetime
df_individual_ddf["VALUEDATE"] = dd.to_datetime(
    df_individual_ddf["VALUEDATE"], format='%Y%m%d'
)

# Compute the filtered data into a Pandas DataFrame
df_individual = df_individual_ddf.compute()

print("Data for all individual customers successfully loaded and processed.")


Data for all individual customers successfully loaded and processed.


In [4]:
# Separate Deposits and Withdrawals
chunk_size = 100000
num_rows = df_individual.shape[0]  # Adjusted to use df_individual

df_deposits_list = []
df_withdrawals_list = []

# Process each chunk separately
for start in range(0, num_rows, chunk_size):
    chunk = df_individual[start:start + chunk_size]  # Adjusted to use df_individual
    
    # Fill NaN values in 'AMOUNT' column
    chunk['AMOUNT'] = chunk['AMOUNT'].fillna(0)

    # Filter deposits and withdrawals separately
    df_deposits_list.append(chunk[chunk['AMOUNT'] > 0])
    df_withdrawals_list.append(chunk[chunk['AMOUNT'] < 0])

# Combine the deposits and withdrawals DataFrames
df_deposits = pd.concat(df_deposits_list, ignore_index=True)
df_withdrawals = pd.concat(df_withdrawals_list, ignore_index=True)

# Aggregate by Customer and Date
daily_deposits = df_deposits.groupby(['CUST_CUSTNO', 'VALUEDATE'])['AMOUNT'].sum().reset_index(name='TOTAL_RECEIVED')
daily_withdrawals = df_withdrawals.groupby(['CUST_CUSTNO', 'VALUEDATE'])['AMOUNT'].sum().reset_index(name='TOTAL_SENT')

# Merge the two dataframes on Customer and Date
transactions = pd.merge(daily_deposits, daily_withdrawals, on=['CUST_CUSTNO', 'VALUEDATE'], how='outer')

# Fill NaN values with 0, as some customers might not have both deposits and withdrawals on the same day
transactions['TOTAL_RECEIVED'].fillna(0, inplace=True)
transactions['TOTAL_SENT'].fillna(0, inplace=True)

# Add TOTAL_ABSOLUTE column (absolute sum of transactions)
transactions['TOTAL_ABSOLUTE'] = transactions['TOTAL_RECEIVED'].abs() + transactions['TOTAL_SENT'].abs()

transactions


Unnamed: 0,CUST_CUSTNO,VALUEDATE,TOTAL_RECEIVED,TOTAL_SENT,TOTAL_ABSOLUTE
0,00001fd6f1d8852373810b5402eb3ae6adbd610d9d3608...,2023-08-05,1500.00,-300.00,1800.00
1,00001fd6f1d8852373810b5402eb3ae6adbd610d9d3608...,2023-08-06,0.00,-1000.00,1000.00
2,00001fd6f1d8852373810b5402eb3ae6adbd610d9d3608...,2023-08-10,12717.60,-2063.00,14780.60
3,00001fd6f1d8852373810b5402eb3ae6adbd610d9d3608...,2023-08-11,0.00,-3439.00,3439.00
4,00001fd6f1d8852373810b5402eb3ae6adbd610d9d3608...,2023-08-13,0.00,-2500.00,2500.00
...,...,...,...,...,...
30481448,fffff9df57f43b816e9ae9dd31699ccbeba7668c731de1...,2023-12-03,0.00,-10000.00,10000.00
30481449,fffff9df57f43b816e9ae9dd31699ccbeba7668c731de1...,2023-12-31,0.46,0.00,0.46
30481450,fffff9df57f43b816e9ae9dd31699ccbeba7668c731de1...,2024-01-06,142350.14,-130000.00,272350.14
30481451,fffff9df57f43b816e9ae9dd31699ccbeba7668c731de1...,2024-01-10,142429.56,-242351.56,384781.12


In [5]:
# Save transactions DataFrame to a Parquet file
transactions.to_parquet('indiv_transactions.parquet', index=False)
