In [16]:
import polars as pl
import s3fs
from rich import print

In [13]:
fs = s3fs.S3FileSystem(
    anon=False,
)

In [14]:
with fs.open("s3://fraud-detection-system/raw/training.parquet") as f:
    df = pl.read_parquet(f)

In [17]:
print(df.head(1).to_dicts())

In [8]:
df['category'].unique()

category
str
"""shopping_pos"""
"""travel"""
"""grocery_pos"""
"""food_dining"""
"""misc_net"""
…
"""grocery_net"""
"""gas_transport"""
"""home"""
"""misc_pos"""


In [5]:
df.null_count()

Unnamed: 0_level_0,trans_date_trans_time,cc_num,merchant,category,amt,first,last,gender,street,city,state,zip,lat,long,city_pop,job,dob,trans_num,unix_time,merch_lat,merch_long,is_fraud
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


### Rename columns

In [37]:
dff = df.__copy__()

In [35]:
def rename_columns(df: pl.DataFrame, mapping: dict[str, str]) -> pl.DataFrame:
    return df.rename(mapping=mapping)

In [40]:
mapping = {
    'trans_date_trans_time': 'transaction_time',
    'cc_num': 'credit_card_number',
    'merchant': 'merchant_name',
    'amt': 'amount_usd'
}

### Convert to date and datetime

In [23]:
def convert_to_datetime(df: pl.DataFrame, column: str):
    return df.with_columns(pl.col(column).str.to_datetime("%Y-%m-%d %H:%M:%S"))

In [26]:
def convert_to_date(df: pl.DataFrame, column: str):
    return df.with_columns(pl.col(column).str.to_date(format="%Y-%m-%d"))

### Drop columns

In [6]:
def drop_columns(df: pl.DataFrame, columns: list[str]) -> pl.DataFrame:
    return df.drop(columns)

### Create customer id

In [48]:
dff[['cc_num', 'first', 'last', 'dob']].unique()

cc_num,first,last,dob
i64,str,str,str
6517217825320610,"""James""","""Reese""","""1958-06-11"""
3534330126107879,"""Jeffrey""","""Smith""","""1978-01-15"""
3544606805704278,"""Carolyn""","""Thomas""","""1952-05-07"""
180040027502291,"""Mary""","""Schmidt""","""1957-12-29"""
376262134119629,"""Christopher""","""Carr""","""1961-09-28"""
…,…,…,…
4254074738931278,"""Gary""","""Hall""","""1956-05-02"""
4855488158131690372,"""Jeremy""","""Roberson""","""1993-09-29"""
2703186189652095,"""Jennifer""","""Banks""","""1988-03-09"""
4373370572092720777,"""Hailey""","""Rhodes""","""1972-07-29"""


In [56]:
from hashlib import sha256

def generate_customer_id(row: dict) -> str:
    identifier = (
                f"{str(row['first']).lower().strip()}"
                f"{str(row['last']).lower().strip()}"
                f"{str(row['cc_num']).strip()}"
                f"{str(row['dob'])}"
            )
            # Create deterministic hash
    return f"CUST_{sha256(identifier.encode()).hexdigest()[:16]}"

In [51]:
sensitive_columns = ['cc_num', 'first', 'last', 'dob']

In [None]:
dff = dff.with_columns(
        pl.struct(sensitive_columns)
        .map_elements(lambda x: generate_customer_id(x), return_dtype=pl.String)
        .alias('customer_id')
    )

In [59]:
dff['customer_id'].unique()

customer_id
str
"""CUST_7947df7a2b2ca53b"""
"""CUST_8c53475cbd711789"""
"""CUST_ed680b5f37ed21fd"""
"""CUST_1b245841e67a2a5f"""
"""CUST_99ece2d9315bbed5"""
…
"""CUST_8d339046343e7d1a"""
"""CUST_a3d8212e5f1190e2"""
"""CUST_a72036e187a90bb6"""
"""CUST_d4a332fcbe53bdb8"""


### Apply customer id to both datasets

In [61]:
# load in train data
with fs.open("s3://fraud-detection-system/raw/training.parquet") as f:
    train_df = pl.read_parquet(f)

In [60]:
# load test data
with fs.open("s3://fraud-detection-system/production/prod.parquet") as f:
    test_df = pl.read_parquet(f)

In [67]:
# concat both datasets together
full_data = pl.concat([
        train_df.with_columns(pl.lit('train').alias('original_split')),
        test_df.with_columns(pl.lit('test').alias('original_split'))
    ])

In [68]:
full_data2 = full_data.with_columns(
        pl.struct(sensitive_columns)
        .map_elements(lambda x: generate_customer_id(x), return_dtype=pl.String)
        .alias('customer_id')
    )

In [72]:
# split the data back into train and test
processed_train = full_data2.filter(pl.col('original_split') == 'train').drop('original_split')
processed_test = full_data2.filter(pl.col('original_split') == 'test').drop('original_split')

# Verify no data leakage
assert len(processed_train) == len(train_df), "Training data size mismatch"
assert len(processed_test) == len(test_df), "Test data size mismatch"