In [None]:
!pip install "dask[complete]"
!pip install pyarrow pandas

Collecting dask[complete]
  Downloading dask-2024.7.1-py3-none-any.whl.metadata (3.8 kB)
Collecting partd>=1.4.0 (from dask[complete])
  Downloading partd-1.4.2-py3-none-any.whl.metadata (4.6 kB)
Collecting importlib-metadata>=4.13.0 (from dask[complete])
  Downloading importlib_metadata-8.2.0-py3-none-any.whl.metadata (4.7 kB)
Collecting pyarrow-hotfix (from dask[complete])
  Downloading pyarrow_hotfix-0.6-py3-none-any.whl.metadata (3.6 kB)
Collecting lz4>=4.3.2 (from dask[complete])
  Downloading lz4-4.3.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.7 kB)
Collecting locket (from partd>=1.4.0->dask[complete])
  Downloading locket-1.0.0-py2.py3-none-any.whl.metadata (2.8 kB)
Collecting dask-expr<1.2,>=1.1 (from dask[complete])
  Downloading dask_expr-1.1.9-py3-none-any.whl.metadata (2.5 kB)
Collecting bokeh>=2.4.2 (from dask[complete])
  Downloading bokeh-3.5.1-py3-none-any.whl.metadata (12 kB)
Collecting distributed==2024.7.1 (from dask[complete])
  Download

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import pandas as pd
import numpy as np
import re
import dask.dataframe as dd
import glob
from pathlib import Path
import os
import glob
import datetime
from datetime import timedelta
import warnings
warnings.filterwarnings('ignore')

In [None]:
pd.set_option('display.max_colwidth', None)

In [None]:
# Directory containing the data files (assuming they are in parquet format)
data_dir = '/content/drive/MyDrive/Data_hasking'

# Get a list of all parquet files for the 10 days
file_list = sorted(glob.glob(f"{data_dir}/*.parquet"))[:10]  # Adjust the slicing if needed
file_list

['/content/drive/MyDrive/Data_hasking/DATA_TB_TRANSACTIONS_20230801.parquet',
 '/content/drive/MyDrive/Data_hasking/DATA_TB_TRANSACTIONS_20230802.parquet',
 '/content/drive/MyDrive/Data_hasking/DATA_TB_TRANSACTIONS_20230803.parquet',
 '/content/drive/MyDrive/Data_hasking/DATA_TB_TRANSACTIONS_20230804.parquet',
 '/content/drive/MyDrive/Data_hasking/DATA_TB_TRANSACTIONS_20230805.parquet',
 '/content/drive/MyDrive/Data_hasking/DATA_TB_TRANSACTIONS_20230806.parquet',
 '/content/drive/MyDrive/Data_hasking/DATA_TB_TRANSACTIONS_20230807.parquet',
 '/content/drive/MyDrive/Data_hasking/DATA_TB_TRANSACTIONS_20230808.parquet',
 '/content/drive/MyDrive/Data_hasking/DATA_TB_TRANSACTIONS_20230809.parquet',
 '/content/drive/MyDrive/Data_hasking/DATA_TB_TRANSACTIONS_20230810.parquet']

In [None]:
# Load all parquet files into a Dask DataFrame
ddf = dd.read_parquet(file_list)
# Compute the DataFrame to get a Pandas DataFrame
df = ddf.compute()

In [None]:
tb_customers = dd.read_parquet('/content/drive/MyDrive/TB_CUSTOMERS.parquet')

In [None]:
tb_customers_df =  tb_customers.compute()
tb_customers_df

Unnamed: 0,CUST_CUSTNO,CUSTOMER_TYPE,BUSINESS_TYPE,INDUSTRY_TYPE
0,e2eaa7a94d5e3bcd15579df95cfadcc33a9ffef9832e5073426dc36a3499c069,Cá nhân,CA THE,DICH VU LUU TRU VA AN UONG
1,30882b5b40eb85071295cc78276c3b74eb0f73eaebad8f009bc0aa04796a565c,Cá nhân,CA THE,SX &PP DIEN/KHI DOT/NUOC NONG/HOI NUOC/DIEU HOA KK
2,9e70464b7a79b94c07a6dc6cf3eea62c1e4cde0872304fd1e03542c898c0a6e9,Cá nhân,CA THE,HOAT DONG DICH VU KHAC
3,ce932f938de9bbabd34650dc0d8e07e1814d17580d97440d48905c5a7d0f0d09,Cá nhân,CA THE,HOAT DONG DICH VU KHAC
4,9771233eb92c014fd6cc1c0312a93278b5b933fc01a64d0a3f24dac62cd92812,Cá nhân,CA THE,HOAT DONG DICH VU KHAC
...,...,...,...,...
3179025,cc1748a291074998ab6fd12d995f4773618f75aac4f6cf33789d8ab3361b658c,Cá nhân,CA THE,HOAT DONG DICH VU KHAC
3179026,40062bd373af28817793d4b15ca498238a103ca39213d7857a039181a9328a54,Cá nhân,CA THE,HOAT DONG DICH VU KHAC
3179027,5c3d7ca15098f3d43a0b47f193f4a1a47f11b35ca8717277c64b165e2fa3c745,Cá nhân,CA THE,HOAT DONG DICH VU KHAC
3179028,b806e3779fe5fcd0efe15d027a91069a07304cddf213a708c8f4318a2305e347,Cá nhân,CA THE,HOAT DONG DICH VU KHAC


In [None]:
# Filter for individual customers
tb_customers_individual_df = tb_customers_df[tb_customers_df["CUSTOMER_TYPE"] == "Cá nhân"]

In [None]:
individual_customers = tb_customers_individual_df["CUST_CUSTNO"].unique().tolist()

In [None]:
# Filter transactions for individual customers
df_individual = df[df['CUST_CUSTNO'].isin(individual_customers)]

In [None]:
# Convert VALUEDATE to datetime
df_individual['VALUEDATE'] = pd.to_datetime(df_individual['VALUEDATE'], errors='coerce')

In [None]:
# Extract date only from VALUEDATE
df_individual['DATE_ONLY'] = df_individual['VALUEDATE'].dt.date


In [None]:

# Separate deposits (receipts) and withdrawals
df_deposits = df_individual[df_individual['AMOUNT'] > 0]
df_withdrawals = df_individual[df_individual['AMOUNT'] < 0]


In [None]:
# Aggregate amounts by customer and date
daily_deposits = df_deposits.groupby(['CUST_CUSTNO', 'DATE_ONLY'])['AMOUNT'].sum().reset_index(name='total_deposits')
daily_withdrawals = df_withdrawals.groupby(['CUST_CUSTNO', 'DATE_ONLY'])['AMOUNT'].sum().reset_index(name='total_withdrawals')


In [None]:
# Merge deposits and withdrawals
transactions = pd.merge(daily_deposits, daily_withdrawals, on=['CUST_CUSTNO', 'DATE_ONLY'], how='outer')


In [None]:
# Fill missing values with 0
transactions['total_deposits'].fillna(0, inplace=True)
transactions['total_withdrawals'].fillna(0, inplace=True)

In [None]:
# Calculate the proportion of withdrawals to deposits
transactions['withdrawal_to_deposit_ratio'] = (transactions['total_withdrawals'].abs() / transactions['total_deposits']) * 100



In [None]:

# Apply the rule conditions
filtered_transactions = transactions[
    (transactions['total_deposits'] >= 6000000) &
    (transactions['total_withdrawals'].abs() >= 0.95 * transactions['total_deposits'])
]

# Display the final DataFrame containing all anomalous transactions
filtered_transactions

Unnamed: 0,CUST_CUSTNO,DATE_ONLY,total_deposits,total_withdrawals,withdrawal_to_deposit_ratio
978,003e0235aab29e1788ce064113f24f8703921030065318bb4ca5367061a3afc6,2023-08-07,11236003.08,-12497903.80,111.230868
1353,00527a90ea8ac7e0cb02edb41f08ec69cacd447c1e06a39cf8be9434cf4b315c,2023-08-03,7162326.10,-7162163.01,99.997723
1355,00527a90ea8ac7e0cb02edb41f08ec69cacd447c1e06a39cf8be9434cf4b315c,2023-08-07,6172231.88,-6342641.09,102.760901
2030,008305ae484962df097f666beadd0aaf025cfbc520151ca16561ff8d49c63913,2023-08-04,8561497.60,-8561400.01,99.998860
2248,008ebd410e08ed2eaa6c7d5d525c3761630b696af313f8355039df690112cc1d,2023-08-08,15536751.83,-15296750.89,98.455270
...,...,...,...,...,...
998320,ff8d7492d6aa39bc6177141f150c866919549060c062e2adfe2bda8b7fb0661d,2023-08-04,15698000.00,-15683291.00,99.906300
998323,ff8d7492d6aa39bc6177141f150c866919549060c062e2adfe2bda8b7fb0661d,2023-08-08,8450516.00,-8359730.00,98.925675
998324,ff8d7492d6aa39bc6177141f150c866919549060c062e2adfe2bda8b7fb0661d,2023-08-09,12835000.00,-12719050.00,99.096611
998578,ff9f2b5ccdfe4e76cb65ea25e5a9819004960b989c40d90207f52d36a35ba087,2023-08-03,8211287.25,-8230508.91,100.234088


In [None]:

# Get the unique customer IDs
unique_customers = filtered_transactions['CUST_CUSTNO'].unique()

# Print the list of unique customers
unique_customers


<ArrowStringArray>
['003e0235aab29e1788ce064113f24f8703921030065318bb4ca5367061a3afc6',
 '00527a90ea8ac7e0cb02edb41f08ec69cacd447c1e06a39cf8be9434cf4b315c',
 '008305ae484962df097f666beadd0aaf025cfbc520151ca16561ff8d49c63913',
 '008ebd410e08ed2eaa6c7d5d525c3761630b696af313f8355039df690112cc1d',
 '009d5f0d4c0a92e45134ab0053b1ea2dd21c852a31ba30c56fb2b2a61f22bc41',
 '00a09b42e8ebcfb8d3c27acc11f83742703f169309244331a378284ae464b5f9',
 '00bf417632cccf888712a5180b704ae453fa25bac921493f37f864393348734e',
 '00cdf11d7da083f6033239ba9c1303950daf57f0be4ddbe913eb541765efb6c2',
 '00de13e062bda3496a8bcadde2ce55d219a6435049b17f7ef785c7e13c8333f2',
 '010dad20c626e8df9910cfdc1f29bf104a89600b097f12f84cab24ba3593e6a3',
 ...
 'fe71b2b12f2ad986656cd969fbb6c28ed59cea748086475965509c1df7af0d4d',
 'fe9636c5354d7026ffa6a01821e85983bd2d5b5034a61b8a93716d481ad9859f',
 'fea59b24c0572cdde013e87ae9d83e1ad86d723c95b79576a4fdb2eb369ddaa4',
 'feb749ba6931cfcb9499e3ae74fee3b24e80b4e95d367dd121690432f4e80472',
 'ff2b01e3