# Pyspark

In this tutorial I will be learning following:

- various ways of loading the dataset into pyspark frame
- doing various operations such as data cleaning, adding columns etc.
- in the end doing some machine learning practices

In [1]:
## Let's first import the pyspark library
import pyspark

In [2]:
## then imporing the SparkSession to start the pyspark session
from pyspark.sql import SparkSession

In [3]:
## next step is to create a spark session using 'SparkSession'
spark = SparkSession.builder.appName('Dataframe').getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/06 10:20:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### There are various way one can laod a dataset in pyspark

In [4]:
## read the dataset
df_pyspark = spark.read.option('header', 'true').csv('test.csv', inferSchema=True)

                                                                                

In [5]:
## let's check schema
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



In [6]:
## second method to load the dataset
df_pyspark = spark.read.csv('test.csv', header=True, inferSchema=True)
df_pyspark.show()

                                                                                

+-------+---+----------+
|   Name|Age|Experience|
+-------+---+----------+
|  Sunny| 24|         3|
|   Ravi| 29|         4|
|Praveen| 22|         2|
|   Ayan| 19|         1|
|Sarvesh| 32|         6|
|   Gyan| 26|         2|
|Sreyash| 27|         3|
|Parvati| 24|         1|
|Parmesh| 28|         4|
+-------+---+----------+



In [7]:
## check the schema
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



### selecting columns and indexing

In [8]:
df_pyspark.head(3)

[Row(Name='Sunny', Age=24, Experience=3),
 Row(Name='Ravi', Age=29, Experience=4),
 Row(Name='Praveen', Age=22, Experience=2)]

In [9]:
df_pyspark.show()

+-------+---+----------+
|   Name|Age|Experience|
+-------+---+----------+
|  Sunny| 24|         3|
|   Ravi| 29|         4|
|Praveen| 22|         2|
|   Ayan| 19|         1|
|Sarvesh| 32|         6|
|   Gyan| 26|         2|
|Sreyash| 27|         3|
|Parvati| 24|         1|
|Parmesh| 28|         4|
+-------+---+----------+



### selecting the Name column

In [10]:
df_pyspark.select('Name').show()

+-------+
|   Name|
+-------+
|  Sunny|
|   Ravi|
|Praveen|
|   Ayan|
|Sarvesh|
|   Gyan|
|Sreyash|
|Parvati|
|Parmesh|
+-------+



### selecting multiple columns

To access a column named 'Experience' in a PySpark DataFrame, we can use the `select` method or `indexing` directly into the DataFrame. Both of these methods will return a DataFrame containing only the 'Experience' column. 

In [11]:
## using select function
df_pyspark.select(['Experience']).show()

+----------+
|Experience|
+----------+
|         3|
|         4|
|         2|
|         1|
|         6|
|         2|
|         3|
|         1|
|         4|
+----------+



In [12]:
## using indexing
df_pyspark['Experience']

Column<'Experience'>

In [13]:
df_pyspark.select(['Name','Experience']).show()

+-------+----------+
|   Name|Experience|
+-------+----------+
|  Sunny|         3|
|   Ravi|         4|
|Praveen|         2|
|   Ayan|         1|
|Sarvesh|         6|
|   Gyan|         2|
|Sreyash|         3|
|Parvati|         1|
|Parmesh|         4|
+-------+----------+



### Datatypes

In [14]:
df_pyspark.dtypes

[('Name', 'string'), ('Age', 'int'), ('Experience', 'int')]

### pyspark describe

In [15]:
df_pyspark.describe()

DataFrame[summary: string, Name: string, Age: string, Experience: string]

In [16]:
df_pyspark.describe().show()

[Stage 10:>                                                         (0 + 1) / 1]

+-------+-----+------------------+------------------+
|summary| Name|               Age|        Experience|
+-------+-----+------------------+------------------+
|  count|    9|                 9|                 9|
|   mean| NULL|25.666666666666668| 2.888888888888889|
| stddev| NULL| 3.905124837953327|1.6158932858054431|
|    min| Ayan|                19|                 1|
|    max|Sunny|                32|                 6|
+-------+-----+------------------+------------------+



                                                                                

### adding and deleting columns

In [17]:
## adding a column to the dataframe
df_pyspark = df_pyspark.withColumn('Experience After 2 years', df_pyspark['Experience']+2)

In [18]:
df_pyspark.show()

+-------+---+----------+------------------------+
|   Name|Age|Experience|Experience After 2 years|
+-------+---+----------+------------------------+
|  Sunny| 24|         3|                       5|
|   Ravi| 29|         4|                       6|
|Praveen| 22|         2|                       4|
|   Ayan| 19|         1|                       3|
|Sarvesh| 32|         6|                       8|
|   Gyan| 26|         2|                       4|
|Sreyash| 27|         3|                       5|
|Parvati| 24|         1|                       3|
|Parmesh| 28|         4|                       6|
+-------+---+----------+------------------------+



In [19]:
## drop the column
df_pyspark = df_pyspark.drop('Experience After 2 years')

In [20]:
df_pyspark.show()

+-------+---+----------+
|   Name|Age|Experience|
+-------+---+----------+
|  Sunny| 24|         3|
|   Ravi| 29|         4|
|Praveen| 22|         2|
|   Ayan| 19|         1|
|Sarvesh| 32|         6|
|   Gyan| 26|         2|
|Sreyash| 27|         3|
|Parvati| 24|         1|
|Parmesh| 28|         4|
+-------+---+----------+



### Renaming the columns

In [21]:
df_pyspark.withColumnRenamed('Name', 'New Name').show()

+--------+---+----------+
|New Name|Age|Experience|
+--------+---+----------+
|   Sunny| 24|         3|
|    Ravi| 29|         4|
| Praveen| 22|         2|
|    Ayan| 19|         1|
| Sarvesh| 32|         6|
|    Gyan| 26|         2|
| Sreyash| 27|         3|
| Parvati| 24|         1|
| Parmesh| 28|         4|
+--------+---+----------+



# Pyspark Handling Missing values and some statistics
- Dropping Columns
- Dropping Rows
- Various parameter in Dropping funcionalities
- Handling Missing values by mean, median and mode

In [4]:
## read the dataset
df_pyspark1 = spark.read.csv('test1.csv', header=True, inferSchema=True)

                                                                                

In [5]:
df_pyspark1.show()

+-------+----+----------+------+
|   Name| Age|Experience|Salary|
+-------+----+----------+------+
|  Sunny|  24|         3| 30000|
|   Ravi|  29|         4| 32000|
|Praveen|  22|         2| 25000|
|   Ayan|  19|         1| 23000|
|Sarvesh|  32|         6| 35000|
|   Gyan|  26|         2| 31000|
|Sreyash|  27|         3| 29500|
|Parvati|  24|         1| 23000|
|Parmesh|  28|         4| 24000|
| Mahesh|NULL|      NULL| 40000|
|   NULL|  34|        10| 38000|
|   NULL|  36|      NULL|  NULL|
+-------+----+----------+------+



In [6]:
## dropo the column
df_pyspark1.drop('Name').show()

+----+----------+------+
| Age|Experience|Salary|
+----+----------+------+
|  24|         3| 30000|
|  29|         4| 32000|
|  22|         2| 25000|
|  19|         1| 23000|
|  32|         6| 35000|
|  26|         2| 31000|
|  27|         3| 29500|
|  24|         1| 23000|
|  28|         4| 24000|
|NULL|      NULL| 40000|
|  34|        10| 38000|
|  36|      NULL|  NULL|
+----+----------+------+



In [7]:
## dropping Nan values
df_pyspark1.na.drop().show()

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 24|         3| 30000|
|   Ravi| 29|         4| 32000|
|Praveen| 22|         2| 25000|
|   Ayan| 19|         1| 23000|
|Sarvesh| 32|         6| 35000|
|   Gyan| 26|         2| 31000|
|Sreyash| 27|         3| 29500|
|Parvati| 24|         1| 23000|
|Parmesh| 28|         4| 24000|
+-------+---+----------+------+



Normally the drop function requires folloing:

`.drop(how='any',thresh=None,subset=None)`

In [8]:
## any ==how
df_pyspark1.na.drop(how='any').show() # it will delete any row with nan values

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 24|         3| 30000|
|   Ravi| 29|         4| 32000|
|Praveen| 22|         2| 25000|
|   Ayan| 19|         1| 23000|
|Sarvesh| 32|         6| 35000|
|   Gyan| 26|         2| 31000|
|Sreyash| 27|         3| 29500|
|Parvati| 24|         1| 23000|
|Parmesh| 28|         4| 24000|
+-------+---+----------+------+



In [9]:
## threshold
df_pyspark1.na.drop(how='any', thresh =2).show() # it will delete row with atleast 2 non `nan` values

+-------+----+----------+------+
|   Name| Age|Experience|Salary|
+-------+----+----------+------+
|  Sunny|  24|         3| 30000|
|   Ravi|  29|         4| 32000|
|Praveen|  22|         2| 25000|
|   Ayan|  19|         1| 23000|
|Sarvesh|  32|         6| 35000|
|   Gyan|  26|         2| 31000|
|Sreyash|  27|         3| 29500|
|Parvati|  24|         1| 23000|
|Parmesh|  28|         4| 24000|
| Mahesh|NULL|      NULL| 40000|
|   NULL|  34|        10| 38000|
+-------+----+----------+------+



In [10]:
## threshold
df_pyspark1.na.drop(how='any', thresh =3).show() # it will delete row with atleast 3 non nan values

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 24|         3| 30000|
|   Ravi| 29|         4| 32000|
|Praveen| 22|         2| 25000|
|   Ayan| 19|         1| 23000|
|Sarvesh| 32|         6| 35000|
|   Gyan| 26|         2| 31000|
|Sreyash| 27|         3| 29500|
|Parvati| 24|         1| 23000|
|Parmesh| 28|         4| 24000|
|   NULL| 34|        10| 38000|
+-------+---+----------+------+



In [11]:
## subset
## threshold
df_pyspark1.na.drop(how='any', subset=['Experience']).show() # deleting only for Experience column

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 24|         3| 30000|
|   Ravi| 29|         4| 32000|
|Praveen| 22|         2| 25000|
|   Ayan| 19|         1| 23000|
|Sarvesh| 32|         6| 35000|
|   Gyan| 26|         2| 31000|
|Sreyash| 27|         3| 29500|
|Parvati| 24|         1| 23000|
|Parmesh| 28|         4| 24000|
|   NULL| 34|        10| 38000|
+-------+---+----------+------+



In [12]:
df_pyspark1.show()

+-------+----+----------+------+
|   Name| Age|Experience|Salary|
+-------+----+----------+------+
|  Sunny|  24|         3| 30000|
|   Ravi|  29|         4| 32000|
|Praveen|  22|         2| 25000|
|   Ayan|  19|         1| 23000|
|Sarvesh|  32|         6| 35000|
|   Gyan|  26|         2| 31000|
|Sreyash|  27|         3| 29500|
|Parvati|  24|         1| 23000|
|Parmesh|  28|         4| 24000|
| Mahesh|NULL|      NULL| 40000|
|   NULL|  34|        10| 38000|
|   NULL|  36|      NULL|  NULL|
+-------+----+----------+------+



In [13]:
## filling the missing value
df_pyspark1.na.fill('Missing Values').show()

+--------------+----+----------+------+
|          Name| Age|Experience|Salary|
+--------------+----+----------+------+
|         Sunny|  24|         3| 30000|
|          Ravi|  29|         4| 32000|
|       Praveen|  22|         2| 25000|
|          Ayan|  19|         1| 23000|
|       Sarvesh|  32|         6| 35000|
|          Gyan|  26|         2| 31000|
|       Sreyash|  27|         3| 29500|
|       Parvati|  24|         1| 23000|
|       Parmesh|  28|         4| 24000|
|        Mahesh|NULL|      NULL| 40000|
|Missing Values|  34|        10| 38000|
|Missing Values|  36|      NULL|  NULL|
+--------------+----+----------+------+



`the fill value doesnot work. I dont know why.`

In [22]:
## filling the missing value
df_pyspark1.na.fill('Missing Values', subset = ['Name', 'Age', 'Experience', 'Salary']).show()

+--------------+----+----------+------+
|          Name| Age|Experience|Salary|
+--------------+----+----------+------+
|         Sunny|  24|         3| 30000|
|          Ravi|  29|         4| 32000|
|       Praveen|  22|         2| 25000|
|          Ayan|  19|         1| 23000|
|       Sarvesh|  32|         6| 35000|
|          Gyan|  26|         2| 31000|
|       Sreyash|  27|         3| 29500|
|       Parvati|  24|         1| 23000|
|       Parmesh|  28|         4| 24000|
|        Mahesh|NULL|      NULL| 40000|
|Missing Values|  34|        10| 38000|
|Missing Values|  36|      NULL|  NULL|
+--------------+----+----------+------+



In [23]:
df_pyspark1.show()

+--------------+----+----------+------+
|          Name| Age|Experience|Salary|
+--------------+----+----------+------+
|         Sunny|  24|         3| 30000|
|          Ravi|  29|         4| 32000|
|       Praveen|  22|         2| 25000|
|          Ayan|  19|         1| 23000|
|       Sarvesh|  32|         6| 35000|
|          Gyan|  26|         2| 31000|
|       Sreyash|  27|         3| 29500|
|       Parvati|  24|         1| 23000|
|       Parmesh|  28|         4| 24000|
|        Mahesh|NULL|      NULL| 40000|
|Missing Values|  34|        10| 38000|
|Missing Values|  36|      NULL|  NULL|
+--------------+----+----------+------+



In [24]:
## replacing with the mean 
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['Age', 'Experience', 'Salary'],
    outputCols = ["{}_imputed".format(c) for c in ['Age', 'Experience', 'Salary']]
).setStrategy("mean")

In [25]:
imputer.fit(df_pyspark1).transform(df_pyspark1).show()

+--------------+----+----------+------+-----------+------------------+--------------+
|          Name| Age|Experience|Salary|Age_imputed|Experience_imputed|Salary_imputed|
+--------------+----+----------+------+-----------+------------------+--------------+
|         Sunny|  24|         3| 30000|         24|                 3|         30000|
|          Ravi|  29|         4| 32000|         29|                 4|         32000|
|       Praveen|  22|         2| 25000|         22|                 2|         25000|
|          Ayan|  19|         1| 23000|         19|                 1|         23000|
|       Sarvesh|  32|         6| 35000|         32|                 6|         35000|
|          Gyan|  26|         2| 31000|         26|                 2|         31000|
|       Sreyash|  27|         3| 29500|         27|                 3|         29500|
|       Parvati|  24|         1| 23000|         24|                 1|         23000|
|       Parmesh|  28|         4| 24000|         28|   

In [26]:
imputer_median = Imputer(
    inputCols = ['Age', 'Experience', 'Salary'],
    outputCols = ["{}_imputed".format(c) for c in ['Age', 'Experience', 'Salary']]
).setStrategy("median")

In [27]:
imputer_median.fit(df_pyspark1).transform(df_pyspark1).show()

+--------------+----+----------+------+-----------+------------------+--------------+
|          Name| Age|Experience|Salary|Age_imputed|Experience_imputed|Salary_imputed|
+--------------+----+----------+------+-----------+------------------+--------------+
|         Sunny|  24|         3| 30000|         24|                 3|         30000|
|          Ravi|  29|         4| 32000|         29|                 4|         32000|
|       Praveen|  22|         2| 25000|         22|                 2|         25000|
|          Ayan|  19|         1| 23000|         19|                 1|         23000|
|       Sarvesh|  32|         6| 35000|         32|                 6|         35000|
|          Gyan|  26|         2| 31000|         26|                 2|         31000|
|       Sreyash|  27|         3| 29500|         27|                 3|         29500|
|       Parvati|  24|         1| 23000|         24|                 1|         23000|
|       Parmesh|  28|         4| 24000|         28|   

# Pyspark Dataframes

- Filter Operation
- &, |, ==
- ~

In [28]:
## second method to load the dataset
df_pyspark2 = spark.read.csv('test2.csv', header=True, inferSchema=True)
df_pyspark2.show()

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 24|         3| 30000|
|   Ravi| 29|         4| 32000|
|Praveen| 22|         2| 25000|
|   Ayan| 19|         1| 23000|
|Sarvesh| 32|         6| 35000|
|   Gyan| 26|         2| 31000|
|Sreyash| 27|         3| 29500|
|Parvati| 24|         1| 23000|
|Parmesh| 28|         4| 24000|
+-------+---+----------+------+



### filter operaition

In [32]:
## salary of the people less that or equal to 20000
df_pyspark2.filter(df_pyspark2['Salary'] <= 25000).show()

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|Praveen| 22|         2| 25000|
|   Ayan| 19|         1| 23000|
|Parvati| 24|         1| 23000|
|Parmesh| 28|         4| 24000|
+-------+---+----------+------+



In [33]:
df_pyspark2.filter(df_pyspark2['Salary'] <= 25000).select(['Name','Age']).show()

+-------+---+
|   Name|Age|
+-------+---+
|Praveen| 22|
|   Ayan| 19|
|Parvati| 24|
|Parmesh| 28|
+-------+---+



this can also be written as

In [41]:
df_pyspark2.filter((df_pyspark2['Salary'] <= 25000) & 
                   (df_pyspark2['Salary']>=15000)).select(['Name','Age']).show()

+-------+---+
|   Name|Age|
+-------+---+
|Praveen| 22|
|   Ayan| 19|
|Parvati| 24|
|Parmesh| 28|
+-------+---+



In [42]:
## Not operation
df_pyspark2.filter(~(df_pyspark2['Salary'] <= 25000)).select(['Name','Age']).show()

+-------+---+
|   Name|Age|
+-------+---+
|  Sunny| 24|
|   Ravi| 29|
|Sarvesh| 32|
|   Gyan| 26|
|Sreyash| 27|
+-------+---+



# Pyspark GroupBy and Aggregate Functions

In [43]:
## second method to load the dataset
df_pyspark3 = spark.read.csv('test3.csv', header=True, inferSchema=True)
df_pyspark3.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [44]:
df_pyspark3.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- salary: integer (nullable = true)



In [47]:
## Groupby operations
### Mean average salalry
df_pyspark3.groupBy('Name').sum().show()

+---------+-----------+
|     Name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



In [48]:
## groupby Departments which gives maximum salalry
df_pyspark3.groupBy('Departments').sum().show()

+------------+-----------+
| Departments|sum(salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+



In [49]:
## groupby Departments which gives mean salalry
df_pyspark3.groupBy('Departments').mean().show()

+------------+-----------+
| Departments|avg(salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



In [50]:
## how many employess working in a department
df_pyspark3.groupBy('Departments').count().show()

+------------+-----+
| Departments|count|
+------------+-----+
|         IOT|    2|
|    Big Data|    4|
|Data Science|    4|
+------------+-----+



In [51]:
df_pyspark3.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|      73000|
+-----------+



In [52]:
# maximum slaalry of a employee using agg function

df_pyspark3.groupBy('Name').max().show()

+---------+-----------+
|     Name|max(salary)|
+---------+-----------+
|Sudhanshu|      20000|
|    Sunny|      10000|
|    Krish|      10000|
|   Mahesh|       4000|
+---------+-----------+

