<h3>Part-1: Installation, Creating Session, Reading Data, DataFrames</h3>

In [None]:
# pip install pyspark

In [4]:
# import pyspark

In [11]:
# Always start a Spark Session.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('PractiseSpark').getOrCreate()
# For the first time it will take time. But when we use the same appName
# later, it will be fetched quite quickly.
spark

# In local there will be 1 cluster only. In a cloud we can create multiple
# clusters and instances.
# See here local[*] is present in the Master.

In [19]:
df_pyspark = spark.read.csv('TestData.csv')
print(df_pyspark) # See here since we are reading CSV so it is adding a 
# layer of extra row. But we want name and age to be our column names.
df_pyspark.show()

DataFrame[_c0: string, _c1: string]
+-----+---+
|  _c0|_c1|
+-----+---+
| Name|Age|
|  Ram| 21|
|Shyam| 22|
|Sunny| 32|
+-----+---+



In [25]:
# So, a better way here is:

df_pyspark = spark.read.option('header', 'true').csv('TestData.csv')
print(df_pyspark) # See the column names are correct now. 
df_pyspark.show()
# So, now it shows the desired data.
print(type(df_pyspark))

DataFrame[Name: string, Age: string]
+-----+---+
| Name|Age|
+-----+---+
|  Ram| 21|
|Shyam| 22|
|Sunny| 32|
+-----+---+

<class 'pyspark.sql.dataframe.DataFrame'>


In [26]:
import pandas as pd
type(pd.read_csv('TestData.csv'))

# <class 'pyspark.sql.dataframe.DataFrame'>
# vs
# pandas.core.frame.DataFrame

pandas.core.frame.DataFrame

In [28]:
# Back to PySpark.
# Check column information.
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)



<h3>Part-2: Reading Data Column Wise. Adding/Dropping/Renaming Columns</h3>

In [31]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('PractiseSpark').getOrCreate()
spark
# See this time it will be fast. As the session is alreeady created.
# Since we are running in local, so it is the only one master node.

In [33]:
df_spark = spark.read.option('header','true').csv('TestData2.csv')
print(df_spark)
df_spark.show()
df_spark.printSchema()
# But see all columns are considered as String.
# To make PySpark infer the schema, we can use inferSchema = True

DataFrame[Name: string, age: string, Experience: string, Salary: string]
+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)



In [36]:
df_spark = spark.read.option('header','true').csv('TestData2.csv', inferSchema = True)
print(df_spark)
# df_spark.show()
df_spark.printSchema()
# See now the columns are considered as they are supposed to be.

DataFrame[Name: string, age: int, Experience: int, Salary: int]
root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [40]:
# Another way to read csv:
df_spark = spark.read.csv('TestData2.csv', header = True, inferSchema=True)
df_spark.show()
# So this also works fine.

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+



In [42]:
print(df_spark.columns)
df_spark.head(3)

['Name', 'age', 'Experience', 'Salary']


[Row(Name='Krish', age=31, Experience=10, Salary=30000),
 Row(Name='Sudhanshu', age=30, Experience=8, Salary=25000),
 Row(Name='Sunny', age=29, Experience=4, Salary=20000)]

In [52]:
# Show only the names.
df_spark.select('Name').show()

# df_spark['Name'] Does not work here.

type(df_spark.select('Name')) # Is also a DataFrame.

df_spark.select(['Name', 'Age']) # Can apply .show() here too.

+---------+
|     Name|
+---------+
|    Krish|
|Sudhanshu|
|    Sunny|
|     Paul|
|   Harsha|
|  Shubham|
|   Mahesh|
|     NULL|
|     NULL|
+---------+



DataFrame[Name: string, Age: int]

In [58]:
print(df_spark.dtypes)
print(df_spark.describe()) # So, it is a DF again. So, show() can be applied here.

[('Name', 'string'), ('age', 'int'), ('Experience', 'int'), ('Salary', 'int')]
DataFrame[summary: string, Name: string, age: string, Experience: string, Salary: string]


In [59]:
df_spark.describe().show()

+-------+------+------------------+------------------+-----------------+
|summary|  Name|               age|        Experience|           Salary|
+-------+------+------------------+------------------+-----------------+
|  count|     7|                 8|                 7|                8|
|   mean|  NULL|              28.5| 5.428571428571429|          25750.0|
| stddev|  NULL|5.3718844791323335|3.8234863173611093|9361.776388210581|
|    min|Harsha|                21|                 1|            15000|
|    max| Sunny|                36|                10|            40000|
+-------+------+------------------+------------------+-----------------+



In [64]:
# Adding & Dropping Columns
df_spark.withColumn('Expreience After 2 Years', df_spark['Experience'] + 2).show()
# Not an inplace operation, so df_spark is not changed.

df_new = df_spark.withColumn('Expreience After 2 Years', df_spark['Experience'] + 2)

+---------+----+----------+------+------------------------+
|     Name| age|Experience|Salary|Expreience After 2 Years|
+---------+----+----------+------+------------------------+
|    Krish|  31|        10| 30000|                      12|
|Sudhanshu|  30|         8| 25000|                      10|
|    Sunny|  29|         4| 20000|                       6|
|     Paul|  24|         3| 20000|                       5|
|   Harsha|  21|         1| 15000|                       3|
|  Shubham|  23|         2| 18000|                       4|
|   Mahesh|NULL|      NULL| 40000|                    NULL|
|     NULL|  34|        10| 38000|                      12|
|     NULL|  36|      NULL|  NULL|                    NULL|
+---------+----+----------+------+------------------------+



In [68]:
# Dropping columns
df_new = df_new.drop('Expreience After 2 Years')
df_new.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+



In [72]:
# Renaming Columns.
df_spark.withColumnRenamed('Experience', 'YOE').show()
# This is also not inplace.

+---------+----+----+------+
|     Name| age| YOE|Salary|
+---------+----+----+------+
|    Krish|  31|  10| 30000|
|Sudhanshu|  30|   8| 25000|
|    Sunny|  29|   4| 20000|
|     Paul|  24|   3| 20000|
|   Harsha|  21|   1| 15000|
|  Shubham|  23|   2| 18000|
|   Mahesh|NULL|NULL| 40000|
|     NULL|  34|  10| 38000|
|     NULL|  36|NULL|  NULL|
+---------+----+----+------+



<h3>Part-3: Handling Missing Values</h3>

In [1]:
# We can drop columns, rows, or may replace using mean/median/mode.

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('PractiseSpark').getOrCreate()
spark

In [4]:
df = spark.read.csv('TestData2.csv', header=True, inferSchema=True)
df.show() # See this data has NULL values too.

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+



In [5]:
# These are the available options to handle NULLs. df has a property na.
# df.na.drop()
# df.na.fill()
# df.na.replace()

# Deleting All rows having atleast one NULL value in Column.

df.na.drop().show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [6]:
df.na.drop(how = 'any', thresh = 2, subset=['Age','Experience']).show()

# how: Checks if 'any' value is NULL or 'all' values are NULL.
# thresh: If the row doesn't have atleast 2 Non NULL values, it will be deleted.
# subset: Only checks 'Age' & 'Experience' column for NULL. If NULL, delete entire row.

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     NULL| 34|        10| 38000|
+---------+---+----------+------+



In [7]:
# Filling the Missing Values

df.na.fill('Missing Value').show() # Won't work on age or experience
# as those are integers.

df.na.fill(0, ['Age', 'Experience']).show()
# Will work.

+-------------+----+----------+------+
|         Name| age|Experience|Salary|
+-------------+----+----------+------+
|        Krish|  31|        10| 30000|
|    Sudhanshu|  30|         8| 25000|
|        Sunny|  29|         4| 20000|
|         Paul|  24|         3| 20000|
|       Harsha|  21|         1| 15000|
|      Shubham|  23|         2| 18000|
|       Mahesh|NULL|      NULL| 40000|
|Missing Value|  34|        10| 38000|
|Missing Value|  36|      NULL|  NULL|
+-------------+----+----------+------+

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|   Mahesh|  0|         0| 40000|
|     NULL| 34|        10| 38000|
|     NULL| 36|         0|  NULL|
+---------+---+----------+------+



In [9]:
# Filling missing data with mean/median etc...
# We will use Imputer function. (In sklearn also it is there)
df.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+



In [11]:
# Impute: assign (a value) to something by inference from the value
# of the products or processes to which it contributes.
from pyspark.ml.feature import Imputer
imputer = Imputer(
    inputCols = ['age','Experience','Salary'],
    outputCols = ['age_imputed','Experience_imputed','Salary_imputed'] 
).setStrategy("mean")

# Now I will use fit and transform.
imputer.fit(df).transform(df).show()

+---------+----+----------+------+-----------+------------------+--------------+
|     Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                 8|         25000|
|    Sunny|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3| 20000|         24|                 3|         20000|
|   Harsha|  21|         1| 15000|         21|                 1|         15000|
|  Shubham|  23|         2| 18000|         23|                 2|         18000|
|   Mahesh|NULL|      NULL| 40000|         28|                 5|         40000|
|     NULL|  34|        10| 38000|         34|                10|         38000|
|     NULL|  36|      NULL|  NULL|         36|                 5|         25750|
+---------+----+----------+-

<h3>Part-4: Filter Operations</h3>

In [12]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('PractiseSpark').getOrCreate()
spark

In [14]:
df = spark.read.csv('TestData3.csv', header=True, inferSchema=True)
df.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [17]:
# Q1. People having salary < 20000
df.filter('Salary < 20000').select(['Name','Age']).show()

+-------+---+
|   Name|Age|
+-------+---+
| Harsha| 21|
|Shubham| 23|
+-------+---+



In [23]:
# Another Syntax. In Pandas we used to write .loc[]
df.filter((df['Salary'] < 20000) & ~(df['Age'] > 22)).show()
# ~ is not.

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
|Harsha| 21|         1| 15000|
+------+---+----------+------+



<h3>Part-5: Group By & Aggregate Functions</h3>

In [25]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('PractiseSpark').getOrCreate()
spark

In [26]:
df = spark.read.csv('TestData4.csv', header=True, inferSchema=True)
df.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [34]:
# Check who is having max salary.
# help(df.groupBy('Name').sum)
df.groupBy('Name').sum('salary').show()

+---------+-----------+
|     Name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



In [41]:
# Check avg salary of each department.
df.groupBy('Departments').mean('salary').show()
# Using .count() we can get no. of people in the department.

+------------+-----------+
| Departments|avg(salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



In [43]:
df.groupBy('Departments').agg({'Salary': 'sum', 'name' : 'count'}).show()
# If we do not use groupBy the agg is applied on whole df.

+------------+-----------+-----------+
| Departments|sum(Salary)|count(name)|
+------------+-----------+-----------+
|         IOT|      15000|          2|
|    Big Data|      15000|          4|
|Data Science|      43000|          4|
+------------+-----------+-----------+



<h3>Part-6: Spark MLlib</h3>

In [44]:
# With respect to Spark ML there are 2 different techniques.
# One is RDD technique and other one is the DataFrame APIs (More popular now).
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('PractiseSpark').getOrCreate()
spark

In [49]:
df = spark.read.csv('TestData3.csv', header=True, inferSchema=True)
df.show()
df.printSchema()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [48]:
# ML Question1. Based on Age and Experience predict Salary.
# Basic question, not much data preprocessing, transformation or 
# standardization is required here.
# Later we will see that it is a Linear Regression Problem.

In [51]:
# PySpark is bit different from sklearn. In sklearn we do
# train, test split first. (We divide into dependent/ independent features etc).
# In PySpark we first must group the independent features. We call
# it a VectorAssembler

In [54]:
from pyspark.ml.feature import VectorAssembler
featureAssembler = VectorAssembler(inputCols = ['age', 'Experience'], outputCol="Independent Features")
trainingData = featureAssembler.transform(df)
trainingData.show()
# So we grouped the 2 columns into single Independent Feature which is our
# input feature now. Our output feature is Salary

+---------+---+----------+------+--------------------+
|     Name|age|Experience|Salary|Independent Features|
+---------+---+----------+------+--------------------+
|    Krish| 31|        10| 30000|         [31.0,10.0]|
|Sudhanshu| 30|         8| 25000|          [30.0,8.0]|
|    Sunny| 29|         4| 20000|          [29.0,4.0]|
|     Paul| 24|         3| 20000|          [24.0,3.0]|
|   Harsha| 21|         1| 15000|          [21.0,1.0]|
|  Shubham| 23|         2| 18000|          [23.0,2.0]|
+---------+---+----------+------+--------------------+



In [55]:
trainingData = trainingData.select("Independent Features","Salary")
trainingData.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



In [56]:
from pyspark.ml.regression import LinearRegression

train_data, test_data = trainingData.randomSplit([0.75, 0.25]) # 75% - 25%

regressor = LinearRegression(featuresCol="Independent Features", labelCol="Salary")
regressor = regressor.fit(train_data)

In [57]:
regressor.coefficients # Will learn later.

DenseVector([-64.8464, 1584.7554])

In [58]:
regressor.intercept

15414.10693970376

In [59]:
pred_results = regressor.evaluate(test_data)
pred_results.predictions.show()

+--------------------+------+-----------------+
|Independent Features|Salary|       prediction|
+--------------------+------+-----------------+
|          [24.0,3.0]| 20000|18612.05915813422|
+--------------------+------+-----------------+



In [64]:
pred_results.meanAbsoluteError, pred_results.meanSquaredError
# Will Leaen in AAIC.

(1387.9408418657804, 1926379.7805190913)