# This notebook performs feature engineering.

In [1]:
from pyspark.sql import SparkSession, functions as F

spark = (
    SparkSession.builder.appName("MAST30034 Project 1")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "5g")
    .config("spark.executor.memory", "3g")
    .getOrCreate()
)

23/08/20 13:48:13 WARN Utils: Your hostname, Didis-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.13.100.27 instead (on interface en0)
23/08/20 13:48:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/20 13:48:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/08/20 13:48:13 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


`Correct FHV dataset schema`

In [2]:
sdf_jan = spark.read.parquet('../data/landing/2022-01.parquet')
sdf_correct = spark.read.parquet('../data/landing/2022-02.parquet')

# Casting schema
consistent_col_casting = [F.col(col_name).alias(col_name.lower()) for col_name in sdf_correct.columns]
sdf_correct = sdf_jan.select(*consistent_col_casting)
sdf_schema = sdf_correct.schema

# Write corrected files into RAW folder
for year in [2022, 2023]:
    if year == 2022:
        MONTH = range(1,13)
    else:
        MONTH = range(1,4)
        
    for month in MONTH:
        path = f'../data/landing/{str(year)}-{str(month).zfill(2)}.parquet'
        out_path = f'../data/raw/{str(year)}-{str(month).zfill(2)}.parquet'

        sdf_malformed = spark.read.parquet(path)

        sdf_malformed = sdf_malformed \
            .select([F.col(c).cast(sdf_schema[i].dataType) for i, c in enumerate(sdf_malformed.columns)])
        
        sdf_malformed \
            .coalesce(1) \
            .write \
            .mode('overwrite') \
            .parquet(out_path)

                                                                                

Verify all data in 2022 are consistent.

In [6]:
sdf_2022 = spark.read.parquet('../data/raw/2022-*')
sdf_2023 = spark.read.parquet('../data/raw/2023-*')
print("# of instances:", sdf_2022.count())
sdf_2022.show(1, vertical = True, truncate = 100)

# of instances: 212416083
-RECORD 0-----------------------------------
 hvfhs_license_num    | HV0003              
 dispatching_base_num | B03404              
 originating_base_num | B03404              
 request_datetime     | 2022-01-01 00:05:31 
 on_scene_datetime    | 2022-01-01 00:05:40 
 pickup_datetime      | 2022-01-01 00:07:24 
 dropoff_datetime     | 2022-01-01 00:18:28 
 PULocationID         | 170                 
 DOLocationID         | 161                 
 trip_miles           | 1.18                
 trip_time            | 664                 
 base_passenger_fare  | 24.9                
 tolls                | 0.0                 
 bcf                  | 0.75                
 sales_tax            | 2.21                
 congestion_surcharge | 2.75                
 airport_fee          | 0.0                 
 tips                 | 0.0                 
 driver_pay           | 23.03               
 shared_request_flag  | N                   
 shared_match_flag    | N    

`Feature engineering for FHV dataset`

Create features of interest

In [7]:
from pyspark.sql.functions import *
from pyspark.sql.functions import hour

def feature_eng(sdf):
    # Add a "PUhour" column 
    sdf = sdf.withColumn("PUhour", hour("pickup_datetime"))

    # Add feature "wait time" to capture the time between request and pickup.
    sdf = sdf.withColumn("wait_time", unix_timestamp("pickup_datetime") - unix_timestamp('request_datetime'))

    # Add feature "pay_per_min", money driver receives per second of the trip
    sdf = sdf.withColumn("pay_per_min", (col("driver_pay")+col("tips")) / (col("wait_time")+col("trip_time")) * 60)

    # Add feature "pay_per_mile", money driver receives per passenger mile of the trip
    sdf = sdf.withColumn("pay_per_mile", (col("driver_pay")+col("tips")) / col("trip_miles"))

    # Combine "driver_pay" and "tips" in one field
    sdf = sdf.withColumn("driver_pay", (col("driver_pay") + col("tips")))
    
    return sdf

sdf_2022 = feature_eng(sdf_2022)
sdf_2023 = feature_eng(sdf_2023)
sdf_2022.show(1, vertical = True, truncate = 100)

-RECORD 0-----------------------------------
 hvfhs_license_num    | HV0003              
 dispatching_base_num | B03404              
 originating_base_num | B03404              
 request_datetime     | 2022-01-01 00:05:31 
 on_scene_datetime    | 2022-01-01 00:05:40 
 pickup_datetime      | 2022-01-01 00:07:24 
 dropoff_datetime     | 2022-01-01 00:18:28 
 PULocationID         | 170                 
 DOLocationID         | 161                 
 trip_miles           | 1.18                
 trip_time            | 664                 
 base_passenger_fare  | 24.9                
 tolls                | 0.0                 
 bcf                  | 0.75                
 sales_tax            | 2.21                
 congestion_surcharge | 2.75                
 airport_fee          | 0.0                 
 tips                 | 0.0                 
 driver_pay           | 23.03               
 shared_request_flag  | N                   
 shared_match_flag    | N                   
 access_a_

Delete redundant features

In [8]:
remove = ['hvfhs_license_num',
          'dispatching_base_num',
          'originating_base_num',
          'on_scene_datetime',
          'request_datetime',
          'pickup_datetime',
          'dropoff_datetime',
          'base_passenger_fare',
          'tolls',
          'tips',
          'bcf',
          'sales_tax',
          'congestion_surcharge',
          'airport_fee',
          'shared_request_flag',
          'access_a_ride_flag',
          'wav_match_flag',
          'wav_request_flag',
          'shared_match_flag']
sdf_2022 = sdf_2022.select([col for col in sdf_2022.columns if col not in remove])
sdf_2023 = sdf_2023.select([col for col in sdf_2023.columns if col not in remove])
sdf_2022.show(1, vertical=True, truncate=100)

-RECORD 0--------------------------
 PULocationID | 170                
 DOLocationID | 161                
 trip_miles   | 1.18               
 trip_time    | 664                
 driver_pay   | 23.03              
 PUhour       | 0                  
 wait_time    | 113                
 pay_per_min  | 1.7783783783783784 
 pay_per_mile | 19.516949152542374 
only showing top 1 row



In [20]:
print("Total number of records originally:", sdf_2022.count())

Total number of records originally: 212416083


`Filter invalid records`

Valid records are:
1. Trip duration more than 1 minute.
2. Trip distance larger than 0.3 miles.
3. Driver pay is positive.
4. Wait time is positive.
5. Pick up location inside NYC region.
6. pay_per_min less than 10.
7. pay_per_mile less than 40

In [9]:
# Count records with trip duration less than 1 minute 

sdf_2022.createOrReplaceTempView('taxi')
sql_query = spark.sql("""
SELECT 
    *
FROM
    taxi
WHERE 
    trip_time < 60

""")

count = sql_query.count()

print("Proportion:", count / 212416083)
print("Amount:", count)



Proportion: 9.69418120755009e-05
Amount: 20592


                                                                                

In [10]:
# Count records with trip distance less than 0.3 miles
sql_query = spark.sql("""
SELECT 
    *
FROM
    taxi
WHERE 
    trip_miles < 0.3

""")

count = sql_query.count()

print("Proportion:", count / 212416083)
print("Amount:", count)



Proportion: 0.001816595026846437
Amount: 385874


                                                                                

In [16]:
# Count records with driver pay less than 0
sql_query = spark.sql("""
SELECT 
    *
FROM
    taxi
WHERE 
    driver_pay <= 0

""")

count = sql_query.count()

print("Proportion:", count / 212416083)
print("Amount:", count)



Proportion: 0.00530790787626001
Amount: 1127485


                                                                                

In [17]:
# Count records with negative wait time
sql_query = spark.sql("""
SELECT 
    *
FROM
    taxi
WHERE 
    wait_time < 0

""")

count = sql_query.count()

print("Proportion:", count / 212416083)
print("Amount:", count)



Proportion: 0.012526372591099894
Amount: 2660803


                                                                                

In [22]:
# Count records with pickup location outside NYC region
sql_query = spark.sql("""
SELECT 
    *
FROM
    taxi
WHERE 
    PULocationID <= 0 OR 
    PULocationID > 263

""")

count = sql_query.count()

print("Proportion:", count / 212416083)
print("Amount:", count)

Proportion: 5.0019752976990916e-05
Amount: 10625


In [23]:
# Check the 0.999 quantile of pay_per_minute
sdf_2022.approxQuantile("pay_per_min", [0.999], 0.0001)

                                                                                

[4.737142857142857]

In [26]:
# Count records with pay_per_minute exceed ~twice its 0.999 quantile
sql_query = spark.sql("""
SELECT 
    *
FROM
    taxi
WHERE 
    pay_per_min >= 

""")

count = sql_query.count()

print("Proportion:", count / 212416083)
print("Amount:", count)



Proportion: 0.00035760945653065263
Amount: 75962


                                                                                

In [28]:
# Check the 0.99 quantile of pay_per_mile
sdf_2022.approxQuantile("pay_per_min", [0.99], 0.0001)

                                                                                

[2.0484769928710307]

In [29]:
# Count records with pay_per_mile exceed ~twice its 0.99 quantile
sql_query = spark.sql("""
SELECT 
    *
FROM
    taxi
WHERE 
    pay_per_mile >= 40

""")

count = sql_query.count()

print("Proportion:", count / 212416083)
print("Amount:", count)



Proportion: 0.0012933107329730772
Amount: 274720


                                                                                

In [30]:
sdf_2022 = sdf_2022.filter(
        (F.col('trip_time') >= 60) &
        (F.col('trip_miles') >= 0.3) &
        (F.col('driver_pay') > 0) &
        (F.col('PULocationID') <= 263) & 
        (F.col('PULocationID') >= 1) &
        (F.col('wait_time') >= 0) &
        (F.col('pay_per_min') < 10) & 
        (F.col('pay_per_mile') < 40)       
)


In [33]:
count = sdf_2022.count()
print("Proportion of removal:", (212416083 - count) / 212416083)
print("Amount of removal:", 212416083 - count)




Proportion of removal: 0.02023320898917056
Amount of removal: 4297859


                                                                                

In [39]:
sdf_2022.show(1, vertical=True, truncate=100)
sdf_2022.write.parquet("../data/curated/FHV_train.parquet")
sdf_2023.write.parquet("../data/curated/FHV_test.parquet")

-RECORD 0--------------------------
 PULocationID | 170                
 DOLocationID | 161                
 trip_miles   | 1.18               
 trip_time    | 664                
 driver_pay   | 23.03              
 PUhour       | 0                  
 wait_time    | 113                
 pay_per_min  | 1.7783783783783784 
 pay_per_mile | 19.516949152542374 
only showing top 1 row



                                                                                

`Pre-processing of the LANDUSE dataset`

In [40]:
landuse = spark.read.parquet('../data/external/landuse.parquet')
print("# of original instances", landuse.count())
landuse.show(1, vertical=True, truncate=100)

# of original instances 859068
-RECORD 0-----------------------------------
 borough              | BK                  
 block                | 7104                
 lot                  | 338                 
 cd                   | 315                 
 ct2010               | 400                 
 cb2010               | 1000                
 schooldist           | 21                  
 council              | 47                  
 zipcode              | 11223               
 firecomp             | E254                
 policeprct           | 61                  
 healtharea           | 8610                
 sanitboro            | 3                   
 sanitsub             | 1D                  
 address              | 336 AVENUE T        
 zonedist1            | R4                  
 spdist1              | OP                  
 splitzone            | false               
 bldgclass            | A5                  
 landuse              | 1                   
 easements            | 

In [41]:
print("Number of columns in PLUTO:", len(landuse.columns))

Number of columns in PLUTO: 90


`Feature engineering`

In [42]:
# Remove records whose sum of each landuse area exceeds the total building area
landuse = landuse.filter(
        (F.col('comarea') + F.col('resarea') + F.col('officearea') + F.col('retailarea') <= F.col('bldgarea'))
)

print("# of remaining records", landuse.count())

# Only keep these features, remove redundant features
landuse_col = [
           'numbldgs',
           'latitude',
           'longitude',
           'comarea',
           'resarea',
           'officearea',
           'retailarea',
           'bldgarea'
           ]
landuse = landuse.select([col for col in landuse.columns if col in landuse_col])
landuse.show(1, vertical=True, truncate=100)

# of remaining records 731182
-RECORD 0-----------------
 bldgarea   | 992         
 comarea    | 0           
 resarea    | 992         
 officearea | 0           
 retailarea | 0           
 numbldgs   | 1           
 latitude   | 40.5991031  
 longitude  | -73.9725388 
only showing top 1 row



`Assign each building to one of the location ID defined by the taxi zones, based on the buildings' latitude and longitude`

Convert latitude and longitude to geometry using geopandas.

In [43]:
import pandas as pd
import geopandas as gpd

In [44]:
pd_landuse = landuse.toPandas()
gdf_landuse = gpd.GeoDataFrame(pd_landuse, geometry=gpd.points_from_xy(pd_landuse.longitude, pd_landuse.latitude), crs="EPSG:4326")
gdf_landuse.head()

                                                                                

Unnamed: 0,bldgarea,comarea,resarea,officearea,retailarea,numbldgs,latitude,longitude,geometry
0,992,0,992,0,0,1,40.5991031,-73.9725388,POINT (-73.97254 40.59910)
1,992,0,992,0,0,1,40.5991085,-73.9724847,POINT (-73.97248 40.59911)
2,2775,0,2775,0,0,1,40.6890427,-73.9069255,POINT (-73.90693 40.68904)
3,7975,0,7975,0,0,1,40.7033412,-73.9417709,POINT (-73.94177 40.70334)
4,2775,0,1850,0,0,1,40.6890811,-73.9068858,POINT (-73.90689 40.68908)


In [45]:
# Read in the location and zone files 
sf = gpd.read_file("../data/taxi_zones/taxi_zones.shp")
zones = pd.read_csv("../data/taxi_zones/taxi+_zone_lookup.csv")
# Convert geometry to latitude and longitude (referenced from W2 tutorial)
sf['geometry'] = sf['geometry'].to_crs("+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs")
# Inner join location and zone files on "location ID"
gdf = gpd.GeoDataFrame(
    pd.merge(zones, sf, on='LocationID', how='inner')
)

In [46]:
# Merge the landuse data with taxi_zone so that the locationID each building belongs to
gdf_landuse = gpd \
              .sjoin(gdf_landuse, gdf[['LocationID', 'geometry']], op='within') \
              .drop('latitude', axis=1) \
              .drop('longitude', axis=1)
gdf_landuse.head(5)

  if await self.run_code(code, result, async_=asy):
Use `to_crs()` to reproject one of the input geometries to match the CRS of the other.

Left CRS: EPSG:4326
Right CRS: +proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs + ...

  .sjoin(gdf_landuse, gdf[['LocationID', 'geometry']], op='within') \


Unnamed: 0,bldgarea,comarea,resarea,officearea,retailarea,numbldgs,geometry,index_right,LocationID
0,992,0,992,0,0,1,POINT (-73.97254 40.59910),20,21
1,992,0,992,0,0,1,POINT (-73.97248 40.59911),20,21
21,2200,0,2200,0,0,2,POINT (-73.98459 40.60198),20,21
23,3131,0,3131,0,0,1,POINT (-73.98529 40.60301),20,21
25,2200,0,2200,0,0,2,POINT (-73.98443 40.60116),20,21


Use a query to group buildings by locationID and calculate the proportion of different usage of areas in the region

In [47]:
sdf_landuse = spark.createDataFrame(gdf_landuse.to_wkt())
sdf_landuse.createOrReplaceTempView('building')

In [50]:
# Create features which represents the proportion of each landuse within each location ID 
sql_query = spark.sql("""
SELECT 
    LocationID, 
    sum(bldgarea) as total_bldg,
    sum(comarea) / total_bldg as prop_commertial,
    sum(resarea) / total_bldg as prop_residential,
    sum(officearea) / total_bldg as prop_office,
    sum(retailarea) / total_bldg as prop_retail
From
    building
GROUP BY
    LocationID
""")

landuse = sql_query
landuse.show(1, vertical=True, truncate=100)

23/08/20 14:24:00 WARN TaskSetManager: Stage 139 contains a task of very large size (4257 KiB). The maximum recommended task size is 1000 KiB.
[Stage 139:>                                                      (0 + 10) / 10]

-RECORD 0---------------------------------
 LocationID       | 22                    
 total_bldg       | 3.0588994E7           
 prop_commertial  | 0.05687781036538828   
 prop_residential | 0.8993525579821291    
 prop_office      | 7.012326067343045E-4  
 prop_retail      | 3.2498616986227137E-4 
only showing top 1 row



                                                                                

In [52]:
# Store the curated PLUTO dataframe
landuse.write.parquet("../data/curated/PLUTO.parquet")

23/08/20 14:24:45 WARN TaskSetManager: Stage 143 contains a task of very large size (4257 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

23/08/20 16:12:27 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 270551 ms exceeds timeout 120000 ms
23/08/20 16:12:27 WARN SparkContext: Killing executors is not supported by current scheduler.
23/08/20 16:12:35 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage.B