###   Data Reading

In [None]:
dbutils.fs.ls('/FileStore/tables/CSV')

In [None]:
df = spark.read.format('csv')\
        .option("inferSchema",True)\
        .option("header",True)\
        .load("/FileStore/tables/BigMart_Sales.csv")

In [None]:
df.display()

In [None]:
df.show()

### Reading JSON

In [None]:
dbutils.fs.ls("/FileStore/tables/")

In [None]:
df_json = spark.read.format('json')\
            .option('inferSchema',True)\
            .option('header',True)\
            .option('multiLine',False)\
            .load("/FileStore/tables/drivers.json")

## Schema Definition

In [None]:
df.printSchema()

## Converting Item_Weight datatype from double to string

In [None]:
my_ddl_schema = '''
                    Item_Identifier STRING,
                    Item_Weight STRING,
                    Item_Fat_Contentt STRING,
                    Item_Visibility DOUBLE,
                    Item_Type STRING,
                    Item_MRP DOUBLE,
                    Outlet_Identifier STRING,
                    Outlet_Establishment_Year INTEGER,
                    Outlet_Size STRING,
                    Outlet_Location_Type STRING,
                    Outlet_Type STRING,
                    Item_Outlet_Sales DOUBLE
                '''
                


In [None]:
df = spark.read.format('csv')\
            .schema(my_ddl_schema)\
            .option("header",True)\
            .load("/FileStore/tables/BigMart_Sales.csv")

In [None]:
df.display()

In [None]:
df.printSchema()

### StructType Schema

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
my_struct_schema = StructType(
    [
        StructField('Item_Identifier',StringType(),True),
        StructField('Item_Weight',StringType(),True),
        StructField('Item_Fat_Content',StringType(),True),
        StructField('Item_Visibility',StringType(),True),
        StructField('Item_Type',StringType(),True),
        StructField('Item_MRP',StringType(),True),
        StructField('Outlet_Identifier',StringType(),True),
        StructField('Outlet_Establishment_Year',StringType(),True),
        StructField('Outlet_Size',StringType(),True),
        StructField('Outlet_Location_Type',StringType(),True),
        StructField('Outlet_Type',StringType(),True),
        StructField('Item_Outlet_Sales',StringType(),True),
    ]
)

In [None]:
df = spark.read.format('csv')\
            .schema(my_struct_schema)\
            .option("header",True)\
            .load("/FileStore/tables/BigMart_Sales.csv")

In [None]:
df.printSchema()

## SELECT

In [None]:
df.columns

In [None]:
df.select('Item_Identifier',
 'Item_Weight',
 'Item_Fat_Content').display()

In [None]:
df.select(col('Item_Identifier'),col('Item_Weight'),col('Item_Fat_Content')).display()

## Alias

In [None]:
df.select(col('Item_Identifier').alias('Item_ID')).display()

### FILTER

## Scenarios
1) Filter the data with fat content = Regular
2) Slice the data with item type = Soft Drinks and weight<10
3) Fetch the data with Tier in (Tier1 or Tier2) and Outlet Size is Null

In [None]:
df.display()

In [None]:
df.filter(col("Item_Fat_Content")=="Regular").display()

In [None]:
df.filter((col("Item_Type")=="Soft Drinks") & (col("Item_Weight") < 10)).display()

In [None]:
df.filter((col('Outlet_Location_Type').isin('Tier 1','Tier 2')) & (col('Outlet_Size').isNull())).display()

### withColumnRenamed
Changing the name of the column Item_Weight to Item_Wt

In [None]:
df.withColumnRenamed('Item_Weight','Item_Wt').display()

### withColumn
##Scenarios
1) New Column
2) Modifying existing column

In [None]:
df = df.withColumn('flag',lit("new"))

In [None]:
df.display()

In [None]:
df.withColumn('Unit_Cost',col('Item_MRP')/col('Item_Weight')).display()

Modifying the existing column

In [None]:
df.withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Regular","Reg"))\
    .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Low Fat","Lf"))\
    .display()

### Type Casting

In [None]:
df = df.withColumn('Item_Weight',col('Item_Weight').cast(StringType()))

In [None]:
df.printSchema()

### Sort

### Scenario 1
descending order

In [None]:
df.sort(col('Item_Weight').desc()).display()

### Scenario 2
Ascending Order

In [None]:
df.sort(col('Item_Visibility').asc()).display()

### Scenario 3
Sorting based on multiple columns

In [None]:
df.sort(['Item_Weight','Item_Visibility'],ascending=[0,0]).display()

### Scenario 4
sort the dataframe based on Item_weight in descending and Item_Visilibilty in ascending order

In [None]:
df.sort(['Item_Weight','Item_Visibility'],ascending=[0,1]).display()

### Limit

In [None]:
df.limit(10).display()

### Drop

In [None]:
df.drop('Item_Visibility').display()

In [None]:
df.drop('Item_Visibility','Item_Type').display()

### drop_duplicates

## Scenario1

In [None]:
df.dropDuplicates().display()

## Scenario 2

In [None]:
df.drop_duplicates(subset=['Item_Type']).display()

In [None]:
df.distinct().display()   ##similar to dropDuplicates()

### Union and Union By Name

In [None]:
data1 = [('1','kad'),
         ('2','sid')]

schema1 = 'id STRING, name STRING'

df1 = spark.createDataFrame(data1,schema1)

data2 = [('3','chandra'),
         ('4','vallika')]

schema2 = 'id STRING, name STRING'

df2 = spark.createDataFrame(data2,schema2)

In [None]:
df1.display()

In [None]:
df2.display()

In [None]:
df1.union(df2).display()

In [None]:
data1 = [('kad','1'),
         ('sid','2')]

schema1 = 'name STRING, id STRING'

df1 = spark.createDataFrame(data1,schema1)
df1.display()

In [None]:
df1.union(df2).display()

In [None]:
df1.unionByName(df2).display()

### String Functions

### INITCAP

In [None]:
df.select(initcap('Item_Type')).display()

#### lower

In [None]:
df.select(lower('Item_Type')).display()

#### upper

In [None]:
df.select(upper('Item_Type')).display()

### Date Functions

#### current_date

In [None]:
df = df.withColumn('Curr_Date',current_date())
df.display()

#### date_add

In [None]:
df = df.withColumn('Week_After',date_add('curr_date',7))

df.display()

#### date_sub

In [None]:
df.withColumn('Week_Before',date_sub('Curr_Date',7)).display()

In [None]:
df = df.withColumn('Week_Before',date_add('Curr_Date',-7))

df.display()

#### datediff

In [None]:
df = df.withColumn('Date_Diff',datediff('Curr_Date','Week_Before'))

df.display()

#### date_format

In [None]:
df = df.withColumn('Week_Before',date_format('Week_Before','dd-MM-yyyy'))
df.display()

### Handling Nulls

### Dropping Nulls

- any: Drops the record if any column value is null
- all: Drops the record if all column values are null

In [None]:
df.dropna('all').display()

In [None]:
df.dropna('any').display()

In [None]:
df.dropna(subset=['Outlet_Size']).display()

### Filling Null

In [None]:
df.fillna('Not Available').display()

In [None]:
df.fillna('NotAvailable',subset=['Outlet_Size']).display()

### Split and Indexing

#### split

In [None]:
df.withColumn('Outlet_Type',split(col('Outlet_Type')," ")).display()

#### Indexing

In [None]:
df.withColumn('Outlet_Type',split(col('Outlet_Type')," ")[1]).display()

#### Explode

In [None]:
df_exp = df.withColumn('Outlet_Type',split(col('Outlet_Type')," "))
df_exp.display()

In [None]:
df_exp.withColumn('Outlet_Type',explode('Outlet_Type')).display()

### Array_contains

In [None]:
df_exp.display()

In [None]:
df_exp.withColumn('Type1_Flag',array_contains('Outlet_Type','Type1')).display()

#### Group_By

In [None]:
df.groupBy('Item_Type').agg(avg('Item_MRP').alias('Agg_MRP')).display()

#### Group By on two columns

In [None]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP')).display()

In [None]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP'),avg('Item_MRP').alias('Avg_MRP')).display()

### Collect_list

In [None]:
data = [
    ('user1','book1'),
    ('user1','book2'),
    ('user2','book2'),
    ('user2','book4'),
    ('user3','book1')
]

schema = 'user STRING, book STRING'

df_book = spark.createDataFrame(data,schema)

df_book.display()

In [None]:
df_book.groupBy('user').agg(collect_list('book')).display()

### pivot

In [None]:
df.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).display()

### when-otherwise  - similar to case-when statement in sql
This is will be used when we wanna create condition based column

###Scenario 1
If Item_Type is Meat then it is non-veg otherwise it is veg

In [None]:
df = df.withColumn('veg_flag',when(col('Item_Type')=='Meat','Non-Veg').otherwise('Veg'))
df.display()

###Scenario 2
Creating a new column veg_exp_flag based on 2 conditions
- Condition1: veg_flag == Veg and Item_MRP > 100 - Veg_Expensive
- Condition2: veg_flag == Veg and Item_MRP < 100 - Veg_Inexpensive
- Otherwise non-veg

In [None]:
df.withColumn('veg_exp_flag',when((col('veg_flag')=='Veg') & (col('Item_MRP') < 100),'Veg_Inexpensive')\
                .when((col('veg_flag')=='Veg') & (col('Item_MRP') >= 100),'Veg_Expensive')\
                .otherwise('Non_Veg')).display()

###Joins
- Inner Join
- Left join
- Right join
- Full join
- Anti join

In [None]:
dataj1 = [
        ('1','gaur','d01'),
        ('2','kit','d02'),
        ('3','sam','d03'),
        ('4','tim','d03'),
        ('5','aman','d05'),
        ('6','nad','d06')
]

schemaj1 = 'emp_id string, emp_name string, dept_id string'

df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [
        ('d01','HR'),
        ('d02','Marketing'),
        ('d03','Accounts'),
        ('d04','IT'),
        ('d05','Finance')
]

schemaj2 = 'dept_id string, dept_name string'

df2 = spark.createDataFrame(dataj2,schemaj2)


In [None]:
df1.display()

In [None]:
df2.display()

###Inner Join

In [None]:
df1.join(df2,df1['dept_id']==df2['dept_id']).display()

In [None]:
df1.join(df2,df1['dept_id']==df2['dept_id'])\
    .select(df1['emp_id'],df1['emp_name'],df1['dept_id'],df2['dept_name'])\
    .display()

###Left Join

In [None]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'left').display()

### Right Join

In [None]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'right').display()

### FULL join

In [None]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'full').display()

### Anti Join

In [None]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'anti').display()

In [None]:
df2.join(df1,df1['dept_id']==df2['dept_id'],'anti').display()

### window functions

- Row Number()  - Popular use case is remove duplicates
- Rank()
- Dense Rank()
- Cummulative Sum

In [None]:
from pyspark.sql.window import Window

In [None]:
#orderby ascending
df.withColumn('rowCol',row_number().over(Window.orderBy('Item_Identifier'))).display()
#order by descending
#df.withColumn('rowCol',row_number().over(Window.orderBy(desc('Item_Identifier')))).display()

###rank()

In [None]:
df.withColumn('Item_Identifier_Rank',rank().over(Window.orderBy('Item_Identifier'))).display()

###dense_rank()

In [None]:
df.withColumn('Item_Identifier_Dense_R',dense_rank().over(Window.orderBy(col('Item_Identifier').desc()))).display()

In [None]:
df.withColumn('Rank',percent_rank().over(Window.orderBy('Item_Identifier'))).display()

### Cummulative Sum

In [None]:
df.withColumn('Cum_sum',sum('Item_MRP').over(Window.orderBy('Item_Type'))).display()

In [None]:
df.withColumn('Cum_sum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.currentRow))).display()

###User Defined Functions

### STEP 1: Create UDF

In [None]:
def my_func(x):
    return x*x

### STEP 2:Convert into pyspark UDF

In [None]:
my_udf = udf(my_func)

In [None]:
df.withColumn('Square_MRP',my_udf('Item_MRP')).display()

### Data Writing

###CSV

### Writing modes
- append
- overwrite
- error
- ignore

In [None]:
df.write.format('csv')\
    .save('/FileStore/tables/CSV/data.csv')

In [None]:
dbutils.fs.ls('/FileStore/tables/CSV/')

In [None]:
df.write.format('csv')\
    .mode('append')\
    .option('path','/FileStore/tables/CSV/data.csv')\
    .save()

### Table

In [None]:
df.write.format('parquet')\
    .mode('overwrite')\
    .saveAsTable('my_table')

In [None]:
spark.sql('select * from my_table')