In [96]:
from pyspark.ml.feature import PCA, StringIndexer, VectorAssembler
from pyspark.sql import SparkSession

In [97]:
# starting Spark session
spark = SparkSession.builder \
    .appName("Dimensionality Reduction with Spark for CSV") \
    .getOrCreate()

# Load CSV and see columns names and numbers
df = spark.read.format("csv").option("header", "true").load("./fraud_data.csv")  # Replace with the path to your CSV file
df.printSchema()
num_cols_df = print(len(df.columns))


root
 |-- Month: string (nullable = true)
 |-- WeekOfMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- AccidentArea: string (nullable = true)
 |-- DayOfWeekClaimed: string (nullable = true)
 |-- MonthClaimed: string (nullable = true)
 |-- WeekOfMonthClaimed: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Fault: string (nullable = true)
 |-- PolicyType: string (nullable = true)
 |-- VehicleCategory: string (nullable = true)
 |-- VehiclePrice: string (nullable = true)
 |-- FraudFound_P: string (nullable = true)
 |-- PolicyNumber: string (nullable = true)
 |-- RepNumber: string (nullable = true)
 |-- Deductible: string (nullable = true)
 |-- DriverRating: string (nullable = true)
 |-- Days_Policy_Accident: string (nullable = true)
 |-- Days_Policy_Claim: string (nullable = true)
 |-- PastNumberOfClaims: string (nullable = true)
 

In [98]:
# see the contents of data for 10 columns
df.show(10)


+-----+-----------+---------+------+------------+----------------+------------+------------------+------+-------------+---+-------------+--------------------+---------------+---------------+------------+------------+---------+----------+------------+--------------------+-----------------+------------------+------------+-----------------+-----------------+--------------+---------+-------------------+-------------------+------------+----+----------+
|Month|WeekOfMonth|DayOfWeek|  Make|AccidentArea|DayOfWeekClaimed|MonthClaimed|WeekOfMonthClaimed|   Sex|MaritalStatus|Age|        Fault|          PolicyType|VehicleCategory|   VehiclePrice|FraudFound_P|PolicyNumber|RepNumber|Deductible|DriverRating|Days_Policy_Accident|Days_Policy_Claim|PastNumberOfClaims|AgeOfVehicle|AgeOfPolicyHolder|PoliceReportFiled|WitnessPresent|AgentType|NumberOfSuppliments|AddressChange_Claim|NumberOfCars|Year|BasePolicy|
+-----+-----------+---------+------+------------+----------------+------------+-----------------

In [99]:
# Select the feature columns from the DataFrame
feature_cols = df.columns[1:33]  
print(feature_cols)
print(len(feature_cols))

['WeekOfMonth', 'DayOfWeek', 'Make', 'AccidentArea', 'DayOfWeekClaimed', 'MonthClaimed', 'WeekOfMonthClaimed', 'Sex', 'MaritalStatus', 'Age', 'Fault', 'PolicyType', 'VehicleCategory', 'VehiclePrice', 'FraudFound_P', 'PolicyNumber', 'RepNumber', 'Deductible', 'DriverRating', 'Days_Policy_Accident', 'Days_Policy_Claim', 'PastNumberOfClaims', 'AgeOfVehicle', 'AgeOfPolicyHolder', 'PoliceReportFiled', 'WitnessPresent', 'AgentType', 'NumberOfSuppliments', 'AddressChange_Claim', 'NumberOfCars', 'Year', 'BasePolicy']
32


In [100]:
# Increas of columns is expected due to conversion from string to numeric values in the columns
# make new data frame and convert string columns to numeric representations
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index").fit(df) for col in feature_cols]
indexed_df = df
for indexer in indexers:
    indexed_df = indexer.transform(indexed_df)

# Select the indexed feature columns for the vector assembly
indexed_feature_cols = [col+"_index" for col in feature_cols]

# Convert the indexed feature columns to a vector column
assembler = VectorAssembler(inputCols=indexed_feature_cols, outputCol="features")
df_with_features = assembler.transform(indexed_df)
num_cols_df = print(len(df_with_features.columns))


66


In [101]:
# Apply PCA for dimensionality reduction
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
pca_model = pca.fit(df_with_features)
reduced_df = pca_model.transform(df_with_features)

# Display the reduced dimensionality DataFrame
reduced_df.show()
num_cols_reduced_df = print(len(reduced_df.columns))

# Stop Spark session
spark.stop()

+-----+-----------+---------+---------+------------+----------------+------------+------------------+------+-------------+---+-------------+--------------------+---------------+---------------+------------+------------+---------+----------+------------+--------------------+-----------------+------------------+------------+-----------------+-----------------+--------------+---------+-------------------+-------------------+------------+----+----------+-----------------+---------------+----------+------------------+----------------------+------------------+------------------------+---------+-------------------+---------+-----------+----------------+---------------------+------------------+------------------+------------------+---------------+----------------+------------------+--------------------------+-----------------------+------------------------+------------------+-----------------------+-----------------------+--------------------+---------------+-------------------------+---------