# Apache Spark Practice 

### Starting the spark Session

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('Spark Prac')\
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/30 06:49:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/03/30 06:49:41 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Loading the dataset

In [2]:
# loading 
df = spark.read.format('csv') \
    .option('header', 'true' ) \
    .option('inferSchema', 'true') \
    .option('sep', ',') \
    .load('./Train_Dataset.csv')
df.show()

                                                                                

23/03/30 06:49:59 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+--------+-------------+---------+----------+-----------+---------+-----------+-------------+------------+----------------+------------------+------------------+---------------------+-------------+------------------+-------------------+--------------------------+--------+-------------+-----------------+-------+-------------+----------+-------------+-----------------+-----------------+---------------------+------------------+-----------------------+------------------------+--------------------------+-----------------------+--------------------+--------------+--------------+--------------+---------------------+------------+-------------+-------+
|      ID|Client_Income|Car_Owned|Bike_Owned|Active_Loan|House_Own|Child_Count|Credit_Amount|Loan_Annuity|Accompany_Client|Client_Income_Type|  Client_Education|Client_

### Data Cleaning and Manipulation

In [3]:
# Checking the number of rows 
df.count()

121856

In [4]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Client_Income: string (nullable = true)
 |-- Car_Owned: integer (nullable = true)
 |-- Bike_Owned: integer (nullable = true)
 |-- Active_Loan: integer (nullable = true)
 |-- House_Own: integer (nullable = true)
 |-- Child_Count: string (nullable = true)
 |-- Credit_Amount: string (nullable = true)
 |-- Loan_Annuity: string (nullable = true)
 |-- Accompany_Client: string (nullable = true)
 |-- Client_Income_Type: string (nullable = true)
 |-- Client_Education: string (nullable = true)
 |-- Client_Marital_Status: string (nullable = true)
 |-- Client_Gender: string (nullable = true)
 |-- Loan_Contract_Type: string (nullable = true)
 |-- Client_Housing_Type: string (nullable = true)
 |-- Population_Region_Relative: string (nullable = true)
 |-- Age_Days: string (nullable = true)
 |-- Employed_Days: string (nullable = true)
 |-- Registration_Days: string (nullable = true)
 |-- ID_Days: string (nullable = true)
 |-- Own_House_Age: integer (nullabl


Let us check the columns that are strings, and select the ones that need to be modified


In [5]:
from pyspark.sql.functions import col

In [6]:

# Selecting string columns
string_columns = [c[0] for c in df.dtypes if c[1] == 'string']

# Selecting the dataframe with string columns
string_df = df.selectExpr(*string_columns)
string_df.printSchema()


root
 |-- Client_Income: string (nullable = true)
 |-- Child_Count: string (nullable = true)
 |-- Credit_Amount: string (nullable = true)
 |-- Loan_Annuity: string (nullable = true)
 |-- Accompany_Client: string (nullable = true)
 |-- Client_Income_Type: string (nullable = true)
 |-- Client_Education: string (nullable = true)
 |-- Client_Marital_Status: string (nullable = true)
 |-- Client_Gender: string (nullable = true)
 |-- Loan_Contract_Type: string (nullable = true)
 |-- Client_Housing_Type: string (nullable = true)
 |-- Population_Region_Relative: string (nullable = true)
 |-- Age_Days: string (nullable = true)
 |-- Employed_Days: string (nullable = true)
 |-- Registration_Days: string (nullable = true)
 |-- ID_Days: string (nullable = true)
 |-- Client_Occupation: string (nullable = true)
 |-- Application_Process_Hour: string (nullable = true)
 |-- Client_Permanent_Match_Tag: string (nullable = true)
 |-- Client_Contact_Work_Tag: string (nullable = true)
 |-- Type_Organization: 

Changing the datatype of all the columns that need to be strings

In [7]:
df = df.withColumn('Child_Count', col('Child_Count').cast('integer'))\
    .withColumn('Credit_Amount', col('Credit_Amount').cast('integer'))\
    .withColumn('Loan_Annuity', col('Loan_Annuity').cast('double'))\
    .withColumn('Population_Region_Relative', col('Population_Region_Relative').cast('double'))\
    .withColumn('Age_Days', col('Age_Days').cast('integer'))\
    .withColumn('Employed_days', col('Employed_days').cast('integer'))\
    .withColumn('ID_Days', col('ID_Days').cast('integer'))\
    .withColumn('Application_Process_Hour', col('Application_Process_Hour').cast('integer'))\
    .withColumn('Score_Source_3', col('Score_Source_3').cast('double'))
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Client_Income: string (nullable = true)
 |-- Car_Owned: integer (nullable = true)
 |-- Bike_Owned: integer (nullable = true)
 |-- Active_Loan: integer (nullable = true)
 |-- House_Own: integer (nullable = true)
 |-- Child_Count: integer (nullable = true)
 |-- Credit_Amount: integer (nullable = true)
 |-- Loan_Annuity: double (nullable = true)
 |-- Accompany_Client: string (nullable = true)
 |-- Client_Income_Type: string (nullable = true)
 |-- Client_Education: string (nullable = true)
 |-- Client_Marital_Status: string (nullable = true)
 |-- Client_Gender: string (nullable = true)
 |-- Loan_Contract_Type: string (nullable = true)
 |-- Client_Housing_Type: string (nullable = true)
 |-- Population_Region_Relative: double (nullable = true)
 |-- Age_Days: integer (nullable = true)
 |-- Employed_days: integer (nullable = true)
 |-- Registration_Days: string (nullable = true)
 |-- ID_Days: integer (nullable = true)
 |-- Own_House_Age: integer (nu

#### Checking for and dealing with null values

In [9]:
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()



+---+-------------+---------+----------+-----------+---------+-----------+-------------+------------+----------------+------------------+----------------+---------------------+-------------+------------------+-------------------+--------------------------+--------+-------------+-----------------+-------+-------------+----------+-------------+-----------------+-----------------+---------------------+------------------+-----------------------+------------------------+--------------------------+-----------------------+-----------------+--------------+--------------+--------------+---------------------+------------+-------------+-------+
| ID|Client_Income|Car_Owned|Bike_Owned|Active_Loan|House_Own|Child_Count|Credit_Amount|Loan_Annuity|Accompany_Client|Client_Income_Type|Client_Education|Client_Marital_Status|Client_Gender|Loan_Contract_Type|Client_Housing_Type|Population_Region_Relative|Age_Days|Employed_days|Registration_Days|ID_Days|Own_House_Age|Mobile_Tag|Homephone_Tag|Workphone_Work

                                                                                

Let us first drop all the null values where all the values are missing


In [10]:
df.show()

+--------+-------------+---------+----------+-----------+---------+-----------+-------------+------------+----------------+------------------+------------------+---------------------+-------------+------------------+-------------------+--------------------------+--------+-------------+-----------------+-------+-------------+----------+-------------+-----------------+-----------------+---------------------+------------------+-----------------------+------------------------+--------------------------+-----------------------+--------------------+--------------+--------------+--------------+---------------------+------------+-------------+-------+
|      ID|Client_Income|Car_Owned|Bike_Owned|Active_Loan|House_Own|Child_Count|Credit_Amount|Loan_Annuity|Accompany_Client|Client_Income_Type|  Client_Education|Client_Marital_Status|Client_Gender|Loan_Contract_Type|Client_Housing_Type|Population_Region_Relative|Age_Days|Employed_days|Registration_Days|ID_Days|Own_House_Age|Mobile_Tag|Homephone_T

For the binary variables we are going to impute the missing values with the mode

In [15]:
from pyspark.sql.functions import when, approx_count_distinct # A replacement of mode in newer versions of spark
binary_cols = ['Car_Owned', 'Bike_Owned', 'Active_Loan', 'House_Own']

# Computing the mode of eahch selected column
modes = df.agg(*[approx_count_distinct(c).alias(c) for c in binary_cols])

# Filling in the missing values with the mode
df = df.fillna(modes.first().asDict(), subset=[binary_cols])
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()



+---+-------------+---------+----------+-----------+---------+-----------+-------------+------------+----------------+------------------+----------------+---------------------+-------------+------------------+-------------------+--------------------------+--------+-------------+-----------------+-------+-------------+----------+-------------+-----------------+-----------------+---------------------+------------------+-----------------------+------------------------+--------------------------+-----------------------+-----------------+--------------+--------------+--------------+---------------------+------------+-------------+-------+
| ID|Client_Income|Car_Owned|Bike_Owned|Active_Loan|House_Own|Child_Count|Credit_Amount|Loan_Annuity|Accompany_Client|Client_Income_Type|Client_Education|Client_Marital_Status|Client_Gender|Loan_Contract_Type|Client_Housing_Type|Population_Region_Relative|Age_Days|Employed_days|Registration_Days|ID_Days|Own_House_Age|Mobile_Tag|Homephone_Tag|Workphone_Work

                                                                                

Replacing child count with mean

In [17]:
from pyspark.sql.functions import mean
# Computing the mode of eahch selected column
means = df.agg(mean('Child_Count'))

# Extracting the mean from the dataframe
mean_val = means.collect()[0][0]

# Filling in the missing values with the mode
df = df.fillna(mean_val, subset=['Child_Count'])
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()



+---+-------------+---------+----------+-----------+---------+-----------+-------------+------------+----------------+------------------+----------------+---------------------+-------------+------------------+-------------------+--------------------------+--------+-------------+-----------------+-------+-------------+----------+-------------+-----------------+-----------------+---------------------+------------------+-----------------------+------------------------+--------------------------+-----------------------+-----------------+--------------+--------------+--------------+---------------------+------------+-------------+-------+
| ID|Client_Income|Car_Owned|Bike_Owned|Active_Loan|House_Own|Child_Count|Credit_Amount|Loan_Annuity|Accompany_Client|Client_Income_Type|Client_Education|Client_Marital_Status|Client_Gender|Loan_Contract_Type|Client_Housing_Type|Population_Region_Relative|Age_Days|Employed_days|Registration_Days|ID_Days|Own_House_Age|Mobile_Tag|Homephone_Tag|Workphone_Work

                                                                                

Now for the remaining numerical variables, we are going to icheck the distributions. If they are normally distributed, we will use mean. If they are skewed, we will use mode

In [60]:
import matplotlib.pyplot as plt
# Getting a list of all the numerical values (Ingeters, doubles or floats)
numeric_cols = [c for c, t in df.dtypes if t in ['int', 'float', 'double']]
numerical_df = df[numeric_cols]
numerical_df.hist(layout=(4,5), color='blue', figsize=(5,10), grid=False)
plt.suptitle('Histogram plot for all numeric variables')

AttributeError: 'DataFrame' object has no attribute 'hist'

In [39]:
from pyspark.sql.functions import expr
# Getting a list of all the numerical values (Ingeters, doubles or floats)
numeric_cols = [c for c, t in df.dtypes if t in ['int', 'float', 'double']]

# Getting the Median from the dataframe
median_expr = [expr(f"percentile_approx('{c}', 0.5") for c in numeric_cols]
median_value = df.select(*median_expr).first()

# Filling the missing values with median
df = df.fillna(median_value.asDict(), subset=[numeric_cols])

ParseException: 
Syntax error at or near end of input: extra input end of input(line 1, pos 27)

== SQL ==
percentile_approx('ID', 0.5
---------------------------^^^
