In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [113]:
!pip install pyspark

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Data Preprocessing").getOrCreate()

df = spark.read.csv("/content/drive/My Drive/compute_datasets/Data.csv", header=True, inferSchema=True)

print(f"Shape of Data: {df.count(), len(df.columns)}")
df.show(10)

Shape of Data: (10, 4)
+-------+----+------+---------+
|Country| Age|Salary|Purchased|
+-------+----+------+---------+
| France|  44| 72000|      Yes|
|  Spain|  27| 48000|      Yes|
|Germany|  30| 54000|       No|
|  Spain|  38| 61000|       No|
|Germany|  40|  NULL|      Yes|
| France|  35| 58000|      Yes|
|  Spain|NULL| 52000|       No|
| France|  48| 79000|      Yes|
|Germany|  50| 83000|       No|
| France|  37| 67000|      Yes|
+-------+----+------+---------+



In [114]:
import pyspark.sql.functions as psql

# Checking for missing values
missing_values = [psql.count(
                      psql.when(
                          psql.isnan(col) | psql.col(col).isNull(), col
                      )
                  ).alias(col) for col in df.columns]

df.select(missing_values).show()

mean_age = df.select(psql.mean(df['age'])).collect()[0][0]
mean_sal = df.select(psql.mean(df['salary'])).collect()[0][0]

# Updating the dataset with mean values as average of ages and salary makes more sense than mode or median
updates = {
    "age": round(mean_age),
    "salary": round(mean_sal)
}
df = df.na.fill(updates)

df.show(10)

+-------+---+------+---------+
|Country|Age|Salary|Purchased|
+-------+---+------+---------+
|      0|  1|     1|        0|
+-------+---+------+---------+

+-------+---+------+---------+
|Country|Age|Salary|Purchased|
+-------+---+------+---------+
| France| 44| 72000|      Yes|
|  Spain| 27| 48000|      Yes|
|Germany| 30| 54000|       No|
|  Spain| 38| 61000|       No|
|Germany| 40| 63778|      Yes|
| France| 35| 58000|      Yes|
|  Spain| 39| 52000|       No|
| France| 48| 79000|      Yes|
|Germany| 50| 83000|       No|
| France| 37| 67000|      Yes|
+-------+---+------+---------+



In [115]:
from pyspark.ml.feature import MinMaxScaler, StringIndexer, VectorAssembler

# VectorAssembler is a must use for Numeric cols

assembler = VectorAssembler(
    inputCols=["Age"],
    outputCol="AgeVec",
    handleInvalid="keep"
)
df = assembler.transform(df)

assembler = VectorAssembler(
    inputCols=["Salary"],
    outputCol="SalaryVec",
    handleInvalid="keep"
)
df = assembler.transform(df)

# Using MinMaxScaler for Both numerical cols as we don't age and salary both to be negative
# Using StringIndexer for Country col which is pyspark's label encoder as the number of countries can increase
# Using String Indexer for Purchased as well as It will assign 1 & 0 for Yes & No(or vice versa)

scaler = MinMaxScaler(inputCol="SalaryVec", outputCol="SalaryVec_scaled")
df = scaler.fit(df).transform(df).drop("SalaryVec")

scaler = MinMaxScaler(inputCol="AgeVec", outputCol="AgeVec_scaled")
df = scaler.fit(df).transform(df).drop("AgeVec")


In [116]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="Country", outputCol="Country_indexed")
df = indexer.fit(df).transform(df).drop("Country")

indexer = StringIndexer(inputCol="Purchased", outputCol="Purchased_indexed")
df = indexer.fit(df).transform(df).drop("Purchased")

df.show()

+---+------+--------------------+--------------------+---------------+-----------------+
|Age|Salary|    SalaryVec_scaled|       AgeVec_scaled|Country_indexed|Purchased_indexed|
+---+------+--------------------+--------------------+---------------+-----------------+
| 44| 72000|[0.6857142857142857]|[0.7391304347826086]|            0.0|              0.0|
| 27| 48000|               [0.0]|               [0.0]|            2.0|              0.0|
| 30| 54000|[0.17142857142857...|[0.13043478260869...|            1.0|              1.0|
| 38| 61000|[0.37142857142857...|[0.4782608695652174]|            2.0|              1.0|
| 40| 63778|            [0.4508]|[0.5652173913043478]|            1.0|              0.0|
| 35| 58000|[0.2857142857142857]|[0.34782608695652...|            0.0|              0.0|
| 39| 52000|[0.11428571428571...|[0.5217391304347826]|            2.0|              1.0|
| 48| 79000|[0.8857142857142857]|[0.9130434782608695]|            0.0|              0.0|
| 50| 83000|         

In [117]:
df_transformed = df.withColumn("log_transformed_Salary", psql.log(df["Salary"]))\
                   .withColumn("log_transformed_Age", psql.log(df["Age"]))
df_transformed.show()

+---+------+--------------------+--------------------+---------------+-----------------+----------------------+-------------------+
|Age|Salary|    SalaryVec_scaled|       AgeVec_scaled|Country_indexed|Purchased_indexed|log_transformed_Salary|log_transformed_Age|
+---+------+--------------------+--------------------+---------------+-----------------+----------------------+-------------------+
| 44| 72000|[0.6857142857142857]|[0.7391304347826086]|            0.0|              0.0|    11.184421397998193|  3.784189633918261|
| 27| 48000|               [0.0]|               [0.0]|            2.0|              0.0|    10.778956289890028|  3.295836866004329|
| 30| 54000|[0.17142857142857...|[0.13043478260869...|            1.0|              1.0|    10.896739325546411| 3.4011973816621555|
| 38| 61000|[0.37142857142857...|[0.4782608695652174]|            2.0|              1.0|    11.018629143155449| 3.6375861597263857|
| 40| 63778|            [0.4508]|[0.5652173913043478]|            1.0|      

In [118]:
# This feature could potenially show if the Salary increases with Age
df_transformed = df_transformed.withColumn("Salary/Age",
                                           df_transformed["Salary"] /
                                           df_transformed["Age"])

# Grouping continuous variables (e.g., age into categories)

df_transformed = df_transformed.withColumn("age_group",
                                      psql.when(df_transformed["age"] < 30, "young")
                                      .when((df_transformed["age"] >= 30) & (df_transformed["age"] < 50), "middle_aged")
                                      .otherwise("old"))

# Display age groups
df_transformed.select(["age_group", "Salary/Age"]).show(10)


+-----------+------------------+
|  age_group|        Salary/Age|
+-----------+------------------+
|middle_aged|1636.3636363636363|
|      young|1777.7777777777778|
|middle_aged|            1800.0|
|middle_aged|1605.2631578947369|
|middle_aged|           1594.45|
|middle_aged| 1657.142857142857|
|middle_aged|1333.3333333333333|
|middle_aged|1645.8333333333333|
|        old|            1660.0|
|middle_aged|1810.8108108108108|
+-----------+------------------+



In [119]:
from pyspark.sql.functions import col, expr, mean

# Z Score based removal based on age
mean_val = df_transformed.select(mean(col("Age"))).collect()[0][0]
stddev_val = df_transformed.select(expr("stddev(Age)")).collect()[0][0]

df_outliers_removed_Z = df_transformed.filter((df_transformed["Age"] > (mean_val - 3 * stddev_val)) &
                                       (df_transformed["Age"] < (mean_val + 3 * stddev_val)))

print("Z Score based removal based on age")
df_outliers_removed_Z.show(10)

# Z Score based removal based on Salary
mean_val = df_transformed.select(mean(col("Salary"))).collect()[0][0]
stddev_val = df_transformed.select(expr("stddev(Salary)")).collect()[0][0]

df_outliers_removed_Z = df_transformed.filter((df_transformed["Salary"] > (mean_val - 3 * stddev_val)) &
                                       (df_transformed["Salary"] < (mean_val + 3 * stddev_val)))

print("Z Score based removal based on Salary")
df_outliers_removed_Z.show(10)

# IQR Based removal based on salary (Same can be done with age)
q1 = df_transformed.approxQuantile("Salary", [0.25], 0.01)[0]
q3 = df_transformed.approxQuantile("Salary", [0.75], 0.01)[0]
iqr = q3 - q1

df_outliers_removed_iqr = df_transformed.filter((df_transformed["Salary"] > (q1 - 1.5 * iqr)) &
                                           (df_transformed["Salary"] < (q3 + 1.5 * iqr)))

print("IQR Based removal based on salary")
df_outliers_removed_iqr.show(10)


# All the below results show same dimensions as that of the original df
# It suggests that the given data is free of Outliers.

Z Score based removal based on age
+---+------+--------------------+--------------------+---------------+-----------------+----------------------+-------------------+------------------+-----------+
|Age|Salary|    SalaryVec_scaled|       AgeVec_scaled|Country_indexed|Purchased_indexed|log_transformed_Salary|log_transformed_Age|        Salary/Age|  age_group|
+---+------+--------------------+--------------------+---------------+-----------------+----------------------+-------------------+------------------+-----------+
| 44| 72000|[0.6857142857142857]|[0.7391304347826086]|            0.0|              0.0|    11.184421397998193|  3.784189633918261|1636.3636363636363|middle_aged|
| 27| 48000|               [0.0]|               [0.0]|            2.0|              0.0|    10.778956289890028|  3.295836866004329|1777.7777777777778|      young|
| 30| 54000|[0.17142857142857...|[0.13043478260869...|            1.0|              1.0|    10.896739325546411| 3.4011973816621555|            1800.0|

In [123]:
train_data, test_data = df_transformed.randomSplit([0.8, 0.2], seed=12)

print(f"Training set count: {train_data.count()}")
print(f"Testing set count: {test_data.count()}")

train_data.show()
test_data.show()

# Train Test Split is necessary as it helps us to realize how efficient our model is on the data it has never seen before,
# which is what matters the most when creating and ML Model for any classification or regression.

Training set count: 8
Testing set count: 2
+---+------+--------------------+--------------------+---------------+-----------------+----------------------+-------------------+------------------+-----------+
|Age|Salary|    SalaryVec_scaled|       AgeVec_scaled|Country_indexed|Purchased_indexed|log_transformed_Salary|log_transformed_Age|        Salary/Age|  age_group|
+---+------+--------------------+--------------------+---------------+-----------------+----------------------+-------------------+------------------+-----------+
| 27| 48000|               [0.0]|               [0.0]|            2.0|              0.0|    10.778956289890028|  3.295836866004329|1777.7777777777778|      young|
| 35| 58000|[0.2857142857142857]|[0.34782608695652...|            0.0|              0.0|    10.968198289528557| 3.5553480614894135| 1657.142857142857|middle_aged|
| 37| 67000|[0.5428571428571428]|[0.43478260869565...|            0.0|              0.0|    11.112447898373103| 3.6109179126442243|1810.810810