In [0]:
# Import pyspark sql types and functions
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
my_struct_schema = StructType([
                                StructField('Item_Identifier', StringType(), True),
                                StructField('Item_Weight', StringType(), True),
                                StructField('Item_Fat_Content', StringType(), True),
                                StructField('Item_Visibility', StringType(), True),
                                StructField('Item_Type', StringType(), True),
                                StructField('Item_MRP', StringType(), True),
                                StructField('Outlet_Identifier', StringType(), True),
                                StructField('Outlet_Establishment_Year', StringType(), True),
                                StructField('Outlet_Size', StringType(), True),
                                StructField('Outlet_Location_Type', StringType(), True),
                                StructField('Outlet_Type', StringType(), True),
                                StructField('Item_Outlet_Sales', StringType(), True)
])

In [0]:
df = spark.read.format('csv').schema(my_struct_schema).option('header', True).load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df.display()

### STRING FUNCTIONS

In [0]:
df.withColumn('Item_Type', upper('Item_Type')).display()

In [0]:
df.withColumn('Item_Type', lower('Item_Type')).display()

In [0]:
df.withColumn('Item_Type', initcap('Item_Type')).display()

In [0]:
# Below mentioned are to just to select the columns
# convert all initials into uppercase
df.select(initcap('Item_Type').alias('Item')).display()

In [0]:
# convert to upper
df.select(upper('Item_Type').alias('ITEM')).display()

In [0]:
# convert to lower
df.select(lower('Item_Type').alias('item')).display()

### DATE FUNCTIONS

In [0]:
# add a column with current date
df = df.withColumn('curr_date', current_date())

In [0]:
df.display()

In [0]:
# create a column by adding 7 days
df = df.withColumn('week_after', date_add('curr_date', 7))
df.display()

In [0]:
# create a column by going back 7 days
df = df.withColumn('week_before', date_add('curr_date', -7))
# we can also use df.withColumn('week_before', date_sub('curr_date', 7))
df.display()

### DATE DIFF

In [0]:
df = df.withColumn('date_diff', datediff('curr_date', 'week_before'))
df.display()

### DATE FORMAT

In [0]:
df = df.withColumn('week_after', date_format('week_after', 'dd-MM-yyyy'))
df.display()

## Handling Nulls

### Drop Nulls

In [0]:
# drop the rows if any of the columns has null
df.dropna('any').display()

In [0]:
# drop the rows if all of their columns has null
df.dropna('all').display()

In [0]:
# drop the rows if a specific subset of columns has null values
df.dropna(subset=['Outlet_Size']).display()

### Filling Nulls

In [0]:
# replace all nulls in all columns with a value
df.fillna('NotAvailable').display()

In [0]:
# replace all nulls in a subset of columns with a value
df.fillna('NotAVailable', subset=['Outlet_Size']).display()

### SPLIT and Indexing

In [0]:
df.withColumn('Outlet_Type', split('Outlet_Type', ' ')).display()

In [0]:
df.withColumn('Outlet_Type', split('Outlet_Type', ' ')[1]).display()

### EXPLODE
#### It explodes each element in list into separate row

In [0]:
df_1 = df.withColumn('Outlet_Type', split('Outlet_Type', ' '))
df_1.display()

In [0]:
df_1.withColumn('Outlet_Type', explode('Outlet_Type')).display()

### ARRAY_CONTAINS

In [0]:
df_1 = df.withColumn('Outlet_Type', split('Outlet_Type', ' '))

In [0]:
df_1.withColumn('Type1_flag', array_contains('Outlet_Type', 'Type1')).display()

### Group By

In [0]:
df.groupBy('Item_Type').agg(sum('Item_MRP')).display()

In [0]:
df.groupBy('Item_Type').agg(avg('Item_MRP').alias('ItemAvg_MRP')).display()

#### Group By - Collect List

In [0]:

book_data = [('user1', 'book1'), ('user1', 'book2'), ('user2', 'book3'), ('user3', 'book4'), ('user3', 'book5')]
book_schema = 'username string, books string'
df_books = spark.createDataFrame(book_data, book_schema)
df_books.display()

In [0]:
df_books.groupBy('username').agg(collect_list('books')).display()

### PIVOT

In [0]:
df_for_pivot = df.select(col('Item_Type'), col('Outlet_Size'), col('Item_MRP'))
df_for_pivot.display()

In [0]:
df_for_pivot.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).display()

In [0]:
df.display()

### WHEN-OTHERWISE

#### Scenario 1

In [0]:
df = df.withColumn('Veg_flag', when(col('Item_Type') == 'Meat', 'Non-Veg').otherwise('Veg'))

#### Scenario 2

In [0]:
df.withColumn('Veg_exp_flag', when((col('Veg_flag')=='Veg') & (col('Item_MRP')<100), 'Veg_Inexpensive')\
                                    .when((col('Veg_flag')=='Veg') & (col('Item_MRP')>100), 'Veg_Expensive')\
                                        .otherwise('Non-Veg')).display()

### JOINS

In [0]:
dataj1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')] 

schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING' 

df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]

schemaj2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(dataj2,schemaj2)

In [0]:
df1.display()

In [0]:
df2.display()

### INNER JOIN
#### rows that are common in two dataframes