In [1]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("Credit Card Fraud Detection") \
    .getOrCreate()

# Load CSV correctly (no line break!)
df = spark.read.csv(
    "hdfs:///user/talentum/NFeatures/credit_card_transactions.csv",
    header=True,
    inferSchema=True
)

# Show schema
df.printSchema()


root
 |-- Unnamed: 0: integer (nullable = true)
 |-- trans_date_trans_time: timestamp (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: timestamp (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)
 |-- merch_zipcode: integer (nullable = true)



In [2]:
rows = df.count()
cols = len(df.columns)
print(f"Shape: ({rows}, {cols})")


Shape: (1296675, 24)


In [3]:
from pyspark.sql.functions import col, sum, when

null_counts = df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns])
null_counts.show()


+----------+---------------------+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+---------+----------+--------+-------------+
|Unnamed: 0|trans_date_trans_time|cc_num|merchant|category|amt|first|last|gender|street|city|state|zip|lat|long|city_pop|job|dob|trans_num|unix_time|merch_lat|merch_long|is_fraud|merch_zipcode|
+----------+---------------------+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+---------+----------+--------+-------------+
|         0|                    0|     0|       0|       0|  0|    0|   0|     0|     0|   0|    0|  0|  0|   0|       0|  0|  0|        0|        0|        0|         0|       0|       195973|
+----------+---------------------+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+---------+----------+--------+-------------+



In [6]:
df = df.drop('merch_zipcode')


In [7]:
df.columns

['Unnamed: 0',
 'trans_date_trans_time',
 'cc_num',
 'merchant',
 'category',
 'amt',
 'first',
 'last',
 'gender',
 'street',
 'city',
 'state',
 'zip',
 'lat',
 'long',
 'city_pop',
 'job',
 'dob',
 'trans_num',
 'unix_time',
 'merch_lat',
 'merch_long',
 'is_fraud']

In [8]:
from pyspark.sql.functions import col, datediff, to_date, floor

# Convert to date format (if not already)
df = df.withColumn("dob", to_date(col("dob")))
df = df.withColumn("trans_date_trans_time", to_date(col("trans_date_trans_time")))

# Calculate age in years
df = df.withColumn("age", floor(datediff(col("trans_date_trans_time"), col("dob")) / 365))

# Drop the 'dob' column
df = df.drop("dob")


In [9]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from geopy.distance import geodesic

# Define the Python function
def calculate_distance(lat1, lon1, lat2, lon2):
    try:
        return geodesic((lat1, lon1), (lat2, lon2)).km
    except:
        return None

# Register as UDF
distance_udf = udf(calculate_distance, DoubleType())

# Apply the UDF to create the 'distance' column
df = df.withColumn("distance", distance_udf(
    col("lat"), col("long"), col("merch_lat"), col("merch_long")
))


In [10]:
from pyspark.sql.functions import col

df = df.withColumn("amt_per_capita", col("amt") / (col("city_pop") + 1))


In [11]:
# Step 1: Get 95th percentile
q95 = df.approxQuantile("distance", [0.95], 0.01)[0]

# Step 2: Create binary 'is_far' column
from pyspark.sql.functions import col, when

df = df.withColumn("is_far", when(col("distance") > q95, 1).otherwise(0))


In [13]:
from pyspark.sql.functions import hour, dayofmonth, dayofweek, col

df = df.withColumn("trans_hour", hour(col("trans_date_trans_time"))) \
       .withColumn("trans_day", dayofmonth(col("trans_date_trans_time"))) \
       .withColumn("trans_weekday", dayofweek(col("trans_date_trans_time")) - 2)

In [14]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Step 1: Define the Python function
def hour_bucket(hour):
    if 0 <= hour < 6:
        return 'night'
    elif 6 <= hour < 12:
        return 'morning'
    elif 12 <= hour < 18:
        return 'afternoon'
    else:
        return 'evening'

# Step 2: Register UDF
hour_bucket_udf = udf(hour_bucket, StringType())

# Step 3: Apply UDF to create new column
df = df.withColumn("hour_bucket", hour_bucket_udf(col("trans_hour")))


In [15]:
from pyspark.ml.feature import StringIndexer

# Step 1: Create StringIndexer
indexer = StringIndexer(inputCol="hour_bucket", outputCol="hour_bucket_encoded")

# Step 2: Fit and transform
df = indexer.fit(df).transform(df)


In [16]:
df = df.drop('hour_bucket')


In [17]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, DoubleType

# STEP 1: StringIndexer + OneHotEncoder
categorical_cols = ['category', 'gender']
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=col + "_index", outputCol=col + "_vec", dropLast=True) for col in categorical_cols]

# STEP 2: Build and run pipeline
pipeline = Pipeline(stages=indexers + encoders)
model = pipeline.fit(df)
df = model.transform(df)

# STEP 3: Convert vector to array
def vector_to_array(v):
    return v.toArray().tolist()

vec_to_array_udf = udf(vector_to_array, ArrayType(DoubleType()))
df = df.withColumn("category_array", vec_to_array_udf(col("category_vec")))
df = df.withColumn("gender_array", vec_to_array_udf(col("gender_vec")))

# STEP 4: Get label names
category_labels = model.stages[0].labels  # from category StringIndexer
gender_labels = model.stages[1].labels    # from gender StringIndexer

# Drop first to simulate drop_first=True like pandas
category_dummies = [f"category_{label}" for label in category_labels[1:]]
gender_dummies = [f"gender_{label}" for label in gender_labels[1:]]

# STEP 5: Create individual columns for category
for i, name in enumerate(category_dummies):
    df = df.withColumn(name, col("category_array")[i])

# STEP 6: Create individual column for gender
for i, name in enumerate(gender_dummies):
    df = df.withColumn(name, col("gender_array")[i])

# STEP 7: Drop old and intermediate columns
df = df.drop("category", "category_index", "category_vec", "category_array")
df = df.drop("gender", "gender_index", "gender_vec", "gender_array")


In [18]:
from pyspark.sql.functions import count, col

# Step 1: Compute frequency of each city
city_freq_df = df.groupBy("city").agg(count("*").alias("city_freq"))

# Step 2: Join back to the original dataframe
df = df.join(city_freq_df, on="city", how="left")


In [19]:
df.columns

['city',
 'Unnamed: 0',
 'trans_date_trans_time',
 'cc_num',
 'merchant',
 'amt',
 'first',
 'last',
 'street',
 'state',
 'zip',
 'lat',
 'long',
 'city_pop',
 'job',
 'trans_num',
 'unix_time',
 'merch_lat',
 'merch_long',
 'is_fraud',
 'age',
 'distance',
 'amt_per_capita',
 'is_far',
 'trans_hour',
 'trans_day',
 'trans_weekday',
 'hour_bucket_encoded',
 'category_grocery_pos',
 'category_home',
 'category_shopping_pos',
 'category_kids_pets',
 'category_shopping_net',
 'category_entertainment',
 'category_food_dining',
 'category_personal_care',
 'category_health_fitness',
 'category_misc_pos',
 'category_misc_net',
 'category_grocery_net',
 'category_travel',
 'gender_M',
 'city_freq']

In [20]:
rows = df.count()
cols = len(df.columns)
print(f"Shape: ({rows}, {cols})")


Shape: (1296675, 43)


In [None]:
cols = ['Unnamed: 0','city','trans_date_trans_time','cc_num','merchant','first',
 'last','street','state','zip','lat','long','city_pop','job','trans_num','unix_time',
 'merch_lat','merch_long','trans_hour','trans_day','trans_weekday']
df = df.drop(*cols)
# dropped 21 columns.            

In [29]:
df.columns

['amt',
 'is_fraud',
 'age',
 'distance',
 'amt_per_capita',
 'is_far',
 'hour_bucket_encoded',
 'category_grocery_pos',
 'category_home',
 'category_shopping_pos',
 'category_kids_pets',
 'category_shopping_net',
 'category_entertainment',
 'category_food_dining',
 'category_personal_care',
 'category_health_fitness',
 'category_misc_pos',
 'category_misc_net',
 'category_grocery_net',
 'category_travel',
 'gender_M',
 'city_freq']

In [32]:
rows = df.count()
cols = len(df.columns)
print(f"Shape: ({rows}, {cols})")


Shape: (1296675, 22)


In [33]:
df.coalesce(1).write.csv("processed_fraud_data_single3.csv", header=True, mode="overwrite")