Import the libraries

In [None]:
# Import the libararies
import pandas as pd
from pyspark.sql import SparkSession # Create spark environment
from sklearn.datasets import load_iris

Load the dataset

In [24]:
# Load the iris dataset using pandas
iris_pandas = pd.read_csv('../data/sample_null.csv')
print(type(iris_pandas))
iris_pandas.head()

<class 'pandas.core.frame.DataFrame'>


Unnamed: 0,name,age,experience,salary
0,Krish,31.0,10.0,30000.0
1,Sudhanshu,30.0,8.0,25000.0
2,Sunny,29.0,4.0,20000.0
3,Paul,24.0,3.0,20000.0
4,Harsha,21.0,1.0,15000.0


In [25]:
# Load the iris dataset using pyspark
spark = SparkSession.builder.appName('Practice').getOrCreate()
iris_spark = spark.read.csv('../data/sample_null.csv', header=True, inferSchema=True)
print(type(iris_spark))
iris_spark.show(5)

<class 'pyspark.sql.dataframe.DataFrame'>
+---------+---+----------+------+
|     name|age|experience|salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
+---------+---+----------+------+
only showing top 5 rows



Get the schema of dataframe

In [26]:
iris_spark.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)
 |-- salary: integer (nullable = true)



In [27]:
iris_spark.dtypes

[('name', 'string'), ('age', 'int'), ('experience', 'int'), ('salary', 'int')]

In [28]:
iris_spark.describe().show()

+-------+------+------------------+------------------+-----------------+
|summary|  name|               age|        experience|           salary|
+-------+------+------------------+------------------+-----------------+
|  count|     7|                 8|                 7|                8|
|   mean|  NULL|              28.5| 5.428571428571429|          25750.0|
| stddev|  NULL|5.3718844791323335|3.8234863173611093|9361.776388210581|
|    min|Harsha|                21|                 1|            15000|
|    max| Sunny|                36|                10|            40000|
+-------+------+------------------+------------------+-----------------+



Slicing by column

In [29]:
iris_spark.select(['name', 'age']).show(5)

+---------+---+
|     name|age|
+---------+---+
|    Krish| 31|
|Sudhanshu| 30|
|    Sunny| 29|
|     Paul| 24|
|   Harsha| 21|
+---------+---+
only showing top 5 rows



Add new column

In [33]:
iris_spark_filtered = iris_spark.withColumn('age_squared', iris_spark['age'] ** 2)
iris_spark_filtered.show(5)

+---------+---+----------+------+-----------+
|     name|age|experience|salary|age_squared|
+---------+---+----------+------+-----------+
|    Krish| 31|        10| 30000|      961.0|
|Sudhanshu| 30|         8| 25000|      900.0|
|    Sunny| 29|         4| 20000|      841.0|
|     Paul| 24|         3| 20000|      576.0|
|   Harsha| 21|         1| 15000|      441.0|
+---------+---+----------+------+-----------+
only showing top 5 rows



Drop existing column

In [35]:
iris_spark_filtered.drop('age_squared').show(5)

+---------+---+----------+------+
|     name|age|experience|salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
+---------+---+----------+------+
only showing top 5 rows



Rename column

In [36]:
iris_spark_filtered.withColumnRenamed('age_squared', 'age_powered').show(5)

+---------+---+----------+------+-----------+
|     name|age|experience|salary|age_powered|
+---------+---+----------+------+-----------+
|    Krish| 31|        10| 30000|      961.0|
|Sudhanshu| 30|         8| 25000|      900.0|
|    Sunny| 29|         4| 20000|      841.0|
|     Paul| 24|         3| 20000|      576.0|
|   Harsha| 21|         1| 15000|      441.0|
+---------+---+----------+------+-----------+
only showing top 5 rows



Drop missing values

In [37]:
iris_spark.show()

+---------+----+----------+------+
|     name| age|experience|salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+



In [38]:
# The thresh parameter means keeping only N non-null values
iris_spark.na.drop(how='any', thresh=2).show()

+---------+----+----------+------+
|     name| age|experience|salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
+---------+----+----------+------+



Handle missing values

In [49]:
# 1. Handle missing values with constant value
iris_spark = iris_spark.na.fill({'name':'Unknown'})
iris_spark.show()

+---------+----+----------+------+
|     name| age|experience|salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|  Unknown|  34|        10| 38000|
|  Unknown|  36|      NULL|  NULL|
+---------+----+----------+------+



In [53]:
# 2. Handle missing values with imputer
from pyspark.ml.feature import Imputer

column_names = ['age', 'experience', 'salary']

# Replace missing values with calculating mean of non-missing values
imputer = Imputer(
    inputCols=column_names,
    outputCols=[f'{column_name}_imputed' for column_name in column_names]
).setStrategy('mean')

iris_spark_imputed = imputer.fit(iris_spark).transform(iris_spark)
iris_spark_imputed.show()


+---------+----+----------+------+-----------+------------------+--------------+
|     name| age|experience|salary|age_imputed|experience_imputed|salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                 8|         25000|
|    Sunny|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3| 20000|         24|                 3|         20000|
|   Harsha|  21|         1| 15000|         21|                 1|         15000|
|  Shubham|  23|         2| 18000|         23|                 2|         18000|
|   Mahesh|NULL|      NULL| 40000|         28|                 5|         40000|
|  Unknown|  34|        10| 38000|         34|                10|         38000|
|  Unknown|  36|      NULL|  NULL|         36|                 5|         25750|
+---------+----+----------+-

Filter operation

In [60]:
# Slicing pyspark dataframe by row and column
iris_spark.filter((iris_spark['salary'] <= 20000) & (iris_spark['age'] <= 25)).select(['name','age','salary']).show()

+-------+---+------+
|   name|age|salary|
+-------+---+------+
|   Paul| 24| 20000|
| Harsha| 21| 15000|
|Shubham| 23| 18000|
+-------+---+------+



Aggregation

In [62]:
# Load the aggregation dataset using pyspark
spark = SparkSession.builder.appName('Aggregation').getOrCreate()
agg_spark = spark.read.csv('../data/sample_aggregation.csv', header=True, inferSchema=True)
print(type(agg_spark))
agg_spark.show(5)

<class 'pyspark.sql.dataframe.DataFrame'>
+------+------------+------+
|  Name| Departments|salary|
+------+------------+------+
| Krish|Data Science| 10000|
| Krish|         IOT|  5000|
|Mahesh|    Big Data|  4000|
| Krish|    Big Data|  4000|
|Mahesh|Data Science|  3000|
+------+------------+------+
only showing top 5 rows



In [67]:
# Total salary per deparment
agg_spark.groupBy('Departments').mean().show()

+------------+-----------+
| Departments|avg(salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



Modelling using pySpark

In [69]:
# Load the modelling dataset using pyspark
spark = SparkSession.builder.appName('Modelling').getOrCreate()
ml_spark = spark.read.csv('../data/sample_clean.csv', header=True, inferSchema=True)
print(type(ml_spark))
ml_spark.show(5)

<class 'pyspark.sql.dataframe.DataFrame'>
+---------+---+----------+------+
|     name|age|experience|salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
+---------+---+----------+------+
only showing top 5 rows



In [78]:
# Concatenate independent features
from pyspark.ml.feature import VectorAssembler
vector_assembler = VectorAssembler(
    inputCols=['age', 'experience'],
    outputCol='independent_features'
)

ml_spark_output = vector_assembler.transform(ml_spark)
ml_spark_output = ml_spark_output.select(['independent_features', 'salary'])
ml_spark_output = ml_spark_output.withColumnRenamed('salary','target')
ml_spark_output.show()

+--------------------+------+
|independent_features|target|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



In [79]:
# Modelling with linear regression
from pyspark.ml.regression import LinearRegression

train_data, test_data = ml_spark_output.randomSplit([0.75, 0.25])
regressor = LinearRegression(
    featuresCol='independent_features',
    labelCol='target'
)
regressor = regressor.fit(train_data)

25/03/22 12:59:48 WARN Instrumentation: [9d0c9c17] regParam is zero, which might cause numerical instability and overfitting.
25/03/22 12:59:48 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/03/22 12:59:48 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/03/22 12:59:48 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [80]:
regressor.coefficients

DenseVector([47.619, 1285.7143])

In [81]:
regressor.intercept

13619.047619047662

In [82]:
y_pred = regressor.evaluate(test_data)
y_pred.predictions.show()

+--------------------+------+-----------------+
|independent_features|target|       prediction|
+--------------------+------+-----------------+
|          [23.0,2.0]| 18000|17285.71428571428|
|         [31.0,10.0]| 30000|27952.38095238097|
+--------------------+------+-----------------+



In [83]:
y_pred.meanAbsoluteError, y_pred.meanSquaredError

(1380.9523809523762, 2351473.922902466)