# Advanced PySpark on DataBricks - Part 1
* Notebook by Adam Lang
* Date: 12/24/2024

# Overview
* This is a detailed notebook on how to leverage Advanced Data Wrangling and Data Trasnformations using PySpark on DataBricks.

### Reading in CSV Dataset
* Basic uploading and reading of a CSV file in databricks.

In [0]:
## how to find filestore link
dbutils.fs.ls('/FileStore/tables/')

In [0]:
## read in CSV dataset into df
df = spark.read.format("csv").option("inferSchema",
                                True).option('header', True).load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
## display df
df.show()

In [0]:
## better than .show() is --> .display()
df.display()

### Reading in JSON Dataset
* Working with JSON files in databricks.
* This is a **Single Line** JSON file, so we will set the `multiline` to `False`.

In [0]:
## check file location
dbutils.fs.ls('/FileStore/tables/')

In [0]:
## read in JSON dataset --> using `\` for multiple line code schema
df_json = spark.read.format('json').option('inferSchema', True)\
                            .option('header', True)\
                            .option('multiline', False)\
                            .load('/FileStore/tables/drivers.json')

In [0]:
## display json
df_json.display()

### Schema Definitions

In [0]:
## print schema for csv file
df.printSchema()

### DDL Schema
* DDL (Data Definition Language) originates from SQL. 
* This is quicker to use than `StructType()`

In [0]:
## convert Item_weight from DOUBLE --> STRING 
my_ddl_schema = '''
                   Item_Identifier STRING,
                   Item_Weight STRING, 
                   Item_Fat_Content STRING,
                   Item_Visibility DOUBLE,
                   Item_Type STRING,
                   Item_MRP DOUBLE,
                   Outlet_Identifier STRING,
                   Outlet_Establishment_Year INTEGER,
                   Outlet_Size STRING,
                   Outlet_Location_Type STRING,
                   Outlet_Type STRING,
                   Item_Outlet_Sales DOUBLE
                '''


Below instead of using `inferSchema()` within `.schema()`, we will use the custom schema defined above: `my_ddl_schema`

In [0]:
## read CSV again without inferSchema ---> use custom `my_ddl_schema`
df = spark.read.format('csv')\
                .schema(my_ddl_schema)\
                .option('header', True)\
                .load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
## display
df.display()

In [0]:
## check schema
df.printSchema()

Summary:
* We can see the `Item_Weight` variable was successfully changed from DOUBLE to STRING.

### StructType() Schema
* This is the "other" method to handle schema conversions. 

In [0]:
## import libraries
from pyspark.sql.types import * ## import all types
from pyspark.sql.functions import * ## import all

In [0]:
## transform all dtypes to STRING 
my_struct_schema = StructType([
                            StructField('Item_Identifier', StringType(), True),
                            StructField('Item_Weight', StringType(), True),
                            StructField('Item_Fat_Content', StringType(), True),
                            StructField('Item_Visibility', StringType(), True),
                            StructField('Item_MRP', StringType(), True),
                            StructField('Outlet_Identifier', StringType(), True),
                            StructField('Outlet_Establishment_Year', StringType(), True),
                            StructField('Outlet_Size', StringType(), True),
                            StructField('Outlet_Location_Type', StringType(), True),
                            StructField('Outlet_Type', StringType(), True),
                            StructField('Item_Outlet_Sales', StringType(), True)

])

In [0]:
## read CSV file again using schema above
df = spark.read.format('csv')\
                .schema(my_struct_schema)\
                .option('header', True)\
                .load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
## printSchema
df.printSchema()

Summary:
* We were able to transform all dtypes to STRING again using the other method. 

### SELECT function in PySpark
* This is similar to SQL `select` function, and one of the most important functions in PySpark data transformations and wrangling.

In [0]:
## lets see the columns again 
df.display()

1. First way to use `.select` is to simply input column names.

In [0]:
## create new df with ONLY first 3 cols
df_sel = df.select('Item_Identifier', 'Item_Weight','Item_Fat_Content').display()

2. Second way to use `.select is to use `col` operator

In [0]:
## utilize .col() selector for each column
df_sel = df.select(col('Item_Identifier'), col('Item_Weight'), col( 'Item_Fat_Content')).display()

### ALIAS 
* This is similar to the SQL function.
* This is similar to the SQL function where you can assign a temporary alias name to a column.
* **Note: you need the `col` object to specify the column you are aliasing!**

In [0]:
## alias a column to different name 
df.select(col('Item_Identifier').alias('Item_ID')).display()

## FILTER/WHERE
* Again similar to SQL you can filter a dataframe in PySpark. 
* Here are 3 scenarios we can look at below:
1. Filter the data with fat content = Regular
2. Slice the data with item type = Soft Drinks and weight < 10.
3. Fetch the data with Tier in (Tier1 or Tier2) and Outlet Size is Null.

### Filter Scenario 1
* We want to filter the `Item_Fat_Content` column for ONLY values with `Regular`.

In [0]:
## filter column for `Regular'
df.filter(col('Item_Fat_Content')=='Regular').display()

### Filter Scenario 2
* Slice the dataframe with item type column = Soft Drinks and weight < 10.
* So, there are 2 filters we need to apply, again very similar to a SQL query.

In [0]:
df.display()

## Change Data Types
* Above we had changed the dtypes and we need to change them back.

In [0]:
## convert Item_weight from DOUBLE --> STRING 
my_ddl_schema = '''
                   Item_Identifier STRING,
                   Item_Weight DOUBLE, 
                   Item_Fat_Content STRING,
                   Item_Visibility DOUBLE,
                   Item_Type STRING,
                   Item_MRP DOUBLE,
                   Outlet_Identifier STRING,
                   Outlet_Establishment_Year INTEGER,
                   Outlet_Size STRING,
                   Outlet_Location_Type STRING,
                   Outlet_Type STRING,
                   Item_Outlet_Sales DOUBLE
                '''


In [0]:
## read CSV again without inferSchema ---> use custom `my_ddl_schema`
df = spark.read.format('csv')\
                .schema(my_ddl_schema)\
                .option('header', True)\
                .load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df.printSchema()

In [0]:
## display df
df.display()

In [0]:
## filter scenario 2
df.filter((col('Item_Type') == 'Soft Drinks') & (col('Item_Weight')<10)).display()

### Filter Scenario 3 
* Fetch the data with Tier in (Tier1 or Tier2) and Outlet Size is Null.
* The columns we need for this analysis:
  * `Outlet_Size`
  * `Outlet_Location_Type`

* The functions we need to this:
  * `isin()`
  * `isNull()`


In [0]:
df.display()

In [0]:
## filter scenario 3
df.filter((col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin('Tier 1','Tier 2'))).display()

### withColumnRenamed
* This is another powerful PySpark transformation. 
* This helps us rename columns at the dataframe level. 

In [0]:
## using `withColumnRenamed` -- current name, new name
df.withColumnRenamed('Item_Weight', 'Item_Wt').display()

### withColumn
* This is used to create a NEW COLUMN, usually in 2 scenarios:
  1. This could be a completely new column! 
  2. This could be a current column you are modifying the data and saving in the new column.

#### Scenario 1
* Creating a brand new column.
* We need to use the `.lit()` function which does the following: "the value to make it as a PySpark **literal**. If a column is passed, it returns the column as is."
* Below we will create a new column called **Flag** with the value **new**.

In [0]:
## create brand new column --> column name, lit(value to add)
## lit supports a list type now
df = df.withColumn('flag', lit('new'))

In [0]:
## display the transformation
df.display()

#### Scenario 2 - Create new column with existing column
* We will create a new column based off existing column data transformations.
* We will try multiplying `Item_Weight` and `Item_MRP`.

In [0]:
## print schema
df.printSchema()

In [0]:
## new col from existing col
df.withColumn('multiply', col('Item_Weight')*col('Item_MRP')).display()

#### Scenario 3 - Modify Existing column contents in place
* We will take the column `Item_Fat_Content` and modify the string values in place WITHOUT creating a new column. 
* There are multiple ways to do this but we will leverage the `regexp_replace()` function.

In [0]:
## scenario 3 -- modify column in place
## replace 'Regular' with 'Reg'
df.withColumn('Item_Fat_Content', regexp_replace(col('Item_Fat_Content'), "Regular","Reg"))\
    .withColumn('Item_Fat_Content', regexp_replace(col('Item_Fat_Content'), "Low Fat", "LF")).display()

### Type Casting
* Helps to cast columns when applying joins to same column type as well as with aggregations. 
* Casting a column data type to another.

In [0]:
## type casting 
df = df.withColumn('Item_Weight', col('Item_Weight').cast(StringType()))

In [0]:
## print schema
df.printSchema()

Summary:
* We were able to cast the `Item_Weight` from a DOUBLE to a STRING.

# Sort/orderBy
* Sort by ascending or descending order. Similar to Pandas functions. 

## Scenario 1
* We will use the built-in functions:
    * `.sort()`
    * `.asc()`
    * `.desc()`

In [0]:
## sort col in descending order
df.sort(col('Item_Weight').desc()).display()

## Scenario 2 

In [0]:
## ascending sort
df.sort(col('Item_Visibility').asc()).display()

## Scenario 3 - Sorting multiple columns

In [0]:
## multiple col sort -- pass a list
## boolean list 0=False, 1=True
df.sort(['Item_Weight','Item_Visibility'], ascending=[0,0]).display()

## Scenario 4 - sort 2 columns in different orders

In [0]:
## 2 cols different order sort
df.sort(['Item_Weight','Item_Visibility'], ascending=[0,1]).display()

# Limit
* This most of us know from SQL syntax to limit how many data points we display.

In [0]:
## limit usage
df.limit(10).display()

# Drop
* Another data wrangling function most commonly used to eliminate columns and rows in a df.

## Drop Scenario 1

In [0]:
## drop 1 column from df
df.drop('Item_Visibility').display()


## Drop Scenario 2 -- multiple columns

In [0]:
## drop multiple cols
df.drop('Item_Visibility','Item_Type').display()

# Drop_Duplicates
* Another useful function for dropping duplicate rows and values and more intricate duplication issues.

## Scenario 1 - Entire row
* There are 2 options for this.

* The `dropDuplicates()` function is also known as "dedup" so if you simply want to "dedup" all duplicate data points in your data this is a quick way to do this. 

In [0]:
df.dropDuplicates().display()

## Scenario 2 - Dropping duplicate subsets in specific column

In [0]:
## drop duplicates in a specific col -- pass a list to subset
df.drop_duplicates(subset=['Item_Type']).display()

## Scenario 3 - `.distinct()`
* This will give us the distinct or unique rows only.

In [0]:
df.distinct().display()

# UNION and UNION BYNAME
* These are both used to combine 2 or more dataframes vertically, similar to the SQL `UNION` clause. 
* There are a few differences though:
1. `union`()
  * **Column Alignment:** Aligns columns based on their position in the DataFrame, regardless of their names.
  * **Schema Requirement:** Requires both DataFrames to have the **same number of columns and the same data types** for corresponding columns.

2. `unionByName()`
  * **Column Alignment:** Aligns columns based on their names, regardless of their position in the DataFrame.
  * **Schema Requirement:** Can handle DataFrames with different schemas, including different numbers of columns and different data types for corresponding columns.

In [0]:
## prepare 2 dummy df's to demo this
data1 = [('1','kad'),
        ('2','sid')]
schema1 = 'id STRING, name STRING' 

df1 = spark.createDataFrame(data1,schema1)

data2 = [('3','rahul'),
        ('4','jas')]
schema2 = 'id STRING, name STRING' 



## create df2 with custom schema above
df2 = spark.createDataFrame(data2, schema2)

In [0]:
## display df1
df1.display()

In [0]:
## display df2
df2.display()

Summary:
* We can easily use UNION function here because as mentioned above, the 2 dataframes have the same number of columns and same data types in those columns.

## UNION function

In [0]:
## union function
df1.union(df2).display()

* We will now see below why we need the UNION BY NAME function.

In [0]:
## redefine df1 but with reverse order of the columns
data1 = [('kad','1',),
        ('sid','2',)]
schema1 = 'name STRING, id STRING' 

## create df1 again with new schema
df1 = spark.createDataFrame(data1,schema1)


## display df1
df1.display()

In [0]:
## now try union
df1.union(df2).display()

Summary:
* We can see that the `union` function does not check the order of the columns or data in those columns and essentially performs a blind join of the 2 dataframes. 
* Now that we have reversed the columns this is where the `union by name` function comes in handy.

## Union by name
* Now we will see how the union by name function will check the column order and dtypes while performing the union.

In [0]:
## union by name
df1.unionByName(df2).display()

Summary:
* Wow! That is exciting isnt' it? We were able to take 2 columns and align them by ther name regardless of their respective positions in each separate df. 
* We were also able to union the df's even with different data types and various column lengths.  
* **This is a very useful function to use with very large messy datasets and allows us to skip some intermediate transformations.**

# STRING Functions
* We will take a deep dive into a few important string functions in PySpark such as:
  * `INITCAP()`
  * `UPPER()`
  * `LOWER()`

In [0]:
## lets see the main df again
df.display()

## INITCAP() function
* Capitalizes the first letter of every word. 

In [0]:
## using INITCAP()
df.select(initcap('Item_Type').alias('initcap_type')).display()

## Lower() function

In [0]:
## Lower() function
df.select(lower('Item_Type').alias('lower_item_type')).display()

## UPPER() function

In [0]:
df.select(upper('Item_Type').alias('upper_item_type')).display()

# Date Functions
* These are pretty important for data wrangling in PySpark. 
* These include:
  1. CURRENT_DATE()
  2. DATE_ADD()
  3. DATE_SUB()
  4. DATEDIFF
  5. DATE_FORMAT

## CURRENT_DATE()

In [0]:
## current date function in action
df = df.withColumn('curr_date', current_date())


## display new col
df.display()

## DATE_ADD()

In [0]:
## date add in action
df = df.withColumn('week_after', date_add('curr_date', 7)) ## add 7 days


## display
df.display()

## DATE_SUB()
* First we will show how to subtract dates.
* Then we will demo a work-around for this.

In [0]:
## normal way to use DATE_SUB()
df = df.withColumn('week_before', date_sub('curr_date', 7)) #subtract 7 days

df.display()

In [0]:
## workaround
df = df.withColumn('week_before', date_add('curr_date', -7)) ## add -7 subtracts

df.display()

## DATEDIFF
* Gives difference between dates given to the function. 

In [0]:
## interval diff between curr_date and week_after
df = df.withColumn('datediff', datediff('curr_date','week_after'))


df.display()

Summary
* We can see there are -7 or 7 days diff between the two columns

## DATE_FORMAT
* Formatting dates in pyspark. 
* This is pyspark version of using the `datetime` library in python.

In [0]:
## date format function --> lets convert MM-dd-yyyy ---> to --> dd-MM-yyyy
df = df.withColumn('week_before', date_format('week_before', 'dd-MM-yyyy'))

df.display()

# NULL Values in PySpark
* How to handle null values in pyspark. 
* We can drop, fill/impute, and other options.

## Dropping Null Values

First we should Count the null values in each column!

In [0]:
## first lets see where the null values are
from pyspark.sql.functions import col, sum

# Count nulls in each column
null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

# Show the result
null_counts.display()


Summary
* Looks like the null value cols are:
  * `Item_Weight` --> 1463
  * `Outlet_Size` --> 2410

In [0]:
## dropping nulls in 1 column
df.dropna('all').display() 

In [0]:
## 'any' using dropna
df.dropna('any').display() #drop all records if there is a null in any col


Note: Using `subset` is most ideal as it specifies which columns/rows you are going after. 

In [0]:
## subset with dropna
df.dropna(subset=['Outlet_Size']).display() ## only nulls in this column

## Filling null values
* There are 2 ways we can do this.
  1. Replace ALL null values in dataset regardless
  2. 

In [0]:
### 1. filling nulls --> replace ALL regardless
df.fillna('NotAvailable').display()


Summary
* We can see that ALL nulls in the df were replaced with the value `NotAvailable`.

In [0]:
## 2. subset fillna
df.fillna('NotAvailable', subset=['Outlet_Size']).display()

Summary
* We can see it ONLY replaced null values in the `Outlet_Size` column.

# SPLIT and Indexing
* These are more advanced functions for data wrangling in pyspark. 
1. SPLIT allows us to split a column into a list OR array
2. INDEXING allows us to index this array. 

Here we will take the column `Outlet_Type` and split it.

## SPLIT function

In [0]:
## split function --> split on a space ' '
df.withColumn('Outlet_Type', split('Outlet_Type',' ')).display()


Summary
* As we can see we split the variables in `Outlet_Type` into an array or list.

## INDEXING

In [0]:
## split function --> split on a space ' '
## NOW use indexing to return index 1 only
df.withColumn('Outlet_Type', split('Outlet_Type',' ')[1]).display() 
