In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('pyspark_tutorial').getOrCreate()

25/06/20 00:04:36 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
spark

# Reading csv file

In [30]:
df_path = 'gs://dataproc-staging-us-central1-1055421222203-cwj2i6jy/notebooks/jupyter/Data/BigMart Sales.csv'

In [113]:
df = spark.read.format('csv').option('inferSchema', True).option('Header',True).load('gs://dataproc-staging-us-central1-1055421222203-cwj2i6jy/notebooks/jupyter/Data/BigMart Sales.csv')

In [9]:
df.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Superma

In [10]:
df.printSchema()

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: double (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: double (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: double (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: integer (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: double (nullable = true)



# Reading json File

In [12]:
json_path = 'gs://dataproc-staging-us-central1-1055421222203-cwj2i6jy/notebooks/jupyter/Data/drivers.json'

In [15]:
df_json = spark.read.format('json').option('inferSchema', True)\
                                    .option('Header',True)\
                                    .option('multiLine', False)\ # the json file is single line json so we are keeping MultiLine as False
                                    .load(json_path)

In [16]:
df_json.show()

+----+----------+--------+----------+--------------------+-----------+------+--------------------+
|code|       dob|driverId| driverRef|                name|nationality|number|                 url|
+----+----------+--------+----------+--------------------+-----------+------+--------------------+
| HAM|1985-01-07|       1|  hamilton|   {Lewis, Hamilton}|    British|    44|http://en.wikiped...|
| HEI|1977-05-10|       2|  heidfeld|    {Nick, Heidfeld}|     German|    \N|http://en.wikiped...|
| ROS|1985-06-27|       3|   rosberg|     {Nico, Rosberg}|     German|     6|http://en.wikiped...|
| ALO|1981-07-29|       4|    alonso|  {Fernando, Alonso}|    Spanish|    14|http://en.wikiped...|
| KOV|1981-10-19|       5|kovalainen|{Heikki, Kovalainen}|    Finnish|    \N|http://en.wikiped...|
| NAK|1985-01-11|       6|  nakajima|  {Kazuki, Nakajima}|   Japanese|    \N|http://en.wikiped...|
| BOU|1979-02-28|       7|  bourdais|{Sébastien, Bourd...|     French|    \N|http://en.wikiped...|
| RAI|1979

In [18]:
df.printSchema()

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: double (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: double (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: double (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: integer (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: double (nullable = true)



# Reading csv with user defined Schema

#### Usig SQL Style Schema

In [34]:
my_schema = '''Item_Identifier STRING,
Item_Weight STRING,
Item_Fat_Content STRING,
Item_Visibility DOUBLE,
Item_Type STRING,
Item_MRP DOUBLE,
Outlet_Identifier STRING,
Outlet_Establishment_Year INT,
Outlet_Size STRING,
Outlet_Location_Type STRING,
Outlet_Type STRING,
Item_Outlet_Sales DOUBLE '''


#### Using StructType Style Schema

In [28]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

my_schema = StructType([
    StructField("Item_Identifier", StringType(), True),
    StructField("Item_Weight", DoubleType(), True),
    StructField("Item_Fat_Content", StringType(), True),
    StructField("Item_Visibility", DoubleType(), True),
    StructField("Item_Type", StringType(), True),
    StructField("Item_MRP", DoubleType(), True),
    StructField("Outlet_Identifier", StringType(), True),
    StructField("Outlet_Establishment_Year", IntegerType(), True),
    StructField("Outlet_Size", StringType(), True),
    StructField("Outlet_Location_Type", StringType(), True),
    StructField("Outlet_Type", StringType(), True),
    StructField("Item_Outlet_Sales", DoubleType(), True)
])

In [35]:
df= spark.read.format('csv')\
                            .schema(my_schema)\
                            .option('Header', True)\
                            .load(df_path)

In [36]:
df.printSchema() # Item_Weight we changed from double to String

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: string (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: double (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: double (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: integer (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: double (nullable = true)



# Selecting columns from a df

In [37]:
df.select('Item_Identifier','Item_Weight','Item_Fat_Content').show()



+---------------+-----------+----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|
+---------------+-----------+----------------+
|          FDA15|        9.3|         Low Fat|
|          DRC01|       5.92|         Regular|
|          FDN15|       17.5|         Low Fat|
|          FDX07|       19.2|         Regular|
|          NCD19|       8.93|         Low Fat|
|          FDP36|     10.395|         Regular|
|          FDO10|      13.65|         Regular|
|          FDP10|       null|         Low Fat|
|          FDH17|       16.2|         Regular|
|          FDU28|       19.2|         Regular|
|          FDY07|       11.8|         Low Fat|
|          FDA03|       18.5|         Regular|
|          FDX32|       15.1|         Regular|
|          FDS46|       17.6|         Regular|
|          FDF32|      16.35|         Low Fat|
|          FDP49|          9|         Regular|
|          NCB42|       11.8|         Low Fat|
|          FDP49|          9|         Regular|
|          DR

In [41]:
# using col to select - this is standardizedapproach

from pyspark.sql.functions import *

df.select(col('Item_Identifier'),col('Item_Weight'),col('Item_Fat_Content')).show()



+---------------+-----------+----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|
+---------------+-----------+----------------+
|          FDA15|        9.3|         Low Fat|
|          DRC01|       5.92|         Regular|
|          FDN15|       17.5|         Low Fat|
|          FDX07|       19.2|         Regular|
|          NCD19|       8.93|         Low Fat|
|          FDP36|     10.395|         Regular|
|          FDO10|      13.65|         Regular|
|          FDP10|       null|         Low Fat|
|          FDH17|       16.2|         Regular|
|          FDU28|       19.2|         Regular|
|          FDY07|       11.8|         Low Fat|
|          FDA03|       18.5|         Regular|
|          FDX32|       15.1|         Regular|
|          FDS46|       17.6|         Regular|
|          FDF32|      16.35|         Low Fat|
|          FDP49|          9|         Regular|
|          NCB42|       11.8|         Low Fat|
|          FDP49|          9|         Regular|
|          DR

# Aliasing

In [45]:
df.select(col('Item_Identifier').alias('Item_Id')).show()

+-------+
|Item_Id|
+-------+
|  FDA15|
|  DRC01|
|  FDN15|
|  FDX07|
|  NCD19|
|  FDP36|
|  FDO10|
|  FDP10|
|  FDH17|
|  FDU28|
|  FDY07|
|  FDA03|
|  FDX32|
|  FDS46|
|  FDF32|
|  FDP49|
|  NCB42|
|  FDP49|
|  DRI11|
|  FDU02|
+-------+
only showing top 20 rows



# Filter


### Case 1


In [46]:
#select records where Item_Fat_Content is regular

df.filter(col('Item_Fat_Content')=='Regular').show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Supermarket Type2|         443.4228|
|          FDX07|       19.2|         Regular|            0.0|Fruits and Vegeta...| 182.095|           OUT010|                     1998|       null|              Tier 3|    Gro

### Case 2

In [48]:
# Filter based on two columns using & operator

df.filter( (col('Item_Type') == 'Soft Drinks') & (col('Item_Weight')<10)).show() 

+---------------+-----------+----------------+---------------+-----------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|  Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+-----------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          DRC01|       5.92|         Regular|    0.019278216|Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Supermarket Type2|         443.4228|
|          DRZ11|       8.85|         Regular|    0.113123893|Soft Drinks|122.5388|           OUT018|                     2009|     Medium|              Tier 3|Supermarket Type2|        1609.9044|
|          DRF4

### Case 3


In [52]:
df.filter( (col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin('Tier 1','Tier 2'))).show() 

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDH17|       16.2|         Regular|    0.016687114|        Frozen Foods| 96.9726|           OUT045|                     2002|       null|              Tier 2|Supermarket Type1|        1076.5986|
|          FDU28|       19.2|         Regular|     0.09444959|        Frozen Foods|187.8214|           OUT017|                     2007|       null|              Tier 2|Superma

# Renaming Column


### WithColumnRenamed

In [54]:
df.withColumnRenamed('Item_Weight','Item_WT').show()

+---------------+-------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_WT|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDA15|    9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|
|          DRC01|   5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Supermarket Type2|         

# Creating a new column or modify column 

### withColumn

### case 1 - creating a new column and apply some transformation

In [62]:
df = df.withColumn('flag', lit('new'))
# creating a new column flag and it will have a string value new
# lit() is literals which is used to give constant value

In [63]:
df.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|flag|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138| new|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|        

In [65]:
df.withColumn('multiply_wt_mrp', col('Item_Weight')*col('Item_MRP')).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+------------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|flag|   multiply_wt_mrp|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+------------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138| new|2323.2255600000003|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drin

### case 2 - Modifying existing column and performing some transformations

In [68]:
# Modifying Item fat content column where value with Regular will be replaced with Reg
df.withColumn('Item_Fat_Content', regexp_replace(col('Item_Fat_Content'),'Regular','Reg'))\
    .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'), 'Low Fat', 'LF')).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|flag|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          FDA15|        9.3|              LF|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138| new|
|          DRC01|       5.92|             Reg|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|        

# Type Casting

In [70]:
# Changing dtype of Item_weight col to string
df = df.withColumn('Item_Weight', col("Item_Weight").cast(StringType()))

In [72]:
df.printSchema()

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: string (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: double (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: double (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: integer (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: double (nullable = true)
 |-- flag: string (nullable = false)



# Sorting Or Ordering Data

### case 1

In [74]:
# sorting Item weight col in descending order
df.sort(col('Item_Weight').desc()).show()

+---------------+-----------+----------------+---------------+------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|   Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|flag|
+---------------+-----------+----------------+---------------+------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          FDR13|      9.895|         Regular|    0.028837829|      Canned|117.8492|           OUT018|                     2009|     Medium|              Tier 3|Supermarket Type2|        1506.0396| new|
|          DRD49|      9.895|              LF|    0.167831064| Soft Drinks|237.4564|           OUT046|                     1997|      Small|              Tier 1|Supermarket Type1|         

### case 2

In [76]:
df.sort(col('Item_Visibility').asc()).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|flag|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          DRB48|      16.75|         Regular|            0.0|         Soft Drinks| 39.3822|           OUT046|                     1997|      Small|              Tier 1|Supermarket Type1|         353.5398| new|
|          FDS32|      17.75|         Regular|            0.0|Fruits and Vegeta...|139.9838|           OUT045|                     2002|       null|        

### case 3 -  Sorting based on multiple columns

In [77]:
# sorting based on multiple columns
# Both Columns are sorted in descending order
df.sort(['Item_Weight','Item_Visibility'], ascending = [0,0]).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|flag|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          DRD49|      9.895|         Low Fat|    0.168780385|         Soft Drinks|236.8564|           OUT017|                     2007|       null|              Tier 2|Supermarket Type1|         4767.128| new|
|          DRD49|      9.895|         Low Fat|     0.16817143|         Soft Drinks|237.7564|           OUT045|                     2002|       null|        

In [78]:
df.printSchema()

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: string (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: double (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: double (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: integer (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: double (nullable = true)
 |-- flag: string (nullable = false)



### case 4 - Sorting based on multiple columns on different order

In [79]:
# Item_Weight is sorted in descending order , Item_Visibility sorted in ascending order

df.sort(['Item_Weight','Item_Visibility'], ascending=[0,1]).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|flag|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          FDR13|      9.895|         Regular|    0.028696932|              Canned|117.0492|           OUT013|                     1987|       High|              Tier 3|Supermarket Type1|         810.9444| new|
|          FDR13|      9.895|         Regular|    0.028765486|              Canned|115.3492|           OUT049|                     1999|     Medium|        

# Limit

In [80]:
# Used when we dont want all the records

df.limit(10).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|flag|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138| new|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|        

In [82]:
df.sort(['Item_Weight','Item_Visibility'], ascending=[0,1]).limit(10).show()

+---------------+-----------+----------------+---------------+------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|   Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|flag|
+---------------+-----------+----------------+---------------+------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          FDR13|      9.895|         Regular|    0.028696932|      Canned|117.0492|           OUT013|                     1987|       High|              Tier 3|Supermarket Type1|         810.9444| new|
|          FDR13|      9.895|         Regular|    0.028765486|      Canned|115.3492|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|        1

# Drop

### case1 - drop a column

In [83]:
df.drop(col('Item_Visibility')).show()

+---------------+-----------+----------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Weight|Item_Fat_Content|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|flag|
+---------------+-----------+----------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          FDA15|        9.3|         Low Fat|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138| new|
|          DRC01|       5.92|         Regular|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Supermarket Type2|         443.4228| new|
|          FDN15|       1

### Case 2 - Drop Multiple columns

In [89]:
df.drop('Item_Visibility', 'Item_Weight').show()

+---------------+----------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Fat_Content|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|flag|
+---------------+----------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          FDA15|         Low Fat|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138| new|
|          DRC01|         Regular|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Supermarket Type2|         443.4228| new|
|          FDN15|         Low Fat|                Meat| 141.618|           OUT049|   

# Drop Duplicates

### case 1 - drop duplicate records from a dataframe

In [87]:
df.count()

8523

In [88]:
df.dropDuplicates().count()

                                                                                

8523

### case 2 - drop duplicates based on subset of columns

In [90]:
# droping duplicates based on Item type column

df.dropDuplicates(subset=['Item_Type']).count()

16

# Distinct

In [91]:
df.distinct().count()

8523

### distinct based on multiple columns

In [93]:
df.select('Item_Type', 'Item_Visibility').distinct().count()

8006

# Union and Union Byname

In [95]:
data1 = [(1,'Kaido'), 
         (2,'VegaPunk')]

schema = 'id String, Name String'



In [96]:
df1 = spark.createDataFrame(data1,schema)

In [97]:
df1.show()


[Stage 66:>                                                         (0 + 1) / 1]

+---+--------+
| id|    Name|
+---+--------+
|  1|   Kaido|
|  2|VegaPunk|
+---+--------+



                                                                                

In [101]:
data2 = [(3,'Abi'),(4,'frieren')]

schema2 = 'id string, name string'

df2 = spark.createDataFrame(data2,schema2)

In [102]:
df2.show()

+---+-------+
| id|   name|
+---+-------+
|  3|    Abi|
|  4|frieren|
+---+-------+



### applying union on df1 and df2

In [103]:
df1.union(df2).show()

+---+--------+
| id|    Name|
+---+--------+
|  1|   Kaido|
|  2|VegaPunk|
|  3|     Abi|
|  4| frieren|
+---+--------+



In [104]:
# reversing order in data1

data1 = [('Kaido',1), 
         ('VegaPunk',2)]

schema = 'Name String,id String'

df1 = spark.createDataFrame(data1,schema)

df1.show()


+--------+---+
|    Name| id|
+--------+---+
|   Kaido|  1|
|VegaPunk|  2|
+--------+---+



In [107]:
df1.union(df2).show()

# this is not in correct order so it will mess the data

+--------+-------+
|    Name|     id|
+--------+-------+
|   Kaido|      1|
|VegaPunk|      2|
|       3|    Abi|
|       4|frieren|
+--------+-------+



# unionByName

In [106]:
# unionByName will make union based on column name

df1.unionByName(df2).show()

+--------+---+
|    Name| id|
+--------+---+
|   Kaido|  1|
|VegaPunk|  2|
|     Abi|  3|
| frieren|  4|
+--------+---+



# String Functions

### Initcap - it will convert string column to proper format like words will start with captital

In [108]:
df.select(initcap('Item_Type')).show()

+--------------------+
|  initcap(Item_Type)|
+--------------------+
|               Dairy|
|         Soft Drinks|
|                Meat|
|Fruits And Vegeta...|
|           Household|
|        Baking Goods|
|         Snack Foods|
|         Snack Foods|
|        Frozen Foods|
|        Frozen Foods|
|Fruits And Vegeta...|
|               Dairy|
|Fruits And Vegeta...|
|         Snack Foods|
|Fruits And Vegeta...|
|           Breakfast|
|  Health And Hygiene|
|           Breakfast|
|         Hard Drinks|
|               Dairy|
+--------------------+
only showing top 20 rows



### Upper - converts to capital case

In [109]:
df.select(upper('Item_Type')).show()

+--------------------+
|    upper(Item_Type)|
+--------------------+
|               DAIRY|
|         SOFT DRINKS|
|                MEAT|
|FRUITS AND VEGETA...|
|           HOUSEHOLD|
|        BAKING GOODS|
|         SNACK FOODS|
|         SNACK FOODS|
|        FROZEN FOODS|
|        FROZEN FOODS|
|FRUITS AND VEGETA...|
|               DAIRY|
|FRUITS AND VEGETA...|
|         SNACK FOODS|
|FRUITS AND VEGETA...|
|           BREAKFAST|
|  HEALTH AND HYGIENE|
|           BREAKFAST|
|         HARD DRINKS|
|               DAIRY|
+--------------------+
only showing top 20 rows



In [111]:
df.select(lower('Item_Type').alias('upper_Item_Type')).show()

+--------------------+
|     upper_Item_Type|
+--------------------+
|               dairy|
|         soft drinks|
|                meat|
|fruits and vegeta...|
|           household|
|        baking goods|
|         snack foods|
|         snack foods|
|        frozen foods|
|        frozen foods|
|fruits and vegeta...|
|               dairy|
|fruits and vegeta...|
|         snack foods|
|fruits and vegeta...|
|           breakfast|
|  health and hygiene|
|           breakfast|
|         hard drinks|
|               dairy|
+--------------------+
only showing top 20 rows



# Date Functions

### current date - to get current date

In [114]:
df = df.withColumn('curr_date',current_date())

In [115]:
df.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales| curr_date|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|2025-06-22|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2

### Date_Add() - adding dates to the column

In [116]:
df = df.withColumn('week_after',date_add('curr_date',2))

In [117]:
df.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales| curr_date|week_after|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|2025-06-22|2025-06-24|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2

### Date_sub  - subtracts days from date column

In [118]:
df.withColumn('week_before', date_sub('curr_date',7)).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------+-----------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales| curr_date|week_after|week_before|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------+-----------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|2025-06-23|2025-06-25| 2025-06-16|
|          DRC01|       5.92|         Re

In [120]:
# using date add we can do the same thing as datesub

df.withColumn('week_before', date_add('curr_date',-7)).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------+-----------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales| curr_date|week_after|week_before|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------+-----------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|2025-06-23|2025-06-25| 2025-06-16|
|          DRC01|       5.92|         Re

### Datediff - To get the difference between two date columns

In [122]:
df = df.withColumn('dateDifference', datediff('curr_date','week_after'))

In [123]:
df.limit(5).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------+--------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales| curr_date|week_after|dateDifference|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------+--------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|2025-06-23|2025-06-25|            -2|
|          DRC01|       5.92

### Date_Format - Changing the format of Date column

In [125]:
df = df.withColumn('week_after', date_format('week_after', 'dd-mm-yyyy')) 

In [128]:
df.limit(5).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------+--------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales| curr_date|week_after|dateDifference|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------+--------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|2025-06-23|25-00-2025|            -2|
|          DRC01|       5.92

# Handling Nulls

### case1 - dropping nulls

#### dropna()

In [129]:
df.dropna('all').count()
# if entire row is null it will drop the row

8523

In [130]:
df.dropna('any').count()
# if any of the column record is null it will drop the whole row

4650

In [132]:
df.dropna(subset=['Outlet_Size']).count()
# if Outlet_Size have any null the entire row is dropped 

6113

### case 2 - Replacing null values

#### fillna()

In [133]:
df.fillna('NotAvailable').show()

# This will replace all nulls in the dataframe with default value

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+------------+--------------------+-----------------+-----------------+----------+----------+--------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year| Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales| curr_date|week_after|dateDifference|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+------------+--------------------+-----------------+-----------------+----------+----------+--------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|      Medium|              Tier 1|Supermarket Type1|         3735.138|2025-06-23|25-00-2025|            -2|
|          DRC01|       

In [134]:
df.fillna('NotAvailable', subset=['Outlet_Size']).show()

# using subset it will replace nulls in spescific column with a value

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+------------+--------------------+-----------------+-----------------+----------+----------+--------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year| Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales| curr_date|week_after|dateDifference|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+------------+--------------------+-----------------+-----------------+----------+----------+--------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|      Medium|              Tier 1|Supermarket Type1|         3735.138|2025-06-23|25-00-2025|            -2|
|          DRC01|       

# Split and Indexing

### split()

In [135]:
df.withColumn('Outlet_Type', split('Outlet_Type', ' ')).show()

# this will split the value with a delimiter ' ' and will display within a list

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+--------------------+-----------------+----------+----------+--------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|         Outlet_Type|Item_Outlet_Sales| curr_date|week_after|dateDifference|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+--------------------+-----------------+----------+----------+--------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|[Supermarket, Type1]|         3735.138|2025-06-23|25-00-2025|            -2|
|          DRC01

### Indexing along with split()

In [136]:
df.withColumn('Outlet_Type',split('Outlet_type', ' ')[1]).limit(5).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+----------+----------+--------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|Outlet_Type|Item_Outlet_Sales| curr_date|week_after|dateDifference|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+----------+----------+--------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|      Type1|         3735.138|2025-06-23|25-00-2025|            -2|
|          DRC01|       5.92|         Regular|    0.

# Explode function

In [137]:
df = df.withColumn('Outlet_Type', split('Outlet_Type',' '))

In [138]:
df.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+--------------------+-----------------+----------+----------+--------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|         Outlet_Type|Item_Outlet_Sales| curr_date|week_after|dateDifference|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+--------------------+-----------------+----------+----------+--------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|[Supermarket, Type1]|         3735.138|2025-06-23|25-00-2025|            -2|
|          DRC01

In [139]:
# Outlet_Type is in list we want that in seperrate columns

df.withColumn('Outlet_Type', explode('Outlet_Type')).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+----------+----------+--------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|Outlet_Type|Item_Outlet_Sales| curr_date|week_after|dateDifference|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+----------+----------+--------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket|         3735.138|2025-06-23|25-00-2025|            -2|
|          FDA15|        9.3|         Low Fat|    0.

# Array_Contains

### Array_Contains checks if a value is present or not in am array column

In [140]:
df.withColumn('type_1_flag', array_contains('Outlet_Type', 'Type1')).show()
# this only works when we have a column with list value

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+--------------------+-----------------+----------+----------+--------------+-----------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|         Outlet_Type|Item_Outlet_Sales| curr_date|week_after|dateDifference|type_1_flag|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+--------------------+-----------------+----------+----------+--------------+-----------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|[Supermarket, Type1]|         3735.138|2025-06-23|25-00-2

# GroupBy and Aggregation

### sum()

In [143]:
df.groupby('Item_Type').agg(sum('Item_MRP')).alias('sum_mrp').show()

+--------------------+------------------+
|           Item_Type|     sum(Item_MRP)|
+--------------------+------------------+
|       Starchy Foods|21880.027399999995|
|        Baking Goods| 81894.73640000001|
|              Breads| 35379.11979999999|
|Fruits and Vegeta...|178124.08099999998|
|                Meat|59449.863799999956|
|         Hard Drinks|29334.676599999995|
|         Soft Drinks|58514.164999999964|
|           Household|135976.52539999998|
|           Breakfast|        15596.6966|
|               Dairy|101276.45959999996|
|         Snack Foods|175433.92040000003|
|              Others|22451.891600000006|
|             Seafood| 9077.870000000003|
|              Canned|  90706.7269999999|
|        Frozen Foods|118558.88140000001|
|  Health and Hygiene|        68025.8388|
+--------------------+------------------+



### avg()

In [144]:
df.groupBy('Item_Type').agg(avg('Item_MRP').alias('Avg')).show()

+--------------------+------------------+
|           Item_Type|               Avg|
+--------------------+------------------+
|       Starchy Foods|147.83802297297294|
|        Baking Goods|126.38076604938273|
|              Breads| 140.9526685258964|
|Fruits and Vegeta...|144.58123457792206|
|                Meat|139.88203247058814|
|         Hard Drinks|137.07792803738315|
|         Soft Drinks|131.49250561797746|
|           Household|149.42475318681318|
|           Breakfast|141.78815090909092|
|               Dairy|148.49920762463336|
|         Snack Foods|146.19493366666669|
|              Others|132.85142958579885|
|             Seafood|141.84171875000004|
|              Canned|139.76383204930647|
|        Frozen Foods|138.50336612149533|
|  Health and Hygiene|130.81892076923077|
+--------------------+------------------+



### groupby based on multiple column

In [145]:
df.groupby('Item_Type', 'Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP')).show()

+--------------------+-----------+------------------+
|           Item_Type|Outlet_Size|         Total_MRP|
+--------------------+-----------+------------------+
|       Starchy Foods|     Medium| 7124.136199999997|
|Fruits and Vegeta...|     Medium|59047.217200000014|
|       Starchy Foods|       null|         6040.6402|
|              Breads|       null|        10011.5004|
|        Baking Goods|       null|23433.838799999994|
|Fruits and Vegeta...|       null|49758.730999999985|
|        Frozen Foods|       High|12588.291000000001|
|         Soft Drinks|       High| 6456.165199999999|
|           Breakfast|      Small|3917.0407999999998|
|                Meat|     Medium| 20326.45059999999|
|Fruits and Vegeta...|       High| 20671.34759999999|
|                Meat|       High| 5627.036400000002|
|        Baking Goods|       High| 9431.749199999998|
|           Household|     Medium| 42688.57439999998|
|                Meat|       null|16158.166000000005|
|         Hard Drinks|      

### Group by multiple columns and multiple aggregations

In [148]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Total'),\
                                          avg('Item_MRP').alias('Avg')).show()

+--------------------+-----------+------------------+------------------+
|           Item_Type|Outlet_Size|             Total|               Avg|
+--------------------+-----------+------------------+------------------+
|       Starchy Foods|     Medium| 7124.136199999997| 148.4195041666666|
|Fruits and Vegeta...|     Medium|59047.217200000014| 142.9714702179177|
|       Starchy Foods|       null|         6040.6402|140.48000465116277|
|              Breads|       null|        10011.5004|139.04861666666667|
|        Baking Goods|       null|23433.838799999994|126.66939891891889|
|Fruits and Vegeta...|       null|49758.730999999985|142.57516045845267|
|        Frozen Foods|       High|12588.291000000001|         136.82925|
|         Soft Drinks|       High| 6456.165199999999|131.75847346938772|
|           Breakfast|      Small|3917.0407999999998|130.56802666666667|
|                Meat|     Medium| 20326.45059999999|136.41913154362408|
|Fruits and Vegeta...|       High| 20671.3475999999