## Data Reading


### CSV

In [0]:
# df_csv = spark.read.csv("/Volumes/workspace/default/data_lake/bigmart_sales.csv", header=True, inferSchema=True)
# display(df_csv)
df_csv = spark.read.format('csv').option('inferSchema', True).option('header', True).load("/Volumes/workspace/default/data_lake/bigmart_sales.csv")

display(df_csv)


### JSON

In [0]:
# df_json = spark.read.json("/Volumes/workspace/default/data_lake/drivers.json")
# display(df_json)

df_json = spark.read.format('json').option('inferSchema', True).option('header', True).option('multiline', False).load("/Volumes/workspace/default/data_lake/drivers.json")

display(df_json)

## Schema Definition

In [0]:
df_csv.printSchema()

### DDL SCHEMA

In [0]:
my_ddl_schema = '''
    Item_Identifier string,
    Item_Weight string,
    Item_Fat_Content string,
    Item_Visibility double,
    Item_Type string,
    Item_MRP double,
    Outlet_Identifier string,
    Outlet_Establishment_Year integer,
    Outlet_Size string,
    Outlet_Location_Type string,
    Outlet_Type string,
    Item_Outlet_Sales double
'''

In [0]:
df_csv = spark.read.format('csv')\
                .schema(my_ddl_schema)\
                .option('header', True)\
                .load("/Volumes/workspace/default/data_lake/bigmart_sales.csv")

In [0]:
df_csv.display()

In [0]:
df_csv.printSchema()

### StructType() Schema

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
my_struct_schema = StructType([
    StructField('Item_Identifier', StringType(), True),
    StructField('Item_Weight', StringType(), True),
    StructField('Item_Fat_Content', StringType(), True),
    StructField('Item_Visibility', StringType(), True),
    StructField('Item_Type', StringType(), True),
    StructField('Item_MRP', StringType(), True),
    StructField('Outlet_Identifier', StringType(), True),
    StructField('Outlet_Establishment_Year', StringType(), True),
    StructField('Outlet_Size', StringType(), True),
    StructField('Outlet_Location_Type', StringType(), True),
    StructField('Outlet_Type', StringType(), True),
    StructField('Item_Outlet_Sales', StringType(), True)
])

In [0]:
df_csv = spark.read.format('csv')\
                .schema(my_struct_schema)\
                .option('header', True)\
                .load("/Volumes/workspace/default/data_lake/bigmart_sales.csv")

In [0]:
df_csv.printSchema()

## TRANSFORMATIONS

### SELECT

In [0]:
df_csv.display()

In [0]:
df_select = df_csv.select('Item_Identifier', 'Item_Weight', 'Item_Fat_Content').display()

In [0]:
# useful with fns such as aggregations
df_select = df_csv.select(col('Item_Identifier'), col('Item_Weight'), col('Item_Fat_Content')).display()

### ALIAS

In [0]:
df_csv.select(col('Item_Identifier').alias('Item_ID')).display()

### FILTER

#### Scenario - 1
filter based on item fat content = Regular

In [0]:
df_csv.filter(col('Item_Fat_Content') == 'Regular').display()

#### Scenario - 2
slice data with item type = soft drink and weight < 10

In [0]:
df_csv.display()

In [0]:
df_csv.filter((col('Item_Type') == 'Soft Drinks') & (col('Item_Weight').cast('Double') < 10)).display()

#### Scenario - 3
fetch data with tier in (tier 1 or tier 2) and outlet size is null

In [0]:
df_csv.filter((col('Outlet_Location_Type').isin('Tier 1', 'Tier 2')) & (col('Outlet_Size').isNull())).display()


### withColummnRenamed

In [0]:
df_csv.withColumnRenamed('Item_Weight', 'Item_wt').display()

### withColumn

#### Scenario - 1
create a new column based on any existing column

In [0]:
df_csv = df_csv.withColumn('flag', lit('new'))
df_csv.display()

new column based on transformation

In [0]:
df_csv.withColumn('multiply', col('Item_Weight') * col('Item_MRP')).display()

#### Scenario - 2
modify existing column

In [0]:
df_csv.withColumn('Item_Fat_Content', regexp_replace(col('Item_Fat_Content'), 'Regular', 'Reg'))\
    .withColumn('Item_Fat_Content', regexp_replace(col('Item_Fat_Content'), 'Low Fat', 'LF'))\
    .display()

### Type Casting
convert datatype 

In [0]:
df_csv = df_csv.withColumn('Item_Weight', col('Item_Weight').cast(StringType()))
df_csv.printSchema()

### Sort/orderBy

#### Scenario - 1
sort based on item_weight in desc order

In [0]:
df_csv.sort(col('Item_Weight').desc()).display()

#### Scenario - 2


In [0]:
df_csv.sort(col('Item_Visibility').asc()).display()

#### scenario - 3
sort based on multiple columns

In [0]:
df_csv.sort(['Item_Weight', 'Item_Visibility'], ascending = [0,0]).display()

#### Scenario - 4
desc order for first col, and asc order in second col

In [0]:
df_csv.sort(['Item_Weight', 'Item_Visibility'], ascending = [0,1]).display()

### LIMIT


In [0]:
df_csv.limit(10).display()

### DROP
to drop columns

#### Scenario - 1
drop single column

In [0]:
df_csv.drop('Item_Visibility').display()

#### Scenario - 2
drop multiple columns

In [0]:
df_csv.drop('Item_Visibility', 'Item_MRP').display()

### DROP_DUPLICATES

#### Scenario - 1
d-dup

In [0]:
df_csv.dropDuplicates().display()

#### Scenario - 2
remove duplicates based on a subset of columns

In [0]:
df_csv.dropDuplicates(subset=['Item_Type']).display()

In [0]:
df_csv.distinct().display()

### UNION and UNION BY NAME

#### Preparing Dataframes

In [0]:
data1 = [('1', 'kad'), ('2', 'sid')]
schema1 = 'id STRING, name STRING'

df1 = spark.createDataFrame(data1, schema1)

data2 = [('3', 'clara'), ('4', 'codess')]
schema2 = 'id STRING, name STRING'

df2 = spark.createDataFrame(data2, schema2)

In [0]:
df1.display()
df2.display()


### UNION

In [0]:
df1.union(df2).display()

changing the columns distorts the data. eg

In [0]:
data1 = [('kad', '1'), ('sid', '2')]
schema1 = 'name STRING, id STRING'

df1 = spark.createDataFrame(data1, schema1)
df1.display()
df1.union(df2).display()

### UnionByName
compares data by column names and maps accordingly

In [0]:
df1.unionByName(df2).display()

### STRING FUNCTIONS

#### Initcap()

In [0]:
df_csv.select(initcap('Item_Type').alias('upper_Item_Type')).display()
# upper()
# lower()

### DATE FUNCTIONS

#### Current_Date

In [0]:
df_csv = df_csv.withColumn('curr_date', current_date())
df_csv.display()

#### Date_Add()

In [0]:
df_csv = df_csv.withColumn('week_after', date_add('curr_date', 7))
df_csv.display()

#### Date_Sub

In [0]:
# df_csv = df_csv.withColumn('week_before', date_sub('curr_date', 7))
df_csv = df_csv.withColumn('week_before', date_add('curr_date', -7))

df_csv.display()

#### DATEDIFF

In [0]:
df_csv = df_csv.withColumn('datediff', datediff( 'week_after', 'curr_date'))
df_csv.display()

#### Date_Format

In [0]:
df_csv = df_csv.withColumn('week_before', date_format('week_before', 'dd-MM-yyyy'))
df_csv.display()

### HANDLING NULLS

#### dropping nulls

In [0]:
# drops records with null in all columns
df_csv.dropna('all').display()

In [0]:
# drops records with null in any column
df_csv.dropna('any').display()

In [0]:
# subsets columns with null
df_csv.dropna(subset=['Outlet_Size']).display()

#### filling nulls

In [0]:
df_csv.fillna('NotAvailabale').display()

In [0]:
df_csv.fillna('NotAvailable', subset=['Outlet_Size']).display()

### SPLIT and INDEXING

#### SPLIT

In [0]:
df_csv.withColumn('Outlet_Type', split('Outlet_Type', ' ')).display()

#### INDEXING

In [0]:
df_csv.withColumn('Outlet_Type', split('Outlet_Type', ' ')[0]).display()

### EXPLODE
creates new rows from columns that are arrays or lists

In [0]:
df_exp = df_csv.withColumn('Outlet_Type', split('Outlet_Type', ' '))
df_exp.display()

In [0]:
df_exp.withColumn('Outlet_Type', explode('Outlet_Type')).display()

### ARRAY_CONTAINS
strongly aligns with spilt functions for array/list columns. it checks if an object is available in the column

In [0]:
df_exp.display()

In [0]:
df_exp.withColumn('Type1_flag', array_contains('Outlet_Type', 'Type1')).display()

### GROUP_BY

#### Scenario - 1
sum of item_mrp for each item_type

In [0]:
df_csv.groupBy('Item_Type').agg(sum('Item_MRP').alias('sum')).display()

#### Scenario - 2
average item_mrp for each item_type

In [0]:
df_csv.groupBy('Item_Type').agg(avg('Item_MRP').alias('average')).display()

#### Scenario - 3
grouping by multiple columns

In [0]:
df_csv.groupBy('Item_Type', 'Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP')).display()

#### Scenario - 4
grouping and aggreagting multiple columns

In [0]:
df_csv.groupBy('Item_Type', 'Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP'), avg('Item_MRP').alias('Average_MRP')).display()

### COLLECT_LIST
aggregates all the columns and groups them into lists

In [0]:
data = [('user1', 'book1'), ('user1', 'book2'), ('user2', 'book2'), ('user2', 'book4'), ('user3', 'book1')]

schema = 'user string, book string'

df_book = spark.createDataFrame(data, schema)

df_book.display()


In [0]:
df_book.groupBy('user').agg(collect_list('book')).display()

### PIVOT
groups and aggregates by rows and columns

In [0]:
df_csv.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).display()

### WHEN-OTHERWISE

#### Scenario - 1

In [0]:
df_csv = df_csv.withColumn('veg_flag', when(col('Item_Type') == 'Meat', 'Non-Veg').otherwise('Veg'))

df_csv.display()

In [0]:
df_csv.withColumn('veg_exp_flag', when((col('veg_flag') == 'Veg') & (col("Item_MRP") < 100), 'Veg_Inexpensive').when((col('veg_flag') == 'Veg') & (col("Item_MRP") > 100), 'Veg_expensive').otherwise('Non-Veg')).display()

### JOINS

In [0]:
dataj1 = [
    ('1', 'clara', 'd01'),
    ('2', 'john', 'd02'),
    ('3', 'maria', 'd03'),
    ('4', 'peter', 'd03'),
    ('5', 'sam', 'd05'),
    ('6', 'tim', 'd06')
]

schemaj1 = 'emp_id string, emp_name string, dept_id string'

df1 = spark.createDataFrame(dataj1, schemaj1)

dataj2 = [
    ('d01', 'HR'),
    ('d02', 'IT'),
    ('d03', 'ACCOUNTS'),
    ('d04', 'MARKETING'),
    ('d05', 'FINANCE')
]

schemaj2 = 'dept_id string, department string'

df2 = spark.createDataFrame(dataj2, schemaj2)

In [0]:
df1.display()
df2.display()

#### Inner Join

In [0]:
df1.join(df2, df1['dept_id'] == df2['dept_id'], 'inner').display()

#### Left Join

In [0]:
df1.join(df2, df1['dept_id'] == df2['dept_id'], 'left').display()

#### Right Join

In [0]:
df1.join(df2, df1['dept_id'] == df2['dept_id'], 'right').display()

#### Anti Join

In [0]:
df1.join(df2, df1['dept_id'] == df2['dept_id'], 'anti').display()

### WINDOW FUNCTIONS

#### ROW NUMBER()

In [0]:
df_csv.display()

In [0]:
from pyspark.sql.window import Window

In [0]:
df_csv.withColumn('rowCol', row_number().over(Window.orderBy('Item_Identifier'))).display()

#### RANK() vs DENSE RANK()

In [0]:
df_csv.withColumn('rank', rank().over(Window.orderBy(col('Item_Identifier').desc())))\
    .withColumn('dense_rank', dense_rank().over(Window.orderBy(col('Item_Identifier').desc()))).display()

#### Cumulative sum

In [0]:
df_csv.withColumn('cumsum', sum('Item_MRP').over(Window.orderBy('Item_Type'))).display()

In [0]:
df_csv.withColumn('cumsum', sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding, Window.currentRow))).display()

In [0]:
df_csv.withColumn('cumsum', sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))).display()

### USER DEFINED FUNCTIONS (UDF)
for transformations that can't be completed using built-in functionss or in scenarios where the built-in functions are too complex to handle the transformations.
PS: udfs have to add python interpreter everytime in the executor and this leads to performance issues. hence, they are not recommended for use. built-in functions are always recommended unless there are no walk-arounds

#### STEP - 1
create the udf function

In [0]:
def my_func(x):
    return x*x

#### STEP - 2
create a function name

In [0]:
my_udf = udf(my_func)

#### STEP - 3
use function as normal function

In [0]:
df_csv.withColumn('mynewcol', my_udf('Item_MRP')).display()

## DATA WRITING

### CSV

In [0]:
df_csv.write.format('csv')\
    .save('/Volumes/workspace/default/data_lake/transformed_bigmart_sales.csv')

### Data Writing Modes

#### APPEND

In [0]:
df_csv.write.format('csv')\
    .mode('append')\
    .save('/Volumes/workspace/default/data_lake/transformed_bigmart_sales.csv')

In [0]:
df_csv.write.format('csv')\
    .mode('append')\
    .option('path', '/Volumes/workspace/default/data_lake/transformed_bigmart_sales.csv')\
    .save()

#### OVERWRITE

In [0]:
df_csv.write.format('csv')\
    .mode('overwrite')\
    .save('/Volumes/workspace/default/data_lake/transformed_bigmart_sales.csv')

#### ERROR

In [0]:
df_csv.write.format('csv')\
    .mode('error')\
    .save('/Volumes/workspace/default/data_lake/transformed_bigmart_sales.csv')

#### IGNORE

In [0]:
df_csv.write.format('csv')\
    .mode('ignore')\
    .save('/Volumes/workspace/default/data_lake/transformed_bigmart_sales.csv')

### PARQUET FILE FORMAT (COLUMNAR FILE FORMAT)
stores data in columns for highly efficient compression and encoding. useful for big data and data with perforance issues

In [0]:
df_csv.write.format('parquet')\
    .mode('overwrite')\
    .save('/Volumes/workspace/default/data_lake/transformed_bigmart_sales.csv')

#### TABLE

In [0]:
df_csv.write.format('delta')\
    .mode('overwrite')\
    .saveAsTable('my_table')

### SPARK SQL
to use sql, first convert the dataframe to a temp_view. this creates a temporary view in our cluster, which can then be queried like sql views. the views will be eliminated at the end of the session.

#### createTempView()

In [0]:
df_csv.createTempView('my_view')

In [0]:
%sql

select * from my_view where Item_Fat_Content = 'Low Fat'

to write the data after transformations with sql, convert the result to dataframe

In [0]:
df_sql = spark.sql("select * from my_view where Item_Fat_Content = 'Low Fat'")

In [0]:
df_sql.display()