DATA READING

Data Reading JSON

In [0]:
df_json = spark.read.format('json').option('inferSchema',True)\
                    .option('header',True)\
                    .option('multiLine',False)\
                    .load('/Volumes/workspace/default/daya/drivers.json')
df_json.display()

Reading CSV file

In [0]:
df = spark.read.format('csv').option('inferSchema',True).option('header',True).load('/Volumes/workspace/default/daya/BigMart Sales.csv')
df.display()

Schema Definition

In [0]:
df.printSchema()

DDL SCHEMA

In [0]:
my_ddl_schema = '''
                    Item_Identifier STRING,
                    Item_Weight STRING,
                    Item_Fat_Content STRING, 
                    Item_Visibility DOUBLE,
                    Item_Type STRING,
                    Item_MRP DOUBLE,
                    Outlet_Identifier STRING,
                    Outlet_Establishment_Year INT,
                    Outlet_Size STRING,
                    Outlet_Location_Type STRING, 
                    Outlet_Type STRING,
                    Item_Outlet_Sales DOUBLE 

                ''' 

In [0]:
df = spark.read.format('csv')\
            .schema(my_ddl_schema)\
            .option('header',True)\
            .load('/Volumes/workspace/default/daya/BigMart Sales.csv')
df.display() 

In [0]:
df.printSchema()

StructType() Schema

In [0]:
from pyspark.sql.types import * 
from pyspark.sql.functions import *  
     
my_strct_schema = StructType([StructField('Item_Identifier',StringType(),True), 
                              StructField('Item_Weight',StringType(),True), 
                              StructField('Item_Fat_Content',StringType(),True), 
                              StructField('Item_Visibility',StringType(),True), 
                              StructField('Item_Type',StringType(),True), 
                              StructField('Item_MRP',StringType(),True), 
                              StructField('Outlet_Identifier',StringType(),True), 
                              StructField('Outlet_Establishment_Year',StringType(),True), 
                              StructField('Outlet_Size',StringType(),True), 
                              StructField('Outlet_Location_Type',StringType(),True), 
                              StructField('Outlet_Type',StringType(),True), 
                              StructField('Item_Outlet_Sales',StringType(),True)

])

df = spark.read.format('csv')\
.schema(my_strct_schema)\
.option('header',True)\
.load('/Volumes/workspace/default/daya/BigMart Sales.csv')

In [0]:
df.printSchema()

TRANSFORMATIONS


SELECT

In [0]:
df.display()

In [0]:
df.select(col('Item_Identifier'),col('Item_Weight'),col('Item_Fat_Content')).display()

ALIAS

In [0]:
df.select(col('Item_Identifier').alias('Item_ID')).display()


FILTER

Scenario - 1

In [0]:
df.filter(col('Item_Fat_Content')=='Regular').display()


Scenario - 2

In [0]:
from pyspark.sql.functions import col

# Option 1: Direct cast to float (Standard approach)
df.filter(
    (col('Item_Type') == 'Soft Drinks') & 
    (col('Item_Weight').cast('double') < 10)
).display()

Scenario - 3

In [0]:
df.filter((col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin('Tier 1','Tier 2'))).display()
     

withColumnRenamed

In [0]:
df.withColumnRenamed('Item_Weight','Item_Wt').display()

withColumn

Scenario - 1

In [0]:
df = df.withColumn('flag',lit("new")) 
df.display()

In [0]:
df.withColumn('multiply',col('Item_Weight').cast('double') *col('Item_MRP').cast('double') ).display()


Scenario - 2

In [0]:

df = df.withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Regular","Reg"))\
    .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Low Fat","Lf"))

df.display()

Type Casting

In [0]:
df = df.withColumn('Item_Weight', col('Item_Weight').cast('double')) 

In [0]:
df.printSchema()

SORT

Scenario - 1

In [0]:
df.sort(col('Item_Weight').desc()).display()

Scenario - 2


In [0]:
df.sort(col('Item_Visibility').asc()).display()

Scenario - 3

In [0]:
df.sort(['Item_Weight','Item_Visibility'],ascending = [0,0]).display()


Scenario - 4

In [0]:
df.sort(['Item_weight','Item_Visibility'], ascending = [0,1]).display()

Limit

In [0]:
df.limit(10).display()


DROP

Scenario-1

In [0]:
df.drop('Item_Visibility').display()

Scenario-2

In [0]:
df.drop('Item_Visibility','Item_Type').display()