<a href="https://colab.research.google.com/github/git-shashank-hp/Structured-ML-Credit-Card-Fraud-Detection-Project/blob/main/code_in_pyspark_ML_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import feature
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder \
    .appName("Spark ML Example") \
    .getOrCreate()


In [2]:
from google.colab import drive
drive.mount('/content/gdrive')


Mounted at /content/gdrive


In [44]:
file_path = '/content/gdrive/MyDrive/Colab Notebooks/CCDP/fraudTrain.csv'

In [47]:
# Reading CSV data into a Spark DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Show first few rows of data
df.show(5)


+---+---------------------+----------------+--------------------+-------------+------+---------+-------+------+--------------------+--------------+-----+-----+-------+---------+--------+--------------------+----------+--------------------+----------+------------------+-----------+--------+
|_c0|trans_date_trans_time|          cc_num|            merchant|     category|   amt|    first|   last|gender|              street|          city|state|  zip|    lat|     long|city_pop|                 job|       dob|           trans_num| unix_time|         merch_lat| merch_long|is_fraud|
+---+---------------------+----------------+--------------------+-------------+------+---------+-------+------+--------------------+--------------+-----+-----+-------+---------+--------+--------------------+----------+--------------------+----------+------------------+-----------+--------+
|  0|  2019-01-01 00:00:18|2703186189652095|fraud_Rippin, Kub...|     misc_net|  4.97| Jennifer|  Banks|     F|      561 Perry 

In [48]:
# Print schema to see column names and types
df.printSchema()


root
 |-- _c0: integer (nullable = true)
 |-- trans_date_trans_time: timestamp (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)



In [49]:
row_count = df.count()
print(f"Number of rows: {row_count}")


Number of rows: 1296675


In [50]:
len(df.columns)

23

In [51]:
import math
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# Initialize Spark session
spark = SparkSession.builder.appName("HaversineExample").getOrCreate()

# Haversine formula function to calculate distance between two latitudes and longitudes (in kilometers)
def haversine(lat1, lon1, lat2, lon2):
    # Radius of the Earth in kilometers
    R = 6371.0

    # Convert degrees to radians
    lat1_rad = math.radians(lat1)
    lon1_rad = math.radians(lon1)
    lat2_rad = math.radians(lat2)
    lon2_rad = math.radians(lon2)

    # Difference in coordinates
    dlat = lat2_rad - lat1_rad
    dlon = lon2_rad - lon1_rad

    # Haversine formula
    a = math.sin(dlat / 2)**2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon / 2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

    # Distance in kilometers
    distance = R * c
    return distance

# Register the Haversine function as a UDF (User Defined Function)
haversine_udf = udf(haversine, DoubleType())

# Assuming you have already loaded the dataset `df`
# If you have more columns, let's make sure that we keep them all

# Example to load your dataset into DataFrame (replace 'file_path' with the actual path)
# Reading CSV data into a Spark DataFrame
df1 = spark.read.csv(file_path, header=True, inferSchema=True)

# Apply the Haversine UDF to calculate the distance and keep all original columns
df_with_distance = df1.withColumn("distance_from_merchant",
                                 haversine_udf("lat", "long", "merch_lat", "merch_long"))

# Show the resulting DataFrame with the calculated distance, along with all original columns
df_with_distance.show(truncate=False)


+---+---------------------+-------------------+----------------------------------------+-------------+------+-----------+---------+------+------------------------------+------------------------+-----+-----+-------+------------------+--------+---------------------------------------------+----------+--------------------------------+----------+------------------+------------------+--------+----------------------+
|_c0|trans_date_trans_time|cc_num             |merchant                                |category     |amt   |first      |last     |gender|street                        |city                    |state|zip  |lat    |long              |city_pop|job                                          |dob       |trans_num                       |unix_time |merch_lat         |merch_long        |is_fraud|distance_from_merchant|
+---+---------------------+-------------------+----------------------------------------+-------------+------+-----------+---------+------+------------------------------+---

In [52]:
# Print schema to see column names and types
df_with_distance.printSchema()


root
 |-- _c0: integer (nullable = true)
 |-- trans_date_trans_time: timestamp (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)
 |-- distance_from_merchant: double (nullable = true)



In [53]:
# Drop unwanted columns from df_with_distance
df_cleaned = df_with_distance.drop('_c0', 'merchant', 'cc_num', 'first',
                                   'state', 'last', 'trans_num', 'unix_time',
                                   'street', 'city', 'lat', 'long', 'merch_lat',
                                   'merch_long', 'zip')

# Show the cleaned DataFrame with the remaining columns
df_cleaned.show(truncate=False)


+---------------------+-------------+------+------+--------+---------------------------------------------+----------+--------+----------------------+
|trans_date_trans_time|category     |amt   |gender|city_pop|job                                          |dob       |is_fraud|distance_from_merchant|
+---------------------+-------------+------+------+--------+---------------------------------------------+----------+--------+----------------------+
|2019-01-01 00:00:18  |misc_net     |4.97  |F     |3495    |Psychologist, counselling                    |1988-03-09|0       |78.59756848823062     |
|2019-01-01 00:00:44  |grocery_pos  |107.23|F     |149     |Special educational needs teacher            |1978-06-21|0       |30.212175719210443    |
|2019-01-01 00:00:51  |entertainment|220.11|M     |4154    |Nature conservation officer                  |1962-01-19|0       |108.20608258720067    |
|2019-01-01 00:01:16  |gas_transport|45.0  |M     |1939    |Patent attorney                         

In [54]:
from pyspark.sql.functions import to_timestamp

# Convert 'trans_date_trans_time' to timestamp
df_cleaned = df_cleaned.withColumn('trans_date_trans_time', to_timestamp('trans_date_trans_time'))

# Convert 'dob' to timestamp
df_cleaned = df_cleaned.withColumn('dob', to_timestamp('dob'))

# Show the updated DataFrame schema to verify the changes
df_cleaned.printSchema()

# Show the DataFrame to verify the conversions
df_cleaned.show(truncate=False)


root
 |-- trans_date_trans_time: timestamp (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: timestamp (nullable = true)
 |-- is_fraud: integer (nullable = true)
 |-- distance_from_merchant: double (nullable = true)

+---------------------+-------------+------+------+--------+---------------------------------------------+-------------------+--------+----------------------+
|trans_date_trans_time|category     |amt   |gender|city_pop|job                                          |dob                |is_fraud|distance_from_merchant|
+---------------------+-------------+------+------+--------+---------------------------------------------+-------------------+--------+----------------------+
|2019-01-01 00:00:18  |misc_net     |4.97  |F     |3495    |Psychologist, counselling                    |1988-03-09 00:00:00|0       |78.5

In [55]:
from pyspark.sql import functions as F
from pyspark.sql.functions import year, month, dayofmonth, current_date

# Extract individual date-time components from 'trans_date_trans_time'
df_cleaned = df_cleaned.withColumn('year', year('trans_date_trans_time'))
df_cleaned = df_cleaned.withColumn('Trans_month', month('trans_date_trans_time'))

# Extract individual date-time components from 'dob'
df_cleaned = df_cleaned.withColumn('birth_year', year('dob'))

# Calculate age by subtracting birth year from the current year
current_year = current_date().substr(1, 4).cast('int')  # Extract current year as integer
df_cleaned = df_cleaned.withColumn('age', current_year - df_cleaned['birth_year'])

# Show the updated DataFrame with the new columns
df_cleaned.show(truncate=False)


+---------------------+-------------+------+------+--------+---------------------------------------------+-------------------+--------+----------------------+----+-----------+----------+---+
|trans_date_trans_time|category     |amt   |gender|city_pop|job                                          |dob                |is_fraud|distance_from_merchant|year|Trans_month|birth_year|age|
+---------------------+-------------+------+------+--------+---------------------------------------------+-------------------+--------+----------------------+----+-----------+----------+---+
|2019-01-01 00:00:18  |misc_net     |4.97  |F     |3495    |Psychologist, counselling                    |1988-03-09 00:00:00|0       |78.59756848823062     |2019|1          |1988      |36 |
|2019-01-01 00:00:44  |grocery_pos  |107.23|F     |149     |Special educational needs teacher            |1978-06-21 00:00:00|0       |30.212175719210443    |2019|1          |1978      |46 |
|2019-01-01 00:00:51  |entertainment|220.11|M

In [56]:
from pyspark.sql.functions import col

# Convert 'year' and 'Trans_month' columns to string (PySpark equivalent of 'object' in Pandas)
df_cleaned = df_cleaned.withColumn('year', col('year').cast('string'))
df_cleaned = df_cleaned.withColumn('Trans_month', col('Trans_month').cast('string'))

# Show the updated DataFrame with the new column types
df_cleaned.printSchema()
df_cleaned.show(truncate=False)


root
 |-- trans_date_trans_time: timestamp (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: timestamp (nullable = true)
 |-- is_fraud: integer (nullable = true)
 |-- distance_from_merchant: double (nullable = true)
 |-- year: string (nullable = true)
 |-- Trans_month: string (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- age: integer (nullable = true)

+---------------------+-------------+------+------+--------+---------------------------------------------+-------------------+--------+----------------------+----+-----------+----------+---+
|trans_date_trans_time|category     |amt   |gender|city_pop|job                                          |dob                |is_fraud|distance_from_merchant|year|Trans_month|birth_year|age|
+---------------------+-------------+------+------+--------+----------------

In [57]:
# Drop specified columns from df_cleaned
df_cleaned = df_cleaned.drop('trans_date_trans_time', 'birth_year', 'dob', 'job')

# Show the cleaned DataFrame to verify the columns have been removed
df_cleaned.show(truncate=False)


+-------------+------+------+--------+--------+----------------------+----+-----------+---+
|category     |amt   |gender|city_pop|is_fraud|distance_from_merchant|year|Trans_month|age|
+-------------+------+------+--------+--------+----------------------+----+-----------+---+
|misc_net     |4.97  |F     |3495    |0       |78.59756848823062     |2019|1          |36 |
|grocery_pos  |107.23|F     |149     |0       |30.212175719210443    |2019|1          |46 |
|entertainment|220.11|M     |4154    |0       |108.20608258720067    |2019|1          |62 |
|gas_transport|45.0  |M     |1939    |0       |95.67323113819748     |2019|1          |57 |
|misc_pos     |41.96 |M     |99      |0       |77.5567436258178      |2019|1          |38 |
|gas_transport|94.63 |F     |2158    |0       |85.92264266264023     |2019|1          |63 |
|grocery_net  |44.54 |F     |2691    |0       |118.11977555909641    |2019|1          |31 |
|gas_transport|71.65 |M     |6018    |0       |12.766922541959126    |2019|1    

In [58]:
from pyspark.sql.functions import max

# Find the maximum value of 'city_pop' column
max_city_pop = df_cleaned.agg(max('city_pop')).collect()[0][0]

# Print the maximum value
print("Maximum value of 'city_pop':", max_city_pop)



Maximum value of 'city_pop': 2906700


In [59]:
from pyspark.sql import functions as F

# Define the population thresholds
rural_threshold = 1000000  # Population < 1 million
metropolitan_threshold = 5000000  # Population > 5 million

# Create a new 'city_type' column based on population
df_cleaned = df_cleaned.withColumn(
    'city_type',
    F.when(df_cleaned['city_pop'] < rural_threshold, 'rural')  # Cities with < 1M population
     .when((df_cleaned['city_pop'] >= rural_threshold) & (df_cleaned['city_pop'] < metropolitan_threshold), 'subarban')  # Cities between 1M and 5M population
     .otherwise('3 - metropolitan')  # Cities with > 5M population
)

'''
1 - rural
2 - subarban
3 - metropolitan
'''

# Show the updated DataFrame with the new 'city_type' column
df_cleaned.select('city_pop', 'city_type').show(truncate=False)


+--------+---------+
|city_pop|city_type|
+--------+---------+
|3495    |rural    |
|149     |rural    |
|4154    |rural    |
|1939    |rural    |
|99      |rural    |
|2158    |rural    |
|2691    |rural    |
|6018    |rural    |
|1472    |rural    |
|151785  |rural    |
|7297    |rural    |
|1925    |rural    |
|341043  |rural    |
|589     |rural    |
|899     |rural    |
|4664    |rural    |
|1078    |rural    |
|4081    |rural    |
|2518    |rural    |
|124967  |rural    |
+--------+---------+
only showing top 20 rows



In [60]:
# Drop specified columns from df_cleaned
df_cleaned = df_cleaned.drop('city_pop')

# Show the cleaned DataFrame to verify the columns have been removed
df_cleaned.show(truncate=False)

+-------------+------+------+--------+----------------------+----+-----------+---+---------+
|category     |amt   |gender|is_fraud|distance_from_merchant|year|Trans_month|age|city_type|
+-------------+------+------+--------+----------------------+----+-----------+---+---------+
|misc_net     |4.97  |F     |0       |78.59756848823062     |2019|1          |36 |rural    |
|grocery_pos  |107.23|F     |0       |30.212175719210443    |2019|1          |46 |rural    |
|entertainment|220.11|M     |0       |108.20608258720067    |2019|1          |62 |rural    |
|gas_transport|45.0  |M     |0       |95.67323113819748     |2019|1          |57 |rural    |
|misc_pos     |41.96 |M     |0       |77.5567436258178      |2019|1          |38 |rural    |
|gas_transport|94.63 |F     |0       |85.92264266264023     |2019|1          |63 |rural    |
|grocery_net  |44.54 |F     |0       |118.11977555909641    |2019|1          |31 |rural    |
|gas_transport|71.65 |M     |0       |12.766922541959126    |2019|1   

In [61]:
from pyspark.sql.functions import col

# Convert 'amt' and 'distance_from_merchant' to integer (whole numbers)
df_cleaned = df_cleaned.withColumn('amt', col('amt').cast('int'))
df_cleaned = df_cleaned.withColumn('distance_from_merchant', col('distance_from_merchant').cast('int'))

# Show the updated DataFrame with the converted columns
df_cleaned.select('amt', 'distance_from_merchant').show(truncate=False)


+---+----------------------+
|amt|distance_from_merchant|
+---+----------------------+
|4  |78                    |
|107|30                    |
|220|108                   |
|45 |95                    |
|41 |77                    |
|94 |85                    |
|44 |118                   |
|71 |12                    |
|4  |25                    |
|198|74                    |
|24 |97                    |
|7  |106                   |
|71 |44                    |
|96 |25                    |
|7  |66                    |
|3  |97                    |
|327|87                    |
|341|87                    |
|63 |90                    |
|44 |84                    |
+---+----------------------+
only showing top 20 rows



In [62]:
# Show the cleaned DataFrame to verify the columns have been removed
df_cleaned.show(truncate=False)

+-------------+---+------+--------+----------------------+----+-----------+---+---------+
|category     |amt|gender|is_fraud|distance_from_merchant|year|Trans_month|age|city_type|
+-------------+---+------+--------+----------------------+----+-----------+---+---------+
|misc_net     |4  |F     |0       |78                    |2019|1          |36 |rural    |
|grocery_pos  |107|F     |0       |30                    |2019|1          |46 |rural    |
|entertainment|220|M     |0       |108                   |2019|1          |62 |rural    |
|gas_transport|45 |M     |0       |95                    |2019|1          |57 |rural    |
|misc_pos     |41 |M     |0       |77                    |2019|1          |38 |rural    |
|gas_transport|94 |F     |0       |85                    |2019|1          |63 |rural    |
|grocery_net  |44 |F     |0       |118                   |2019|1          |31 |rural    |
|gas_transport|71 |M     |0       |12                    |2019|1          |77 |rural    |
|misc_pos 

In [63]:
from pyspark.sql import functions as F

# Map 'M' to 1 and 'F' to 0 in the 'gender' column
df_cleaned = df_cleaned.withColumn(
    'gender',
    F.when(df_cleaned['gender'] == 'M', 1)
     .when(df_cleaned['gender'] == 'F', 0)
     .otherwise(df_cleaned['gender'])  # Keeps other values if there are any (optional)
)

# Show the updated DataFrame with the mapped 'gender' column
df_cleaned.select('gender').show(truncate=False)


+------+
|gender|
+------+
|0     |
|0     |
|1     |
|1     |
|1     |
|0     |
|0     |
|1     |
|0     |
|0     |
|1     |
|0     |
|1     |
|1     |
|1     |
|1     |
|0     |
|1     |
|1     |
|1     |
+------+
only showing top 20 rows



In [64]:
# Show data types of all columns
for col_name, dtype in df_cleaned.dtypes:
    print(f"Column: {col_name}, Type: {dtype}")


Column: category, Type: string
Column: amt, Type: int
Column: gender, Type: string
Column: is_fraud, Type: int
Column: distance_from_merchant, Type: int
Column: year, Type: string
Column: Trans_month, Type: string
Column: age, Type: int
Column: city_type, Type: string


In [66]:
from pyspark.sql.types import StringType

# Extract categorical columns (columns with string type)
categorical_cols = [field.name for field in df_cleaned.schema.fields if isinstance(field.dataType, StringType)]

# Show the categorical columns
print(categorical_cols)


['category', 'gender', 'year', 'Trans_month', 'city_type']


In [72]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

# Define the columns to be label encoded
cols = ['year', 'Trans_month']

# Create a list to hold the StringIndexer stages
indexers = []

for col_name in cols:
    # Create a StringIndexer for each column
    indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_index")
    indexers.append(indexer)

# Create a pipeline to apply the transformations
pipeline = Pipeline(stages=indexers)

# Fit and transform the data to create the new indexed columns
df_cleaned = pipeline.fit(df_cleaned).transform(df_cleaned)

# Show the updated dataframe with indexed columns
df_cleaned.select('year', 'Trans_month', 'year_index', 'Trans_month_index').show(5)


+----+-----------+----------+-----------------+
|year|Trans_month|year_index|Trans_month_index|
+----+-----------+----------+-----------------+
|2019|          1|       0.0|              5.0|
|2019|          1|       0.0|              5.0|
|2019|          1|       0.0|              5.0|
|2019|          1|       0.0|              5.0|
|2019|          1|       0.0|              5.0|
+----+-----------+----------+-----------------+
only showing top 5 rows



In [73]:
# Show the resulting DataFrame with encoded columns
df_cleaned.show()

+-------------+---+------+--------+----------------------+----+-----------+---+---------+----------+-----------------+
|     category|amt|gender|is_fraud|distance_from_merchant|year|Trans_month|age|city_type|year_index|Trans_month_index|
+-------------+---+------+--------+----------------------+----+-----------+---+---------+----------+-----------------+
|     misc_net|  4|     0|       0|                    78|2019|          1| 36|    rural|       0.0|              5.0|
|  grocery_pos|107|     0|       0|                    30|2019|          1| 46|    rural|       0.0|              5.0|
|entertainment|220|     1|       0|                   108|2019|          1| 62|    rural|       0.0|              5.0|
|gas_transport| 45|     1|       0|                    95|2019|          1| 57|    rural|       0.0|              5.0|
|     misc_pos| 41|     1|       0|                    77|2019|          1| 38|    rural|       0.0|              5.0|
|gas_transport| 94|     0|       0|             

In [74]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

# Define the non-ordinal categorical columns
non_ordinal_cols = ['category', 'gender', 'city_type']

# Step 1: Create StringIndexer and OneHotEncoder for each non-ordinal column
indexers = []
encoders = []

for col_name in non_ordinal_cols:
    # StringIndexer: Convert each categorical column to an index
    indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_index")

    # OneHotEncoder: Convert the indexed column to one-hot encoding
    encoder = OneHotEncoder(inputCol=f"{col_name}_index", outputCol=f"{col_name}_onehot")

    indexers.append(indexer)
    encoders.append(encoder)

# Combine both indexers and encoders into a pipeline
pipeline = Pipeline(stages=indexers + encoders)

# Fit and transform the data, apply the transformation directly to df_cleaned
df_cleaned = pipeline.fit(df_cleaned).transform(df_cleaned)

# Step 2: Convert boolean columns to integers (example for 'is_fraud')
df_cleaned = df_cleaned.withColumn("is_fraud", F.col("is_fraud").cast(IntegerType()))

# Show the updated dataframe with the added one-hot encoded columns
df_cleaned.select('category', 'gender', 'city_type', 'is_fraud').show(5)

# Step 3: Inspect the schema to verify that one-hot encoded columns have been added
df_cleaned.printSchema()


+-------------+------+---------+--------+
|     category|gender|city_type|is_fraud|
+-------------+------+---------+--------+
|     misc_net|     0|    rural|       0|
|  grocery_pos|     0|    rural|       0|
|entertainment|     1|    rural|       0|
|gas_transport|     1|    rural|       0|
|     misc_pos|     1|    rural|       0|
+-------------+------+---------+--------+
only showing top 5 rows

root
 |-- category: string (nullable = true)
 |-- amt: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- is_fraud: integer (nullable = true)
 |-- distance_from_merchant: integer (nullable = true)
 |-- year: string (nullable = false)
 |-- Trans_month: string (nullable = false)
 |-- age: integer (nullable = true)
 |-- city_type: string (nullable = false)
 |-- year_index: double (nullable = false)
 |-- Trans_month_index: double (nullable = false)
 |-- category_index: double (nullable = false)
 |-- gender_index: double (nullable = false)
 |-- city_type_index: double (nullable

## Modeling


In [75]:
from pyspark.sql import functions as F

# Filter the fraudulent and non-fraudulent data
fraud_data = df_cleaned.filter(F.col('is_fraud') == 1)
non_fraud_data = df_cleaned.filter(F.col('is_fraud') == 0)

# Sample 25% of both fraud and non-fraud data
sample_fraud_data = fraud_data.sample(fraction=0.50, seed=42)
sample_non_fraud_data = non_fraud_data.sample(fraction=0.50, seed=42)

# Combine the sampled data into one DataFrame
sampled_data = sample_fraud_data.union(sample_non_fraud_data)

# Show the result
sampled_data.show(5)


+-------------+----+------+--------+----------------------+----+-----------+---+---------+----------+-----------------+--------------+------------+---------------+---------------+-------------+----------------+
|     category| amt|gender|is_fraud|distance_from_merchant|year|Trans_month|age|city_type|year_index|Trans_month_index|category_index|gender_index|city_type_index|category_onehot|gender_onehot|city_type_onehot|
+-------------+----+------+--------+----------------------+----+-----------+---+---------+----------+-----------------+--------------+------------+---------------+---------------+-------------+----------------+
|gas_transport|   7|     1|       1|                    34|2019|          1| 36|    rural|       0.0|              5.0|           0.0|         1.0|            0.0| (13,[0],[1.0])|    (1,[],[])|   (1,[0],[1.0])|
|gas_transport|  10|     0|       1|                    91|2019|          1| 64| subarban|       0.0|              5.0|           0.0|         0.0|         

In [78]:
# Number of rows (count of records)
num_rows = sampled_data.count()

# Number of columns
num_columns = len(sampled_data.columns)

# Shape of the DataFrame
print(f"Shape of the sampled data: ({num_rows}, {num_columns})")


Shape of the sampled data: (648804, 17)


In [83]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql import functions as F

# Step 1: Convert year and Trans_month to numeric values (IntegerType or DoubleType)
sampled_data = sampled_data.withColumn("year", sampled_data["year"].cast("integer"))
sampled_data = sampled_data.withColumn("Trans_month", sampled_data["Trans_month"].cast("integer"))

# Step 2: Apply StringIndexer to categorical columns
categorical_cols = ['category', 'gender', 'city_type']

# Apply StringIndexer to convert categorical columns to numeric indices
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_index") for col in categorical_cols]

# Step 3: Assemble the features into a single vector (including the indexed columns and numeric columns)
feature_columns = [f"{col}_index" for col in categorical_cols] + ['year', 'Trans_month', 'distance_from_merchant', 'age']

# Apply VectorAssembler to combine all features
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Step 4: Apply StandardScaler to scale the features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)

# Step 5: Create a pipeline
pipeline = Pipeline(stages=indexers + [assembler, scaler])

# Step 6: Fit and transform the data to add scaled features
df_transformed = pipeline.fit(sampled_data).transform(sampled_data)

# Step 7: Show the transformed data with scaled features
df_transformed.select('scaled_features', 'is_fraud').show(5)

# Step 8: Split the data into training and test sets
train_data, test_data = df_transformed.randomSplit([0.8, 0.2], seed=42)

# Step 9: Show the training and test sets' shapes (row counts)
print(f"Training set size: {train_data.count()}")
print(f"Test set size: {test_data.count()}")

# Step 10: Select features and target for model training
train_data = train_data.select('scaled_features', 'is_fraud')
test_data = test_data.select('scaled_features', 'is_fraud')

# Show the final training data
train_data.show(5)


IllegalArgumentException: requirement failed: Output column category_index already exists.

In [82]:
# Check the schema to see data types
sampled_data.printSchema()

# Check a sample of the data to identify potential issues with any column
sampled_data.show(5)


root
 |-- category: string (nullable = true)
 |-- amt: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- is_fraud: integer (nullable = true)
 |-- distance_from_merchant: integer (nullable = true)
 |-- year: string (nullable = false)
 |-- Trans_month: string (nullable = false)
 |-- age: integer (nullable = true)
 |-- city_type: string (nullable = false)
 |-- year_index: double (nullable = false)
 |-- Trans_month_index: double (nullable = false)
 |-- category_index: double (nullable = false)
 |-- gender_index: double (nullable = false)
 |-- city_type_index: double (nullable = false)
 |-- category_onehot: vector (nullable = true)
 |-- gender_onehot: vector (nullable = true)
 |-- city_type_onehot: vector (nullable = true)

+-------------+----+------+--------+----------------------+----+-----------+---+---------+----------+-----------------+--------------+------------+---------------+---------------+-------------+----------------+
|     category| amt|gender|is_fraud|dista