__Feature engineering for credit card transactions data__


In [1]:
#########################
# Read tx data in pyspark dataframe
#########################

df = catalog.load("raw_daily_data")
df.printSchema()
df.count()

root
 |-- TRANSACTION_ID: integer (nullable = true)
 |-- TX_DATETIME: timestamp (nullable = true)
 |-- CUSTOMER_ID: integer (nullable = true)
 |-- TERMINAL_ID: integer (nullable = true)
 |-- TX_AMOUNT: double (nullable = true)
 |-- TX_TIME_SECONDS: integer (nullable = true)
 |-- TX_TIME_DAYS: integer (nullable = true)
 |-- TX_FRAUD: integer (nullable = true)
 |-- TX_FRAUD_SCENARIO: integer (nullable = true)
 |-- TX_DURING_WEEKEND: integer (nullable = true)
 |-- TX_DURING_NIGHT: integer (nullable = true)
 |-- CUSTOMER_ID_NB_TX_1DAY_WINDOW: double (nullable = true)
 |-- CUSTOMER_ID_AVG_AMOUNT_1DAY_WINDOW: double (nullable = true)
 |-- CUSTOMER_ID_NB_TX_7DAY_WINDOW: double (nullable = true)
 |-- CUSTOMER_ID_AVG_AMOUNT_7DAY_WINDOW: double (nullable = true)
 |-- CUSTOMER_ID_NB_TX_30DAY_WINDOW: double (nullable = true)
 |-- CUSTOMER_ID_AVG_AMOUNT_30DAY_WINDOW: double (nullable = true)
 |-- TERMINAL_ID_NB_TX_1DAY_WINDOW: double (nullable = true)
 |-- TERMINAL_ID_RISK_1DAY_WINDOW: double (null

[1;36m881360[0m

In [2]:
#########################
# Function to handle missing data by filling with mean
#########################

import pyspark.sql.functions as F

def handle_missing_data(df):
    for column in df.columns:
        if df.select(column).dtypes[0][1] in ["int", "double"]:
            mean_value = df.select(F.mean(F.col(column))).collect()[0][0]
            df = df.fillna(mean_value, subset=[column])
    return df

In [3]:
#########################
# Function for one-hot encoding
#########################

from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

def one_hot_encoding(df, input_col, output_col):
    indexer = StringIndexer(inputCol=input_col, outputCol=f"{input_col}_index")
    encoder = OneHotEncoder(inputCol=f"{input_col}_index", outputCol=output_col)
    pipeline = Pipeline(stages=[indexer, encoder])
    df = pipeline.fit(df).transform(df)
    return df


In [4]:
#########################
# Function for binning or discretization
#########################

from pyspark.ml.feature import Bucketizer

def binning(df, input_col, output_col, splits):
    bucketizer = Bucketizer(splits=splits, inputCol=input_col, outputCol=output_col)
    df = bucketizer.transform(df)
    return df


In [5]:
#########################
# Convert Column to Vector
#########################

from pyspark.ml.feature import VectorAssembler

def vectorize_column(df, input_col, output_col):
    assembler = VectorAssembler(inputCols=[input_col], outputCol=output_col)
    df = assembler.transform(df)
    return df


In [6]:
#########################
# Function for feature scaling
#########################

from pyspark.ml.feature import StandardScaler

def feature_scaling(df, input_col, output_col):
    # Convert the column to a vector
    vector_col = f"{input_col}_vec"
    df = vectorize_column(df, input_col, vector_col)
    
    # Apply standard scaling
    scaler = StandardScaler(inputCol=vector_col, outputCol=output_col)
    scaler_model = scaler.fit(df)
    df = scaler_model.transform(df)
    return df



In [7]:
#########################
# Function for feature creation
#########################

import pyspark.sql.functions as F

def feature_creation(df, input_col, output_col, threshold):
    df = df.withColumn(output_col, F.when(F.col(input_col) > threshold, 1).otherwise(0))
    return df



In [8]:
# Handle missing data
df = handle_missing_data(df)
print("Handle missing data done!")

# One-hot encoding for 'TX_FRAUD_SCENARIO'
df = one_hot_encoding(df, input_col="TX_FRAUD_SCENARIO", output_col="TX_FRAUD_SCENARIO_VEC")
print("One-hot encoding done!")

# Binning for 'TX_AMOUNT'
splits = [-float("inf"), 10, 50, 100, float("inf")]
df = binning(df, input_col="TX_AMOUNT", output_col="TX_AMOUNT_BINNED", splits=splits)
print("Binning done!")

# Feature scaling for 'TX_AMOUNT'
df = feature_scaling(df, input_col="TX_AMOUNT", output_col="TX_AMOUNT_SCALED")
print("Feature scaling done!")

# Feature creation for high value transactions
df = feature_creation(df, input_col="TX_AMOUNT", output_col="HIGH_VALUE_TX", threshold=50)
print("Feature creation done!")

# Show the resulting dataframe
df.show()


Handle missing data done!
One-hot encoding done!
Binning done!
Feature scaling done!
Feature creation done!
+--------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+-----------------+---------------+-----------------------------+----------------------------------+-----------------------------+----------------------------------+------------------------------+-----------------------------------+-----------------------------+----------------------------+-----------------------------+----------------------------+------------------------------+-----------------------------+-----------------------+---------------------+----------------+-------------+--------------------+-------------+
|TRANSACTION_ID|        TX_DATETIME|CUSTOMER_ID|TERMINAL_ID|TX_AMOUNT|TX_TIME_SECONDS|TX_TIME_DAYS|TX_FRAUD|TX_FRAUD_SCENARIO|TX_DURING_WEEKEND|TX_DURING_NIGHT|CUSTOMER_ID_NB_TX_1DAY_WINDOW|CUSTOMER_ID_AVG_AMOUNT_1DAY_WINDOW|CUSTOMER_ID_NB_TX_

In [12]:
pipelines

[1m{[0m[32m'__default__'[0m: [1;35mPipeline[0m[1m([0m[1m[[0m[1m][0m[1m)[0m, [32m'data_engineering'[0m: [1;35mPipeline[0m[1m([0m[1m[[0m[1m][0m[1m)[0m, [32m'data_science'[0m: [1;35mPipeline[0m[1m([0m[1m[[0m[1m][0m[1m)[0m, [32m'model_evaluation'[0m: [1;35mPipeline[0m[1m([0m[1m[[0m[1m][0m[1m)[0m[1m}[0m