In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=132bd84b880d7b4631c2a2b42f6fbff97d7cf8436e3cd9581f5df2123e423193
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [2]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("PySpark Regression Example") \
    .master("local[*]") \
    .getOrCreate()

In [3]:
#spark.stop()

In [4]:
df = spark.read.csv("loan_prediction.csv", header=True, inferSchema=True)
df.show()

+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+
| Loan_ID|Gender|Married|Dependents|   Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+
|LP001015|  Male|    Yes|         0|    Graduate|           No|           5720|                0|       110|             360|             1|        Urban|
|LP001022|  Male|    Yes|         1|    Graduate|           No|           3076|             1500|       126|             360|             1|        Urban|
|LP001031|  Male|    Yes|         2|    Graduate|           No|           5000|             1800|       208|             360|             1|        Urban|
|LP001035|  Male|    Yes|         2|    Graduate|           No|       

In [5]:
df

DataFrame[Loan_ID: string, Gender: string, Married: string, Dependents: string, Education: string, Self_Employed: string, ApplicantIncome: int, CoapplicantIncome: int, LoanAmount: int, Loan_Amount_Term: int, Credit_History: int, Property_Area: string]

In [6]:
# Number of rows
num_rows = df.count()

# Number of columns
num_cols = len(df.columns)

# Print shape
print(f"Shape: ({num_rows}, {num_cols})")


Shape: (367, 12)


In [7]:
## Printing schema
df.printSchema()

root
 |-- Loan_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Married: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = true)
 |-- ApplicantIncome: integer (nullable = true)
 |-- CoapplicantIncome: integer (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- Loan_Amount_Term: integer (nullable = true)
 |-- Credit_History: integer (nullable = true)
 |-- Property_Area: string (nullable = true)



In [8]:
df.describe().show()

+-------+--------+------+-------+-----------------+------------+-------------+-----------------+------------------+------------------+------------------+------------------+-------------+
|summary| Loan_ID|Gender|Married|       Dependents|   Education|Self_Employed|  ApplicantIncome| CoapplicantIncome|        LoanAmount|  Loan_Amount_Term|    Credit_History|Property_Area|
+-------+--------+------+-------+-----------------+------------+-------------+-----------------+------------------+------------------+------------------+------------------+-------------+
|  count|     367|   356|    367|              357|         367|          344|              367|               367|               362|               361|               338|          367|
|   mean|    NULL|  NULL|   NULL|0.555205047318612|        NULL|         NULL|4805.599455040872|1569.5776566757493|136.13259668508286|342.53739612188366|0.8254437869822485|         NULL|
| stddev|    NULL|  NULL|   NULL|0.788131727070386|        NULL| 

In [9]:
# count no of  null values in all column
from pyspark.sql.functions import col, isnan, when, count

df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()


+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+
|Loan_ID|Gender|Married|Dependents|Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|
+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+
|      0|    11|      0|        10|        0|           23|              0|                0|         5|               6|            29|            0|
+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+



In [10]:
# to check individaul column null values
df.select(count(when(col("Self_Employed").isNull(), "Self_Employed")).alias("null_count")).show()


+----------+
|null_count|
+----------+
|        23|
+----------+



In [11]:
# drop records having null values
# df_clean = df.na.drop()

# drop records in specific column
df = df.na.drop(subset=["LoanAmount"])
df.select(count(when(col("LoanAmount").isNull(), "LoanAmount")).alias("null_count")).show()

+----------+
|null_count|
+----------+
|         0|
+----------+



In [12]:
# calculate mean value of ApplicantIncome
from pyspark.sql.functions import mean

mean_income = df.select(mean(col("ApplicantIncome")))  # Calculate mean
mean_income.collect()[0][0]

4769.513812154696

In [14]:
# calculate man value of Loan_Amount_term
from pyspark.sql.functions import mean

mean_Loan_amount = df.select(mean(col("Loan_Amount_Term"))).collect()[0][0]
print(mean_Loan_amount)  # Calculate mean
df= df.fillna({"Loan_Amount_Term": mean_Loan_amount})  # Fill missing values

341.939226519337


In [15]:
print(type(mean_Loan_amount))  # Should be 'int' or 'float'

<class 'float'>


In [16]:
# replacing all null values in numerical column with mean
from pyspark.sql.functions import mean, col

# Identify numerical columns
numeric_cols = [field.name for field in df.schema.fields if field.dataType.simpleString() in ['int', 'double', 'float']]

# Compute mean for each numerical column
mean_values = df.select([mean(col(c)).alias(c) for c in numeric_cols]).collect()[0].asDict()

# Fill null values with corresponding mean
df = df.fillna(mean_values)

df.show()


+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+
| Loan_ID|Gender|Married|Dependents|   Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+
|LP001015|  Male|    Yes|         0|    Graduate|           No|           5720|                0|       110|             360|             1|        Urban|
|LP001022|  Male|    Yes|         1|    Graduate|           No|           3076|             1500|       126|             360|             1|        Urban|
|LP001031|  Male|    Yes|         2|    Graduate|           No|           5000|             1800|       208|             360|             1|        Urban|
|LP001035|  Male|    Yes|         2|    Graduate|           No|       

In [17]:
# count no null values in all column
from pyspark.sql.functions import col, isnan, when, count

df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()


+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+
|Loan_ID|Gender|Married|Dependents|Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|
+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+
|      0|    11|      0|         9|        0|           23|              0|                0|         0|               0|             0|            0|
+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+



In [18]:
# replacing null values in all categorical column with mode value
from pyspark.sql.functions import col, count

def get_mode(df, column):
    """Returns the most frequent value (mode) of a categorical column."""
    mode_value = df.groupBy(column).count().orderBy(col("count").desc()).first()[0]
    return mode_value

# Identify categorical columns
categorical_cols = [field.name for field in df.schema.fields if field.dataType.simpleString() == "string"]

# Compute mode for each categorical column and fill nulls
for col_name in categorical_cols:
    mode_val = get_mode(df, col_name)
    df = df.fillna({col_name: mode_val})

df.show()


+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+
| Loan_ID|Gender|Married|Dependents|   Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+
|LP001015|  Male|    Yes|         0|    Graduate|           No|           5720|                0|       110|             360|             1|        Urban|
|LP001022|  Male|    Yes|         1|    Graduate|           No|           3076|             1500|       126|             360|             1|        Urban|
|LP001031|  Male|    Yes|         2|    Graduate|           No|           5000|             1800|       208|             360|             1|        Urban|
|LP001035|  Male|    Yes|         2|    Graduate|           No|       

In [19]:
# count null values in all column
from pyspark.sql.functions import col, isnan, when, count

df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()


+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+
|Loan_ID|Gender|Married|Dependents|Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|
+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+
|      0|     0|      0|         0|        0|            0|              0|                0|         0|               0|             0|            0|
+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+



In [20]:
# check outlier in loanamount column
from pyspark.sql.functions import mean, stddev, col

# Compute mean and standard deviation
stats = df.select(
    mean(col("LoanAmount")).alias("mean"),
    stddev(col("LoanAmount")).alias("stddev")
).collect()[0]

mean_value = stats["mean"]
stddev_value = stats["stddev"]
print(mean_value)
print(stddev_value)
# Compute Z-score and filter outliers
df_outliers = df.filter(
    ((col("LoanAmount") - mean_value) / stddev_value > 3) |  # Right-side outliers (Z > 3)
    ((col("LoanAmount") - mean_value) / stddev_value < -3)   # Left-side outliers (Z < -3)
)

df_outliers.show()


136.13259668508286
61.366652393018235
+--------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+
| Loan_ID|Gender|Married|Dependents|Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|
+--------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+
|LP001428|  Male|    Yes|        3+| Graduate|           No|          72529|                0|       360|             360|             1|        Urban|
|LP001483|  Male|    Yes|        3+| Graduate|           No|          13518|                0|       390|             360|             1|        Rural|
|LP001791|  Male|    Yes|         0| Graduate|          Yes|          32000|                0|       550|             360|             0|    Semiurban|
|LP002059|  Male|    Yes|         2| Graduate|    

In [18]:
df.schema.fields

[StructField('Loan_ID', StringType(), False),
 StructField('Gender', StringType(), False),
 StructField('Married', StringType(), False),
 StructField('Dependents', StringType(), False),
 StructField('Education', StringType(), False),
 StructField('Self_Employed', StringType(), False),
 StructField('ApplicantIncome', IntegerType(), True),
 StructField('CoapplicantIncome', IntegerType(), True),
 StructField('LoanAmount', IntegerType(), True),
 StructField('Loan_Amount_Term', IntegerType(), True),
 StructField('Credit_History', IntegerType(), True),
 StructField('Property_Area', StringType(), False)]

In [21]:
# finding oulier bt IQR method & replacing with upper & lower boundary
from pyspark.sql.functions import when

def cap_outliers(df, column):
    q1 = df.approxQuantile(column, [0.25], 0.01)[0]
    q3 = df.approxQuantile(column, [0.75], 0.01)[0]
    iqr = q3 - q1

    lower_bound = q1 - (1.5 * iqr)
    upper_bound = q3 + (1.5 * iqr)

    # Replace outliers with upper/lower bound
    return df.withColumn(column,
                         when(col(column) < lower_bound, lower_bound)
                         .when(col(column) > upper_bound, upper_bound)
                         .otherwise(col(column)))

# Apply function to cap outliers in a column
df = cap_outliers(df, "LoanAmount")
df.show()


+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+
| Loan_ID|Gender|Married|Dependents|   Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+
|LP001015|  Male|    Yes|         0|    Graduate|           No|           5720|                0|     110.0|             360|             1|        Urban|
|LP001022|  Male|    Yes|         1|    Graduate|           No|           3076|             1500|     126.0|             360|             1|        Urban|
|LP001031|  Male|    Yes|         2|    Graduate|           No|           5000|             1800|     208.0|             360|             1|        Urban|
|LP001035|  Male|    Yes|         2|    Graduate|           No|       

In [22]:
df.show()

+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+
| Loan_ID|Gender|Married|Dependents|   Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+
|LP001015|  Male|    Yes|         0|    Graduate|           No|           5720|                0|     110.0|             360|             1|        Urban|
|LP001022|  Male|    Yes|         1|    Graduate|           No|           3076|             1500|     126.0|             360|             1|        Urban|
|LP001031|  Male|    Yes|         2|    Graduate|           No|           5000|             1800|     208.0|             360|             1|        Urban|
|LP001035|  Male|    Yes|         2|    Graduate|           No|       

# **Feature** **Engineering**

In [23]:
categorical_cols = [field.name for field in df.schema.fields if field.dataType.simpleString() == 'string']
categorical_cols

['Loan_ID',
 'Gender',
 'Married',
 'Dependents',
 'Education',
 'Self_Employed',
 'Property_Area']

In [24]:
# convert married colimn
from pyspark.ml.feature import StringIndexer

# Convert categorical column "married" to numerical
indexer = StringIndexer(inputCol="Married", outputCol="Married_Index")
df = indexer.fit(df).transform(df)

df.select("Married", "Married_Index").show()


+-------+-------------+
|Married|Married_Index|
+-------+-------------+
|    Yes|          0.0|
|    Yes|          0.0|
|    Yes|          0.0|
|    Yes|          0.0|
|     No|          1.0|
|    Yes|          0.0|
|     No|          1.0|
|    Yes|          0.0|
|    Yes|          0.0|
|     No|          1.0|
|     No|          1.0|
|    Yes|          0.0|
|     No|          1.0|
|    Yes|          0.0|
|     No|          1.0|
|     No|          1.0|
|    Yes|          0.0|
|    Yes|          0.0|
|    Yes|          0.0|
|     No|          1.0|
+-------+-------------+
only showing top 20 rows



In [25]:
columns_catg = ['Gender',
 'Married',
 'Dependents',
 'Education',
  'Self_employed',
  'Property_Area']

for col_name in columns_catg:
    print(f"Value counts for {col_name}:")
    df.groupBy(col_name).count().show()


Value counts for Gender:
+------+-----+
|Gender|count|
+------+-----+
|Female|   69|
|  Male|  293|
+------+-----+

Value counts for Married:
+-------+-----+
|Married|count|
+-------+-----+
|     No|  134|
|    Yes|  228|
+-------+-----+

Value counts for Dependents:
+----------+-----+
|Dependents|count|
+----------+-----+
|         0|  207|
|         1|   56|
|        3+|   40|
|         2|   59|
+----------+-----+

Value counts for Education:
+------------+-----+
|   Education|count|
+------------+-----+
|Not Graduate|   83|
|    Graduate|  279|
+------------+-----+

Value counts for Self_employed:
+-------------+-----+
|Self_employed|count|
+-------------+-----+
|           No|  325|
|          Yes|   37|
+-------------+-----+

Value counts for Property_Area:
+-------------+-----+
|Property_Area|count|
+-------------+-----+
|        Urban|  138|
|    Semiurban|  114|
|        Rural|  110|
+-------------+-----+



In [26]:
from pyspark.ml.feature import StringIndexer

# Convert categorical column "Gender" to numerical
indexer = StringIndexer(inputCol="Gender", outputCol="Gender_Index")
df = indexer.fit(df).transform(df)

df.select("Gender", "Gender_Index").show()


+------+------------+
|Gender|Gender_Index|
+------+------------+
|  Male|         0.0|
|  Male|         0.0|
|  Male|         0.0|
|  Male|         0.0|
|  Male|         0.0|
|  Male|         0.0|
|Female|         1.0|
|  Male|         0.0|
|  Male|         0.0|
|  Male|         0.0|
|  Male|         0.0|
|  Male|         0.0|
|  Male|         0.0|
|  Male|         0.0|
|Female|         1.0|
|  Male|         0.0|
|  Male|         0.0|
|  Male|         0.0|
|  Male|         0.0|
|  Male|         0.0|
+------+------------+
only showing top 20 rows



In [27]:
# use onehot enconding to convert categorical column education inti numerical column
from pyspark.ml.feature import OneHotEncoder

from pyspark.ml.feature import StringIndexer, OneHotEncoder

# Step 1: Convert String column to Numeric Index
indexer = StringIndexer(inputCol="Education", outputCol="Education_Index")
df = indexer.fit(df).transform(df)

# Step 2: Apply One-Hot Encoding
encoder = OneHotEncoder(inputCol="Education_Index", outputCol="Education_OHE", dropLast=False)
df = encoder.fit(df).transform(df)

# Show results
df.select("Education", "Education_Index", "Education_OHE").show()

+------------+---------------+-------------+
|   Education|Education_Index|Education_OHE|
+------------+---------------+-------------+
|    Graduate|            0.0|(2,[0],[1.0])|
|    Graduate|            0.0|(2,[0],[1.0])|
|    Graduate|            0.0|(2,[0],[1.0])|
|    Graduate|            0.0|(2,[0],[1.0])|
|Not Graduate|            1.0|(2,[1],[1.0])|
|Not Graduate|            1.0|(2,[1],[1.0])|
|Not Graduate|            1.0|(2,[1],[1.0])|
|Not Graduate|            1.0|(2,[1],[1.0])|
|    Graduate|            0.0|(2,[0],[1.0])|
|Not Graduate|            1.0|(2,[1],[1.0])|
|Not Graduate|            1.0|(2,[1],[1.0])|
|    Graduate|            0.0|(2,[0],[1.0])|
|    Graduate|            0.0|(2,[0],[1.0])|
|    Graduate|            0.0|(2,[0],[1.0])|
|    Graduate|            0.0|(2,[0],[1.0])|
|    Graduate|            0.0|(2,[0],[1.0])|
|    Graduate|            0.0|(2,[0],[1.0])|
|    Graduate|            0.0|(2,[0],[1.0])|
|    Graduate|            0.0|(2,[0],[1.0])|
|    Gradu

In [28]:
df = df.drop("Loan_ID")
df.show()


+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-------------+------------+---------------+-------------+
|Gender|Married|Dependents|   Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Married_Index|Gender_Index|Education_Index|Education_OHE|
+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-------------+------------+---------------+-------------+
|  Male|    Yes|         0|    Graduate|           No|           5720|                0|     110.0|             360|             1|        Urban|          0.0|         0.0|            0.0|(2,[0],[1.0])|
|  Male|    Yes|         1|    Graduate|           No|           3076|             1500|     126.0|             360|             1|        Urban|          0.0|         0.0|            0.0|

In [29]:
# convert dependent column
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# Convert String to Index First
indexer = StringIndexer(inputCol="Dependents", outputCol="Dependent_index")
df = indexer.fit(df).transform(df)

# Apply One-Hot Encoding
encoder = OneHotEncoder(inputCol="Dependent_index", outputCol="Dependent_OHE", dropLast=False)
df = encoder.fit(df).transform(df)

df.select("Dependents", "Dependent_index", "Dependent_OHE").show()


+----------+---------------+-------------+
|Dependents|Dependent_index|Dependent_OHE|
+----------+---------------+-------------+
|         0|            0.0|(4,[0],[1.0])|
|         1|            2.0|(4,[2],[1.0])|
|         2|            1.0|(4,[1],[1.0])|
|         2|            1.0|(4,[1],[1.0])|
|         0|            0.0|(4,[0],[1.0])|
|         0|            0.0|(4,[0],[1.0])|
|         1|            2.0|(4,[2],[1.0])|
|         2|            1.0|(4,[1],[1.0])|
|         2|            1.0|(4,[1],[1.0])|
|         0|            0.0|(4,[0],[1.0])|
|         0|            0.0|(4,[0],[1.0])|
|         1|            2.0|(4,[2],[1.0])|
|        3+|            3.0|(4,[3],[1.0])|
|         2|            1.0|(4,[1],[1.0])|
|         0|            0.0|(4,[0],[1.0])|
|         1|            2.0|(4,[2],[1.0])|
|         2|            1.0|(4,[1],[1.0])|
|        3+|            3.0|(4,[3],[1.0])|
|         0|            0.0|(4,[0],[1.0])|
|         0|            0.0|(4,[0],[1.0])|
+----------

In [30]:
# convert Self_Employed
from pyspark.ml.feature import StringIndexer

# Convert categorical column "Gender" to numerical
indexer = StringIndexer(inputCol="Self_Employed", outputCol="Employed_Index")
df = indexer.fit(df).transform(df)

df.select("Self_Employed", "Employed_Index").show()

+-------------+--------------+
|Self_Employed|Employed_Index|
+-------------+--------------+
|           No|           0.0|
|           No|           0.0|
|           No|           0.0|
|           No|           0.0|
|           No|           0.0|
|          Yes|           1.0|
|           No|           0.0|
|           No|           0.0|
|           No|           0.0|
|           No|           0.0|
|           No|           0.0|
|           No|           0.0|
|           No|           0.0|
|           No|           0.0|
|           No|           0.0|
|           No|           0.0|
|           No|           0.0|
|           No|           0.0|
|           No|           0.0|
|           No|           0.0|
+-------------+--------------+
only showing top 20 rows



In [31]:
# convert property_area column
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# Convert String to Index First
indexer = StringIndexer(inputCol="Property_Area", outputCol="property_index")
df = indexer.fit(df).transform(df)

# Apply One-Hot Encoding
encoder = OneHotEncoder(inputCol="property_index", outputCol="property_OHE", dropLast=False)
df = encoder.fit(df).transform(df)

df.select("Property_Area", "property_index", "property_OHE").show()


+-------------+--------------+-------------+
|Property_Area|property_index| property_OHE|
+-------------+--------------+-------------+
|        Urban|           0.0|(3,[0],[1.0])|
|        Urban|           0.0|(3,[0],[1.0])|
|        Urban|           0.0|(3,[0],[1.0])|
|        Urban|           0.0|(3,[0],[1.0])|
|        Urban|           0.0|(3,[0],[1.0])|
|        Urban|           0.0|(3,[0],[1.0])|
|    Semiurban|           1.0|(3,[1],[1.0])|
|        Rural|           2.0|(3,[2],[1.0])|
|        Urban|           0.0|(3,[0],[1.0])|
|    Semiurban|           1.0|(3,[1],[1.0])|
|        Urban|           0.0|(3,[0],[1.0])|
|    Semiurban|           1.0|(3,[1],[1.0])|
|        Urban|           0.0|(3,[0],[1.0])|
|    Semiurban|           1.0|(3,[1],[1.0])|
|    Semiurban|           1.0|(3,[1],[1.0])|
|        Urban|           0.0|(3,[0],[1.0])|
|        Urban|           0.0|(3,[0],[1.0])|
|    Semiurban|           1.0|(3,[1],[1.0])|
|        Urban|           0.0|(3,[0],[1.0])|
|    Semiu

In [32]:
categorical_cols = [field.name for field in df.schema.fields if field.dataType.simpleString() == 'string']
categorical_cols

['Gender',
 'Married',
 'Dependents',
 'Education',
 'Self_Employed',
 'Property_Area']

In [33]:
df.columns

['Gender',
 'Married',
 'Dependents',
 'Education',
 'Self_Employed',
 'ApplicantIncome',
 'CoapplicantIncome',
 'LoanAmount',
 'Loan_Amount_Term',
 'Credit_History',
 'Property_Area',
 'Married_Index',
 'Gender_Index',
 'Education_Index',
 'Education_OHE',
 'Dependent_index',
 'Dependent_OHE',
 'Employed_Index',
 'property_index',
 'property_OHE']

In [34]:
df = df.drop('Gender',
 'Married',
 'Dependents',
 'Education',
 'Self_Employed',
 'Property_Area',
 'Education_Index',
 'Dependent_index',
 'Employed_Index',
 'property_index',
  )
df.show()

+---------------+-----------------+----------+----------------+--------------+-------------+------------+-------------+-------------+-------------+
|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Married_Index|Gender_Index|Education_OHE|Dependent_OHE| property_OHE|
+---------------+-----------------+----------+----------------+--------------+-------------+------------+-------------+-------------+-------------+
|           5720|                0|     110.0|             360|             1|          0.0|         0.0|(2,[0],[1.0])|(4,[0],[1.0])|(3,[0],[1.0])|
|           3076|             1500|     126.0|             360|             1|          0.0|         0.0|(2,[0],[1.0])|(4,[2],[1.0])|(3,[0],[1.0])|
|           5000|             1800|     208.0|             360|             1|          0.0|         0.0|(2,[0],[1.0])|(4,[1],[1.0])|(3,[0],[1.0])|
|           2340|             2546|     100.0|             360|             0|          0.0|         0.0|(2,[0],

In [35]:
df.columns

['ApplicantIncome',
 'CoapplicantIncome',
 'LoanAmount',
 'Loan_Amount_Term',
 'Credit_History',
 'Married_Index',
 'Gender_Index',
 'Education_OHE',
 'Dependent_OHE',
 'property_OHE']

In [37]:
## Descriptive analysis
# df.describe().toPandas().transpose()

In [38]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['ApplicantIncome',
 'CoapplicantIncome',
 'Loan_Amount_Term',
 'Credit_History',
 'Married_Index',
 'Gender_Index',
 'Education_OHE',
 'Dependent_OHE',
 'property_OHE'], outputCol = 'features')
v_df = vectorAssembler.transform(df)
v_df = v_df.select(['features', 'LoanAmount'])
v_df.show(3)

+--------------------+----------+
|            features|LoanAmount|
+--------------------+----------+
|(15,[0,2,3,6,8,12...|     110.0|
|(15,[0,1,2,3,6,10...|     126.0|
|(15,[0,1,2,3,6,9,...|     208.0|
+--------------------+----------+
only showing top 3 rows



In [40]:
# splitting data in training & testing
splits = v_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [41]:
splits = df.randomSplit([0.7, 0.3])
splits

[DataFrame[ApplicantIncome: int, CoapplicantIncome: int, LoanAmount: double, Loan_Amount_Term: int, Credit_History: int, Married_Index: double, Gender_Index: double, Education_OHE: vector, Dependent_OHE: vector, property_OHE: vector],
 DataFrame[ApplicantIncome: int, CoapplicantIncome: int, LoanAmount: double, Loan_Amount_Term: int, Credit_History: int, Married_Index: double, Gender_Index: double, Education_OHE: vector, Dependent_OHE: vector, property_OHE: vector]]

In [42]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='LoanAmount', maxIter=10)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [0.0033207875718255114,0.005007277107368784,0.07497097302829422,4.627223998898075,-8.364533232037278,-5.958092801590558,5.951632949185885,-5.951632949186764,-6.010225672407012,6.282409822422225,-1.0883883001807813,7.673425416226339,-2.2908773091359977,0.2903175463764191,2.4516975064474553]
Intercept: 80.59276250686743


In [43]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 40.526641
r2: 0.248820


In [44]:
lr_predictions = lr_model.transform(test_df)

from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="LoanAmount",metricName="r2")

lr_predictions.select("prediction","LoanAmount","features").show(5)
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+----------+--------------------+
|        prediction|LoanAmount|            features|
+------------------+----------+--------------------+
| 98.21963224147272|     104.0|(15,[0,1,2,3,4,6,...|
|109.69784949252715|     125.0|(15,[0,1,2,3,4,6,...|
|130.59391226521961|     155.0|(15,[0,1,2,3,4,6,...|
|123.42244761748228|     138.0|(15,[0,1,2,3,4,6,...|
| 121.5695232775011|     122.0|(15,[0,1,2,3,4,6,...|
+------------------+----------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.139098


In [45]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 10
objectiveHistory: [0.4999999999999991, 0.3927538900002707, 0.3783377936141834, 0.3756360597388744, 0.37560156939960443, 0.37559118112101597, 0.37559054419519544, 0.37559022621419125, 0.3755902140759315, 0.3755902116819052, 0.37559021166976025]
+-------------------+
|          residuals|
+-------------------+
| 12.045270016204086|
|  46.72997077971473|
|  -5.44344633547081|
| 47.659834783744714|
|-12.274228647919315|
| 28.643690456503123|
|-11.404009748174843|
| 21.285713280275957|
| -96.23356086715859|
|-28.553335024512478|
|  46.16561378356704|
|  67.66965186082211|
| 19.626853002021505|
| -1.521079066133126|
| 10.839182281236901|
|-16.017924222406748|
| -20.86081260980643|
|  75.72533441237243|
|  41.51214022322294|
|-23.483858653843924|
+-------------------+
only showing top 20 rows



In [46]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","LoanAmount","features").show()

+------------------+----------+--------------------+
|        prediction|LoanAmount|            features|
+------------------+----------+--------------------+
| 98.21963224147272|     104.0|(15,[0,1,2,3,4,6,...|
|109.69784949252715|     125.0|(15,[0,1,2,3,4,6,...|
|130.59391226521961|     155.0|(15,[0,1,2,3,4,6,...|
|123.42244761748228|     138.0|(15,[0,1,2,3,4,6,...|
| 121.5695232775011|     122.0|(15,[0,1,2,3,4,6,...|
| 138.6807680830941|     200.0|(15,[0,1,2,3,4,6,...|
| 128.0428964011846|     130.0|(15,[0,1,2,3,4,6,...|
| 140.9989155945713|     187.0|(15,[0,1,2,3,4,6,...|
|187.16755244534778|      40.0|(15,[0,1,2,3,4,6,...|
|125.29850914972965|      77.0|(15,[0,1,2,3,4,7,...|
|126.18874980374854|     100.0|(15,[0,1,2,3,4,7,...|
|207.81737990965235|     237.5|(15,[0,1,2,3,5,6,...|
|  146.463040866679|     173.0|(15,[0,1,2,3,5,6,...|
|106.29086745761151|     181.0|(15,[0,1,2,3,5,7,...|
|131.36860987269637|      90.0|(15,[0,1,2,3,6,8,...|
|126.39241404431365|     113.0|(15,[0,1,2,3,6,

## Decision tree regression

In [55]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'LoanAmount')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="LoanAmount", predictionCol="prediction", metricName="r2")
r2 = dt_evaluator.evaluate(dt_predictions)
print("R2 score = %g" % r2)

R2 score = 0.040272


In [53]:
train_df.show()

+--------------------+----------+
|            features|LoanAmount|
+--------------------+----------+
|(15,[0,1,2,3,4,6,...|     138.0|
|(15,[0,1,2,3,4,6,...|     180.0|
|(15,[0,1,2,3,4,6,...|     120.0|
|(15,[0,1,2,3,4,6,...|     187.0|
|(15,[0,1,2,3,4,6,...|     100.0|
|(15,[0,1,2,3,4,6,...|     152.0|
|(15,[0,1,2,3,4,6,...|     110.0|
|(15,[0,1,2,3,4,6,...|     152.0|
|(15,[0,1,2,3,4,6,...|     130.0|
|(15,[0,1,2,3,4,6,...|     106.0|
|(15,[0,1,2,3,4,6,...|     187.0|
|(15,[0,1,2,3,4,6,...|     209.0|
|(15,[0,1,2,3,4,7,...|     125.0|
|(15,[0,1,2,3,4,7,...|     108.0|
|(15,[0,1,2,3,4,7,...|     123.0|
|(15,[0,1,2,3,5,6,...|     104.0|
|(15,[0,1,2,3,5,6,...|     108.0|
|(15,[0,1,2,3,5,6,...|     200.0|
|(15,[0,1,2,3,5,6,...|     165.0|
|(15,[0,1,2,3,5,6,...|     122.0|
+--------------------+----------+
only showing top 20 rows



In [57]:

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
dt_hyper=DecisionTreeRegressor(featuresCol = 'features', labelCol = 'LoanAmount')

#Hyper-Parameter Tuning
paramGrid_dt = ParamGridBuilder() \
    .addGrid(dt_hyper.maxDepth, [5, 7]) \
    .addGrid(dt_hyper.maxBins, [10,20]) \
    .build()
crossval_dt = CrossValidator(estimator=dt_hyper,
                             estimatorParamMaps=paramGrid_dt,
                             evaluator=RegressionEvaluator(labelCol="LoanAmount"),
                             numFolds=5)

#fit model to data train
dt_model_hyper = crossval_dt.fit(train_df)

In [59]:
#Transform model to data test
dt_prediction_hyper= dt_model_hyper.transform(test_df)

In [61]:
#View prediction, label and featues from prediction
dt_prediction_hyper.select("prediction","LoanAmount","features").show(5)

+------------------+----------+--------------------+
|        prediction|LoanAmount|            features|
+------------------+----------+--------------------+
|             114.9|     104.0|(15,[0,1,2,3,4,6,...|
|             114.9|     125.0|(15,[0,1,2,3,4,6,...|
|164.53703703703704|     155.0|(15,[0,1,2,3,4,6,...|
|118.92156862745098|     138.0|(15,[0,1,2,3,4,6,...|
|             114.9|     122.0|(15,[0,1,2,3,4,6,...|
+------------------+----------+--------------------+
only showing top 5 rows



In [64]:
#Calculate RMSE
eval_rmse=RegressionEvaluator(labelCol="LoanAmount", predictionCol="prediction", metricName="rmse")
print("Root Mean Squared Error (RMSE) on Decision Tree Model=%g" % eval_rmse.evaluate(dt_prediction_hyper))

Root Mean Squared Error (RMSE) on Decision Tree Model=47.7435


## Gradient-boosted tree regression

In [48]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'LoanAmount', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'LoanAmount', 'features').show(5)

+------------------+----------+--------------------+
|        prediction|LoanAmount|            features|
+------------------+----------+--------------------+
|153.81934944740289|     138.0|(15,[0,1,2,3,4,6,...|
| 86.38288206307196|     125.0|(15,[0,1,2,3,4,6,...|
| 210.2617166655304|     155.0|(15,[0,1,2,3,4,6,...|
|124.97702474519572|     138.0|(15,[0,1,2,3,4,6,...|
| 161.6157894479196|     200.0|(15,[0,1,2,3,4,6,...|
+------------------+----------+--------------------+
only showing top 5 rows



In [49]:
gbt_evaluator = RegressionEvaluator(
    labelCol="LoanAmount", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 46.8072
