In [2]:
import configparser
import os, glob
from datetime import datetime
import pandas as pd 
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType


In [3]:
spark = SparkSession.builder.config(
        "spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0"
        ).getOrCreate()

23/04/15 01:16:59 WARN Utils: Your hostname, m-hassib resolves to a loopback address: 127.0.1.1; using 192.168.1.7 instead (on interface wlp3s0)
23/04/15 01:16:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/m-hassib/.ivy2/cache
The jars for the packages stored in: /home/m-hassib/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-163116d5-4660-4e7e-a651-c0e86857ce62;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;2.7.0 in central
	found org.apache.hadoop#hadoop-common;2.7.0 in central
	found org.apache.hadoop#hadoop-annotations;2.7.0 in central
	found com.google.guava#guava;11.0.2 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found commons-cli#commons-cli;1.2 in central
	found org.apache.commons#commons-math3;3.1.1 in central
	found xmlenc#xmlenc;0.52 in central
	found commons-httpclient#commons-httpclient;3.1 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.4 in central
	found commons-io#commons-io;2.4 in central
	found commons-net#commons-net;3.1 in central
	found commons-collections#commons-col

23/04/15 01:17:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/15 01:17:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Scope the Project and Gather Data
For this project, I have decided to gather data from two sources: the New York City Taxi and Limousine Commission's (TLC) trip record data and weather data 
scraped from https://www.wunderground.com/.  

The end use case for this data is to create an analytics table that can be used to analyze the relationship between weather conditions and trips in New York City.


### The TLC data 
provides information on taxi and for-hire vehicle trips in New York City, including pickup and dropoff 
times and locations, fares, and payment types in this project we will work on for-hire vehicle trips data.

#### 1. Trips data

In [6]:
trip_data = spark.read.parquet("data/tlc/fhvhv_tripdata_2023-01.parquet")
trip_data.show()

                                                                                

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+-----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee| tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+

#### 2. High Volume Licenses
This table contains information about high volume licenses for ride-sharing companies operating in New York City. The table includes license numbers, base names, and affiliated app companies for each license. The table is useful for tracking the licenses of high volume ride-sharing companies operating in NYC and for analyzing their market share in the city.

In [8]:
hvl_data = spark.read.csv("data/tlc/hvl_data.csv", header= True)
hvl_data.show()

+-----------------+--------------+--------------------+-----------+
|hv_license_number|license_number|           base_name|affiliation|
+-----------------+--------------+--------------------+-----------+
|           HV0002|        B02914|     VULCAN CARS LLC|       Juno|
|           HV0002|        B02907|        SABO ONE LLC|       Juno|
|           HV0002|        B02908|        SABO TWO LLC|       Juno|
|           HV0002|        B03035|           OMAHA LLC|       Juno|
|           HV0005|        B02510|        TRI-CITY LLC|       Lyft|
|           HV0005|        B02844|ENDOR CAR & DRIVE...|       Lyft|
|           HV0003|        B02877|        ZWOLF-NY LLC|       Uber|
|           HV0003|        B02866|         ZWEI-NY LLC|       Uber|
|           HV0003|        B02882|      ZWANZIG-NY LLC|       Uber|
|           HV0003|        B02869|         ZEHN-NY LLC|       Uber|
|           HV0003|        B02617|          WEITER LLC|       Uber|
|           HV0003|        B02876|     VIERZEHN-

#### 3. zone lookup table
Each of the trip records contains a field corresponding to the location of the pickup or drop-off of
the trip populated by numbers ranging from 1-263.
These numbers correspond to taxi zones 

In [10]:
zone_data= spark.read.csv("data/tlc/zone_lookup.csv", header=True)
zone_data.show()

+-----------+-------------+--------------------+-------------+
|location_id|      borough|                zone| service_zone|
+-----------+-------------+--------------------+-------------+
|          1|          EWR|      Newark Airport|          EWR|
|          2|       Queens|         Jamaica Bay|    Boro Zone|
|          3|        Bronx|Allerton/Pelham G...|    Boro Zone|
|          4|    Manhattan|       Alphabet City|  Yellow Zone|
|          5|Staten Island|       Arden Heights|    Boro Zone|
|          6|Staten Island|Arrochar/Fort Wad...|    Boro Zone|
|          7|       Queens|             Astoria|    Boro Zone|
|          8|       Queens|        Astoria Park|    Boro Zone|
|          9|       Queens|          Auburndale|    Boro Zone|
|         10|       Queens|        Baisley Park|    Boro Zone|
|         11|     Brooklyn|          Bath Beach|    Boro Zone|
|         12|    Manhattan|        Battery Park|  Yellow Zone|
|         13|    Manhattan|   Battery Park City|  Yello

### Weather Data
The wunderground data provides information on weather
conditions in New York City, including temperature, precipitation, and wind speed.    

In [12]:
weather_data = spark.read.csv("data/weather/weather.csv", header=True)
weather_data.show()

+--------+-----------+---------+--------+----+----------+---------+--------+------+-------------+----------+
|    time|temperature|dew_point|humidity|wind|wind_speed|wind_gust|pressure|precip|    condition|      date|
+--------+-----------+---------+--------+----+----------+---------+--------+------+-------------+----------+
| 1:51 AM|      47 °F|    26 °F|    44 %| SSW|     8 mph|    0 mph|30.28 in|0.0 in|Mostly Cloudy|2023-01-29|
| 2:51 AM|      46 °F|    26 °F|    46 %| SSW|     7 mph|    0 mph|30.29 in|0.0 in|Partly Cloudy|2023-01-29|
| 3:51 AM|      42 °F|    28 °F|    58 %|   S|     5 mph|    0 mph|30.29 in|0.0 in|Partly Cloudy|2023-01-29|
| 4:51 AM|      42 °F|    29 °F|    60 %|   S|     6 mph|    0 mph|30.29 in|0.0 in|Partly Cloudy|2023-01-29|
| 5:51 AM|      42 °F|    29 °F|    60 %|   S|     5 mph|    0 mph|30.28 in|0.0 in|Partly Cloudy|2023-01-29|
| 6:51 AM|      42 °F|    31 °F|    65 %| SSE|     5 mph|    0 mph|30.27 in|0.0 in|Partly Cloudy|2023-01-29|
| 7:51 AM|      42 

### 2. Explore and Assess the Data
Upon exploring the data, I found that there were missing values in some columns and duplicate rows in the TLC data. I also noticed inconsistencies in the date and time formats between the two data sources, which will need to be addressed in the data cleaning process.

To clean the data, I will need to remove any duplicate rows, fill in any missing values, and convert the date and time formats to be consistent between the two data sources.

#### Trips Data

In [14]:
trips_null_counts_exprs = [F.sum(F.col(c).isNull().cast('int')).alias(c) for c in trip_data.columns]
trips_null_counts = trip_data.agg(*trips_null_counts_exprs)
trips_null_counts.show()



+-----------------+--------------------+--------------------+----------------+-----------------+---------------+----------------+------------+------------+----------+---------+-------------------+-----+---+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|request_datetime|on_scene_datetime|pickup_datetime|dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls|bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+----------------+-----------------+---------------+----------------+------------+------------+----------+---------+-------------------+-----+---+---------+--------------------+-----------+----+----------+-------------------+-----

                                                                                

In [15]:
# drop duplicates
trip_data = trip_data.drop_duplicates()

# fillna in originating_base_num with dispatching_base_num column and add weaher column that can be used to referance weather data
trip_data = trip_data.withColumn('originating_base_num', F.coalesce('originating_base_num', 'dispatching_base_num'))\
                        .withColumn('weather', F.date_format(F.from_unixtime((F.unix_timestamp("request_datetime") / 3600) * 3600), "yyyy-MM-dd HH")) 
    
trip_data.show()



23/04/14 23:58:40 WARN TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
23/04/14 23:58:41 WARN TaskMemoryManager: Failed to allocate a page (60871574 bytes), try again.


[Stage 19:>                                                         (0 + 1) / 1]

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+-----+----------+-------------------+-----------------+------------------+----------------+--------------+-------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee| tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|      weather|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+-----

                                                                                

#### WEATHER DATA


In [19]:
def convert_time(time_str):
    hour = int(time_str.split(':')[0])
    if 'PM' in time_str:
        hour += 12
    if hour == 12 or hour ==24: # Special case: 12 PM or 12 AM
        hour -= 12
    return f'{hour:02d}'

# Define UDFs
convert_time_udf = F.udf(lambda time_str: convert_time(time_str), StringType())
split_udf = F.udf(lambda x: int(x.split(' ')[0]), IntegerType())

# Change datetime format, make it as a key and apply transformations
weather_data = weather_data.withColumn("time", convert_time_udf("time")) \
    .withColumn("date", F.date_format("date", 'yyyy-MM-dd'))\
    .withColumn("weather_id", F.concat_ws(" ", "date", "time")) \
    .drop("dew_point", "wind", "wind_gust", "pressure", "precip") \
    .withColumn("humidity", split_udf("humidity")) \
    .withColumn("wind_speed", split_udf("wind_speed")) \
    .withColumn("temperature", split_udf("temperature")) \
    .drop_duplicates(['weather_id'])

weather_data.show()

[Stage 23:>                                                         (0 + 1) / 1]

+----+-----------+--------+----------+-------------+----------+-------------+
|time|temperature|humidity|wind_speed|    condition|      date|   weather_id|
+----+-----------+--------+----------+-------------+----------+-------------+
|  00|         50|      54|         0|Mostly Cloudy|2023-01-01|2023-01-01 00|
|  01|         53|      93|         3|   Light Rain|2023-01-01|2023-01-01 01|
|  02|         54|      86|         3|   Light Rain|2023-01-01|2023-01-01 02|
|  03|         54|      83|        12|   Light Rain|2023-01-01|2023-01-01 03|
|  04|         54|      80|        10|   Light Rain|2023-01-01|2023-01-01 04|
|  05|         53|      77|         6|   Light Rain|2023-01-01|2023-01-01 05|
|  06|         53|      74|        12|         Fair|2023-01-01|2023-01-01 06|
|  07|         52|      77|         9|Partly Cloudy|2023-01-01|2023-01-01 07|
|  08|         50|      71|         9|Partly Cloudy|2023-01-01|2023-01-01 08|
|  09|         51|      66|        15|Mostly Cloudy|2023-01-01|2

                                                                                

#### 3. The Data Model
For this project, I have decided to use star schema. With a star schema based on the TLC Trip Record Data  and weather data, 
you can perform a variety of analytical queries to gain insights into for-hire vehicle industry in New York City.

##### Schema contain one fact table and four Dimension Tables:
- Fact Table: 
    - Trips:
        trip_id (Primary key)

        hvfhs_license_num (Foreign key referencing HVFHS)

        dispatching_base_num (Foreign key referencing base_num)

        originating_base_num (Foreign key referencing base_num)

        request_datetime (Foreign key referencing DateTimes)

        pickup_datetime (Foreign key referencing DateTimes)

        dropoff_datetime (Foreign key referencing DateTimes)

        PULocationID (Foreign key referencing Locations)

        DOLocationID (Foreign key referencing Locations)

        weather,
        trip_miles,
        trip_time,
        base_passenger_fare,
        tolls,
        bcf,
        sales_tax,
        congestion_surcharge,
        airport_fee,
        tips,
        driver_pay,
        shared_request_flag,
        shared_match_flag

- Dimension Tables:
    - HVFHS :
        hv_license_num (Primary key), 
        base_num (Primary key),
        Affiliation,
        base_name,

    - DateTimes: 
        datetime_id (Primary key)
        full_datetime
        date
        hour
        day
        month
        week
        day_of_week

    - Locations: 
        location_id (Primary key), 
        borough, 
        zone, 
        service_zone
    
    - weather: 
        weather_id,	
        date,	
        time,	
        temperature,		
        humidity, 	
        wind_speed,			
        condition

I chose this model because it allows for easy querying and analysis of the data. To pipeline the data into this model, I will need to use ETL (Extract, Transform, Load) processes to extract the data from the original sources, transform the data to match the desired data model, and load the transformed data into the appropriate tables in a relational database.

#### 4. Run ETL to Model the Data
To create the data pipelines and the data model, I will use Python and PySpark. I will write Python scripts to extract and transform the data and load the transformed data into the appropriate data lake on S3.

### 5. Complete Project Write Up
The goal of this project is to create an analytics datalake that can be used to analyze the relationship between weather conditions and taxi trips in New York City. Some example queries that could be run on this data lake such as:

        - What is the average fare for taxi trips on rainy days versus sunny days?
        - Get the total revenue for each weather condition:
        - How does the number of taxi trips vary with temperature?
        - Get the average trip distance and time for each weather condition

Airflow could be incorporated into this project to manage the ETL workflows and automate the data pipeline processes.

I chose a star data model for this project because it allows for easy querying and analysis of the data. If the data was increased by 100x, I would consider using a distributed computing system, such as AWS EMR, to handle the increased data volume.

If the pipelines were run on a daily basis by 7am, I would use Airflow to schedule and automate the ETL workflows.

If the database needed to be accessed by 100+ people, I would consider using a cloud-based solution, such as Amazon Redshift or Google BigQuery, to ensure scalability and availability. I would also implement security measures, such as role-based access control