### Data Reading - JSON

In [0]:
df_json=spark.read.format('json').option('inferSchema','True')\
                  .option('header','True')\
                  .option('multiline','False')\
                  .load('/Volumes/cert_prep_catalog/default/rawdata/drivers.json')


In [0]:
df_json.display()

### Data Reading

In [0]:
dbutils.fs.ls

In [0]:
df=spark.read.format('csv').option('inferSchema','True').option('header','True').load('/Volumes/cert_prep_catalog/default/rawdata/')

In [0]:
df.show()

In [0]:
df.display()

### Schema Definition

In [0]:
df.printSchema()

### DDL Schema

In [0]:
my_ddl_schema = """
Item_Identifier STRING,
Item_Weight STRING,
Item_Fat_Content STRING,
Item_Visibility DOUBLE,
Item_Type STRING,
Item_MRP DOUBLE,
Outlet_Identifier STRING,
Outlet_Establishment_Year INT,
Outlet_Size STRING,
Outlet_Location_Type STRING,
Outlet_Type STRING,
Item_Outlet_Sales DOUBLE
"""


In [0]:
df=spark.read.format('csv').schema(my_ddl_schema).option('header','True').load('/Volumes/cert_prep_catalog/default/rawdata/BigMart Sales.csv')


In [0]:
df.show()


In [0]:
from pyspark.sql.types import * 
from pyspark.sql.functions import *  

In [0]:
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    DoubleType,
    IntegerType
)
from pyspark.sql.functions import col 

In [0]:
my_stct_schema=StructType([
StructField('Item_Identifier',StringType(),True),
StructField('Item_Weight',StringType(),True),
StructField('Item_Fat_Content',StringType(),True),
StructField('Item_Visibility',StringType(),True),
StructField('Item_Type',StringType(),True),
StructField('Item_MRP',StringType(),True),
StructField('Outlet_Identifier',StringType(),True),
StructField('Outlet_Establishment_Year',StringType(),True),
StructField('Outlet_Size',StringType(),True),
StructField('Outlet_Location_Type',StringType(),True),
StructField('Outlet_Type',StringType(),True),
StructField('Item_Outlet_Sales',StringType(),True)
])

In [0]:
df=spark.read.format('csv').schema(my_stct_schema).option('header','True').load('/Volumes/cert_prep_catalog/default/rawdata/BigMart Sales.csv')

In [0]:
df.printSchema()

### SELECT

In [0]:
df_sel=df.select('Item_identifier','Item_Weight','Item_Fat_Content').display()

In [0]:
df.select(col('Item_Identifier'),col('Item_Weight'),col('Item_Fat_Content')).display()

### ALIAS

In [0]:
df.select(col('Item_Identifier').alias('ItemId')).display()

In [0]:
df.display()

### FILTER

## Scenario - 1 - Fetch those record where item Fat content = 'Regular'

In [0]:
df.filter(col('Item_Fat_Content')=='Regular').display()

## Scenario - 2 : Filter on item type is Soft drinks and item weight less than 10

In [0]:
df.filter((col('Item_Type')=='Soft Drinks')&(col("Item_Weight").cast('double')<10)).display()

## Scenario - 3 : Filter the data with tier in (Tier 1 or tier 2) and outlet size is null

In [0]:
df.filter((col('Outlet_Location_Type').isin('Tier 1','Tier 2'))&(col("Outlet_Size").isNull())).display()

### Withcolumnrenamed

In [0]:
df.withColumnRenamed('Item_Weight','Item_wt').display()

### Withcolumn - New column creation/ modifying columns

### Scenario - 1: new column: A flag column

In [0]:
df=df.withColumn('flag',lit("new"))

In [0]:
df.display()

In [0]:
df.withColumn(
    'multiply',
    col('Item_MRP').cast('double') * col('Item_Weight').cast('double')
).display()

### Scenario  -2 : modify the existing columns

In [0]:
df.withColumn('item_Fat_Content',regexp_replace('item_Fat_Content','Regular','Reg'))\
  .withColumn('item_Fat_Content',regexp_replace('item_Fat_Content','Low Fat','LF')).display()

### Type Casting

In [0]:

df = df.withColumn('Item_Weight', col('Item_Weight').cast(StringType())) 

In [0]:
df.printSchema()

### Sorting -

In [0]:
df.sort(col('Item_Weight').desc()).display()

Scenario - 2: Sort on item visibility in asc

In [0]:
df.sort(col("Item_Visibility").asc()).display()

### Scenario - 3: Sort the 

In [0]:
df.sort(['Item_weight','Item_Visibility'],ascending=[0,0]).display()

In [0]:
df.sort(['Item_Weight','Item_Visibility'],ascending=[0,1]).display()

### Limit

In [0]:
df.limit(10).display()

## Drop - remove columns from dataframw

In [0]:
df.drop('item_visibility').display()

scenario -2 

In [0]:
df.drop('Item_Visibility','Item_Type').display()

### Drop duplicates

In [0]:
df.dropDuplicates().display()

### scenario -2: duplicates removal on subset of columns

In [0]:
df.dropDuplicates(subset=["Item_Type"]).display()

In [0]:
df.distinct().display()

### Union and Union by 

In [0]:
data1 = [('1','kad'),
        ('2','sid')]
schema1 = 'id STRING, name STRING' 

df1 = spark.createDataFrame(data1,schema1)

data2 = [('3','rahul'),
        ('4','jas')]
schema2 = 'id STRING, name STRING' 

df2 = spark.createDataFrame(data2,schema2)

### union

In [0]:
df1.union(df2).display()


### String Functions

### InitCap

In [0]:
df.select(initcap('item_type')).display()

In [0]:
df.select(lower('item_type')).display()

In [0]:
df.select(lower('item_type').alias('lower_item_type')).display()

### Date Functions

### Currentdate

In [0]:

df = df.withColumn('curr_date',current_date())

df.display()

date_add

In [0]:
df=df.withColumn('week_after',date_add('curr_date',7))
df.display()

###date_sub

In [0]:
df.withColumn('sub',date_sub('curr_date',7)).display()

In [0]:
df=df.withColumn('sub',date_add('curr_date',-7))
df.display()

> ### datediff - interval between two dates

In [0]:
df=df.withColumn('datediff',date_diff('week_after','curr_date'))
df.display()

### Dateformat

### Hnadling Nulls

### Dropping Nulls

In [0]:
df.dropna('all').display()

In [0]:
df.dropna('any').display()

In [0]:
df.dropna(subset=['Outlet_Size']).display()

### Filling nulls

In [0]:
df.fillna('NotAvailable').display()

In [0]:
df.fillna('Not Available',subset=['Outlet_Size']).display()

### SPLIT

In [0]:

df.withColumn('Outlet_Type',split('Outlet_Type',' ')).display()

### INDEXING

In [0]:

df.withColumn('Outlet_Type',split('Outlet_Type',' ')[1]).display()

### Explode

In [0]:
df_exp = df.withColumn('Outlet_Type',split('Outlet_Type',' '))

df_exp.display()

In [0]:
df_exp.withColumn('Outlet_type',explode('Outlet_Type')).display()

### ArrayContains

In [0]:
df_exp.withColumn('type_1Flag',array_contains('Outlet_Type','Type1')).display()

### Group BY

### Scenario -1: We have item type, i want to find sum of mrp for each type

In [0]:
df.groupBy("Item_Type").agg(round(sum("Item_MRP"),2)).display()

In [0]:
df.groupBy("Item_Type").agg(round(avg("Item_MRP"),2)).display()

### scenario - 3:

In [0]:
df.groupBy("Item_Type",'Outlet_Size').agg(sum("Item_MRP").alias('Total_MRP')).display()

### Scenario -4:

In [0]:
df.groupBy("Item_Type",'Outlet_Size').agg(sum("Item_MRP").alias('Total_MRP'),avg('Item_MRP').alias('avg')).display()

###pivot

In [0]:
df.groupBy("Item_Type").pivot('Outlet_Size').agg(avg('Item_MRP')).display()

### When - Otherwise - kinda like case when 