# Transformations

## Data Reading

In [0]:
df_grocery = spark.read.format("csv")\
        .option("inferSchema", True)\
        .option("header", True)\
        .load("/Volumes/workspace/practice_data/grocery_data")

In [0]:
df_grocery.display()

In [0]:
df_drivers = spark.read.format("json")\
        .option("inferSchema", True)\
        .option("header", True)\
        .option("multiline", False)\
        .load("/Volumes/workspace/practice_data/drivers")

In [0]:
df_drivers.display()

### Schema definition

In [0]:
df_grocery.printSchema()
df_drivers.printSchema()

#### DDL Schema

In [0]:
my_ddl_schema = '''
    Item_Identifier STRING,
    Item_Weight DOUBLE,
    Item_Fat_Content STRING,
    Item_Visibility DOUBLE,
    Item_Type STRING,
    Item_MRP DOUBLE,
    Outlet_Identifier STRING,
    Outlet_Establishment_Year INTEGER,
    Outlet_Size STRING,
    Outlet_Location_Type STRING,
    Outlet_Type STRING,
    Item_Outlet_Sales DOUBLE
'''

In [0]:
df_grocery = spark.read.format("csv")\
        .schema(my_ddl_schema)\
        .option("header", True)\
        .load("/Volumes/workspace/practice_data/grocery_data")
df_grocery.display()
df_grocery.printSchema()

#### StructType() Schema

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
my_strct_schema = StructType([
    # Define the schema
    # StructField(column_name, data_type, nullable_boolean)

    StructField('Item_Identifier', StringType(), True),
    StructField('Item_Weight', DoubleType(), True),
    StructField('Item_Fat_Content', StringType(), True),
    StructField('Item_Visibility', DoubleType(), True),
    StructField('Item_Type', StringType(), True),
    StructField('Item_MRP', DoubleType(), True),
    StructField('Outlet_Identifier', StringType(), True),
    StructField('Outlet_Establishment_Year', IntegerType(), True),
    StructField('Outlet_Size', StringType(), True),
    StructField('Outlet_Location_Type', StringType(), True),
    StructField('Outlet_Type', StringType(), True),
    StructField('Item_Outlet_Sales', DoubleType(), True)
])

In [0]:
df_grocery = spark.read.format("csv")\
        .schema(my_strct_schema)\
        .option("header", True)\
        .load("/Volumes/workspace/practice_data/grocery_data")
df_grocery.display()
df_grocery.printSchema()

## Select

In [0]:
df_grocery.select('Item_Identifier', 'Item_Weight', 'Item_Fat_Content').display()

In [0]:
#df_self = df_grocery.select(col('Item_Identifier'), col('Item_Weight'), col('Item_Fat_Content'))
#df_self.display()

## ALIAS

In [0]:
df_grocery.select(
    col('Item_Identifier').alias('Item_ID'), 
    col('Item_Weight'), 
    col('Item_Fat_Content')
).display()

## Filter/Where


1) Filter data with fat content = Regular
2) Slice the data with item type = Soft Drinks and Weight < 10
3) Fetch the data with Tier in (Tier1 or Tier2). and Outlet Size is Null

In [0]:
# Scenario 1
df_grocery.filter(col('Item_Fat_Content')=='Regular').display()

In [0]:
# Scenario 2

df_grocery.filter((col('Item_Type')=='Soft Drinks') & (col('Item_weight')<10)).display()

In [0]:
# Scenario 3

df_grocery.filter((col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin('Tier 1', 'Tier 2'))).display()

### withColumnRename

In [0]:
df_grocery.withColumnRenamed('Item_Weight', 'Item_Wt').display()

### withColumn


1) Create new column and save a constant
2) Modify values in existing columns

In [0]:
# Scenario 1

df_grocery = df_grocery.withColumn('flag',lit('new'))
df_grocery.display()

In [0]:
null_values = df_grocery.filter(col('Item_Fat_Content').isNull())
null_values.display()

In [0]:
# Scenario 2

df_grocery = df_grocery.withColumn('Item_Fat_Content', regexp_replace(col('Item_Fat_Content'), 'LF', 'Low Fat'))
df_grocery.display()

## Typecasting

In [0]:
df_grocery = df_grocery.withColumn('Item_Weight', col('Item_Weight').cast(StringType()))
df_grocery.display()

## Sort/orderBy


1) Sort by Item_weight in descending order
2) Sort by Item_visibility in ascending order
3) Sort by multiple columns

In [0]:
# Scenario 1
df_grocery.sort(col('Item_Weight').desc()).display()

In [0]:
df_grocery.sort(col('Item_Visibility').asc()).display()

In [0]:
#df_grocery.sort(col('Item_Visibility').asc(), col('Item_Type').desc()).display()

df_grocery.sort(['Item_Visibility', 'Item_Type'], ascending=[True, False]).display()

## Limit

In [0]:
df_grocery.limit(10).display()

## Drop


1) Drop one column
2) Drop multiple columns

In [0]:
# Scenario 1
df_grocery.drop('Item_Weight').display()

In [0]:
# Scenario 2

df_grocery.drop('Item_Identifier', 'Outlet_Type', 'Outlet_Identifier').display()

## Drop duplicates


1) Drop the whole row -- all the duplicates are dropped
2) Drop duplicates based on subset of columns. Only distict values will remain in columns

In [0]:
# Scenario 1

df_grocery.dropDuplicates().display()

In [0]:
df_grocery.dropDuplicates(['Item_Type', 'Outlet_Type']).display()

### Distinct

In [0]:
df_grocery.distinct().display()