# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [2]:
%help

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 



# Available Magic Commands

## Sessions Magic

----
    %help                             Return a list of descriptions and input types for all magic commands. 
    %profile            String        Specify a profile in your aws configuration to use as the credentials provider.
    %region             String        Specify the AWS region in which to initialize a session. 
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\ USERNAME \.aws\config" on Windows.
    %idle_timeout       Int           The number of minutes of inactivity after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %timeout            Int           The number of minutes after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %session_id_prefix  String        Define a String that will precede all session IDs in the format 
                                      [session_id_prefix]-[session_id]. If a session ID is not provided,
                                      a random UUID will be generated.
    %status                           Returns the status of the current Glue session including its duration, 
                                      configuration and executing user / role.
    %session_id                       Returns the session ID for the running session.
    %list_sessions                    Lists all currently running sessions by ID.
    %stop_session                     Stops the current session.
    %glue_version       String        The version of Glue to be used by this session. 
                                      Currently, the only valid options are 2.0, 3.0 and 4.0. 
                                      Default: 2.0.
    %reconnect          String        Specify a live session ID to switch/reconnect to the sessions.
----

## Selecting Session Types

----
    %streaming          String        Sets the session type to Glue Streaming.
    %etl                String        Sets the session type to Glue ETL.
    %session_type       String        Specify a session_type to be used. Supported values: streaming and etl.
----

## Glue Config Magic 
*(common across all session types)*

----

    %%configure         Dictionary    A json-formatted dictionary consisting of all configuration parameters for 
                                      a session. Each parameter can be specified here or through individual magics.
    %iam_role           String        Specify an IAM role ARN to execute your session with.
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\%USERNAME%\.aws\config` on Windows.
    %number_of_workers  int           The number of workers of a defined worker_type that are allocated 
                                      when a session runs.
                                      Default: 5.
    %additional_python_modules  List  Comma separated list of additional Python modules to include in your cluster 
                                      (can be from Pypi or S3).
    %%tags        Dictionary          Specify a json-formatted dictionary consisting of tags to use in the session.
    
    %%assume_role Dictionary, String  Specify a json-formatted dictionary or an IAM role ARN string to create a session 
                                      for cross account access.
                                      E.g. {valid arn}
                                      %%assume_role 
                                      'arn:aws:iam::XXXXXXXXXXXX:role/AWSGlueServiceRole' 
                                      E.g. {credentials}
                                      %%assume_role
                                      {
                                            "aws_access_key_id" : "XXXXXXXXXXXX",
                                            "aws_secret_access_key" : "XXXXXXXXXXXX",
                                            "aws_session_token" : "XXXXXXXXXXXX"
                                       }
----

                                      
## Magic for Spark Sessions (ETL & Streaming)

----
    %worker_type        String        Set the type of instances the session will use as workers. 
    %connections        List          Specify a comma separated list of connections to use in the session.
    %extra_py_files     List          Comma separated list of additional Python files From S3.
    %extra_jars         List          Comma separated list of additional Jars to include in the cluster.
    %spark_conf         String        Specify custom spark configurations for your session. 
                                      E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
----

## Action Magic

----

    %%sql               String        Run SQL code. All lines after the initial %%sql magic will be passed
                                      as part of the SQL code.  
    %matplot      Matplotlib figure   Visualize your data using the matplotlib library.
                                      E.g. 
                                      import matplotlib.pyplot as plt
                                      # Set X-axis and Y-axis values
                                      x = [5, 2, 8, 4, 9]
                                      y = [10, 4, 8, 5, 2]
                                      # Create a bar chart 
                                      plt.bar(x, y) 
                                      # Show the plot
                                      %matplot plt    
    %plotly            Plotly figure  Visualize your data using the plotly library.
                                      E.g.
                                      import plotly.express as px
                                      #Create a graphical figure
                                      fig = px.line(x=["a","b","c"], y=[1,3,2], title="sample figure")
                                      #Show the figure
                                      %plotly fig

  
                
----



####  Run this cell to set up and start your interactive session.


In [2]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# set args
possible = ["JOB_NAME", "database_name", "table_name", "output_path", "WRITE_MODE", "push_down_predicate"]
present = [p for p in possible if f"--{p}" in sys.argv]
args = getResolvedOptions(sys.argv, present) if present else {}

JOB_NAME  = args.get("JOB_NAME", "ETL Part2")
DB_NAME   = args.get("database_name", "nyc_taxi_database")
TBL_NAME  = args.get("table_name", "data")
OUT_S3    = (args.get("output_path", "s3://homework16-data-bucket/output/").rstrip("/") + "/")
WRITE_MODE = args.get("WRITE_MODE", "append").lower()   # append | overwrite
PREDICATE = args.get("push_down_predicate", "")

# initialize Glue and Spark
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(JOB_NAME, args)
spark.conf.set("spark.sql.shuffle.partitions", "80")
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")


You are already connected to a glueetl session 40ef9c09-5311-409c-aa73-cdf8a11f66e3.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session 40ef9c09-5311-409c-aa73-cdf8a11f66e3.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 5.0


You are already connected to a glueetl session 40ef9c09-5311-409c-aa73-cdf8a11f66e3.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session 40ef9c09-5311-409c-aa73-cdf8a11f66e3.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 5
Setting new number of workers to: 5



In [2]:
# Serves as a demo to test and run code in data transformation task
# specify where the data is stored in S3
"""
S3_INPUT = "s3://homework16-data-bucket/"   
DRY_RUN  = False                                 # do not need to generate data

# set up RDS connection
RDS_JDBC_URL = "jdbc:mysql://nyc-taxi-mysql.ck3aqgcwi4y2.us-east-1.rds.amazonaws.com:3306/nyc_taxi"  
RDS_TABLE    = "hw16_ETL_Table"
"""




In [3]:
# show the data
"""
df = spark.read.parquet(S3_INPUT)
df.show()
"""

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|         1|                 N|         229|    

In [7]:
# read data from catalog
dyf = glueContext.create_dynamic_frame.from_catalog(
    database = DB_NAME,
    table_name = TBL_NAME,
    transformation_ctx = "src_dyf",
    # partition setup
    additional_options = {
        "groupFiles": "inPartition",   
        "groupSize": "134217728",      
    },
)
df = dyf.toDF()

from pyspark.sql import functions as f
# first, clean the data and partition by date
# df = df.filter((f.col("trip_distance").isNotNull()) & (f.col("trip_distance") >= 0))
df = (df
      .filter((f.col("trip_distance").isNotNull()) & (f.col("trip_distance") >= 0))
      .withColumn("pickup_ts",  f.to_timestamp(f.col("tpep_pickup_datetime")))
      .withColumn("dropoff_ts", f.to_timestamp(f.col("tpep_dropoff_datetime")))
      .withColumn("pickup_date", f.date_format(f.col("pickup_ts"), "yyyy-MM-dd"))
)
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+-------------------+-------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|          pickup_ts|         dropoff_ts|pickup_date|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+-------------------+-------

In [39]:
from awsglue.dynamicframe import DynamicFrame
def write_out_by_date(df_out, subdir):
    df_out = df_out.repartition(50, "pickup_date")
    dyf_out = DynamicFrame.fromDF(df_out, glueContext, f"dyf_{subdir.strip('/').replace('/','_')}")
    glueContext.write_dynamic_frame.from_options(
        frame = dyf_out,
        connection_type = "s3",
        format = "csv",
        connection_options = {
            "path": OUT_S3 + subdir, 
            "partitionKeys": ["pickup_date"], 
        },
        format_options = {"withHeader": True},
        transformation_ctx = f"sink_{subdir.strip('/').replace('/','_')}",
    )




In [40]:
# perform Time-Based Filtering: Filter records for trips that occurred during the weekend.
# dayofweek function can convert timestamp to the exact day, 1 is Sunday and 7 is Saturday
df_w = df.filter(f.dayofweek(f.col("tpep_pickup_datetime")).isin(1, 7))\
         .select("VendorID", "pickup_date", "pickup_ts", "trip_distance", "fare_amount", "total_amount")

write_out_by_date(df_w, "weekend_records/")
df_w.show()

+--------+-----------+-------------------+-------------+-----------+------------+
|VendorID|pickup_date|          pickup_ts|trip_distance|fare_amount|total_amount|
+--------+-----------+-------------------+-------------+-----------+------------+
|       2| 2025-01-04|2025-01-04 00:01:36|         6.31|       28.2|       39.84|
|       1| 2025-01-04|2025-01-04 00:01:26|          6.8|       26.8|       38.55|
|       2| 2025-01-04|2025-01-04 00:02:21|          7.0|       28.9|       53.76|
|       1| 2025-01-04|2025-01-04 00:10:29|          7.1|       31.7|        42.7|
|       2| 2025-01-04|2025-01-04 00:00:15|         3.75|       22.6|       35.88|
|       2| 2025-01-04|2025-01-04 00:56:47|         1.61|       10.7|       18.84|
|       1| 2025-01-04|2025-01-04 00:39:08|          0.7|        7.9|       15.45|
|       2| 2025-01-04|2025-01-04 00:02:20|         1.37|        8.6|       16.32|
|       2| 2025-01-04|2025-01-04 00:00:59|         6.54|       28.2|       43.16|
|       1| 2025-

In [41]:
# Bucketing: Create buckets based on trip distance and count the number of trips in each bucket.
# categorize based on different distances
df_distance = df.withColumn("distance_bucket",
                          f.when(f.col("trip_distance") < 1, "0-1")
                          .when(f.col("trip_distance") < 5, "1-5")
                          .when(f.col("trip_distance") < 10, "5-10")
                          .when(f.col("trip_distance") < 15, "10-15")
                          .otherwise("15+"))

# add to buckets based on different tags 
df_bucket = df_distance.withColumn("bucket_order",
                                  f.when(f.col("distance_bucket") == "0-1", 0)
                                  .when(f.col("distance_bucket") == "1-5", 1)
                                  .when(f.col("distance_bucket") == "5-10", 2)
                                  .when(f.col("distance_bucket") == "10-15", 3)
                                  .otherwise(4))

# count the number of the trips in each bucket
df_cb = df_bucket.groupBy("bucket_order", "pickup_date").agg(f.count(f.col("bucket_order")).alias("number_of_trips")).orderBy("bucket_order")

write_out_by_date(df_cb, "buckets/")
df_cb.show()

+------------+-----------+---------------+
|bucket_order|pickup_date|number_of_trips|
+------------+-----------+---------------+
|           0| 2025-01-08|          32055|
|           0| 2025-01-04|          24798|
|           0| 2025-01-02|          21057|
|           0| 2025-01-07|          27950|
|           0| 2025-01-12|          21167|
|           0| 2025-01-01|          17523|
|           0| 2025-01-10|          29370|
|           0| 2025-01-09|          33166|
|           0| 2025-01-05|          18780|
|           0| 2025-01-03|          23212|
|           0| 2025-01-11|          28796|
|           0| 2025-01-06|          21115|
|           0| 2024-12-31|              3|
|           0| 2025-01-23|          38368|
|           0| 2025-01-21|          32616|
|           0| 2025-01-19|          25363|
|           0| 2025-01-20|          19983|
|           0| 2025-01-14|          32918|
|           0| 2025-01-17|          32151|
|           0| 2025-01-16|          37522|
+----------

In [42]:
# Time-Series Analysis: Aggregate data to calculate total fare and total trip count per hour.
# calculate the total fare
df_hourly = df.withColumn("trip_hour", f.date_trunc("hour", f.col("tpep_pickup_datetime"))) \
              .groupBy("trip_hour", "pickup_date").agg(f.sum("fare_amount").alias("total_fare_hourly"), f.count("*").alias("total_trip_hourly")).orderBy("trip_hour")

write_out_by_date(df_hourly, "fare_trip_hourly/")
df_hourly.show()

+-------------------+-----------+------------------+-----------------+
|          trip_hour|pickup_date| total_fare_hourly|total_trip_hourly|
+-------------------+-----------+------------------+-----------------+
|2024-12-31 20:00:00| 2024-12-31|              60.8|                3|
|2024-12-31 21:00:00| 2024-12-31|              39.8|                3|
|2024-12-31 23:00:00| 2024-12-31|300.99999999999994|               15|
|2025-01-01 00:00:00| 2025-01-01|121168.00999999975|             7344|
|2025-01-01 01:00:00| 2025-01-01|141337.82999999978|             8468|
|2025-01-01 02:00:00| 2025-01-01|109413.60999999993|             7257|
|2025-01-01 03:00:00| 2025-01-01| 58335.50999999995|             4915|
|2025-01-01 04:00:00| 2025-01-01| 41073.96000000002|             2918|
|2025-01-01 05:00:00| 2025-01-01|23179.539999999986|             1429|
|2025-01-01 06:00:00| 2025-01-01| 24637.22999999999|             1220|
|2025-01-01 07:00:00| 2025-01-01|22889.960000000017|             1116|
|2025-

In [43]:
# Geospatial Analysis: Calculate the Haversine distance for each trip.
# show the zone data, which is preprocessed using python geopandas package
zone_path = "s3://hw16-zones-bucket/taxi_zones_centroids.parquet"
# show the data
zones = spark.read.parquet(zone_path)

# join the zones table with the original table to find the longitude and latitude
pickup = zones.select(f.col("LocationID").alias("PULocationID"), f.col("longitude").alias("pickup_longitude"), f.col("latitude").alias("pickup_latitude"))
dropoff = zones.select(f.col("LocationID").alias("DOLocationID"), f.col("longitude").alias("dropoff_longitude"), f.col("latitude").alias("dropoff_latitude"))

# join the two charts to find the pickup and dropoff longitude and latitude
join_expression_pick = pickup["PULocationID"] == df["PULocationID"]
join_expression_drop = dropoff["DOLocationID"] == df["DOLocationID"]
df_trips = df.join(pickup, join_expression_pick, how="left")
df_trips = df_trips.join(dropoff, join_expression_drop, how="left")
df_trips = df_trips.select("pickup_date", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude")
df_trips.show()

# calculate the Haversine Distance
R = f.lit(6371)
do_lat = f.radians(f.col("dropoff_latitude"))
pu_lat = f.radians(f.col("pickup_latitude"))
diff_lat = do_lat - pu_lat
diff_lon = f.radians(f.col("dropoff_longitude")) - f.radians(f.col("pickup_longitude"))
term1 = f.sqrt((1/2) * (1 - f.cos(diff_lat) + f.cos(do_lat) * f.cos(pu_lat) * (1 - f.cos(diff_lon))))
distance = 2 * f.asin(term1) * R

# add to the column
df_trips = df_trips.withColumn("Haversine_Distance", distance)
write_out_by_date(df_trips, "distance/")
df_trips.show()

+-----------+------------------+------------------+------------------+------------------+
|pickup_date|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|
+-----------+------------------+------------------+------------------+------------------+
| 2025-01-01|-73.96514581878095| 40.75672890813223|-73.96563472764679| 40.76861505664575|
| 2025-01-01|-73.95701192571859| 40.78043629001211|-73.96563472764679| 40.76861505664575|
| 2025-01-01|-73.95963501136171| 40.76694805479654|-73.95963501136171| 40.76694805479654|
| 2025-01-01|-73.94139929841424| 40.84170843286747|-73.94139929841424| 40.84170843286747|
| 2025-01-01|-73.94139929841424| 40.84170843286747|-73.94852183123933| 40.82701260635661|
| 2025-01-01|-73.97863191501314| 40.78396140232239|-73.99991779023728|  40.7484273300986|
| 2025-01-01|-73.97849158432054| 40.74774573203895|-73.97849158432054| 40.74774573203895|
| 2025-01-01| -73.9904579133977| 40.74033737258096|-73.99089636602756|40.718938301131104|
| 2025-01-

In [45]:
# Calculate trip duration and average speed
df_duration = df.withColumn("trip_duration_min", (f.unix_timestamp(f.col("tpep_dropoff_datetime")) / 60 - f.unix_timestamp(f.col("tpep_pickup_datetime")) / 60))
df_time_speed = df_duration.withColumn("speed(mile_min)", f.col("trip_distance") / f.col("trip_duration_min"))\
                           .select("pickup_date", "VendorID", "trip_duration_min", "speed(mile_min)")
write_out_by_date(df_time_speed, "duration_speed/")
df_time_speed.show()

+-----------+--------+------------------+-------------------+
|pickup_date|VendorID| trip_duration_min|    speed(mile_min)|
+-----------+--------+------------------+-------------------+
| 2025-01-01|       1| 8.350000001490116| 0.1916167664328705|
| 2025-01-01|       1|2.5499999970197678| 0.1960784316017095|
| 2025-01-01|       1| 1.949999999254942|0.30769230780987106|
| 2025-01-01|       2| 5.566666666418314|0.09341317365686218|
| 2025-01-01|       2| 3.533333335071802|0.18679245273828315|
| 2025-01-01|       2|20.033333335071802|0.13128119799193536|
| 2025-01-01|       1|1.4666666649281979|0.27272727305054173|
| 2025-01-01|       1|12.400000002235174| 0.1290322580412573|
| 2025-01-01|       1| 19.66666666790843| 0.1423728813469427|
| 2025-01-01|       2| 9.566666666418314| 0.1787456446039434|
| 2025-01-01|       2| 7.600000001490116|0.30131578941460585|
| 2025-01-01|       2|  3.41666666790843|0.16390243896482107|
| 2025-01-01|       2|12.966666664928198|0.15347043703857097|
| 2025-0

In [54]:
# identify peak hours and detect fare anomalies
# count how many pickups happen in each hour to find the rush hour
df_congestion = df.filter(f.col("cbd_congestion_fee") > 0)
df_congestion = df_congestion.withColumn("pickup_hour", f.hour("tpep_pickup_datetime"))
df_congestion = df_congestion.groupBy("pickup_hour", "pickup_date").count().orderBy(f.desc("count"))
write_out_by_date(df_congestion, "peak_hour/")
df_congestion.show()

+-----------+-----------+-----+
|pickup_hour|pickup_date|count|
+-----------+-----------+-----+
|         18| 2025-01-16|10071|
|         18| 2025-01-23| 9569|
|         18| 2025-01-30| 8962|
|         18| 2025-01-31| 8907|
|         17| 2025-01-16| 8670|
|         23| 2025-01-25| 8500|
|         18| 2025-01-22| 8362|
|         17| 2025-01-23| 8311|
|         21| 2025-01-23| 8252|
|         18| 2025-01-24| 8218|
|         19| 2025-01-16| 8091|
|         17| 2025-01-31| 8003|
|         21| 2025-01-16| 7930|
|         19| 2025-01-23| 7878|
|         19| 2025-01-31| 7825|
|         18| 2025-01-17| 7764|
|         22| 2025-01-23| 7732|
|         18| 2025-01-15| 7728|
|         22| 2025-01-25| 7715|
|         21| 2025-01-22| 7670|
+-----------+-----------+-----+
only showing top 20 rows


In [57]:
# detect fare anomalies using IQR method (not robust to extreme values)
# if the value is lower than q1-1.5*IQR or higher than q3+1.5*IQR, we assume it is abnormal
# also we have amount which is less than 0, it is also abnormal
q1, q3 = df.approxQuantile("total_amount", [0.25, 0.75], 0.01)
iqr = q3 - q1
lower = q1 - 1.5 * iqr
upper = q3 + 1.5 * iqr

df_anomalies = df.select("pickup_date", "total_amount")\
                 .withColumn("amount_anomalies", (f.col("total_amount") < lower) | (f.col("total_amount") > upper))
write_out_by_date(df_anomalies, "fare_anomalies/")
df_anomalies.show(100, truncate = False)

+-----------+------------+----------------+
|pickup_date|total_amount|amount_anomalies|
+-----------+------------+----------------+
|2025-01-01 |18.0        |false           |
|2025-01-01 |12.12       |false           |
|2025-01-01 |12.1        |false           |
|2025-01-01 |9.7         |false           |
|2025-01-01 |8.3         |false           |
|2025-01-01 |24.1        |false           |
|2025-01-01 |11.75       |false           |
|2025-01-01 |19.1        |false           |
|2025-01-01 |27.1        |false           |
|2025-01-01 |16.4        |false           |
|2025-01-01 |16.4        |false           |
|2025-01-01 |12.96       |false           |
|2025-01-01 |19.2        |false           |
|2025-01-01 |12.9        |false           |
|2025-01-01 |38.9        |false           |
|2025-01-01 |22.7        |false           |
|2025-01-01 |25.55       |false           |
|2025-01-01 |-8.54       |true            |
|2025-01-01 |12.2        |false           |
|2025-01-01 |20.6        |false 

In [58]:
job.commit()


