<a href="https://colab.research.google.com/github/AyurSarawgi/PySpark_Notes/blob/main/PySpark_Practice_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
#@title Default title text
!pip install pyspark



In [6]:
import pyspark

In [7]:
#For Creating SparkSession
from pyspark.sql import SparkSession

#For using pyspark dataTypes
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import DateType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql import types

#For getting partition Id
from pyspark.sql.functions import spark_partition_id 

In [8]:
spark=SparkSession.builder.appName('Practice-1').getOrCreate()

# ***1.READING A FILE***

In [9]:
df_pyspark=spark.read.csv('test.csv')
df_pyspark.show()

+---------+--------------------+----------+----------+--------------------+-----+
|      _c0|                 _c1|       _c2|       _c3|                 _c4|  _c5|
+---------+--------------------+----------+----------+--------------------+-----+
|ProductID|         ProductName|SupplierID|CategoryID|                Unit|Price|
|        1|               Chais|         1|         1|  10 boxes x 20 bags|   18|
|        2|               Chang|         1|         1|  24 - 12 oz bottles|   19|
|        3|       Aniseed Syrup|         1|         2| 12 - 550 ml bottles|   10|
|        4|Chef Anton's Caju...|         2|         2|      48 - 6 oz jars|   22|
|        5|Chef Anton's Gumb...|         2|         2|            36 boxes|21.35|
|        6|Grandma's Boysenb...|         3|         2|      12 - 8 oz jars|   25|
|     null|                null|      null|      null|                null| null|
|        8|Northwoods Cranbe...|         3|         2|     12 - 12 oz jars|   40|
|     null|     

### 1.1 Reading Files With Header

In [10]:
df_pyspark=spark.read.option('header','true').csv('test.csv')
df_pyspark.show()

+---------+--------------------+----------+----------+--------------------+-----+
|ProductID|         ProductName|SupplierID|CategoryID|                Unit|Price|
+---------+--------------------+----------+----------+--------------------+-----+
|        1|               Chais|         1|         1|  10 boxes x 20 bags|   18|
|        2|               Chang|         1|         1|  24 - 12 oz bottles|   19|
|        3|       Aniseed Syrup|         1|         2| 12 - 550 ml bottles|   10|
|        4|Chef Anton's Caju...|         2|         2|      48 - 6 oz jars|   22|
|        5|Chef Anton's Gumb...|         2|         2|            36 boxes|21.35|
|        6|Grandma's Boysenb...|         3|         2|      12 - 8 oz jars|   25|
|     null|                null|      null|      null|                null| null|
|        8|Northwoods Cranbe...|         3|         2|     12 - 12 oz jars|   40|
|     null|     Mishi Kobe Niku|         4|         6|    18 - 500 g pkgs.|   97|
|       10|     

In [11]:
df_pyspark=spark.read.csv('test.csv',header=True)
df_pyspark.show()

+---------+--------------------+----------+----------+--------------------+-----+
|ProductID|         ProductName|SupplierID|CategoryID|                Unit|Price|
+---------+--------------------+----------+----------+--------------------+-----+
|        1|               Chais|         1|         1|  10 boxes x 20 bags|   18|
|        2|               Chang|         1|         1|  24 - 12 oz bottles|   19|
|        3|       Aniseed Syrup|         1|         2| 12 - 550 ml bottles|   10|
|        4|Chef Anton's Caju...|         2|         2|      48 - 6 oz jars|   22|
|        5|Chef Anton's Gumb...|         2|         2|            36 boxes|21.35|
|        6|Grandma's Boysenb...|         3|         2|      12 - 8 oz jars|   25|
|     null|                null|      null|      null|                null| null|
|        8|Northwoods Cranbe...|         3|         2|     12 - 12 oz jars|   40|
|     null|     Mishi Kobe Niku|         4|         6|    18 - 500 g pkgs.|   97|
|       10|     

### 1.2 Reading N Rows From Top Only (in form of list)

In [12]:
df_pyspark.head(4)

[Row(ProductID='1', ProductName='Chais', SupplierID='1', CategoryID='1', Unit='10 boxes x 20 bags', Price='18'),
 Row(ProductID='2', ProductName='Chang', SupplierID='1', CategoryID='1', Unit='24 - 12 oz bottles', Price='19'),
 Row(ProductID='3', ProductName='Aniseed Syrup', SupplierID='1', CategoryID='2', Unit='12 - 550 ml bottles', Price='10'),
 Row(ProductID='4', ProductName="Chef Anton's Cajun Seasoning", SupplierID='2', CategoryID='2', Unit='48 - 6 oz jars', Price='22')]

### 1.3 Information Regarding Columns

In [13]:
df_pyspark.printSchema()

root
 |-- ProductID: string (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- SupplierID: string (nullable = true)
 |-- CategoryID: string (nullable = true)
 |-- Unit: string (nullable = true)
 |-- Price: string (nullable = true)



### 1.4 Setting Column DataType According To Data Present

In [14]:
df_pyspark=spark.read.csv('test.csv',header=True,inferSchema=True)
df_pyspark.show()

df_pyspark.printSchema()

+---------+--------------------+----------+----------+--------------------+-----+
|ProductID|         ProductName|SupplierID|CategoryID|                Unit|Price|
+---------+--------------------+----------+----------+--------------------+-----+
|        1|               Chais|         1|         1|  10 boxes x 20 bags| 18.0|
|        2|               Chang|         1|         1|  24 - 12 oz bottles| 19.0|
|        3|       Aniseed Syrup|         1|         2| 12 - 550 ml bottles| 10.0|
|        4|Chef Anton's Caju...|         2|         2|      48 - 6 oz jars| 22.0|
|        5|Chef Anton's Gumb...|         2|         2|            36 boxes|21.35|
|        6|Grandma's Boysenb...|         3|         2|      12 - 8 oz jars| 25.0|
|     null|                null|      null|      null|                null| null|
|        8|Northwoods Cranbe...|         3|         2|     12 - 12 oz jars| 40.0|
|     null|     Mishi Kobe Niku|         4|         6|    18 - 500 g pkgs.| 97.0|
|       10|     

# ***2. ALL ABOUT COLUMNS***

### 2.1 All Column Names

In [15]:
df_pyspark.columns

['ProductID', 'ProductName', 'SupplierID', 'CategoryID', 'Unit', 'Price']

### 2.2 Selecting Columns

In [16]:
df_pyspark.select('ProductID').show()

+---------+
|ProductID|
+---------+
|        1|
|        2|
|        3|
|        4|
|        5|
|        6|
|     null|
|        8|
|     null|
|       10|
|       11|
|       12|
|       13|
|       14|
|       15|
|       16|
|       17|
|       18|
|       19|
|       20|
+---------+
only showing top 20 rows



In [17]:
df_pyspark.select('ProductID','ProductName').show()

+---------+--------------------+
|ProductID|         ProductName|
+---------+--------------------+
|        1|               Chais|
|        2|               Chang|
|        3|       Aniseed Syrup|
|        4|Chef Anton's Caju...|
|        5|Chef Anton's Gumb...|
|        6|Grandma's Boysenb...|
|     null|                null|
|        8|Northwoods Cranbe...|
|     null|     Mishi Kobe Niku|
|       10|               Ikura|
|       11|      Queso Cabrales|
|       12|Queso Manchego La...|
|       13|               Konbu|
|       14|                Tofu|
|       15|        Genen Shouyu|
|       16|             Pavlova|
|       17|        Alice Mutton|
|       18|    Carnarvon Tigers|
|       19|Teatime Chocolate...|
|       20|Sir Rodney's Marm...|
+---------+--------------------+
only showing top 20 rows



In [18]:
df_pyspark.select('ProductName','ProductID').show()

+--------------------+---------+
|         ProductName|ProductID|
+--------------------+---------+
|               Chais|        1|
|               Chang|        2|
|       Aniseed Syrup|        3|
|Chef Anton's Caju...|        4|
|Chef Anton's Gumb...|        5|
|Grandma's Boysenb...|        6|
|                null|     null|
|Northwoods Cranbe...|        8|
|     Mishi Kobe Niku|     null|
|               Ikura|       10|
|      Queso Cabrales|       11|
|Queso Manchego La...|       12|
|               Konbu|       13|
|                Tofu|       14|
|        Genen Shouyu|       15|
|             Pavlova|       16|
|        Alice Mutton|       17|
|    Carnarvon Tigers|       18|
|Teatime Chocolate...|       19|
|Sir Rodney's Marm...|       20|
+--------------------+---------+
only showing top 20 rows



In [19]:
df_pyspark.select(['ProductName','ProductID']).show()

+--------------------+---------+
|         ProductName|ProductID|
+--------------------+---------+
|               Chais|        1|
|               Chang|        2|
|       Aniseed Syrup|        3|
|Chef Anton's Caju...|        4|
|Chef Anton's Gumb...|        5|
|Grandma's Boysenb...|        6|
|                null|     null|
|Northwoods Cranbe...|        8|
|     Mishi Kobe Niku|     null|
|               Ikura|       10|
|      Queso Cabrales|       11|
|Queso Manchego La...|       12|
|               Konbu|       13|
|                Tofu|       14|
|        Genen Shouyu|       15|
|             Pavlova|       16|
|        Alice Mutton|       17|
|    Carnarvon Tigers|       18|
|Teatime Chocolate...|       19|
|Sir Rodney's Marm...|       20|
+--------------------+---------+
only showing top 20 rows



### 2.3 DataTypes Of Columns

In [20]:
df_pyspark.dtypes

[('ProductID', 'int'),
 ('ProductName', 'string'),
 ('SupplierID', 'int'),
 ('CategoryID', 'int'),
 ('Unit', 'string'),
 ('Price', 'double')]

### 2.4 Adding A Column In DataFrame

In [21]:
df_pyspark=df_pyspark.withColumn('New_PriceList', df_pyspark['Price']+2)
df_pyspark.show()

+---------+--------------------+----------+----------+--------------------+-----+-------------+
|ProductID|         ProductName|SupplierID|CategoryID|                Unit|Price|New_PriceList|
+---------+--------------------+----------+----------+--------------------+-----+-------------+
|        1|               Chais|         1|         1|  10 boxes x 20 bags| 18.0|         20.0|
|        2|               Chang|         1|         1|  24 - 12 oz bottles| 19.0|         21.0|
|        3|       Aniseed Syrup|         1|         2| 12 - 550 ml bottles| 10.0|         12.0|
|        4|Chef Anton's Caju...|         2|         2|      48 - 6 oz jars| 22.0|         24.0|
|        5|Chef Anton's Gumb...|         2|         2|            36 boxes|21.35|        23.35|
|        6|Grandma's Boysenb...|         3|         2|      12 - 8 oz jars| 25.0|         27.0|
|     null|                null|      null|      null|                null| null|         null|
|        8|Northwoods Cranbe...|        

### 2.5 Deleting A Column From DataFrame

In [22]:
df_pyspark=df_pyspark.drop('New_PriceList')
df_pyspark.show()

+---------+--------------------+----------+----------+--------------------+-----+
|ProductID|         ProductName|SupplierID|CategoryID|                Unit|Price|
+---------+--------------------+----------+----------+--------------------+-----+
|        1|               Chais|         1|         1|  10 boxes x 20 bags| 18.0|
|        2|               Chang|         1|         1|  24 - 12 oz bottles| 19.0|
|        3|       Aniseed Syrup|         1|         2| 12 - 550 ml bottles| 10.0|
|        4|Chef Anton's Caju...|         2|         2|      48 - 6 oz jars| 22.0|
|        5|Chef Anton's Gumb...|         2|         2|            36 boxes|21.35|
|        6|Grandma's Boysenb...|         3|         2|      12 - 8 oz jars| 25.0|
|     null|                null|      null|      null|                null| null|
|        8|Northwoods Cranbe...|         3|         2|     12 - 12 oz jars| 40.0|
|     null|     Mishi Kobe Niku|         4|         6|    18 - 500 g pkgs.| 97.0|
|       10|     

### 2.6 Renaming The Column Name

In [23]:
df_pyspark.withColumnRenamed('ProductName', 'P_Name').columns

['ProductID', 'P_Name', 'SupplierID', 'CategoryID', 'Unit', 'Price']

# ***3. OTHERS***

### 3.1 Describing DataFrame

In [24]:
df_pyspark.describe()

DataFrame[summary: string, ProductID: string, ProductName: string, SupplierID: string, CategoryID: string, Unit: string, Price: string]

In [25]:
df_pyspark.describe().show()

+-------+-----------------+-------------+-----------------+-----------------+-----------------+------------------+
|summary|        ProductID|  ProductName|       SupplierID|       CategoryID|             Unit|             Price|
+-------+-----------------+-------------+-----------------+-----------------+-----------------+------------------+
|  count|               75|           76|               75|               75|               75|                75|
|   mean|39.82666666666667|         null|            13.92|             4.08|             null|29.156133333333333|
| stddev|22.07565207223593|         null|8.156840935929427|2.403600902250059|             null|  34.1642773721377|
|    min|                1| Alice Mutton|                1|                1|        1 kg pkg.|               2.5|
|    max|               77|Zaanse koeken|               29|                8|750 cc per bottle|             263.5|
+-------+-----------------+-------------+-----------------+-----------------+---

# ***4. HANDLING WITH NULL & MISSING VALUES***

In [26]:
df_pyspark.show()

+---------+--------------------+----------+----------+--------------------+-----+
|ProductID|         ProductName|SupplierID|CategoryID|                Unit|Price|
+---------+--------------------+----------+----------+--------------------+-----+
|        1|               Chais|         1|         1|  10 boxes x 20 bags| 18.0|
|        2|               Chang|         1|         1|  24 - 12 oz bottles| 19.0|
|        3|       Aniseed Syrup|         1|         2| 12 - 550 ml bottles| 10.0|
|        4|Chef Anton's Caju...|         2|         2|      48 - 6 oz jars| 22.0|
|        5|Chef Anton's Gumb...|         2|         2|            36 boxes|21.35|
|        6|Grandma's Boysenb...|         3|         2|      12 - 8 oz jars| 25.0|
|     null|                null|      null|      null|                null| null|
|        8|Northwoods Cranbe...|         3|         2|     12 - 12 oz jars| 40.0|
|     null|     Mishi Kobe Niku|         4|         6|    18 - 500 g pkgs.| 97.0|
|       10|     

### 4.1 Drpooing Rows Containing Null Value

In [27]:
df_pyspark.na.drop().show()

+---------+--------------------+----------+----------+--------------------+-----+
|ProductID|         ProductName|SupplierID|CategoryID|                Unit|Price|
+---------+--------------------+----------+----------+--------------------+-----+
|        1|               Chais|         1|         1|  10 boxes x 20 bags| 18.0|
|        2|               Chang|         1|         1|  24 - 12 oz bottles| 19.0|
|        3|       Aniseed Syrup|         1|         2| 12 - 550 ml bottles| 10.0|
|        4|Chef Anton's Caju...|         2|         2|      48 - 6 oz jars| 22.0|
|        5|Chef Anton's Gumb...|         2|         2|            36 boxes|21.35|
|        6|Grandma's Boysenb...|         3|         2|      12 - 8 oz jars| 25.0|
|        8|Northwoods Cranbe...|         3|         2|     12 - 12 oz jars| 40.0|
|       14|                Tofu|         6|         7|    40 - 100 g pkgs.|23.25|
|       15|        Genen Shouyu|         6|         2| 24 - 250 ml bottles| 15.5|
|       16|     



1.   .drop('any') //delete all rows containing any null value
2.   .drop('all') //delete all rows which contains all the values as null
3.   .drop('any',thresh=2) //delete all rows containing atleast two non-null values
4.   .drop('any',subset=['ProductId']) //delete all rows in which ProductId is null





In [28]:
# .drop(how=='any') //delete all rows containing any null value

df_pyspark.na.drop('any').show()

+---------+--------------------+----------+----------+--------------------+-----+
|ProductID|         ProductName|SupplierID|CategoryID|                Unit|Price|
+---------+--------------------+----------+----------+--------------------+-----+
|        1|               Chais|         1|         1|  10 boxes x 20 bags| 18.0|
|        2|               Chang|         1|         1|  24 - 12 oz bottles| 19.0|
|        3|       Aniseed Syrup|         1|         2| 12 - 550 ml bottles| 10.0|
|        4|Chef Anton's Caju...|         2|         2|      48 - 6 oz jars| 22.0|
|        5|Chef Anton's Gumb...|         2|         2|            36 boxes|21.35|
|        6|Grandma's Boysenb...|         3|         2|      12 - 8 oz jars| 25.0|
|        8|Northwoods Cranbe...|         3|         2|     12 - 12 oz jars| 40.0|
|       14|                Tofu|         6|         7|    40 - 100 g pkgs.|23.25|
|       15|        Genen Shouyu|         6|         2| 24 - 250 ml bottles| 15.5|
|       16|     

In [29]:
# .drop(how=='all') //delete all rows which contains all the values as null

df_pyspark.na.drop('all').show()

+---------+--------------------+----------+----------+--------------------+-----+
|ProductID|         ProductName|SupplierID|CategoryID|                Unit|Price|
+---------+--------------------+----------+----------+--------------------+-----+
|        1|               Chais|         1|         1|  10 boxes x 20 bags| 18.0|
|        2|               Chang|         1|         1|  24 - 12 oz bottles| 19.0|
|        3|       Aniseed Syrup|         1|         2| 12 - 550 ml bottles| 10.0|
|        4|Chef Anton's Caju...|         2|         2|      48 - 6 oz jars| 22.0|
|        5|Chef Anton's Gumb...|         2|         2|            36 boxes|21.35|
|        6|Grandma's Boysenb...|         3|         2|      12 - 8 oz jars| 25.0|
|        8|Northwoods Cranbe...|         3|         2|     12 - 12 oz jars| 40.0|
|     null|     Mishi Kobe Niku|         4|         6|    18 - 500 g pkgs.| 97.0|
|       10|               Ikura|      null|         8|    12 - 200 ml jars| 31.0|
|       11|     

In [30]:
# drop('any',thresh=2) //delete all rows containing atleast two non-null values

df_pyspark.na.drop('any',thresh=2).show()

+---------+--------------------+----------+----------+--------------------+-----+
|ProductID|         ProductName|SupplierID|CategoryID|                Unit|Price|
+---------+--------------------+----------+----------+--------------------+-----+
|        1|               Chais|         1|         1|  10 boxes x 20 bags| 18.0|
|        2|               Chang|         1|         1|  24 - 12 oz bottles| 19.0|
|        3|       Aniseed Syrup|         1|         2| 12 - 550 ml bottles| 10.0|
|        4|Chef Anton's Caju...|         2|         2|      48 - 6 oz jars| 22.0|
|        5|Chef Anton's Gumb...|         2|         2|            36 boxes|21.35|
|        6|Grandma's Boysenb...|         3|         2|      12 - 8 oz jars| 25.0|
|        8|Northwoods Cranbe...|         3|         2|     12 - 12 oz jars| 40.0|
|     null|     Mishi Kobe Niku|         4|         6|    18 - 500 g pkgs.| 97.0|
|       10|               Ikura|      null|         8|    12 - 200 ml jars| 31.0|
|       11|     

In [31]:
# .drop('any',subset=['ProductId']) //delete all rows in which ProductId is null

df_pyspark.na.drop('any',subset=['ProductId']).show()

+---------+--------------------+----------+----------+--------------------+-----+
|ProductID|         ProductName|SupplierID|CategoryID|                Unit|Price|
+---------+--------------------+----------+----------+--------------------+-----+
|        1|               Chais|         1|         1|  10 boxes x 20 bags| 18.0|
|        2|               Chang|         1|         1|  24 - 12 oz bottles| 19.0|
|        3|       Aniseed Syrup|         1|         2| 12 - 550 ml bottles| 10.0|
|        4|Chef Anton's Caju...|         2|         2|      48 - 6 oz jars| 22.0|
|        5|Chef Anton's Gumb...|         2|         2|            36 boxes|21.35|
|        6|Grandma's Boysenb...|         3|         2|      12 - 8 oz jars| 25.0|
|        8|Northwoods Cranbe...|         3|         2|     12 - 12 oz jars| 40.0|
|       10|               Ikura|      null|         8|    12 - 200 ml jars| 31.0|
|       11|      Queso Cabrales|         5|      null|           1 kg pkg.| 21.0|
|       12|Queso

### 4.2 Filling A Value to All Missing / Null Values in String Columns only

In [32]:
# Fill the String '****Value Missing****' in the String Columns

df_pyspark.na.fill('****VALUE_MISSING****').show()

+---------+--------------------+----------+----------+--------------------+-----+
|ProductID|         ProductName|SupplierID|CategoryID|                Unit|Price|
+---------+--------------------+----------+----------+--------------------+-----+
|        1|               Chais|         1|         1|  10 boxes x 20 bags| 18.0|
|        2|               Chang|         1|         1|  24 - 12 oz bottles| 19.0|
|        3|       Aniseed Syrup|         1|         2| 12 - 550 ml bottles| 10.0|
|        4|Chef Anton's Caju...|         2|         2|      48 - 6 oz jars| 22.0|
|        5|Chef Anton's Gumb...|         2|         2|            36 boxes|21.35|
|        6|Grandma's Boysenb...|         3|         2|      12 - 8 oz jars| 25.0|
|     null|****VALUE_MISSING...|      null|      null|****VALUE_MISSING...| null|
|        8|Northwoods Cranbe...|         3|         2|     12 - 12 oz jars| 40.0|
|     null|     Mishi Kobe Niku|         4|         6|    18 - 500 g pkgs.| 97.0|
|       10|     

### 4.2 Filling A Value to All Missing / Null Values In A Particular String Columns only  //Integer columns will not show any error

In [33]:
df_pyspark.na.fill('$$$VALUE_MISSING$$$','ProductName').show()

+---------+--------------------+----------+----------+--------------------+-----+
|ProductID|         ProductName|SupplierID|CategoryID|                Unit|Price|
+---------+--------------------+----------+----------+--------------------+-----+
|        1|               Chais|         1|         1|  10 boxes x 20 bags| 18.0|
|        2|               Chang|         1|         1|  24 - 12 oz bottles| 19.0|
|        3|       Aniseed Syrup|         1|         2| 12 - 550 ml bottles| 10.0|
|        4|Chef Anton's Caju...|         2|         2|      48 - 6 oz jars| 22.0|
|        5|Chef Anton's Gumb...|         2|         2|            36 boxes|21.35|
|        6|Grandma's Boysenb...|         3|         2|      12 - 8 oz jars| 25.0|
|     null| $$$VALUE_MISSING$$$|      null|      null|                null| null|
|        8|Northwoods Cranbe...|         3|         2|     12 - 12 oz jars| 40.0|
|     null|     Mishi Kobe Niku|         4|         6|    18 - 500 g pkgs.| 97.0|
|       10|     

In [34]:
df_pyspark.na.fill('$$$VALUE_MISSING$$$',['ProductName','Unit']).show()

+---------+--------------------+----------+----------+--------------------+-----+
|ProductID|         ProductName|SupplierID|CategoryID|                Unit|Price|
+---------+--------------------+----------+----------+--------------------+-----+
|        1|               Chais|         1|         1|  10 boxes x 20 bags| 18.0|
|        2|               Chang|         1|         1|  24 - 12 oz bottles| 19.0|
|        3|       Aniseed Syrup|         1|         2| 12 - 550 ml bottles| 10.0|
|        4|Chef Anton's Caju...|         2|         2|      48 - 6 oz jars| 22.0|
|        5|Chef Anton's Gumb...|         2|         2|            36 boxes|21.35|
|        6|Grandma's Boysenb...|         3|         2|      12 - 8 oz jars| 25.0|
|     null| $$$VALUE_MISSING$$$|      null|      null| $$$VALUE_MISSING$$$| null|
|        8|Northwoods Cranbe...|         3|         2|     12 - 12 oz jars| 40.0|
|     null|     Mishi Kobe Niku|         4|         6|    18 - 500 g pkgs.| 97.0|
|       10|     

In [35]:
df_pyspark.na.fill('$$$VALUE_MISSING$$$',['ProductName','Price']).show()

+---------+--------------------+----------+----------+--------------------+-----+
|ProductID|         ProductName|SupplierID|CategoryID|                Unit|Price|
+---------+--------------------+----------+----------+--------------------+-----+
|        1|               Chais|         1|         1|  10 boxes x 20 bags| 18.0|
|        2|               Chang|         1|         1|  24 - 12 oz bottles| 19.0|
|        3|       Aniseed Syrup|         1|         2| 12 - 550 ml bottles| 10.0|
|        4|Chef Anton's Caju...|         2|         2|      48 - 6 oz jars| 22.0|
|        5|Chef Anton's Gumb...|         2|         2|            36 boxes|21.35|
|        6|Grandma's Boysenb...|         3|         2|      12 - 8 oz jars| 25.0|
|     null| $$$VALUE_MISSING$$$|      null|      null|                null| null|
|        8|Northwoods Cranbe...|         3|         2|     12 - 12 oz jars| 40.0|
|     null|     Mishi Kobe Niku|         4|         6|    18 - 500 g pkgs.| 97.0|
|       10|     

### 4.3 Filling The Integer Column NULL Values with Mean / Meadian / Mode Of Presented Values In Them By Using Imputer Function

In [36]:
from pyspark.ml.feature import Imputer

In [37]:
# Filling Mean Values at NULL

imputer=Imputer(
    inputCols=['ProductID','SupplierID','CategoryID','Price'],
    outputCols=['ProductID','SupplierID','CategoryID','Price']
).setStrategy('mean')

imputer.fit(df_pyspark).transform(df_pyspark).show()

+---------+--------------------+----------+----------+--------------------+------------------+
|ProductID|         ProductName|SupplierID|CategoryID|                Unit|             Price|
+---------+--------------------+----------+----------+--------------------+------------------+
|        1|               Chais|         1|         1|  10 boxes x 20 bags|              18.0|
|        2|               Chang|         1|         1|  24 - 12 oz bottles|              19.0|
|        3|       Aniseed Syrup|         1|         2| 12 - 550 ml bottles|              10.0|
|        4|Chef Anton's Caju...|         2|         2|      48 - 6 oz jars|              22.0|
|        5|Chef Anton's Gumb...|         2|         2|            36 boxes|             21.35|
|        6|Grandma's Boysenb...|         3|         2|      12 - 8 oz jars|              25.0|
|       39|                null|        13|         4|                null|29.156133333333333|
|        8|Northwoods Cranbe...|         3|       

In [38]:
# Filling Median Values at NULL

imputer=Imputer(
    inputCols=['ProductID','SupplierID','CategoryID','Price'],
    outputCols=['ProductID','SupplierID','CategoryID','Price']
).setStrategy('median')

imputer.fit(df_pyspark).transform(df_pyspark).show()

+---------+--------------------+----------+----------+--------------------+-----+
|ProductID|         ProductName|SupplierID|CategoryID|                Unit|Price|
+---------+--------------------+----------+----------+--------------------+-----+
|        1|               Chais|         1|         1|  10 boxes x 20 bags| 18.0|
|        2|               Chang|         1|         1|  24 - 12 oz bottles| 19.0|
|        3|       Aniseed Syrup|         1|         2| 12 - 550 ml bottles| 10.0|
|        4|Chef Anton's Caju...|         2|         2|      48 - 6 oz jars| 22.0|
|        5|Chef Anton's Gumb...|         2|         2|            36 boxes|21.35|
|        6|Grandma's Boysenb...|         3|         2|      12 - 8 oz jars| 25.0|
|       40|                null|        14|         4|                null| 19.5|
|        8|Northwoods Cranbe...|         3|         2|     12 - 12 oz jars| 40.0|
|       40|     Mishi Kobe Niku|         4|         6|    18 - 500 g pkgs.| 97.0|
|       10|     

In [39]:
# Filling Mode Values at NULL

imputer=Imputer(
    inputCols=['ProductID','SupplierID','CategoryID','Price'],
    outputCols=['ProductID','SupplierID','CategoryID','Price']
).setStrategy('mode')

imputer.fit(df_pyspark).transform(df_pyspark).show()

+---------+--------------------+----------+----------+--------------------+-----+
|ProductID|         ProductName|SupplierID|CategoryID|                Unit|Price|
+---------+--------------------+----------+----------+--------------------+-----+
|        1|               Chais|         1|         1|  10 boxes x 20 bags| 18.0|
|        2|               Chang|         1|         1|  24 - 12 oz bottles| 19.0|
|        3|       Aniseed Syrup|         1|         2| 12 - 550 ml bottles| 10.0|
|        4|Chef Anton's Caju...|         2|         2|      48 - 6 oz jars| 22.0|
|        5|Chef Anton's Gumb...|         2|         2|            36 boxes|21.35|
|        6|Grandma's Boysenb...|         3|         2|      12 - 8 oz jars| 25.0|
|        1|                null|         7|         3|                null| 14.0|
|        8|Northwoods Cranbe...|         3|         2|     12 - 12 oz jars| 40.0|
|        1|     Mishi Kobe Niku|         4|         6|    18 - 500 g pkgs.| 97.0|
|       10|     

# ***5. Filter Operations***

In [40]:
df_pyspark.filter(df_pyspark['Price']>20).show()

+---------+--------------------+----------+----------+--------------------+------+
|ProductID|         ProductName|SupplierID|CategoryID|                Unit| Price|
+---------+--------------------+----------+----------+--------------------+------+
|        4|Chef Anton's Caju...|         2|         2|      48 - 6 oz jars|  22.0|
|        5|Chef Anton's Gumb...|         2|         2|            36 boxes| 21.35|
|        6|Grandma's Boysenb...|         3|         2|      12 - 8 oz jars|  25.0|
|        8|Northwoods Cranbe...|         3|         2|     12 - 12 oz jars|  40.0|
|     null|     Mishi Kobe Niku|         4|         6|    18 - 500 g pkgs.|  97.0|
|       10|               Ikura|      null|         8|    12 - 200 ml jars|  31.0|
|       11|      Queso Cabrales|         5|      null|           1 kg pkg.|  21.0|
|       12|Queso Manchego La...|         5|         4|                null|  38.0|
|       14|                Tofu|         6|         7|    40 - 100 g pkgs.| 23.25|
|   

### 5.1 Selecting Particular Columns With Filter Operation

In [41]:
df_pyspark.filter('Price>20').select(['ProductID','ProductName','Unit']).show(5)

+---------+--------------------+----------------+
|ProductID|         ProductName|            Unit|
+---------+--------------------+----------------+
|        4|Chef Anton's Caju...|  48 - 6 oz jars|
|        5|Chef Anton's Gumb...|        36 boxes|
|        6|Grandma's Boysenb...|  12 - 8 oz jars|
|        8|Northwoods Cranbe...| 12 - 12 oz jars|
|     null|     Mishi Kobe Niku|18 - 500 g pkgs.|
+---------+--------------------+----------------+
only showing top 5 rows



### 5.2 Filtering With & (AND), | (OR) , ~ (NOT)

In [42]:
# & (And Operator)

df_pyspark.filter((df_pyspark['Price']>20) & (df_pyspark['ProductID']> 50)).select('ProductID','Price').show()

+---------+-----+
|ProductID|Price|
+---------+-----+
|       51| 53.0|
|       53| 32.8|
|       55| 24.0|
|       56| 38.0|
|       59| 55.0|
|       60| 34.0|
|       61| 28.5|
|       62| 49.3|
|       63| 43.9|
|       64|33.25|
|       65|21.05|
|       69| 36.0|
|       71| 21.5|
|       72| 34.8|
+---------+-----+



In [43]:
# | (Or Operator)

df_pyspark.filter((df_pyspark['Price']==20) | (df_pyspark['ProductID']<5)).select('ProductID','Price').show()

+---------+-----+
|ProductID|Price|
+---------+-----+
|        1| 18.0|
|        2| 19.0|
|        3| 10.0|
|        4| 22.0|
|       49| 20.0|
+---------+-----+



In [44]:
# ~ (Not Operator)

df_pyspark.filter(~ (df_pyspark['Price']>15)).select('Price').show()

+-----+
|Price|
+-----+
| 10.0|
|  9.2|
| 10.0|
|  9.0|
|  4.5|
| 14.0|
| 12.5|
|  2.5|
| 14.0|
| 9.65|
| 14.0|
|  9.5|
| 12.0|
|  9.5|
|12.75|
|  7.0|
| 7.45|
|13.25|
| 14.0|
| 12.5|
+-----+
only showing top 20 rows



# ***6. GROUP BY AND AGGREGATE FUNCTIONS***

Aggregate funtions: - sum(), count(), mean(), agg(), max(), min()

In [45]:
#Importing new data frame

df_pyspark=spark.read.csv('test1.csv',header=True,inferSchema=True)
df_pyspark.show()

+----------+--------------------+------------------+--------------------+------------+----------+-----------+
|CustomerID|        CustomerName|       ContactName|             Address|        City|PostalCode|    Country|
+----------+--------------------+------------------+--------------------+------------+----------+-----------+
|         1| Alfreds Futterkiste|      Maria Anders|       Obere Str. 57|      Berlin|     12209|    Germany|
|         2|Ana Trujillo Empa...|      Ana Trujillo|Avda. de la Const...| M�xico D.F.|      5021|     Mexico|
|         3|Antonio Moreno Ta...|    Antonio Moreno|      Mataderos 2312| M�xico D.F.|      5023|     Mexico|
|         4|     Around the Horn|      Thomas Hardy|     120 Hanover Sq.|      London|   WA1 1DP|         UK|
|         5|  Berglunds snabbk�p|Christina Berglund|      Berguvsv�gen 8|       Lule�|  S-958 22|     Sweden|
|         6|Blauer See Delika...|        Hanna Moos|      Forsterstr. 57|    Mannheim|     68306|    Germany|
|         

### 6.1 GroupBy & Aggregate on particular columns

In [46]:
#getting country-wise customer count

df_pyspark.groupBy('Country').count().orderBy('count').show()

+-----------+-----+
|    Country|count|
+-----------+-----+
|     Poland|    1|
|    Ireland|    1|
|     Norway|    1|
|     Sweden|    2|
|    Austria|    2|
|    Finland|    2|
|    Denmark|    2|
|Switzerland|    2|
|   Portugal|    2|
|    Belgium|    2|
|     Canada|    3|
|  Argentina|    3|
|      Italy|    3|
|  Venezuela|    4|
|      Spain|    5|
|     Mexico|    5|
|         UK|    7|
|     Brazil|    9|
|    Germany|   11|
|     France|   11|
+-----------+-----+
only showing top 20 rows



In [47]:
#getting city-wise group by country... customer count 

df_pyspark.groupBy('Country','City').count().show()

+-----------+--------------+-----+
|    Country|          City|count|
+-----------+--------------+-----+
|     Mexico|   M�xico D.F.|    5|
|    Denmark|         �rhus|    1|
|    Belgium|     Charleroi|    1|
|Switzerland|          Bern|    1|
|        USA|   Albuquerque|    1|
|    Germany|Frankfurt a.M.|    1|
|         UK|        London|    6|
|    Austria|      Salzburg|    1|
|      Italy|       Bergamo|    1|
|        USA|         Boise|    1|
|        USA| San Francisco|    1|
|     Poland|         Walla|    1|
|    Germany|     Cunewalde|    1|
|    Germany|          K�ln|    1|
|      Spain|       Sevilla|    1|
|     Brazil|       Resende|    1|
|     France|    Versailles|    1|
|      Spain|        Madrid|    3|
|     Norway|       Stavern|    1|
|     Brazil|     S�o Paulo|    4|
+-----------+--------------+-----+
only showing top 20 rows



### 6.2 Another Method To Use Aggregate Functions

In [48]:
#max country name
#max customerID
#max customerID country-wise

df_pyspark.agg({'Country':'max'}).show()

df_pyspark.agg({'CustomerID':'max'}).show()

df_pyspark.groupBy('Country').agg({'CustomerID':'max'}).show()

+------------+
|max(Country)|
+------------+
|   Venezuela|
+------------+

+---------------+
|max(CustomerID)|
+---------------+
|             91|
+---------------+

+-----------+---------------+
|    Country|max(CustomerID)|
+-----------+---------------+
|     Sweden|             24|
|    Germany|             86|
|     France|             85|
|  Argentina|             64|
|    Belgium|             76|
|    Finland|             90|
|      Italy|             66|
|     Norway|             70|
|      Spain|             69|
|    Denmark|             83|
|    Ireland|             37|
|  Venezuela|             47|
|        USA|             89|
|     Mexico|             80|
|         UK|             72|
|Switzerland|             68|
|     Canada|             51|
|     Brazil|             88|
|     Poland|             91|
|   Portugal|             60|
+-----------+---------------+
only showing top 20 rows



# ***7. DATAFRAME API***

In [49]:
# Reading new dataset
df_pyspark.na.drop().show()

+----------+--------------------+------------------+--------------------+------------+----------+-----------+
|CustomerID|        CustomerName|       ContactName|             Address|        City|PostalCode|    Country|
+----------+--------------------+------------------+--------------------+------------+----------+-----------+
|         1| Alfreds Futterkiste|      Maria Anders|       Obere Str. 57|      Berlin|     12209|    Germany|
|         2|Ana Trujillo Empa...|      Ana Trujillo|Avda. de la Const...| M�xico D.F.|      5021|     Mexico|
|         3|Antonio Moreno Ta...|    Antonio Moreno|      Mataderos 2312| M�xico D.F.|      5023|     Mexico|
|         4|     Around the Horn|      Thomas Hardy|     120 Hanover Sq.|      London|   WA1 1DP|         UK|
|         5|  Berglunds snabbk�p|Christina Berglund|      Berguvsv�gen 8|       Lule�|  S-958 22|     Sweden|
|         6|Blauer See Delika...|        Hanna Moos|      Forsterstr. 57|    Mannheim|     68306|    Germany|
|         

In [50]:
df_product_table=spark.read.option('header','true').csv('product_table3.csv')
df_product_table.show()

+---------+--------------------+----------+----------+-------------------+-----+
|ProductID|         ProductName|SupplierID|CategoryID|               Unit|Price|
+---------+--------------------+----------+----------+-------------------+-----+
|        1|               Chais|         1|         1| 10 boxes x 20 bags|   18|
|        2|               Chang|         1|         1| 24 - 12 oz bottles|   19|
|        3|       Aniseed Syrup|         1|         2|12 - 550 ml bottles|   10|
|        4|Chef Anton's Caju...|         2|         2|     48 - 6 oz jars|   22|
|        5|Chef Anton's Gumb...|         2|         2|           36 boxes|21.35|
|        6|                null|         3|         2|     12 - 8 oz jars|   25|
|        7|Uncle Bob's Organ...|         3|         7|    12 - 1 lb pkgs.|   30|
|        8|                null|         3|      null|    12 - 12 oz jars|   40|
|        9|     Mishi Kobe Niku|         4|         6|   18 - 500 g pkgs.|   97|
|       10|                n

In [51]:
df_product_table.na.drop('any').show()

+---------+--------------------+----------+----------+-------------------+-----+
|ProductID|         ProductName|SupplierID|CategoryID|               Unit|Price|
+---------+--------------------+----------+----------+-------------------+-----+
|        1|               Chais|         1|         1| 10 boxes x 20 bags|   18|
|        2|               Chang|         1|         1| 24 - 12 oz bottles|   19|
|        3|       Aniseed Syrup|         1|         2|12 - 550 ml bottles|   10|
|        4|Chef Anton's Caju...|         2|         2|     48 - 6 oz jars|   22|
|        5|Chef Anton's Gumb...|         2|         2|           36 boxes|21.35|
|        6|                null|         3|         2|     12 - 8 oz jars|   25|
|        7|Uncle Bob's Organ...|         3|         7|    12 - 1 lb pkgs.|   30|
|        9|     Mishi Kobe Niku|         4|         6|   18 - 500 g pkgs.|   97|
+---------+--------------------+----------+----------+-------------------+-----+



# ***8.WRITING TO DIFFERENT FILE FORMAT***

In [53]:
df_order= spark.read.csv('/content/csv/order.csv',header=True,inferSchema= True)

#Official syntax
'''
df.write \
  .format('json') \
  .mode('overwrite') \
  .option('path','/content/json/') \
  .save()

MODE- 1.Overwrite = Cleans up the target directory, 
      2.Append = Append the data files, 
      3.Ignore = Writes the data if directory is empty else nothing, 
      4.errorIfExists = Throw error if having some data at the location
'''
#Shortcut
df_order.write.json('/content/json/')
df_order.write.parquet('/content/parquet/')
'''
#Official
df_order.write \
  .format('json') \
  .mode('overwrite') \
  .option('path','/content/json/') \
  .save()

df_order.write \
  .format('parquet') \
  .mode('overwrite') \
  .option('path','/content/parquet/') \
  .save()
  '''

"\n#Official\ndf_order.write   .format('json')   .mode('overwrite')   .option('path','/content/json/')   .save()\n\ndf_order.write   .format('parquet')   .mode('overwrite')   .option('path','/content/parquet/')   .save()\n  "

# ***9. CREATING DATAFRAME SCHEMAS FOR DIFFERENT FORMAT FILES***

In [54]:
df_OrderCsv=df_order
df_OrderCsv.printSchema()
df_OrderJson=spark.read.json('/content/json/order.json')  #headers are not there and auto recognize schema in Json
df_OrderJson.printSchema()
df_OrderParq=spark.read.parquet('/content/parquet/order.parquet',header=True) #Already contains info about dataTypes
df_OrderParq.printSchema()

root
 |-- OrderID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- EmployeeID: integer (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- ShipperID: integer (nullable = true)

root
 |-- CustomerID: long (nullable = true)
 |-- EmployeeID: long (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- OrderID: long (nullable = true)
 |-- ShipperID: long (nullable = true)

root
 |-- OrderID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- EmployeeID: integer (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- ShipperID: integer (nullable = true)



**Using inferSchema, other schemas are ok but Date is still a string**

##To resolve this, two types of schema applying methods are there

## 9.1. Programatically Define Schema
Spark dataframe schema is a structType which is made up of a list of structField


In [55]:
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import DateType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql import types
# Defining schema for all the columns programatically
internalSchema= StructType([
                      StructField('OrderID',IntegerType()),
                      StructField('CustomerID',IntegerType()),
                      StructField('EmployeeID',IntegerType()),
                      StructField('OrderDate',DateType()),
                      StructField('ShipperID',IntegerType())

])

In [56]:
df_OrderCsv= spark.read.schema(internalSchema).csv('/content/csv/order.csv',header=True)
df_OrderCsv.printSchema()
df_OrderJson=spark.read.schema(internalSchema).json('/content/json/order.json')
df_OrderJson.printSchema()
df_OrderParq=spark.read.schema(internalSchema).parquet('/content/parquet/order.parquet',header=True)
df_OrderParq.printSchema()

#If getting error for date type format, use --> .option('dateFormat','M/D/Y')

root
 |-- OrderID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- EmployeeID: integer (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- ShipperID: integer (nullable = true)

root
 |-- OrderID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- EmployeeID: integer (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- ShipperID: integer (nullable = true)

root
 |-- OrderID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- EmployeeID: integer (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- ShipperID: integer (nullable = true)



## 9.2. Using DDL (Explicitly Define Schema)

In [57]:
ddlSchema= "OrderID INT,CustomerID INT,EmployeeID INT,OrderDate DATE,ShipperID INT"
df_OrderCsv= spark.read.schema(ddlSchema).csv('/content/csv/order.csv',header=True)
df_OrderCsv.printSchema()
df_OrderJson=spark.read.schema(ddlSchema).json('/content/json/order.json')
df_OrderJson.printSchema()
df_OrderParq=spark.read.schema(ddlSchema).parquet('/content/parquet/order.parquet',header=True)
df_OrderParq.printSchema()

root
 |-- OrderID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- EmployeeID: integer (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- ShipperID: integer (nullable = true)

root
 |-- OrderID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- EmployeeID: integer (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- ShipperID: integer (nullable = true)

root
 |-- OrderID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- EmployeeID: integer (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- ShipperID: integer (nullable = true)



# ***OTHER***

# ==> Getting no of partitions dataFrame has

In [58]:
num= df_pyspark.rdd.getNumPartitions()
print("Number of partitions before" +str(num))

num= df_OrderParq.rdd.getNumPartitions()
print(num)

from pyspark.sql.functions import spark_partition_id

print("Partitions count of records before")
df_pyspark.groupBy(spark_partition_id()).count().show()


Number of partitions before1
1
Partitions count of records before
+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
|                   0|   91|
+--------------------+-----+



# ==> Changing the Num of Partitions of a DF (Random Partition)


In [60]:
df_pyspark= df_pyspark.repartition(5)
numb= df_pyspark.rdd.getNumPartitions()
print("Number of partitions after " +str(numb))

from pyspark.sql.functions import spark_partition_id 

print("Partitions count of records")
df_pyspark.groupBy(spark_partition_id()).count().show()

#If we will wirte to a directory, then we will get 

Number of partitions after 5
Partitions count of records
+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
|                   0|   18|
|                   1|   19|
|                   2|   18|
|                   3|   18|
|                   4|   18|
+--------------------+-----+



# ==> Partitioned by on columns



### PartitionBy single column

In [61]:
df_pyspark.write.format('csv')\
          .mode('overwrite')\
          .option('path','/content/single_Partition/')\
          .partitionBy('Country')\
          .save()

### PartitionBy two columns (i.e sub-partition)

In [62]:
df_pyspark.write.format('csv')\
          .mode('overwrite')\
          .option('path','/content/sub_Partition/')\
          .partitionBy('Country','City')\
          .save()

# ==> Limiting Max Num of Records per file while writing

In [63]:
df_pyspark.write.format('csv')\
          .mode('overwrite')\
          .option('path','/content/maxRecordsPerFile_50/')\
          .option('maxRecordsPerFile',50)\
          .save()