#### Data Reading(CSV)

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
%fs ls

In [0]:
%fs ls '/Volumes/pyspark/source/ff_source'

In [0]:
df = spark.read.format("csv")\
    .option("header","true")\
        .option("inferSchema","true")\
            .load("/Volumes/pyspark/source/ff_source/BigMart Sales.csv")

In [0]:
df1 = df.filter(col("Item_Fat_Content") == "Low Fat")

In [0]:
display(df1)

#### Data Reading(JSON)

In [0]:
df2 = spark.read.format("json")\
    .option("header",True)\
        .option("multiline",False)\
            .load("/Volumes/pyspark/source/ff_source/drivers.json")
display(df2)

#### Schema Defination

In [0]:
df1.printSchema()

#### DDL Schema

In [0]:
my_ddl_schema = '''

            Item_Identifier string,
            Item_Weight string,
            Item_Fat_Content string,
            Item_Visibility double,
            Item_Type string,
            Item_MRP double,
            Outlet_Identifier string,
            Outlet_Establishment_Year integer,
            Outlet_Size string,
            Outlet_Location_Type string,
            Outlet_Type string,
            Item_Outlet_Sales double'''

In [0]:
df3 = spark.read.format("csv")\
    .schema(my_ddl_schema)\
    .option("header","true")\
        .load("/Volumes/pyspark/source/ff_source/BigMart Sales.csv")

In [0]:
df3.display()

In [0]:
df3.printSchema()

####StructType() Schema

In [0]:
my_struct_schema = StructType([
    StructField("Item_Identifier", StringType(),True),
    StructField("Item_Weight", DoubleType(),True),
    StructField("Item_Fat_Content", DoubleType()),
    StructField("Item_Visibility", DoubleType()),
    StructField("Item_Type", StringType()),
    StructField("Item_MRP", DoubleType()),
    StructField("Outlet_Identifier", StringType()),
    StructField("Outlet_Establishment_Year", IntegerType()),
    StructField("Outlet_Size", StringType()),
    StructField("Outlet_Location_Type", StringType()),
    StructField("Outlet_Type", StringType()),
    StructField("Item_Outlet_Sales", DoubleType())
])

In [0]:
df3 =  spark.read.format("csv")\
    .schema(my_struct_schema)\
    .option("header","true")\
        .load("/Volumes/pyspark/source/ff_source/BigMart Sales.csv")
df3.display()

In [0]:
df3.printSchema()

#### SELECT Statement

In [0]:
#df.select("Item_Identifier","Item_MRP","Item_Weight").display()
df.select(col("Item_Fat_Content"),col("Item_MRP"),col("Item_Weight")).display()

#### ALIAS funstion

In [0]:
df.select(col("Item_Fat_Content").alias("fatcontent"),col("Item_Identifier").alias("identifier")).display()

#### FILTER Transformation

In [0]:
df.display()

##### Scnerio - 1

Applying normal filter condition

In [0]:
df.filter(col("Item_Fat_Content") == "Regular").display()

##### Scnerio - 2

Using multiple conditions to filetr the data using and ( & ) operator

In [0]:
df.filter((col('Item_Type') == 'Soft Drinks') & (col('Item_Weight') < 10.0)).display()

##### Scnerio - 3 

Using isNuLL() and isin() with filter function

In [0]:
df.filter((col("Outlet_Size").isNull()) & (col("Outlet_Location_Type")).isin('Tier 1','Tier 2')).display()

####withColumnRenamed

Is used to rename the column

In [0]:
df.withColumnRenamed('Item_Weight', 'Item_Wt').display()

####withColumn

Is used to create a new column or you can modify the existing one also

In [0]:
df.withColumn('flag',lit('new')).display()

In [0]:
df.withColumn('total_price',col('Item_Weight') * col('Item_MRP')).display()

####Scenario - 2 (withColumn)

In [0]:
df.withColumn('Item_Fat_Content',regexp_replace('Item_Fat_Content','Regular','Reg'))\
    .withColumn('Item_Fat_Content',regexp_replace('Item_Fat_Content','Low Fat','Lf')).display()

In [0]:
df.withColumn('Item_Fat_Content',when(col("Item_Fat_Content") == 'Low Fat', 'lf')\
    .when(col("Item_Fat_Content") == 'Regular', 'Reg')\
        .otherwise(col('Item_Fat_Content'))).display()

In [0]:
df.withColumn('Item_Type',upper(col("Item_Type"))).display()

####Type Casting

It is used to change the data type of columns

In [0]:
df = df.withColumn('Item_Weight',col("Item_Weight").cast(StringType()))
df.display()

####Sort/orderBy

It is used to sort the data based on column

In [0]:
df.sort("Item_Visibility").display()

In [0]:
df.sort(col("Item_Weight").desc()).display()

In [0]:
df.sort(['Item_Weight','Item_Visibility'], ascending = [0,0]).display()

In [0]:
df.sort(['Item_Weight','Item_Visibility'], ascending = [0,1]).display()