### Download data and upload to S3


In [None]:
import os
import requests
import boto3
from os.path import basename
from dotenv import load_dotenv

load_dotenv()

urls = {
    "yellow": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet",
    "green": "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-01.parquet",
    "zones": "https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv"
}

bucket_name = os.getenv("BUCKET_NAME")
s3_prefix = os.getenv("S3_RAW_PREFIX")

if not bucket_name:
    raise ValueError("BUCKET_NAME not set in enviroment")

s3 = boto3.client('s3')

for name, url in urls.items():
    response= requests.get(url, stream=True)
    response.raise_for_status()

    response.raw.decode_content = True

    filename = basename(url)
    s3_key = os.path.join(s3_prefix, filename)

    s3.upload_fileobj(response.raw, bucket_name, s3_key)
    print(f"Uploaded {filename} to s3://{bucket_name}/{s3_key}")

print(f"All url was successfully uploaded")

Uploaded yellow_tripdata_2023-01.parquet to s3://mypracawsbucketsc2/raw/yellow_tripdata_2023-01.parquet
Uploaded green_tripdata_2023-01.parquet to s3://mypracawsbucketsc2/raw/green_tripdata_2023-01.parquet
Uploaded taxi+_zone_lookup.csv to s3://mypracawsbucketsc2/raw/taxi+_zone_lookup.csv
All url was successfully uploaded


### Create Spark connection with S3

In [47]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import os
import findspark

os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages org.apache.hadoop:hadoop-aws:3.3.1,"
    "com.amazonaws:aws-java-sdk-bundle:1.11.375,"
    "org.postgresql:postgresql:42.2.27 pyspark-shell"
)

findspark.init()

def create_spark_session(app_name="S3 CSV Reader"):
    spark_conf = {
        "spark.jars.packages": "org.apache.hadoop:hadoop-aws:3.3.1,"
                              "com.amazonaws:aws-java-sdk-bundle:1.11.375",
        "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
        "spark.hadoop.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
        "spark.sql.adaptive.enabled": "true",
        "spark.sparkContext.setLogLevel" :"INFO"
        
    }

    builder = SparkSession.builder.appName(app_name)
    for k, v in spark_conf.items():
        builder = builder.config(k, v)
    
    spark = builder.getOrCreate()
    return spark

spark = create_spark_session()

25/07/04 07:33:17 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


### Check that we can read all data


In [2]:
yellow_df = spark.read.parquet("s3a://mypracawsbucketsc2/raw/yellow_tripdata_2023-01.parquet")
yellow_df.show()
yellow_df.columns

25/07/04 07:00:20 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2023-01-01 00:32:10|  2023-01-01 00:40:36|            1.0|         0.97|       1.0|                 N|         161|         141|           2|        9.3|  1.0|    0.5|       0.

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'airport_fee']

In [7]:
green_df = spark.read.parquet("s3a://mypracawsbucketsc2/raw/green_tripdata_2023-01.parquet")
green_df.show()
green_df.columns

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2023-01-01 00:26:10|  2023-01-01 00:37:11|                 N|       1.0|         166|         143|            1.0|         2.58|       14.9|  1.0|    0.

['VendorID',
 'lpep_pickup_datetime',
 'lpep_dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge']

In [4]:
taxi_df = spark.read.csv("s3a://mypracawsbucketsc2/raw/taxi+_zone_lookup.csv", header=True, inferSchema=True)
taxi_df.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

### Join PULocation, DOLocation with taxi zone


In [104]:
# Create a in memory temp-view for df
taxi_df.createOrReplaceGlobalTempView('taxi_zone')
green_df.createOrReplaceGlobalTempView('green_taxi')
yellow_df.createOrReplaceGlobalTempView('yellow_taxi')

SQL_QUERY = """
    
WITH g_cte AS (
        SELECT 
        gt.PULocationID AS pick_up_location, 
        gt.DOLocationID AS drop_off_location,
        g_tz1.zone AS pick_up_zone,
        g_tz2.zone AS drop_off_zone,
        CONCAT(gt.PULocationID, '_', gt.DOLocationID) AS route_id,
        (UNIX_TIMESTAMP(lpep_dropoff_datetime) - UNIX_TIMESTAMP(lpep_pickup_datetime)) / 60 AS trip_duration,
        trip_distance,
        'green' AS taxi_type
        FROM global_temp.green_taxi gt
        LEFT JOIN global_temp.taxi_zone g_tz1 ON gt.PULocationID = g_tz1.LocationID
        LEFT JOIN global_temp.taxi_zone g_tz2 ON gt.DOLocationID= g_tz2.LocationID
    ), 
    y_cte AS (
        SELECT 
        yt.PULocationID AS pick_up_location, 
        yt.DOLocationID AS drop_off_location,
        y_tz1.zone AS pick_up_zone,
        y_tz2.zone AS drop_off_zone,
        CONCAT(yt.PULocationID, '_', yt.DOLocationID) AS route_id,
        (UNIX_TIMESTAMP(tpep_dropoff_datetime) - UNIX_TIMESTAMP(tpep_pickup_datetime)) / 60 AS trip_duration,
        trip_distance,
        'yellow' AS taxi_type
        FROM global_temp.yellow_taxi yt
        LEFT JOIN global_temp.taxi_zone y_tz1 ON yt.PULocationID = y_tz1.LocationID
        LEFT JOIN global_temp.taxi_zone y_tz2 ON yt.DOLocationID= y_tz2.LocationID
    )
    SELECT 
        pick_up_zone,
        drop_off_zone, 
        route_id, 
        COUNT(route_id) as total_trips, 
        ROUND(AVG(trip_duration),2) AS avg_trip_duration,
        ROUND(AVG(trip_distance),2) AS avg_trip_distance,
        taxi_type
    FROM g_cte
    GROUP BY route_id, pick_up_zone, drop_off_zone, taxi_type
    

    UNION ALL

    SELECT 
        pick_up_zone,
        drop_off_zone, 
        route_id, 
        COUNT(route_id) as total_trips, 
        ROUND(AVG(trip_duration),2) AS avg_trip_duration,
        ROUND(AVG(trip_distance),2) AS avg_trip_distance,
        taxi_type
    FROM y_cte
    GROUP BY route_id, pick_up_zone, drop_off_zone, taxi_type
    

"""
from pyspark.sql import functions as F

transformed_df = spark.sql(SQL_QUERY)
transformed_df.show()
transformed_df.orderBy(F.rand()).show(10)

                                                                                

+--------------------+--------------------+--------+-----------+-----------------+-----------------+---------+
|        pick_up_zone|       drop_off_zone|route_id|total_trips|avg_trip_duration|avg_trip_distance|taxi_type|
+--------------------+--------------------+--------+-----------+-----------------+-----------------+---------+
|             Astoria|     Jackson Heights|   7_129|         71|            14.54|             2.56|    green|
| Crown Heights North| Crown Heights North|   61_61|         40|             9.55|            67.23|    green|
|   East Harlem North| Lincoln Square East|  74_142|        123|            21.02|              3.7|    green|
|            Elmhurst|        Forest Hills|   82_95|        112|            36.29|             1.96|    green|
|Flatbush/Ditmas Park|        Coney Island|   89_55|          4|             37.6|             1.73|    green|
|           Bay Ridge|       Dyker Heights|   14_67|          1|            11.82|              0.0|    green|
|



+--------------------+--------------------+--------+-----------+-----------------+-----------------+---------+
|        pick_up_zone|       drop_off_zone|route_id|total_trips|avg_trip_duration|avg_trip_distance|taxi_type|
+--------------------+--------------------+--------+-----------+-----------------+-----------------+---------+
|         Boerum Hill|        West Village|  25_249|          2|            20.42|             3.84|    green|
|      Brighton Beach|         JFK Airport|  29_132|          1|             17.8|            13.68|   yellow|
|        Clinton East|             Norwood|  48_174|          6|            27.81|            11.38|   yellow|
|             Madison|        Clinton Hill|  149_49|          1|            70.13|              5.8|   yellow|
|  DUMBO/Vinegar Hill|        East Chelsea|   66_68|          7|            24.04|             5.11|    green|
|Greenwich Village...|            Kips Bay| 114_137|        760|            18.73|             1.92|   yellow|
|

                                                                                

### Saving to as parquet to S3


In [None]:
from dotenv import load_dotenv
load_dotenv()
s3_prefix = os.getenv("S3_PROCESSED_PREFIX")
bucket_name = os.getenv("BUCKET_NAME")

transformed_df.write\
    .mode("append") \
    .parquet(f"s3a://{bucket_name}/{s3_prefix}/task_3_data_parquet/")

print(f"Data successfully written to s3://{bucket_name}/{s3_prefix}task_3_data_parquet/")

                                                                                

Data successfully written to s3://mypracawsbucketsc2/processed//task_3_data_parquet/


### Data from ATHENE
1. Confirm that the number of trips is greater than zero.  

SQ_QUERY =  SELECT pick_up_zone, drop_off_zone, total_trips
            FROM "task_3_glue_db"."task_3_data_parquet"
            WHERE total_trips > 0
            ORDER BY total_trips ASC
            LIMIT 10;


In [110]:
import pandas as pd

data = {
    "pick_up_zone": [
        "Jackson Heights", "Sunset Park East", "Rosedale", "Bay Ridge",
        "Sheepshead Bay", "Old Astoria", "Brooklyn Heights",
        "Briarwood/Jamaica Hills", "North Corona", "Steinway"
    ],
    "drop_off_zone": [
        "East Chelsea", "Central Harlem North", "Hillcrest/Pomonok", "Dyker Heights",
        "JFK Airport", "Yorkville West", "Lincoln Square West",
        "JFK Airport", "Briarwood/Jamaica Hills", "East Harlem South"
    ],
    "total_trips": [1,1,1,1,1,1,1,1,1,1]
}

df = pd.DataFrame(data)
df

Unnamed: 0,pick_up_zone,drop_off_zone,total_trips
0,Jackson Heights,East Chelsea,1
1,Sunset Park East,Central Harlem North,1
2,Rosedale,Hillcrest/Pomonok,1
3,Bay Ridge,Dyker Heights,1
4,Sheepshead Bay,JFK Airport,1
5,Old Astoria,Yorkville West,1
6,Brooklyn Heights,Lincoln Square West,1
7,Briarwood/Jamaica Hills,JFK Airport,1
8,North Corona,Briarwood/Jamaica Hills,1
9,Steinway,East Harlem South,1


### Data from ATHENE
2. Get statistics like the top 10 zone pairs with the highest trip counts.

SQL_QUERY=  SELECT pick_up_zone, drop_off_zone, total_trips
            FROM "task_3_glue_db"."task_3_data_parquet"
            GROUP BY pick_up_zone, drop_off_zone, total_trips
            ORDER BY total_trips desc
            LIMIT 10;

In [112]:
import pandas as pd

data = {
    "pick_up_zone": [
        "Upper East Side South", "Upper East Side North", "N/A", "Upper East Side North",
        "Upper East Side South", "Upper East Side South", "Midtown Center",
        "Midtown Center", "Lenox Hill West", "Lincoln Square East"
    ],
    "drop_off_zone": [
        "Upper East Side North", "Upper East Side South", "N/A", "Upper East Side North",
        "Upper East Side South", "Midtown Center", "Upper East Side South",
        "Upper East Side North", "Upper East Side North", "Upper West Side South"
    ],
    "total_trips": [22303, 18981, 15354, 14926, 14546, 9408, 9320, 8599, 8299, 8198]
}

df = pd.DataFrame(data)
df

Unnamed: 0,pick_up_zone,drop_off_zone,total_trips
0,Upper East Side South,Upper East Side North,22303
1,Upper East Side North,Upper East Side South,18981
2,,,15354
3,Upper East Side North,Upper East Side North,14926
4,Upper East Side South,Upper East Side South,14546
5,Upper East Side South,Midtown Center,9408
6,Midtown Center,Upper East Side South,9320
7,Midtown Center,Upper East Side North,8599
8,Lenox Hill West,Upper East Side North,8299
9,Lincoln Square East,Upper West Side South,8198
