### Challenge 04: Transform data using Notebooks and Spark clusters 

In this scenario, the data engineer could perform some data cleaning, transformation and aggregation tasks to prepare the data for downstream analysis. 

**NOTE:** <mark>you can use Scala, PySpark or SparkSQL | or combination of all languages.</mark>

**Goal:** Notebook parametrization

**Actions:**
- Set parameters
    - p_year = 2015
    - p_month = 01
    - p_table_prefix = green
    - table_name = p_table_prefix+p_year+p_month
- Set code cell as parameter cell which can be used by external processes such as Pipelines

**Success Criteria:**
- Variable table_name exists a return value green201501
- Parameter cell is turned on

In [21]:
print(table_name)

StatementMeta(, c375edb4-46de-4cc9-933c-83cbef15386d, 23, Finished, Available, Finished)

green201501


**Goal:** Cleanse the data and filter out invalid records for further analysis.

**Actions:**
- Attach bronzerawdata lakehouse (default lakehouse) and silvercleansed lakehouse
- Load the data from the source (variable table_name) and then 
    - filter out records where the trip distance and fare amount are less than or equal to zero, which are invalid records (valid records are trip_distance > 0 & fare_amount > 0)
    - change datatypes (tables green202301 vs green2015 are not consistent):
        - lpep_pickup_datetime and lpep_dropoff_datetime to TIMESTAMP 
        - RatecodeID, passenger_count, payment_type to LONG
        - congestion_surcharge to INTEGER

- Write output into lakehouse: **silvercleansed**, table: **{table_name}_cleansed** 

**Success Criteria:**
- table green201501_cleansed exists and contains 1479797 records
    - 28696 records were removed.


In [22]:
from pyspark.sql.functions import col, when

# Load data from source
df = spark.read.load#<your code>
df_count = df.count()

# Remove invalid records
df = df.filter#<your code>


# change datatypes
df = df.withColumn("lpep_pickup_datetime",#<your code>
df = df.withColumn("lpep_dropoff_datetime",#<your code>
df = df.withColumn("RatecodeID",#<your code>
df = df.withColumn("passenger_count",#<your code>
df = df.withColumn("payment_type",#<your code>
df = df.withColumn("congestion_surcharge",#<your code>


df_count_after_clearning = df.count()

number_of_deleted_records = df_count - df_count_after_clearning

print(f"Removed {number_of_deleted_records} records")

# Write cleansed data to destination
df.write.format#<your code>
print(f"Written {df_count_after_clearning} records")

# Display cleansed data to destination
display(df)

StatementMeta(, c375edb4-46de-4cc9-933c-83cbef15386d, 24, Finished, Available, Finished)

Removed 28696 records
Written 1479797 records


SynapseWidget(Synapse.DataFrame, 4eacb526-d5d2-43fb-88f3-c527c45c69c5)

**Goal:** Add additional derived columns and filters

**Actions:**
- Use {table_name}_cleansed or prev. data frame
- Add derived Columns
    - pickupDate - convert datetime to date for visualizations and reporting
    - weekDay - day number of the week
    - weekDayName - day names abbreviated
    - dayofMonth - day number of the month
    - pickupHour - hour of pickup time
    - trip_duration - representing duration in minutes of the trip 
        - lpep_dropoff_datetime - lpep_pickup_datetime
    - timeBins - Binned time of the day:
        - pickupHour >=7 & pickupHour <=10 is MorningRus \
        - pickupHour >=11 & pickupHour <=15 is Afternoon \
        - pickupHour >=16 & pickupHour <=19 is EveningRush \
        - pickupHour <=6) | pickupHour >=20 is Night


- Add filter Conditions:
    - fare_amount is between 0 and 100
    - trip_distance is between 0 and 100
    - trip_duration is less than 3 hours (180 minutes)
    - passenger_count is between 1 and 8

- Write output into lakehouse: **silvercleansed**, table: **{table_name}_enriched** 

**Success Criteria:**
- table green201501_enriched exists and contains 1477385 records
    


In [23]:
from pyspark.sql.functions import col,when, dayofweek, date_format, hour,unix_timestamp, round, dayofmonth, lit

# Add additional columns and filters
nytaxidf_prep = df.withColumn('pickupDate', #<your code>\
                            .withColumn("weekDay", #<your code>\
                            .withColumn("weekDayName", #<your code>\
                            .withColumn("dayofMonth", #<your code>\
                            .withColumn("pickupHour", #<your code>\
                            .withColumn("trip_duration", #<your code>\
                            .withColumn("timeBins", when((col("pickupHour") >=7) & (col("pickupHour")<=10) ,"MorningRush")\
                            .when((col("pickupHour") <your code>\
                            .when((col("pickupHour") <your code>\
                            .when((col("pickupHour") <your code>\
                            .filter(#<your code>
                            

# Write enriched data to destination
nytaxidf_prep.write.format#<your code>

df_count_nytaxidf_prep = nytaxidf_prep.count()

print(f"Written {df_count_nytaxidf_prep} records")

# Display cleansed data to destination
display(nytaxidf_prep) 


StatementMeta(, c375edb4-46de-4cc9-933c-83cbef15386d, 25, Finished, Available, Finished)

Written 1476971 records


SynapseWidget(Synapse.DataFrame, bb4eb378-d189-43ef-89d4-37df57fd720c)

**Goal:** Find discount for every Taxi trip

**Actions:**
- Load data from greenDiscountsPerDay table
    - Unpivot columns to rows
- Join greenDiscountsPerDay with green201501_enriched based on VendorID = VendorID & pickupDate = date
- Write output into lakehouse: **silvercleansed**, table: **{table_name}_discounts**
    - output columns: VendorID, lpep_pickup_datetime, PULocationID, DOLocationID, RateCodeID, passenger_count, trip_distance, trip_duration, pickupDate, weekDayName, weekDay, dayOfMonth, timeBins, payment_type, total_amount, tip_amount, fare_amount, discount    

**Success Criteria:**
- table green201501_discounts exists and contains 1477385 records
    


In [24]:
df_gdpd = spark.read.load#<your code>

StatementMeta(, c375edb4-46de-4cc9-933c-83cbef15386d, 26, Finished, Available, Finished)

In [25]:
display(df_gdpd)

StatementMeta(, c375edb4-46de-4cc9-933c-83cbef15386d, 27, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 347cf3d8-4b18-4245-9c56-bff3fef3fb0e)

In [26]:
import pandas as pd

# Melt discouts_df to long format
discouts_pd_df = pd.melt(#<your code>)

discounts_spark_df = spark.createDataFrame(#<your code>)

display(discounts_spark_df)

StatementMeta(, c375edb4-46de-4cc9-933c-83cbef15386d, 28, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 478e7a57-5a21-430b-873c-951c1d09be81)

In [27]:
from pyspark.sql.functions import col

# Create aliases for your DataFrames
df1_alias = nytaxidf_prep.alias("df1")
df2_alias = discounts_spark_df.alias("df2")

# Define the join condition using the aliases
join_condition = #<your code>

# Perform the join using the aliases
result_df = df1_alias.join#<your code>

# Select only the desired columns
result_df = result_df.select("df1.VendorID", "df1.lpep_pickup_datetime", "df1.PULocationID", "df1.DOLocationID", "df1.RateCodeID","df1.passenger_count", "df1.trip_distance", "df1.trip_duration", "df1.pickupDate", "df1.weekDayName", "df1.weekDay", "df1.dayOfMonth", "df1.timeBins", "df1.payment_type", "df1.total_amount", "df1.tip_amount", "df1.fare_amount", "df2.discount")

# Save the results to a new delta table
result_df.write.format#<your code>

df_count_result_df = result_df.count()

print(f"Written {df_count_result_df} records")

# Display data
display(result_df)

StatementMeta(, c375edb4-46de-4cc9-933c-83cbef15386d, 29, Finished, Available, Finished)

Written 1476971 records


SynapseWidget(Synapse.DataFrame, 0a8e2e23-b961-4097-a4af-6d7ec25d331e)

**Goal:** Notebook parametrization -> 2023 Taxi day

**Actions:**
- Go to the 1st cell and change parameter p_year = 2023
- Run all cells

**Success Criteria:**
- Tables green202301_cleansed, green202301_discounts, green202301_enriched were created