In [17]:
from pyspark.sql import SparkSession

#Creation of the SparkSession, min workers 2, max 6
spark = SparkSession.builder \
    .appName("EDA") \
    .config("spark.driver.host", "localhost") \
    .config("spark.driver.port", "4040") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "2") \
    .config("spark.dynamicAllocation.maxExecutors", "6") \
    .getOrCreate()

In [18]:
spark

In [19]:
#read csv with headers.
df = spark.read.csv("accepted_2007_to_2018Q4.csv", header=True, inferSchema=True)

In [20]:
#Import functions for Exploration
from pyspark.sql.functions import col, mean, stddev, min, max, count

In [21]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- loan_amnt: double (nullable = true)
 |-- funded_amnt: double (nullable = true)
 |-- funded_amnt_inv: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- issue_d: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- pymnt_plan: string (nullable = true)
 |-- url: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- title: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: string 

In [22]:
#Real number of rows and columns
print("Number of rows:", df.count())
print("Number of columns:", len(df.columns))

Number of rows: 2260701
Number of columns: 151


In [23]:
#Stadistical analisis in the column loan_amnt
df.select(
    mean(col("loan_amnt")).alias("mean"),
    stddev(col("loan_amnt")).alias("standard deviation"),
    min(col("loan_amnt")).alias("Min"),
    max(col("loan_amnt")).alias("Max")
).show()

+------------------+------------------+-----+-------+
|              mean|standard deviation|  Min|    Max|
+------------------+------------------+-----+-------+
|15046.931227849467| 9190.245488232786|500.0|40000.0|
+------------------+------------------+-----+-------+



In [24]:
#Count of every term
df.groupBy("term").agg(count("*").alias("count")).show()

+----------+-------+
|      term|  count|
+----------+-------+
| 36 months|1609754|
| 60 months| 650914|
|      NULL|     33|
+----------+-------+



In [25]:
#Import functions for the PCA
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA

In [26]:
#Extract numeric columns from the original df
numeric_columns = [column for column, dtype in df.dtypes if dtype in ["int", "double"]]

In [27]:
numeric_columns

['loan_amnt',
 'funded_amnt',
 'funded_amnt_inv',
 'int_rate',
 'installment',
 'inq_last_12m',
 'acc_open_past_24mths',
 'bc_open_to_buy',
 'chargeoff_within_12_mths',
 'delinq_amnt',
 'mo_sin_old_il_acct',
 'mo_sin_old_rev_tl_op',
 'mo_sin_rcnt_rev_tl_op',
 'mo_sin_rcnt_tl',
 'mort_acc',
 'mths_since_recent_bc',
 'mths_since_recent_bc_dlq',
 'mths_since_recent_inq',
 'mths_since_recent_revol_delinq',
 'num_accts_ever_120_pd',
 'num_actv_bc_tl',
 'num_actv_rev_tl',
 'num_bc_sats',
 'num_bc_tl',
 'num_il_tl',
 'num_op_rev_tl',
 'num_rev_accts',
 'num_rev_tl_bal_gt_0',
 'num_sats',
 'num_tl_120dpd_2m',
 'num_tl_30dpd',
 'num_tl_90g_dpd_24m',
 'num_tl_op_past_12m',
 'pct_tl_nvr_dlq',
 'percent_bc_gt_75',
 'pub_rec_bankruptcies',
 'tax_liens',
 'tot_hi_cred_lim',
 'total_bal_ex_mort',
 'total_bc_limit',
 'total_il_high_credit_limit',
 'revol_bal_joint',
 'sec_app_fico_range_low',
 'sec_app_fico_range_high',
 'sec_app_inq_last_6mths',
 'sec_app_mort_acc',
 'sec_app_open_acc',
 'sec_app_rev

In [28]:
# Since the VectorAssembler doesn't support missing values, we can just erase those rows with NAN
df_cleaned = df.na.drop(subset=numeric_columns)

In [29]:
#Create Assembler
assembler = VectorAssembler(inputCols=numeric_columns, outputCol="features")
df_assembled = assembler.transform(df_cleaned)

In [30]:
#Create Scaler with STD
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=False)
scaler_model = scaler.fit(df_assembled)
df_scaled = scaler_model.transform(df_assembled)

In [33]:
#Number of component for the PCA
n_components = 10
pca = PCA(k=n_components, inputCol="scaled_features", outputCol="pca_features")
pca_model = pca.fit(df_scaled)
df_pca = pca_model.transform(df_scaled)

In [35]:
#Show variance for each principal component
explained_variance = pca_model.explainedVariance
total_variance = sum(explained_variance)
explained_variance_ratio = [var / total_variance for var in explained_variance]

print("Variance explained by principal component:")
for i, ratio in enumerate(explained_variance_ratio):
    print(f"Principal component {i+1}: {ratio:.4f}")

Variance explained by principal component:
Principal component 1: 0.2644
Principal component 2: 0.1483
Principal component 3: 0.1021
Principal component 4: 0.0983
Principal component 5: 0.0831
Principal component 6: 0.0749
Principal component 7: 0.0668
Principal component 8: 0.0592
Principal component 9: 0.0531
Principal component 10: 0.0498


In [39]:
#Number of rows and columns after the PCA
print("Number of rows:", df_pca.count())
print("Number of columns:", len(df_pca.columns))

Number of rows: 8893
Number of columns: 154


In [40]:
spark.stop()