<a href="https://colab.research.google.com/github/JaiVR/Spotify_songs_Analysis/blob/main/ML_assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.appName("Spotify_popularity").getOrCreate()

print(f"Spark Version: {spark.version}")

Spark Version: 3.5.3


In [5]:
df = spark.read.csv("loan_data.csv", header=True, inferSchema=True)
df.show()

+----------+-------------+----------------+-------------+--------------+---------------------+---------+-----------------+-------------+-------------------+--------------------------+------------------------------+------------+
|person_age|person_gender|person_education|person_income|person_emp_exp|person_home_ownership|loan_amnt|      loan_intent|loan_int_rate|loan_percent_income|cb_person_cred_hist_length|previous_loan_defaults_on_file|credit_score|
+----------+-------------+----------------+-------------+--------------+---------------------+---------+-----------------+-------------+-------------------+--------------------------+------------------------------+------------+
|        22|       female|          Master|        71948|             0|                 RENT|    35000|         PERSONAL|        16.02|               0.49|                         3|                            No|         561|
|        21|       female|     High School|        12282|             0|                

#Preprocessing

In [6]:
df.printSchema()

root
 |-- person_age: integer (nullable = true)
 |-- person_gender: string (nullable = true)
 |-- person_education: string (nullable = true)
 |-- person_income: integer (nullable = true)
 |-- person_emp_exp: integer (nullable = true)
 |-- person_home_ownership: string (nullable = true)
 |-- loan_amnt: integer (nullable = true)
 |-- loan_intent: string (nullable = true)
 |-- loan_int_rate: double (nullable = true)
 |-- loan_percent_income: double (nullable = true)
 |-- cb_person_cred_hist_length: integer (nullable = true)
 |-- previous_loan_defaults_on_file: string (nullable = true)
 |-- credit_score: integer (nullable = true)



In [7]:
numeric_columns = ['person_age',
 'person_income',
 'person_emp_exp',
 'loan_amnt',
 'loan_int_rate',
 'loan_percent_income',
 'cb_person_cred_hist_length','credit_score']

from pyspark.sql.functions import col

for col_name in numeric_columns:
    df = df.withColumn(col_name, col(col_name).cast("double"))

In [8]:
df.printSchema()

root
 |-- person_age: double (nullable = true)
 |-- person_gender: string (nullable = true)
 |-- person_education: string (nullable = true)
 |-- person_income: double (nullable = true)
 |-- person_emp_exp: double (nullable = true)
 |-- person_home_ownership: string (nullable = true)
 |-- loan_amnt: double (nullable = true)
 |-- loan_intent: string (nullable = true)
 |-- loan_int_rate: double (nullable = true)
 |-- loan_percent_income: double (nullable = true)
 |-- cb_person_cred_hist_length: double (nullable = true)
 |-- previous_loan_defaults_on_file: string (nullable = true)
 |-- credit_score: double (nullable = true)



In [9]:
#checking missing values
from pyspark.sql.functions import col, sum
for column in df.columns:
  missing_count = df.filter(col(column).isNull()).count()
  print(f"'{column}' '=' {missing_count}")

'person_age' '=' 0
'person_gender' '=' 0
'person_education' '=' 0
'person_income' '=' 0
'person_emp_exp' '=' 0
'person_home_ownership' '=' 0
'loan_amnt' '=' 0
'loan_intent' '=' 0
'loan_int_rate' '=' 0
'loan_percent_income' '=' 0
'cb_person_cred_hist_length' '=' 0
'previous_loan_defaults_on_file' '=' 0
'credit_score' '=' 0


In [10]:
train, test = df.randomSplit([0.7, 0.3])

In [11]:
numerical_features_lst = train.columns
numerical_features_lst.remove('person_gender')
numerical_features_lst.remove('person_education')
numerical_features_lst.remove('person_home_ownership')
numerical_features_lst.remove('loan_intent')
numerical_features_lst.remove('previous_loan_defaults_on_file')
numerical_features_lst.remove('credit_score')

numerical_features_lst

['person_age',
 'person_income',
 'person_emp_exp',
 'loan_amnt',
 'loan_int_rate',
 'loan_percent_income',
 'cb_person_cred_hist_length']

In [12]:
from pyspark.ml.feature import VectorAssembler

numerical_vector_assembler = VectorAssembler(inputCols=numerical_features_lst,outputCol='numerical_feature_vector')

train = numerical_vector_assembler.transform(train)
test = numerical_vector_assembler.transform(test)

train.show(2)

+----------+-------------+----------------+-------------+--------------+---------------------+---------+-----------------+-------------+-------------------+--------------------------+------------------------------+------------+------------------------+
|person_age|person_gender|person_education|person_income|person_emp_exp|person_home_ownership|loan_amnt|      loan_intent|loan_int_rate|loan_percent_income|cb_person_cred_hist_length|previous_loan_defaults_on_file|credit_score|numerical_feature_vector|
+----------+-------------+----------------+-------------+--------------+---------------------+---------+-----------------+-------------+-------------------+--------------------------+------------------------------+------------+------------------------+
|      20.0|       female|       Associate|      42674.0|           0.0|                 RENT|   7000.0|          MEDICAL|        13.61|               0.16|                       2.0|                           Yes|       636.0|    [20.0,4267

In [13]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol='numerical_feature_vector',
                        outputCol='scaled_numerical_feature_vector',
                        withStd=True, withMean=True)

scaler = scaler.fit(train)

train = scaler.transform(train)
test = scaler.transform(test)

train.show(3)

+----------+-------------+----------------+-------------+--------------+---------------------+---------+-----------------+-------------+-------------------+--------------------------+------------------------------+------------+------------------------+-------------------------------+
|person_age|person_gender|person_education|person_income|person_emp_exp|person_home_ownership|loan_amnt|      loan_intent|loan_int_rate|loan_percent_income|cb_person_cred_hist_length|previous_loan_defaults_on_file|credit_score|numerical_feature_vector|scaled_numerical_feature_vector|
+----------+-------------+----------------+-------------+--------------+---------------------+---------+-----------------+-------------+-------------------+--------------------------+------------------------------+------------+------------------------+-------------------------------+
|      20.0|       female|       Associate|      42674.0|           0.0|                 RENT|   7000.0|          MEDICAL|        13.61|         

In [14]:
categorical_features_lst = train.columns
categorical_features_lst.remove('person_age')
categorical_features_lst.remove('person_income')
categorical_features_lst.remove('loan_amnt')
categorical_features_lst.remove('loan_int_rate')
categorical_features_lst.remove('loan_percent_income')
categorical_features_lst.remove('person_emp_exp')
categorical_features_lst.remove('scaled_numerical_feature_vector')
categorical_features_lst.remove('numerical_feature_vector')
categorical_features_lst.remove('credit_score')


categorical_features_lst

['person_gender',
 'person_education',
 'person_home_ownership',
 'loan_intent',
 'cb_person_cred_hist_length',
 'previous_loan_defaults_on_file']

In [15]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

for feature in categorical_features_lst:
    indexer = StringIndexer(inputCol=feature, outputCol=f"{feature}_index")
    indexer = indexer.fit(train)
    train = indexer.transform(train)
    test = indexer.transform(test)

one_hot_encoded_cols = []
for feature in categorical_features_lst:
    one_hot_encoder = OneHotEncoder(inputCol=f"{feature}_index", outputCol=f"{feature}_one_hot")
    one_hot_encoder = one_hot_encoder.fit(train)
    train = one_hot_encoder.transform(train)
    test = one_hot_encoder.transform(test)
    one_hot_encoded_cols.append(f"{feature}_one_hot")

assembler = VectorAssembler(inputCols=one_hot_encoded_cols, outputCol="categorical_features_vector")
train = assembler.transform(train)
test = assembler.transform(test)

train.show(3)

+----------+-------------+----------------+-------------+--------------+---------------------+---------+-----------------+-------------+-------------------+--------------------------+------------------------------+------------+------------------------+-------------------------------+-------------------+----------------------+---------------------------+-----------------+--------------------------------+------------------------------------+---------------------+------------------------+-----------------------------+-------------------+----------------------------------+--------------------------------------+---------------------------+
|person_age|person_gender|person_education|person_income|person_emp_exp|person_home_ownership|loan_amnt|      loan_intent|loan_int_rate|loan_percent_income|cb_person_cred_hist_length|previous_loan_defaults_on_file|credit_score|numerical_feature_vector|scaled_numerical_feature_vector|person_gender_index|person_education_index|person_home_ownership_index|lo

In [16]:
assembler = VectorAssembler(inputCols=['scaled_numerical_feature_vector',
                                       'categorical_features_vector'],
                            outputCol='final_feature_vector')

train = assembler.transform(train)
test = assembler.transform(test)

#PCA

#Regression

In [17]:
#linear regression
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='scaled_numerical_feature_vector',
                      labelCol='credit_score')
lr = lr.fit(train)
lr

pred_train_df = lr.transform(train).withColumnRenamed('prediction',
                                                      'predicted_credit_score')
pred_train_df.show(5)

pred_test_df = lr.transform(test).withColumnRenamed('prediction', 'predicted_credit_score')

pred_test_df.show(5)

+----------+-------------+----------------+-------------+--------------+---------------------+---------+-----------------+-------------+-------------------+--------------------------+------------------------------+------------+------------------------+-------------------------------+-------------------+----------------------+---------------------------+-----------------+--------------------------------+------------------------------------+---------------------+------------------------+-----------------------------+-------------------+----------------------------------+--------------------------------------+---------------------------+--------------------+----------------------+
|person_age|person_gender|person_education|person_income|person_emp_exp|person_home_ownership|loan_amnt|      loan_intent|loan_int_rate|loan_percent_income|cb_person_cred_hist_length|previous_loan_defaults_on_file|credit_score|numerical_feature_vector|scaled_numerical_feature_vector|person_gender_index|person_ed

In [19]:
#linear regression metrics
predictions_and_actuals = pred_test_df[['predicted_credit_score',
                                        'credit_score']]

predictions_and_actuals_rdd = predictions_and_actuals.rdd
predictions_and_actuals_rdd = predictions_and_actuals_rdd.map(tuple)


from pyspark.mllib.evaluation import RegressionMetrics

metrics = RegressionMetrics(predictions_and_actuals_rdd)

s = '''
Root Mean Squared Error: {0}
Mean Absolute Error:     {1}
'''.format(metrics.rootMeanSquaredError,
           metrics.meanAbsoluteError,
           )

print(s)




Root Mean Squared Error: 49.383455272933666
Mean Absolute Error:     39.591785021511505



#Classification

Classification before PCA

Classification after PCA

#Clustering