### Json Reading

In [0]:
df_json = spark.read.format('json').option('header',True)\
    .option('multiline',False)\
        .load('/Volumes/workspace/default/bigmart_volume/drivers.json')

In [0]:
df_json.display()

### Data Reading

In [0]:
df = spark.read.format('csv').option("header", "true").csv("/Volumes/workspace/default/bigmart_volume/BigMart Sales.csv")







In [0]:
/Volumes/workspace/default/bigmart_volume

In [0]:
df.show()

In [0]:
display(df)

In [0]:
df.printSchema()

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: string (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: string (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: string (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: string (nullable = true)



In [0]:
my_ddl_schema = '''
                    Item_Identifier STRING,
                    Item_Weight STRING,
                    Item_Fat_Content STRING, 
                    Item_Visibility DOUBLE,
                    Item_Type STRING,
                    Item_MRP DOUBLE,
                    Outlet_Identifier STRING,
                    Outlet_Establishment_Year INT,
                    Outlet_Size STRING,
                    Outlet_Location_Type STRING, 
                    Outlet_Type STRING,
                    Item_Outlet_Sales DOUBLE 

                ''' 

### StructType() Schema

In [0]:
from pyspark.sql.types import * 
from pyspark.sql.functions import * 
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

### SELECT 

In [0]:
from pyspark.sql.functions import col
#df.select(col('Item_Identifier'),col('Item_Weight'),col('Item_Fat_Content')).display()

In [0]:
df.select(col('Item_Identifier').alias('Item_ID')).display()

### Filter Condition

In [0]:
df.filter(col('Item_Fat_Content')=='Regular').display(df.limit(5))

In [0]:
df.filter((col('Item_Type')=='Soft Drinks') & (col('Item_Fat_Content')=='Regular')).display()

In [0]:
df.filter((col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin('Tier 1','Tier 2'))).display()

### Column Renamed

In [0]:

df.withColumnRenamed('Item_Weight','Item_Wt').display()


### withColumn

In [0]:
from pyspark.sql.functions import lit
df.withColumn('flag',lit("new")).display()

In [0]:
from pyspark.sql.functions import col
df.withColumn('multiply',col('Item_Weight').cast('float')*col('Item_MRP').cast('float')).display()

In [0]:
from pyspark.sql.functions import regexp_replace, col
df.withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Regular","Reg"))\
    .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Low Fat","Lf")).display()



### TypeCasting

In [0]:
df.withColumn('Item_Weight',col('Item_Weight').cast(StringType()))

DataFrame[Item_Identifier: string, Item_Weight: string, Item_Fat_Content: string, Item_Visibility: string, Item_Type: string, Item_MRP: string, Outlet_Identifier: string, Outlet_Establishment_Year: string, Outlet_Size: string, Outlet_Location_Type: string, Outlet_Type: string, Item_Outlet_Sales: string]

In [0]:
df.printSchema()

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: string (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: string (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: string (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: string (nullable = true)



### Sort

In [0]:
df.sort(col('Item_Weight').desc()).display()

In [0]:
df.sort(['Item_Weight','Item_Visibility'],ascending = [0,0]).display()

In [0]:
df.sort(['Item_weight','Item_Visibility'], ascending = [0,1]).display()

### Drop_duplicates

In [0]:
df.dropDuplicates().display()

In [0]:
df.drop_duplicates(subset=['Item_Type']).display(df.limit(5))

### UNION and UNION BY NAME


In [0]:
data1 = [('1','kad'),
        ('2','sid')]
schema1 = 'id STRING, name STRING' 

df1 = spark.createDataFrame(data1,schema1)

data2 = [('3','rahul'),
        ('4','jas')]
schema2 = 'id STRING, name STRING' 

df2 = spark.createDataFrame(data2,schema2)


In [0]:
df1.display()

id,name
1,kad
2,sid


In [0]:
df2.display()

id,name
3,rahul
4,jas


## Union

In [0]:
df1.union(df2).display()


id,name
1,kad
2,sid
3,rahul
4,jas


In [0]:
data1 = [('kad','1',),
        ('sid','2',)]
schema1 = 'name STRING, id STRING' 

df1 = spark.createDataFrame(data1,schema1)

df1.display()

name,id
kad,1
sid,2


In [0]:
df1.union(df2).display()

name,id
kad,1
sid,2
3,rahul
4,jas


### Union By Name

In [0]:
df1.unionByName(df2).display()

name,id
kad,1
sid,2
rahul,3
jas,4


### String Functions
Initcap(), Lower(), Upper()

In [0]:
df.select(upper('Item_Type').alias('Item_Upper_Type')).display()

### Date Functions
Current_date

In [0]:
df = df.withColumn('curr_date',current_date())
df.display()

### Date_Add

In [0]:
df = df.withColumn('week_after',date_add('curr_date',7))
df.display()

### Date_Sub

In [0]:
df.withColumn('week_before',date_sub('curr_date',7)).display()

In [0]:
df = df.withColumn('week_before',date_add('curr_date',-7)) 
df.display()

### DateDiff

In [0]:
df.withColumn('datediff',datediff('week_after','curr_date')).display()

### Date Format

In [0]:
df.withColumn('week_before',date_format('week_before','dd-MM-yyyy')).display()

### Handling Nulls

In [0]:
df.dropna('all').display()

In [0]:
df.dropna('any').display()


In [0]:
df.dropna(subset=['Outlet_Size']).display().display()


### Filling Null

In [0]:
df.fillna('NotAvailable').display()

In [0]:
df.fillna('NotAvailable',subset=['Outlet_Size']).display()

### Split and Indexing

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')).display()

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')[1]).display()

### Explode

In [0]:
df_exp = df.withColumn('Outlet_Type',split('Outlet_Type',' '))
df_exp.display()

In [0]:
df_exp.withColumn('Outlet_Type',explode('Outlet_Type')).display()

### Group By

In [0]:
df.groupBy('Item_Type').agg(sum('Item_MRP')).display()

In [0]:
df.groupBy('Item_Type').agg(avg('Item_MRP')).display()

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP')).display()

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP'),avg('Item_MRP')).display()