# Uber Data Processing
#### This notebook fetches data from a table from the 'Glue' database, cleans it, creates fact and dimension tables and writes the data back to S3 bucket and glue database.


## Section 1: Spark setup and reading data

#### 1.1: Setting up spark session

In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 5
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::703507205356:role/glue-uber-data-etl
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 536a76ba-1733-47d2-a0c2-024fb065b734
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.38.1
--enable-glue-datacatalog true
Waiting for session 536a76ba-1733-47d2-a0c2-024fb065b734 to get into ready status...
Session 536a76ba-1733-47d2-a0c2-024fb065b734 has been created.



#### 1.2: Reading the data from glue database

In [2]:
dyf = glueContext.create_dynamic_frame.from_catalog(database='uber_database_glue_full', table_name='uber_database_s3_full')
dyf.printSchema()

root
|-- VendorID: long
|-- tpep_pickup_datetime: timestamp
|-- tpep_dropoff_datetime: timestamp
|-- passenger_count: double
|-- trip_distance: double
|-- RatecodeID: double
|-- store_and_fwd_flag: string
|-- PULocationID: long
|-- DOLocationID: long
|-- payment_type: long
|-- fare_amount: double
|-- extra: double
|-- mta_tax: double
|-- tip_amount: double
|-- tolls_amount: double
|-- improvement_surcharge: double
|-- total_amount: double
|-- congestion_surcharge: double
|-- airport_fee: double


#### 1.3: Converting the DynamicFrame to a Spark DataFrame and display a sample of the data


In [26]:
trips = dyf.toDF()




## Section 2: Data cleaning and transformations

#### 2.1: Adding a column named trip_id

In [27]:
from pyspark.sql.functions import monotonically_increasing_id
trips = trips.withColumn("trip_id", monotonically_increasing_id())




#### 2.2: Only Keeping or filtering out data that has 2022 as it's year, outliers are left behind

In [28]:
from pyspark.sql.functions import year


# Create a new column named 'pickup_year'
trips = trips.withColumn('pickup_year', year('tpep_pickup_datetime'))

# Filter rows with year 2022
trips = trips.filter(trips['pickup_year'] == 2022)




#### 2.3: Dropping Duplicates

In [29]:
trips = trips.dropDuplicates()




#### 2.4: Removing unwanted columns

In [31]:
trips = trips.drop(*['airport_fee','store_and_fwd_flag', 'congestion_surcharge','extra',
                     'mta_tax','tolls_amount','improvement_surcharge'])




#### 2.5: Converting columns to date time

In [32]:
from pyspark.sql.functions import col, to_timestamp, dayofmonth

trips = trips.withColumn("tpep_pickup_datetime_converted", to_timestamp(col("tpep_pickup_datetime"))) \
             .withColumn("tpep_dropoff_datetime_converted", to_timestamp(col("tpep_dropoff_datetime")))

# Extract day of the month
trips = trips.withColumn("pickup_day", dayofmonth(col("tpep_pickup_datetime_converted"))) \
             .withColumn("dropoff_day", dayofmonth(col("tpep_dropoff_datetime_converted")))





#### 2.6: Creating a new column for day of the week

In [33]:
from pyspark.sql.functions import date_format


trips = trips.withColumn('pickup_day_of_week', date_format('tpep_pickup_datetime_converted', 'EEEE'))

# Add dropoff_day_of_week column
trips = trips.withColumn('dropoff_day_of_week', date_format('tpep_dropoff_datetime_converted', 'EEEE'))





#### 2.7: Creating a new column named payment_type_name

In [34]:
from pyspark.sql.functions import when, lit

payment_type_name = {
    1: "Credit card",
    2: "Cash",
    3: "No charge",
    4: "Dispute",
    5: "Unknown",
    6: "Voided trip"
}

trips = trips.withColumn('payment_type_name', lit("Unknown"))
for key, value in payment_type_name.items():
    trips = trips.withColumn('payment_type_name', when(col('payment_type') == key, value).otherwise(col('payment_type_name')))




#### 2.8:  Removing rows with fare amount < 0

In [35]:
trips = trips.filter(col('fare_amount') >= 0.0)




#### 2.9: Adding columns for Borough using taxi-zone-lookup.csv file

In [36]:
s3_path = "s3://uber-data-engg-bucket/uber_data/TLC_LookupFile/taxi-zone-lookup.csv"




In [37]:
df_taxi_zone_lookup = spark.read.csv(s3_path, header=True, inferSchema=True)




In [38]:
# If the columns LocationID and Zone are not dropped, it leads to duplicate records after joining.
# To overcome this problem, the join condition requires to drop columns LocationID and Zone to avoid multiple join columns.

# Creating PUBorough:
trips = trips.join(df_taxi_zone_lookup, trips.PULocationID == df_taxi_zone_lookup.LocationID, 'left')
trips = trips.withColumnRenamed('Borough', 'PUBorough')
trips = trips.drop('LocationID','Zone')

#Creating DUBorough:
trips = trips.join(df_taxi_zone_lookup, trips.DOLocationID == df_taxi_zone_lookup.LocationID, 'left')
trips = trips.withColumnRenamed('Borough', 'DOBorough')
trips = trips.drop('LocationID','Zone')




## Section 3: Data Modeling: Converting dataframe into fact and dimension tables

#### 3.1: datetime_dim table 

In [40]:
# PySpark equivalent of selecting fixed columns is using .select.
# Adding a new column is done using .withColumn.

from pyspark.sql.functions import hour, dayofmonth, month, year, dayofweek



# Select the relevant columns and create a new DataFrame 'datetime_dim'
datetime_dim = trips.select('tpep_pickup_datetime', 'tpep_dropoff_datetime')

# Extract date and time components
datetime_dim = datetime_dim \
    .withColumn('pick_hour', hour('tpep_pickup_datetime')) \
    .withColumn('pick_day', dayofmonth('tpep_pickup_datetime')) \
    .withColumn('pick_month', month('tpep_pickup_datetime')) \
    .withColumn('pick_year', year('tpep_pickup_datetime')) \
    .withColumn('pick_weekday', dayofweek('tpep_pickup_datetime')) \
    .withColumn('drop_hour', hour('tpep_dropoff_datetime')) \
    .withColumn('drop_day', dayofmonth('tpep_dropoff_datetime')) \
    .withColumn('drop_month', month('tpep_dropoff_datetime')) \
    .withColumn('drop_year', year('tpep_dropoff_datetime')) \
    .withColumn('drop_weekday', dayofweek('tpep_dropoff_datetime'))

# # Add datetime_id column
datetime_dim = datetime_dim.withColumn('datetime_id', monotonically_increasing_id())

# # # Reorder columns
datetime_dim = datetime_dim.select('datetime_id', 'tpep_pickup_datetime', 'pick_hour', 'pick_day', 'pick_month', 'pick_year',
                                   'pick_weekday', 'tpep_dropoff_datetime', 'drop_hour', 'drop_day', 'drop_month',
                                   'drop_year', 'drop_weekday')





#### 3.2: passenger_count_dim table

In [41]:
passenger_count_dim = trips.select('passenger_count') \
    .withColumn('passenger_count_id', monotonically_increasing_id())

# Reorder columns
passenger_count_dim = passenger_count_dim.select('passenger_count_id', 'passenger_count')





#### 3.3: trip_distance_dim table

In [42]:
# Select the 'trip_distance' column and add a new column 'trip_distance_id'
trip_distance_dim = trips.select('trip_distance') \
    .withColumn('trip_distance_id', monotonically_increasing_id())

# Reorder columns
trip_distance_dim = trip_distance_dim.select('trip_distance_id', 'trip_distance')




#### 3.4: rate_code_dim table

In [43]:
rate_code_type = {
    1:"Standard rate",
    2:"JFK",
    3:"Newark",
    4:"Nassau or Westchester",
    5:"Negotiated fare",
    6:"Group ride"
}

rate_code_dim = trips.select('RatecodeID').withColumn('rate_code_id', monotonically_increasing_id())



rate_code_dim = rate_code_dim.withColumn('rate_code_name', lit(""))
for key, value in rate_code_type.items():
    rate_code_dim = rate_code_dim.withColumn('rate_code_name', when(col('RatecodeID') == key, value)\
                                             .otherwise(col('rate_code_name')))




#### 3.5: payment_type_dim table

In [44]:
payment_type_name = {
    1:"Credit card",
    2:"Cash",
    3:"No charge",
    4:"Dispute",
    5:"Unknown",
    6:"Voided trip"
}

payment_type_dim = trips.select('payment_type')\
                .withColumn('payment_type_id', monotonically_increasing_id())



payment_type_dim = payment_type_dim.withColumn('payment_type_name', lit(""))
for key, value in payment_type_name.items():
    payment_type_dim = payment_type_dim.withColumn('payment_type_name', when(col('payment_type') == key, value)\
                                             .otherwise(col('payment_type_name')))




#### 3.6: pickup_location_dim table

In [45]:
pickup_location_dim = trips.select('PULocationID', 'PUBorough') \
    .withColumn('pickup_location_id', monotonically_increasing_id())

# Reorder columns
pickup_location_dim = pickup_location_dim.select('pickup_location_id', 'PULocationID', 'PUBorough')




#### 3.7: dropoff_location_dim table

In [46]:
dropoff_location_dim = trips.select('DOLocationID', 'DOBorough') \
    .withColumn('dropoff_location_id', monotonically_increasing_id())

# Reorder columns
dropoff_location_dim = dropoff_location_dim.select('dropoff_location_id', 'DOLocationID', 'DOBorough')




#### 3.8: fact_table

In [47]:
fact_table = trips \
    .join(passenger_count_dim, trips['trip_id'] == passenger_count_dim['passenger_count_id'], 'inner') \
    .join(trip_distance_dim, trips['trip_id'] == trip_distance_dim['trip_distance_id'], 'inner') \
    .join(rate_code_dim, trips['trip_id'] == rate_code_dim['rate_code_id'], 'inner') \
    .join(datetime_dim, trips['trip_id'] == datetime_dim['datetime_id'], 'inner') \
    .join(payment_type_dim, trips['trip_id'] == payment_type_dim['payment_type_id'], 'inner') \
    .join(pickup_location_dim, trips['trip_id'] == pickup_location_dim['pickup_location_id'], 'inner') \
    .join(dropoff_location_dim, trips['trip_id'] == dropoff_location_dim['dropoff_location_id'], 'inner') \
    .select('trip_id', 'VendorID', 'datetime_id', 'passenger_count_id', 'trip_distance_id', 
            'rate_code_id', 'payment_type_id', 'fare_amount', 'tip_amount', 'total_amount', 'pickup_location_id',
            'dropoff_location_id')





## Section 4: Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog

#### 4.1: Converting PySpark DataFrame to a Glue DynamicFrame

In [48]:
# Convert dataframe back to dynamicframe:
from awsglue.dynamicframe import DynamicFrame
fact_table = DynamicFrame.fromDF(fact_table, glueContext, "fact_table")
passenger_count_dim = DynamicFrame.fromDF(passenger_count_dim, glueContext, "passenger_count_dim")
trip_distance_dim = DynamicFrame.fromDF(trip_distance_dim, glueContext, "trip_distance_dim")
rate_code_dim = DynamicFrame.fromDF(rate_code_dim, glueContext, "rate_code_dim")
datetime_dim = DynamicFrame.fromDF(datetime_dim, glueContext, "datetime_dim")
payment_type_dim = DynamicFrame.fromDF(payment_type_dim, glueContext, "payment_type_dim")
pickup_location_dim = DynamicFrame.fromDF(pickup_location_dim, glueContext, "pickup_location_dim")
dropoff_location_dim = DynamicFrame.fromDF(dropoff_location_dim, glueContext, "dropoff_location_dim")




#### 4.2: Creating a dictionary of dynamic frames

In [49]:
dict_of_tables = {'fact_table': fact_table, 
                  'passenger_count_dim': passenger_count_dim, 
                  'trip_distance_dim': trip_distance_dim,
                  'rate_code_dim': rate_code_dim, 
                  'datetime_dim': datetime_dim, 
                  'payment_type_dim': payment_type_dim,
                  'pickup_location_dim':pickup_location_dim,
                  'dropoff_location_dim':dropoff_location_dim}




#### 4.3: Defining a function to write the data into S3 bucket and to also dump it into a glue database

In [50]:
import os

for name, table in dict_of_tables.items():
    print(name, table)
    
    # Applying repartition eliminates the parallel processing power of Spark. Commenting the below code to improve performance
    # Repartition to a single partition
    # table = table.repartition(1)
    output_path = f"s3://uber-data-engg-bucket/uber_data/Output_ETL_full/athena_output_parquet_full/{name}"

    # Check if the path exists
    if not os.path.exists(output_path):
        # Create the directory if it doesn't exist
        os.makedirs(output_path)
        
    s3output = glueContext.getSink(
                                      path=output_path,
                                      connection_type="s3",
                                      updateBehavior="UPDATE_IN_DATABASE",
                                      partitionKeys=[],
                                      compression="snappy",
                                      enableUpdateCatalog=True,
                                      transformation_ctx="s3output",
                                    )
    
    s3output.setCatalogInfo(
      catalogDatabase="uber_database_glue_full", catalogTableName=f"etl_uber_{name}_full"
    )
    s3output.setFormat("glueparquet")
    s3output.writeFrame(table)

fact_table <awsglue.dynamicframe.DynamicFrame object at 0x7f56f9f49290>
<awsglue.dynamicframe.DynamicFrame object at 0x7f56f9f46590>
passenger_count_dim <awsglue.dynamicframe.DynamicFrame object at 0x7f56f9f49610>
<awsglue.dynamicframe.DynamicFrame object at 0x7f56f9f67e50>
trip_distance_dim <awsglue.dynamicframe.DynamicFrame object at 0x7f56f9f496d0>
<awsglue.dynamicframe.DynamicFrame object at 0x7f56fb4a0350>
rate_code_dim <awsglue.dynamicframe.DynamicFrame object at 0x7f56f9f49350>
<awsglue.dynamicframe.DynamicFrame object at 0x7f56fb4a0750>
datetime_dim <awsglue.dynamicframe.DynamicFrame object at 0x7f56f9f49710>
<awsglue.dynamicframe.DynamicFrame object at 0x7f56f9f67b90>
payment_type_dim <awsglue.dynamicframe.DynamicFrame object at 0x7f56f9f49590>
<awsglue.dynamicframe.DynamicFrame object at 0x7f56f9f67b10>
pickup_location_dim <awsglue.dynamicframe.DynamicFrame object at 0x7f56f9f49550>
<awsglue.dynamicframe.DynamicFrame object at 0x7f56f9f674d0>
dropoff_location_dim <awsglue.dyn