In [1]:
import pyspark

In [2]:
import pandas as pd
df_pd = pd.read_csv('sample_dataset.csv')
df_pd.head()

Unnamed: 0,Name,gender,age,Experience
0,Tom,Male,67.0,10.0
1,Sarah,Female,61.0,12.0
2,John,Male,80.0,14.0
3,Ann,Female,49.0,13.0
4,Jenny,Female,79.0,16.0


In [3]:
from pyspark.sql import SparkSession

In [4]:
# create a spark session
# you can give any name for session (eg: 'Practice')
spark = SparkSession.builder.appName('Practice').getOrCreate()

In [5]:
spark

when you are executing in local, then always there will be only 1 cluster.
But when you are working in cloud, you can create multiple clusters and instances.

In [6]:
df_pyspark = spark.read.csv('sample_dataset.csv')
df_pyspark
# here it will only prints the columns(but not as actual column names) and their data types

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string]

In [7]:
df_pyspark.show()

+-----+------+----+----------+
|  _c0|   _c1| _c2|       _c3|
+-----+------+----+----------+
| Name|gender| age|Experience|
|  Tom|  Male|  67|        10|
|Sarah|Female|  61|        12|
| John|  Male|  80|        14|
|  Ann|Female|  49|        13|
|Jenny|Female|  79|        16|
| Jack|  Male|  81|        17|
|Waugh|  null|null|        20|
| Andy|  null|  69|        19|
|Marie|  null|null|      null|
+-----+------+----+----------+



In [8]:
df_pyspark = spark.read.option('header', 'true').csv('sample_dataset.csv')
df_pyspark
# here it will prints the columns as actual column names and their data types

DataFrame[Name: string, gender: string, age: string, Experience: string]

In [9]:
df_pyspark.show()

+-----+------+----+----------+
| Name|gender| age|Experience|
+-----+------+----+----------+
|  Tom|  Male|  67|        10|
|Sarah|Female|  61|        12|
| John|  Male|  80|        14|
|  Ann|Female|  49|        13|
|Jenny|Female|  79|        16|
| Jack|  Male|  81|        17|
|Waugh|  null|null|        20|
| Andy|  null|  69|        19|
|Marie|  null|null|      null|
+-----+------+----+----------+



In [10]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [11]:
df_pyspark.head(2)

[Row(Name='Tom', gender='Male', age='67', Experience='10'),
 Row(Name='Sarah', gender='Female', age='61', Experience='12')]

In [12]:
# printSchema is like df.info() in pandas; it tells about your columns
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)



## Perform some basic operations

    - Checking the Datatypes of the Column(Schema)
    - Selecting Columns and Indexing
    - Check Describe option similar to pandas
    - Adding Columns
    - Dropping Columns
    - Renaming Columns

In [13]:
## read the dataset
df_pyspark = spark.read.option('header', 'true').csv('sample_dataset.csv', inferSchema=True)

# here we need to configure 'inferSchema=True' in order to keep the datatypes of columns as in the csv, otherwise it will all appears as 'String' datatype

In [14]:
## check the schema
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



or we can also read the csv as the following

In [15]:
df_pyspark = spark.read.csv('sample_dataset.csv', header=True, inferSchema=True)


In [16]:
## check the schema
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



In [17]:
# get column names
df_pyspark.columns

['Name', 'gender', 'age', 'Experience']

In [18]:
df_pyspark.head(2)

# this will give us the first 2 rows of data but not as dataframe but as a list, (but in pandas it will give us as a dataframe)


[Row(Name='Tom', gender='Male', age=67, Experience=10),
 Row(Name='Sarah', gender='Female', age=61, Experience=12)]

In [19]:
# pick only a single column (i.e. age)
df_age = df_pyspark.select("age")
df_age

# this will return the selected column as a dataframe

DataFrame[age: int]

In [20]:
df_age.show()

+----+
| age|
+----+
|  67|
|  61|
|  80|
|  49|
|  79|
|  81|
|null|
|  69|
|null|
+----+



In [21]:
# select two columns
df_two_col = df_pyspark.select(['Name', 'age'])
df_two_col

DataFrame[Name: string, age: int]

In [22]:
df_two_col.show()

+-----+----+
| Name| age|
+-----+----+
|  Tom|  67|
|Sarah|  61|
| John|  80|
|  Ann|  49|
|Jenny|  79|
| Jack|  81|
|Waugh|null|
| Andy|  69|
|Marie|null|
+-----+----+



In [23]:
# check columns datatypes
df_pyspark.dtypes

[('Name', 'string'),
 ('gender', 'string'),
 ('age', 'int'),
 ('Experience', 'int')]

for simplicity we take 'df_two_col' dataframe as it has only 2 columns

In [24]:
# describe
df_two_col.describe().show()

+-------+-----+------------------+
|summary| Name|               age|
+-------+-----+------------------+
|  count|    9|                 7|
|   mean| null| 69.42857142857143|
| stddev| null|11.773659058213278|
|    min| Andy|                49|
|    max|Waugh|                81|
+-------+-----+------------------+



In [25]:
# adding columns in data frames
df_two_col = df_two_col.withColumn('Age after 2 years', df_two_col['age']+2)
df_two_col

DataFrame[Name: string, age: int, Age after 2 years: int]

In [26]:
df_two_col.show()

+-----+----+-----------------+
| Name| age|Age after 2 years|
+-----+----+-----------------+
|  Tom|  67|               69|
|Sarah|  61|               63|
| John|  80|               82|
|  Ann|  49|               51|
|Jenny|  79|               81|
| Jack|  81|               83|
|Waugh|null|             null|
| Andy|  69|               71|
|Marie|null|             null|
+-----+----+-----------------+



In [27]:
# drop columns
df_two_col = df_two_col.drop('Age after 2 years') # here we can also provide multiple column names to drop multiple columns
df_two_col.show()

+-----+----+
| Name| age|
+-----+----+
|  Tom|  67|
|Sarah|  61|
| John|  80|
|  Ann|  49|
|Jenny|  79|
| Jack|  81|
|Waugh|null|
| Andy|  69|
|Marie|null|
+-----+----+



In [28]:
# rename the columns
df_two_col = df_two_col.withColumnRenamed('Name', 'name')
df_two_col.show()

+-----+----+
| name| age|
+-----+----+
|  Tom|  67|
|Sarah|  61|
| John|  80|
|  Ann|  49|
|Jenny|  79|
| Jack|  81|
|Waugh|null|
| Andy|  69|
|Marie|null|
+-----+----+



### PySpark Handling Missing Values

 - Dropping Rows
 - Varoius parameter in dropping functionalities
 - Handling missing values by mean

In [29]:
# drop all the rows with null/missing values
df = df_pyspark
df = df.na.drop()
df.show()

+-----+------+---+----------+
| Name|gender|age|Experience|
+-----+------+---+----------+
|  Tom|  Male| 67|        10|
|Sarah|Female| 61|        12|
| John|  Male| 80|        14|
|  Ann|Female| 49|        13|
|Jenny|Female| 79|        16|
| Jack|  Male| 81|        17|
+-----+------+---+----------+



#### Parameters in Drop() functions

Here there are different ways to drop null values using na.drop() function with configuring different parameters.

 1. configure parameter '<b>how</b>':

        how=any : drop rows if they even have a single null value 
        
        or 
        
        how=all : drop rows only if they completely have null values                     (it won't drop if it having any single non-null                          value)
        
        * by default this 'how' set to 'any' value

In [30]:
# how==any
# here we drop rows if they even have any single null value
df = df_pyspark
df = df.na.drop(how='any')
df.show()

+-----+------+---+----------+
| Name|gender|age|Experience|
+-----+------+---+----------+
|  Tom|  Male| 67|        10|
|Sarah|Female| 61|        12|
| John|  Male| 80|        14|
|  Ann|Female| 49|        13|
|Jenny|Female| 79|        16|
| Jack|  Male| 81|        17|
+-----+------+---+----------+



In [31]:
df_pyspark.count()

9

In [32]:
# how==all
# here we drop rows if they completely have null values
df = df_pyspark
df = df.na.drop(how='all')
df.show()

+-----+------+----+----------+
| Name|gender| age|Experience|
+-----+------+----+----------+
|  Tom|  Male|  67|        10|
|Sarah|Female|  61|        12|
| John|  Male|  80|        14|
|  Ann|Female|  49|        13|
|Jenny|Female|  79|        16|
| Jack|  Male|  81|        17|
|Waugh|  null|null|        20|
| Andy|  null|  69|        19|
|Marie|  null|null|      null|
+-----+------+----+----------+



In [33]:
df.count()

9

2. configure threshold '<b>thresh</b>':

       eg: 
       thresh=2 : 
       
atleast 2 non-null values should be present
(if any row has atleast 2 non-null values regardless of how many null values present it will not drop that row. 

But any row doesn't have atleast 2 non-null values, it will drop

In [34]:
# drop rows if they don't have atleast 2 non-null values
df = df_pyspark
df = df.na.drop(how='any', thresh=2)
df.show()

+-----+------+----+----------+
| Name|gender| age|Experience|
+-----+------+----+----------+
|  Tom|  Male|  67|        10|
|Sarah|Female|  61|        12|
| John|  Male|  80|        14|
|  Ann|Female|  49|        13|
|Jenny|Female|  79|        16|
| Jack|  Male|  81|        17|
|Waugh|  null|null|        20|
| Andy|  null|  69|        19|
+-----+------+----+----------+



In [35]:
df.count()

8

3. Configure <b>subset</b>:

    this used to drop rows based on null values in specific columns defined

In [36]:
# drop rows if column 'age' has null values
df = df_pyspark
df = df.na.drop(how='any', subset=['age'])
df.show()

+-----+------+---+----------+
| Name|gender|age|Experience|
+-----+------+---+----------+
|  Tom|  Male| 67|        10|
|Sarah|Female| 61|        12|
| John|  Male| 80|        14|
|  Ann|Female| 49|        13|
|Jenny|Female| 79|        16|
| Jack|  Male| 81|        17|
| Andy|  null| 69|        19|
+-----+------+---+----------+



In [37]:
df.count()

7

### Filling missing values

#### fill() method:

this has 2 parameters;
1. value: the value to be replaced
2. subset: if any specific column(or columns) to be applied

In [38]:
# fill missing values with value of 'sample val'
df = df_pyspark
df = df.na.fill('sample val')
df.show()

+-----+----------+----+----------+
| Name|    gender| age|Experience|
+-----+----------+----+----------+
|  Tom|      Male|  67|        10|
|Sarah|    Female|  61|        12|
| John|      Male|  80|        14|
|  Ann|    Female|  49|        13|
|Jenny|    Female|  79|        16|
| Jack|      Male|  81|        17|
|Waugh|sample val|null|        20|
| Andy|sample val|  69|        19|
|Marie|sample val|null|      null|
+-----+----------+----+----------+



In [39]:
# fill missing values with value of 'sample val' for 'age' column
df = df_pyspark
df = df.na.fill('sample val', 'age')
df.show()

+-----+------+----+----------+
| Name|gender| age|Experience|
+-----+------+----+----------+
|  Tom|  Male|  67|        10|
|Sarah|Female|  61|        12|
| John|  Male|  80|        14|
|  Ann|Female|  49|        13|
|Jenny|Female|  79|        16|
| Jack|  Male|  81|        17|
|Waugh|  null|null|        20|
| Andy|  null|  69|        19|
|Marie|  null|null|      null|
+-----+------+----+----------+



In [40]:
# fill missing values with value of 'sample val' for multiple columns
df = df_pyspark
df = df.na.fill('sample val', ['age','Experience'])
df.show()

+-----+------+----+----------+
| Name|gender| age|Experience|
+-----+------+----+----------+
|  Tom|  Male|  67|        10|
|Sarah|Female|  61|        12|
| John|  Male|  80|        14|
|  Ann|Female|  49|        13|
|Jenny|Female|  79|        16|
| Jack|  Male|  81|        17|
|Waugh|  null|null|        20|
| Andy|  null|  69|        19|
|Marie|  null|null|      null|
+-----+------+----+----------+



In [41]:
# fill null values with mean
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['age', 'Experience'],
    outputCols=["{}_imputed".format(c) for c in ['age', 'Experience']]
    ).setStrategy("mean")


In [42]:
# add imputation cols to df
imputer.fit(df_pyspark).transform(df_pyspark).show()

+-----+------+----+----------+-----------+------------------+
| Name|gender| age|Experience|age_imputed|Experience_imputed|
+-----+------+----+----------+-----------+------------------+
|  Tom|  Male|  67|        10|         67|                10|
|Sarah|Female|  61|        12|         61|                12|
| John|  Male|  80|        14|         80|                14|
|  Ann|Female|  49|        13|         49|                13|
|Jenny|Female|  79|        16|         79|                16|
| Jack|  Male|  81|        17|         81|                17|
|Waugh|  null|null|        20|         69|                20|
| Andy|  null|  69|        19|         69|                19|
|Marie|  null|null|      null|         69|                15|
+-----+------+----+----------+-----------+------------------+



## Filter Operations

In [43]:
df_pyspark.show()

+-----+------+----+----------+
| Name|gender| age|Experience|
+-----+------+----+----------+
|  Tom|  Male|  67|        10|
|Sarah|Female|  61|        12|
| John|  Male|  80|        14|
|  Ann|Female|  49|        13|
|Jenny|Female|  79|        16|
| Jack|  Male|  81|        17|
|Waugh|  null|null|        20|
| Andy|  null|  69|        19|
|Marie|  null|null|      null|
+-----+------+----+----------+



In [44]:
df = df_pyspark
df = df.na.drop()
df.show()

+-----+------+---+----------+
| Name|gender|age|Experience|
+-----+------+---+----------+
|  Tom|  Male| 67|        10|
|Sarah|Female| 61|        12|
| John|  Male| 80|        14|
|  Ann|Female| 49|        13|
|Jenny|Female| 79|        16|
| Jack|  Male| 81|        17|
+-----+------+---+----------+



#### 1st method

In [45]:
# filter records in which the age of the people less than or equal 70
# 1st method
df.filter('age<=70').show()

+-----+------+---+----------+
| Name|gender|age|Experience|
+-----+------+---+----------+
|  Tom|  Male| 67|        10|
|Sarah|Female| 61|        12|
|  Ann|Female| 49|        13|
+-----+------+---+----------+



In [47]:
# if need only certain columns (i.e. Name, age) to display in filtered records
df.filter('age<=70').select(['Name', 'Experience']).show()

+-----+----------+
| Name|Experience|
+-----+----------+
|  Tom|        10|
|Sarah|        12|
|  Ann|        13|
+-----+----------+



#### 2nd method

In [48]:
# filter records in which the age of the people less than or equal 70
# 2nd method
df.filter(df['age']<=70).show()

+-----+------+---+----------+
| Name|gender|age|Experience|
+-----+------+---+----------+
|  Tom|  Male| 67|        10|
|Sarah|Female| 61|        12|
|  Ann|Female| 49|        13|
+-----+------+---+----------+



In [49]:
# using multiple conditions
df.filter((df['age']>=70) & 
          (df['Experience']>=15)).show()

+-----+------+---+----------+
| Name|gender|age|Experience|
+-----+------+---+----------+
|Jenny|Female| 79|        16|
| Jack|  Male| 81|        17|
+-----+------+---+----------+



In [50]:
# using multiple conditions
df.filter((df['age']>=70) | 
          (df['Experience']>=15)).show()

+-----+------+---+----------+
| Name|gender|age|Experience|
+-----+------+---+----------+
| John|  Male| 80|        14|
|Jenny|Female| 79|        16|
| Jack|  Male| 81|        17|
+-----+------+---+----------+



In [52]:
# inverse operation
df.filter(~(df['age']<=70)).show()

+-----+------+---+----------+
| Name|gender|age|Experience|
+-----+------+---+----------+
| John|  Male| 80|        14|
|Jenny|Female| 79|        16|
| Jack|  Male| 81|        17|
+-----+------+---+----------+



## PySpark GroupBy and Aggregate Functions

In [53]:
df_emp = spark.read.csv('sample_employee_dataset.csv', header=True, inferSchema=True)


In [54]:
df_emp.show()

+-----+------------+------+
| Name| Departments|Salary|
+-----+------------+------+
| Jane|Data Science| 10000|
| Jane|         IOT|  5000|
| Lisa|    Big Data|  4000|
| Jane|    Big Data|  4000|
| Lisa|Data Science|  3000|
|  Roy|Data Science| 20000|
|  Roy|         IOT| 10000|
|  Roy|    Big Data|  5000|
|Alexy|Data Science| 10000|
|Alexy|    Big Data|  4000|
+-----+------------+------+



In [55]:
df_emp.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- Salary: integer (nullable = true)



- groupby and aggregate works together

- first we apply groupby and then we need to apply aggregate

In [56]:
# group by 'Names' considering the sum of salaries
# (here only column 'salary' has int values)
df_emp.groupBy('Name').sum().show()

+-----+-----------+
| Name|sum(Salary)|
+-----+-----------+
|  Roy|      35000|
|Alexy|      14000|
| Jane|      19000|
| Lisa|       7000|
+-----+-----------+



In [64]:
# identify maximum salaries of each person
df_emp.groupBy('Name').max().show()

+-----+-----------+
| Name|max(Salary)|
+-----+-----------+
|  Roy|      20000|
|Alexy|      10000|
| Jane|      10000|
| Lisa|       4000|
+-----+-----------+



In [65]:
# identify min salaries of each person
df_emp.groupBy('Name').min().show()

+-----+-----------+
| Name|min(Salary)|
+-----+-----------+
|  Roy|       5000|
|Alexy|       4000|
| Jane|       4000|
| Lisa|       3000|
+-----+-----------+



In [57]:
# Groupby Departments considering the sum of salaries
df_emp.groupBy('Departments').sum().show()

+------------+-----------+
| Departments|sum(Salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      17000|
|Data Science|      43000|
+------------+-----------+



In [58]:
# mean
df_emp.groupBy('Departments').mean().show()

+------------+-----------+
| Departments|avg(Salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     4250.0|
|Data Science|    10750.0|
+------------+-----------+



#how many number of employees are working on each department
df_emp.groupBy('Departments').count().show()

#### Directly using aggregate function

In [60]:
# get the total value of salries
df_emp.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|      75000|
+-----------+

