# PYSPARK INTRO, READ YOUR DATA AND CREATE SparkSession

In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
import pyspark

In [4]:
import pandas as pd

In [109]:
df_pandas =  pd.read_csv('C:\\Users\\laptop\\data_cleaning_python_2883183-main\\cart.csv')
df_pandas

Unnamed: 0,date,name,amount,price
0,2021-03-01,carrot,7.0,5.73
1,2021-03-01,egg,12.0,1.7
2,2021-03-01,milk,,3.57
3,2021-03-01,potato,2.0,
4,,tomato,6.0,1.52
5,2021-03-02,potato,3.0,2.17
6,2021-03-03,,5.0,3.68


In [57]:
## Returns the first three rows but if not specified, returns the first five rows
df_pandas.head(3)

Unnamed: 0,date,name,amount,price
0,2021-03-01,carrot,7.0,5.73
1,2021-03-01,egg,12.0,1.7
2,2021-03-01,milk,,3.57


In [58]:
## Summary of the dataframe
df_pandas.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7 entries, 0 to 6
Data columns (total 4 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   date    6 non-null      object 
 1   name    6 non-null      object 
 2   amount  6 non-null      float64
 3   price   6 non-null      float64
dtypes: float64(2), object(2)
memory usage: 352.0+ bytes


In [60]:
## shows the total rows and column values in the dataframe
df_pandas.shape

(7, 4)

In [9]:
from pyspark.sql import SparkSession

In [11]:
spark = SparkSession.builder.appName('Practise').getOrCreate()
spark

In [12]:
df_pyspark = spark.read.csv('cart.csv', header=True)

In [54]:
df_pyspark.show()

+----------+------+------+-----+
|      date|  name|amount|price|
+----------+------+------+-----+
|2021-03-01|carrot|     7| 5.73|
|2021-03-01|   egg|    12|  1.7|
|2021-03-01|  milk|  null| 3.57|
|2021-03-01|potato|     2| null|
|      null|tomato|     6| 1.52|
|2021-03-02|potato|     3| 2.17|
|2021-03-03|  null|     5| 3.68|
+----------+------+------+-----+



In [18]:
df_pyspark = spark.read.option('header', 'true').csv('cart.csv')
df_pyspark.show()

+----------+------+------+-----+
|      date|  name|amount|price|
+----------+------+------+-----+
|2021-03-01|carrot|     7| 5.73|
|2021-03-01|   egg|    12|  1.7|
|2021-03-01|  milk|  null| 3.57|
|2021-03-01|potato|     2| null|
|      null|tomato|     6| 1.52|
|2021-03-02|potato|     3| 2.17|
|2021-03-03|  null|     5| 3.68|
+----------+------+------+-----+



In [19]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [20]:
df_pyspark.head(3)

[Row(date='2021-03-01', name='carrot', amount='7', price='5.73'),
 Row(date='2021-03-01', name='egg', amount='12', price='1.7'),
 Row(date='2021-03-01', name='milk', amount=None, price='3.57')]

In [25]:
df_pyspark.printSchema()

root
 |-- date: string (nullable = true)
 |-- name: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- price: string (nullable = true)



In [29]:
df_pandas.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7 entries, 0 to 6
Data columns (total 4 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   date    6 non-null      object 
 1   name    6 non-null      object 
 2   amount  6 non-null      float64
 3   price   6 non-null      float64
dtypes: float64(2), object(2)
memory usage: 352.0+ bytes


# PYSPARK SESSION


*** Part one

**In this video, we will cover;**

**--Pyspark Dataframe
--Reading the dataset
--Checking the datatypes of the column(schema)
--Selecting columns and indexing
--Adding columns
--Dropping columns**

In [35]:
from pyspark.sql import SparkSession

In [37]:
spark = SparkSession.builder.appName('Dataframe').getOrCreate()
spark

In [47]:
## read the dataset
df = spark.read.option('header', 'true').csv('test1.csv', inferSchema=True)
df

DataFrame[Name: string, age: int, Experience: int, Salary: int]

In [48]:
## check  the schema
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [63]:
df = spark.read.csv('test1.csv', header=True, inferSchema=True)
df.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [64]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [65]:
## check the type of dataframe
type(df)

pyspark.sql.dataframe.DataFrame

In [85]:
## Pick up columns
df.select('Name').show()

+---------+
|     Name|
+---------+
|    Krish|
|Sudhanshu|
|    Sunny|
|     Paul|
|   Harsha|
|  Shubham|
+---------+



In [87]:
## pick up multiple columns
df.select(['Name', 'Experience']).show()

+---------+----------+
|     Name|Experience|
+---------+----------+
|    Krish|        10|
|Sudhanshu|         8|
|    Sunny|         4|
|     Paul|         3|
|   Harsha|         1|
|  Shubham|         2|
+---------+----------+



In [88]:
## pick the datatypes
df.dtypes

[('Name', 'string'), ('age', 'int'), ('Experience', 'int'), ('Salary', 'int')]

In [89]:
## decsribe the data
df.describe()

DataFrame[summary: string, Name: string, age: string, Experience: string, Salary: string]

In [91]:
df.describe().show()

+-------+------+------------------+-----------------+------------------+
|summary|  Name|               age|       Experience|            Salary|
+-------+------+------------------+-----------------+------------------+
|  count|     6|                 6|                6|                 6|
|   mean|  null|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev|  null| 4.179314138308661|3.559026084010437| 5354.126134736337|
|    min|Harsha|                21|                1|             15000|
|    max| Sunny|                31|               10|             30000|
+-------+------+------------------+-----------------+------------------+



In [92]:
## Same with pandas
df_pandas.describe()

Unnamed: 0,amount,price
count,6.0,6.0
mean,5.833333,3.061667
std,3.544949,1.599167
min,2.0,1.52
25%,3.5,1.8175
50%,5.5,2.87
75%,6.75,3.6525
max,12.0,5.73


In [99]:
### Adding Columns in data frame
Add_df=df.withColumn('Experience After 2years', df['Experience']+2)
Add_df.show()

+---------+---+----------+------+-----------------------+
|     Name|age|Experience|Salary|Experience After 2years|
+---------+---+----------+------+-----------------------+
|    Krish| 31|        10| 30000|                     12|
|Sudhanshu| 30|         8| 25000|                     10|
|    Sunny| 29|         4| 20000|                      6|
|     Paul| 24|         3| 20000|                      5|
|   Harsha| 21|         1| 15000|                      3|
|  Shubham| 23|         2| 18000|                      4|
+---------+---+----------+------+-----------------------+



In [100]:
### Drop the Column from the dataframe above
Add_df.drop('Experience After 2years').show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [102]:
### Rename the columns
df.withColumnRenamed('Name', 'New Name').show()

+---------+---+----------+------+
| New Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



*** Part Two

**Pyspark Handling Missing Values**

*Dropping columns*
*Dropping Rows*
*Various Parameter in Dropping functionalities*
*Handling Missing values by Mean, Median and Mode*

In [111]:
df_misnvalue = spark.read.csv('test2.csv', header=True, inferSchema=True)
df_misnvalue.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [112]:
## Dropping columns
df_misnvalue.drop('Name').show()

+----+----------+------+
| age|Experience|Salary|
+----+----------+------+
|  31|        10| 30000|
|  30|         8| 25000|
|  29|         4| 20000|
|  24|         3| 20000|
|  21|         1| 15000|
|  23|         2| 18000|
|null|      null| 40000|
|  34|        10| 38000|
|  36|      null|  null|
+----+----------+------+



In [113]:
### Dropping rows 
df_misnvalue.na.drop().show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [117]:
### .drop() has different operation which is a. How == any or all
#1. how ==any--
df_misnvalue.na.drop(how='any').show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [118]:
### drop rows with thresh-- with the given parameter values, it tells how many null that satisfy the condition to be deleted
df_misnvalue.na.drop(how='any',thresh=2).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
+---------+----+----------+------+



In [120]:
### Drop rows with subset-- it indicate the column in the frame we want to delete the null
df_misnvalue.na.drop(how='any',subset=['Age']).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     null| 34|        10| 38000|
|     null| 36|      null|  null|
+---------+---+----------+------+



In [141]:
## Filling the missing value
df_misnvalue.na.fill('Missing Value').show()

+-------------+----+----------+------+
|         Name| age|Experience|Salary|
+-------------+----+----------+------+
|        Krish|  31|        10| 30000|
|    Sudhanshu|  30|         8| 25000|
|        Sunny|  29|         4| 20000|
|         Paul|  24|         3| 20000|
|       Harsha|  21|         1| 15000|
|      Shubham|  23|         2| 18000|
|       Mahesh|null|      null| 40000|
|Missing Value|  34|        10| 38000|
|Missing Value|  36|      null|  null|
+-------------+----+----------+------+



In [130]:
df_misnvalue.na.fill('Series', ['age',  'Experience', 'Salary']).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [None]:
## Replacing the null values with mean, median and mode using the imputer function

In [138]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['age', 'Experience', 'Salary'],
                 outputCols=["{}_imputed".format(c) for c in ['age', 'Experience', 'Salary']]).setStrategy("mean")

In [139]:
## Add imputation cols to df(Dataframe)
imputer.fit(df_misnvalue).transform(df_misnvalue).show()

+---------+----+----------+------+-----------+------------------+--------------+
|     Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                 8|         25000|
|    Sunny|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3| 20000|         24|                 3|         20000|
|   Harsha|  21|         1| 15000|         21|                 1|         15000|
|  Shubham|  23|         2| 18000|         23|                 2|         18000|
|   Mahesh|null|      null| 40000|         28|                 5|         40000|
|     null|  34|        10| 38000|         34|                10|         38000|
|     null|  36|      null|  null|         36|                 5|         25750|
+---------+----+----------+-

# PYSPARK-DATAFRAME-Filter operation

***

## Pyspark Dataframe

- Filter Operation
- &,|,==
- -

In [142]:
from pyspark.sql import SparkSession

In [143]:
spark = SparkSession.builder.appName('dataframe').getOrCreate()
spark

In [145]:
## Read out your data
df_pyspark = spark.read.csv('test1.csv', header=True, inferSchema=True)
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



## Filter operations

In [148]:
## Salary of the people less than or equal to 2000
df_pyspark.filter("Salary<=20000").show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [149]:
## from your result, select the columns needed;
df_pyspark.filter("Salary<=20000").select(['Name', 'age']).show()

+-------+---+
|   Name|age|
+-------+---+
|  Sunny| 29|
|   Paul| 24|
| Harsha| 21|
|Shubham| 23|
+-------+---+



In [160]:
## Set multiple conditions
df_pyspark.filter(~(df_pyspark.Salary<=20000)|
                  (df_pyspark.Salary>=15000)).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



## GROUPBY AND AGGREGATE FUNCTIONS

In [161]:
Agg_pyspark = spark.read.csv('test3.csv',header=True,inferSchema=True)
Agg_pyspark.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [162]:
Agg_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- salary: integer (nullable = true)



In [165]:
## Remember Aggregate and groupby work together
##Groupby to find the Maximum Salary
Agg_pyspark.groupby('Name').sum().show()

+---------+-----------+
|     Name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



In [166]:
## Groupby to find Department with the max salary
Agg_pyspark.groupby('Departments').sum().show()

+------------+-----------+
| Departments|sum(salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+



In [175]:
## Groupby to find Department with the mean salary
Agg_pyspark.groupby('Departments').mean('Salary').show()

+------------+-----------+
| Departments|avg(Salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



In [183]:
#### Groupby to find numbers of employees based on the departments
Agg_pyspark.groupby('Departments').agg({'Name':'count'}).show()

+------------+-----------+
| Departments|count(Name)|
+------------+-----------+
|         IOT|          2|
|    Big Data|          4|
|Data Science|          4|
+------------+-----------+



In [192]:
Agg_pyspark.groupby('Departments').count().withColumnRenamed('count', 'No of employees').show()

+------------+---------------+
| Departments|No of employees|
+------------+---------------+
|         IOT|              2|
|    Big Data|              4|
|Data Science|              4|
+------------+---------------+



In [194]:
## Apply direct aggregate function
Agg_pyspark.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|      73000|
+-----------+



In [195]:
## Determine who gets the maximum salary
Agg_pyspark.groupby('Name').max().show()

+---------+-----------+
|     Name|max(salary)|
+---------+-----------+
|Sudhanshu|      20000|
|    Sunny|      10000|
|    Krish|      10000|
|   Mahesh|       4000|
+---------+-----------+



## Example of Pyspark ML

In [196]:
training=spark.read.csv('test1.csv',header=True,inferSchema=True)
training.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [197]:
training.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [198]:
training.columns

['Name', 'age', 'Experience', 'Salary']

[Age,Experience]-----> new feature----->independent feature

In [202]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=["age","Experience"],outputCol="Independent Features")

In [213]:
output=featureassembler.transform(training)
output.show()

+---------+---+----------+------+--------------------+
|     Name|age|Experience|Salary|Independent Features|
+---------+---+----------+------+--------------------+
|    Krish| 31|        10| 30000|         [31.0,10.0]|
|Sudhanshu| 30|         8| 25000|          [30.0,8.0]|
|    Sunny| 29|         4| 20000|          [29.0,4.0]|
|     Paul| 24|         3| 20000|          [24.0,3.0]|
|   Harsha| 21|         1| 15000|          [21.0,1.0]|
|  Shubham| 23|         2| 18000|          [23.0,2.0]|
+---------+---+----------+------+--------------------+



In [204]:
output.columns

['Name', 'age', 'Experience', 'Salary', 'Independent Features']

In [205]:
finalized_data=output.select("Independent Features", "Salary")
finalized_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



In [206]:
from pyspark.ml.regression import LinearRegression
##train test split
train_data, test_data=finalized_data.randomSplit([0.75,0.25])
regressor=LinearRegression(featuresCol='Independent Features', labelCol='Salary')
regressor=regressor.fit(train_data)

In [208]:
### coefficients
regressor.coefficients

DenseVector([-58.8235, 1666.6667])

In [209]:
### Prediction
pred_results=regressor.evaluate(test_data)

In [210]:
pred_results.predictions.show()

+--------------------+------+------------------+
|Independent Features|Salary|        prediction|
+--------------------+------+------------------+
|          [24.0,3.0]| 20000|18784.313725490192|
|          [30.0,8.0]| 25000|26764.705882352937|
+--------------------+------+------------------+



In [214]:
pred_results.meanAbsoluteError, pred_results.meanSquaredError

(1490.1960784313724, 2296039.984621296)