## Basic of Pyspark

### This notebook covers below topics - 

## PART - 1
#### - Pyspark Dataframe
#### - Reading the dataset
#### - Checking the datatypes (Schemas)
#### - Selecting columns
#### - Check describe
#### - Adding columns
#### - Dropping columns
#### - Renaming columns

Note: Set up an virtual environment first before starting the session

In [1]:
import pyspark

In [27]:
import pandas as pd 

df = pd.read_csv("test.csv")
df

Unnamed: 0,Name,Age
0,Ironman,26
1,Aquaman,25
2,Spiderman,22
3,Hulk,28


In [5]:
from pyspark.sql import SparkSession

### Starting the Pyspark session

In [15]:
spark = SparkSession.builder.appName('Practice').getOrCreate()

In [16]:
spark

In [40]:
df_pyspark = spark.read.csv('test.csv')

In [41]:
df_pyspark

DataFrame[_c0: string, _c1: string, _c2: string]

In [42]:
df_pyspark.show()

+---------+---+----------+
|      _c0|_c1|       _c2|
+---------+---+----------+
|     Name|Age|Experience|
|  Ironman| 26|        10|
|  Aquaman| 25|         2|
|Spiderman| 22|         4|
|     Hulk| 28|         6|
+---------+---+----------+



## Read the dataset

In [46]:
df_pyspark = spark.read.option('header','true').csv('test.csv',inferSchema=True)

In [47]:
df_pyspark.show()

+---------+---+----------+
|     Name|Age|Experience|
+---------+---+----------+
|  Ironman| 26|        10|
|  Aquaman| 25|         2|
|Spiderman| 22|         4|
|     Hulk| 28|         6|
+---------+---+----------+



## Check the Schema (datatypes)

In [48]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



In [75]:
df_pyspark = spark.read.csv('test.csv',header=True,inferSchema=True)
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



### Selecting specific column

In [63]:
df_pyspark.select('Name','Experience').show()

+---------+----------+
|     Name|Experience|
+---------+----------+
|  Ironman|        10|
|  Aquaman|         2|
|Spiderman|         4|
|     Hulk|         6|
+---------+----------+



In [67]:
df_pyspark.select(['Name']).show()

+---------+
|     Name|
+---------+
|  Ironman|
|  Aquaman|
|Spiderman|
|     Hulk|
+---------+



In [68]:
df_pyspark.dtypes

[('Name', 'string'), ('Age', 'int'), ('Experience', 'int')]

In [70]:
df_pyspark.describe().show()

+-------+---------+------------------+-----------------+
|summary|     Name|               Age|       Experience|
+-------+---------+------------------+-----------------+
|  count|        4|                 4|                4|
|   mean|     NULL|             25.25|              5.5|
| stddev|     NULL|2.5000000000000004|3.415650255319866|
|    min|  Aquaman|                22|                2|
|    max|Spiderman|                28|               10|
+-------+---------+------------------+-----------------+



### Adding columns

In [76]:
df_pyspark = df_pyspark.withColumn('Experience after 2 years',df_pyspark['Experience']+2)

In [77]:
df_pyspark.show()

+---------+---+----------+------------------------+
|     Name|Age|Experience|Experience after 2 years|
+---------+---+----------+------------------------+
|  Ironman| 26|        10|                      12|
|  Aquaman| 25|         2|                       4|
|Spiderman| 22|         4|                       6|
|     Hulk| 28|         6|                       8|
+---------+---+----------+------------------------+



### Drop the columns

In [80]:
df_pyspark = df_pyspark.drop('Experience after 2 years')

In [82]:
df_pyspark.show()

+---------+---+----------+
|     Name|Age|Experience|
+---------+---+----------+
|  Ironman| 26|        10|
|  Aquaman| 25|         2|
|Spiderman| 22|         4|
|     Hulk| 28|         6|
+---------+---+----------+



### Rename the column

In [84]:
df_pyspark.withColumnRenamed('name','New Name').show()

+---------+---+----------+
| New Name|Age|Experience|
+---------+---+----------+
|  Ironman| 26|        10|
|  Aquaman| 25|         2|
|Spiderman| 22|         4|
|     Hulk| 28|         6|
+---------+---+----------+



## PART - 2

### - Dropping rows and columns
### - Various parameter in dropping functionalities
### - Handling missing values (mean, median, mode)

In [92]:
df = spark.read.csv('test2.csv',header=True,inferSchema=True)  ## Using other sample csv file (test2.csv)
df.show()

+-----------+----+----------+------+
|       name| age|experience|salary|
+-----------+----+----------+------+
|       hulk|  28|         8| 50000|
|   superman|  38|        12| 80000|
|wonderwoman|  25|         5| 48000|
|       thor|NULL|      NULL| 40000|
|       NULL|  23|         3| 28000|
|       NULL|  29|      NULL|  NULL|
+-----------+----+----------+------+



In [93]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)
 |-- salary: integer (nullable = true)



In [89]:
df.describe().show()

+-------+-----------+-----------------+------------------+------------------+
|summary|       name|              age|        experience|            salary|
+-------+-----------+-----------------+------------------+------------------+
|  count|          4|                5|                 4|                 5|
|   mean|       NULL|             28.6|               7.0|           49200.0|
| stddev|       NULL|5.770615218501403|3.9157800414902435|19266.551326067674|
|    min|       hulk|               23|                 3|             28000|
|    max|wonderwoman|               38|                12|             80000|
+-------+-----------+-----------------+------------------+------------------+



### Drop the column

In [91]:
df.drop('name').show()

+----+----------+------+
| age|experience|salary|
+----+----------+------+
|  28|         8| 50000|
|  38|        12| 80000|
|  25|         5| 48000|
|NULL|      NULL| 40000|
|  23|         3| 28000|
|  29|      NULL|  NULL|
+----+----------+------+



### Treating Null values

In [95]:
## If inside drop nothing is passed, then it will drop all the rows that contains any null values.

df.na.drop().show()

+-----------+---+----------+------+
|       name|age|experience|salary|
+-----------+---+----------+------+
|       hulk| 28|         8| 50000|
|   superman| 38|        12| 80000|
|wonderwoman| 25|         5| 48000|
+-----------+---+----------+------+



In [97]:
## In drop different parameters can be passed. First considering 'how==all'. This will drop the rows that has all the values as null.

df.na.drop(how="all").show()

+-----------+----+----------+------+
|       name| age|experience|salary|
+-----------+----+----------+------+
|       hulk|  28|         8| 50000|
|   superman|  38|        12| 80000|
|wonderwoman|  25|         5| 48000|
|       thor|NULL|      NULL| 40000|
|       NULL|  23|         3| 28000|
|       NULL|  29|      NULL|  NULL|
+-----------+----+----------+------+



In [101]:
## threshold --> the number denotes non-null values. 

df.na.drop(how='any',thresh=2).show()

+-----------+----+----------+------+
|       name| age|experience|salary|
+-----------+----+----------+------+
|       hulk|  28|         8| 50000|
|   superman|  38|        12| 80000|
|wonderwoman|  25|         5| 48000|
|       thor|NULL|      NULL| 40000|
|       NULL|  23|         3| 28000|
+-----------+----+----------+------+



In [105]:
## Subset --> Can define a column in which whereever 'NA' is there will get removed.

df.na.drop(how='any',subset=['age']).show()

+-----------+---+----------+------+
|       name|age|experience|salary|
+-----------+---+----------+------+
|       hulk| 28|         8| 50000|
|   superman| 38|        12| 80000|
|wonderwoman| 25|         5| 48000|
|       NULL| 23|         3| 28000|
|       NULL| 29|      NULL|  NULL|
+-----------+---+----------+------+



### Filling missing values

In [111]:
df.na.fill('Missing values','name').show()

+--------------+----+----------+------+
|          name| age|experience|salary|
+--------------+----+----------+------+
|          hulk|  28|         8| 50000|
|      superman|  38|        12| 80000|
|   wonderwoman|  25|         5| 48000|
|          thor|NULL|      NULL| 40000|
|Missing values|  23|         3| 28000|
|Missing values|  29|      NULL|  NULL|
+--------------+----+----------+------+



In [113]:
df.show()

+-----------+----+----------+------+
|       name| age|experience|salary|
+-----------+----+----------+------+
|       hulk|  28|         8| 50000|
|   superman|  38|        12| 80000|
|wonderwoman|  25|         5| 48000|
|       thor|NULL|      NULL| 40000|
|       NULL|  23|         3| 28000|
|       NULL|  29|      NULL|  NULL|
+-----------+----+----------+------+



In [119]:
## Use an imputer function to replace the null values by mean values. Strategy can be changed accordingly.

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['age','experience','salary'],
    outputCols = ["{}_imputed".format(c) for c in ['age','experience','salary']]
).setStrategy("mean")

In [120]:
## Add imputation cols to df

imputer.fit(df).transform(df).show()

+-----------+----+----------+------+-----------+------------------+--------------+
|       name| age|experience|salary|age_imputed|experience_imputed|salary_imputed|
+-----------+----+----------+------+-----------+------------------+--------------+
|       hulk|  28|         8| 50000|         28|                 8|         50000|
|   superman|  38|        12| 80000|         38|                12|         80000|
|wonderwoman|  25|         5| 48000|         25|                 5|         48000|
|       thor|NULL|      NULL| 40000|         28|                 7|         40000|
|       NULL|  23|         3| 28000|         23|                 3|         28000|
|       NULL|  29|      NULL|  NULL|         29|                 7|         49200|
+-----------+----+----------+------+-----------+------------------+--------------+



### Filter Operations

In [121]:
df1 = spark.read.csv('test3.csv',header=True,inferSchema=True)
df1.show()

+-----------+---+----------+------+
|       name|age|experience|salary|
+-----------+---+----------+------+
|       hulk| 28|         8| 50000|
|   superman| 38|        12| 80000|
|wonderwoman| 25|         5| 48000|
+-----------+---+----------+------+



In [123]:
## Salary of people less than or equal to 50000

df1.filter("salary<=50000").show()

+-----------+---+----------+------+
|       name|age|experience|salary|
+-----------+---+----------+------+
|       hulk| 28|         8| 50000|
|wonderwoman| 25|         5| 48000|
+-----------+---+----------+------+



In [124]:
## Further filter of columns

df1.filter("salary<=50000").select(['name','age']).show()

+-----------+---+
|       name|age|
+-----------+---+
|       hulk| 28|
|wonderwoman| 25|
+-----------+---+



In [128]:
## Apply multiple conditions (& -> and , | -> or)

df1.filter((df1['salary']<=50000) | (df1['age'] < 27)).show()

+-----------+---+----------+------+
|       name|age|experience|salary|
+-----------+---+----------+------+
|       hulk| 28|         8| 50000|
|wonderwoman| 25|         5| 48000|
+-----------+---+----------+------+



In [131]:
## App;y ~ -> not condition

df1.filter(~(df1['salary']<=50000)).show()

+--------+---+----------+------+
|    name|age|experience|salary|
+--------+---+----------+------+
|superman| 38|        12| 80000|
+--------+---+----------+------+



## PART - 4

#### - Group by and Aggregate functions

In [132]:
df2 = spark.read.csv("test4.csv",header=True,inferSchema=True)
df2.show()

+--------+------------+-------+
|    name| departments| salary|
+--------+------------+-------+
|    hulk|     finance|50000.0|
|   joker|          hr|30000.0|
|    hulk|       legal|38000.0|
|    loki|          hr|48000.0|
|sherlock|data science|52000.0|
|  batman|data science|60000.0|
|    loki|     product|35000.0|
+--------+------------+-------+



In [134]:
df2.printSchema()

root
 |-- name: string (nullable = true)
 |-- departments: string (nullable = true)
 |-- salary: double (nullable = true)



In [136]:
df2.describe().show()

+-------+--------+------------+------------------+
|summary|    name| departments|            salary|
+-------+--------+------------+------------------+
|  count|       7|           7|                 7|
|   mean|    NULL|        NULL| 44714.28571428572|
| stddev|    NULL|        NULL|10656.989658033293|
|    min|  batman|data science|           30000.0|
|    max|sherlock|     product|           60000.0|
+-------+--------+------------+------------------+



In [146]:
## groupby name

df2.groupBy('name').sum('salary').show()

+--------+-----------+
|    name|sum(salary)|
+--------+-----------+
|    loki|    83000.0|
|   joker|    30000.0|
|  batman|    60000.0|
|sherlock|    52000.0|
|    hulk|    88000.0|
+--------+-----------+



In [149]:
## group by using department, checking the highest salary department wise

df2.groupBy('departments').sum().show()

+------------+-----------+
| departments|sum(salary)|
+------------+-----------+
|     finance|    50000.0|
|       legal|    38000.0|
|data science|   112000.0|
|          hr|    78000.0|
|     product|    35000.0|
+------------+-----------+



In [150]:
df2.groupBy('departments').mean().show()

+------------+-----------+
| departments|avg(salary)|
+------------+-----------+
|     finance|    50000.0|
|       legal|    38000.0|
|data science|    56000.0|
|          hr|    39000.0|
|     product|    35000.0|
+------------+-----------+



In [151]:
## no. of people department wise

df2.groupBy('departments').count().show()

+------------+-----+
| departments|count|
+------------+-----+
|     finance|    1|
|       legal|    1|
|data science|    2|
|          hr|    2|
|     product|    1|
+------------+-----+



In [152]:
## Apply direct aggregate function

df2.agg({'salary':'sum'}).show()

+-----------+
|sum(salary)|
+-----------+
|   313000.0|
+-----------+



In [157]:
df2.groupBy('name').max().show()

+--------+-----------+
|    name|max(salary)|
+--------+-----------+
|    loki|    48000.0|
|   joker|    30000.0|
|  batman|    60000.0|
|sherlock|    52000.0|
|    hulk|    50000.0|
+--------+-----------+

