In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder \
   .appName("Neural Network Model") \
   .config("spark.executor.memory", "3gb") \
   .getOrCreate()
   
sc = spark.sparkContext

In [6]:
sc

In [7]:
spark.version

'3.0.3'

In [8]:
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

--2022-03-01 12:30:30--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.217.227.177
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.217.227.177|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘fhvhv_tripdata_2021-02.csv.1’


2022-03-01 12:30:41 (68.6 MB/s) - ‘fhvhv_tripdata_2021-02.csv.1’ saved [733822658/733822658]



In [9]:
import pandas as pd
import pyspark
from pyspark.sql import types, SparkSession, functions as F

In [10]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [11]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('/content/fhvhv_tripdata_2021-02.csv')
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [12]:
df.repartition(24) \
    .write.parquet('parquet/')

In [15]:
df_fhvhv = spark.read.parquet('/content/parquet/*')

In [16]:
df_fhvhv.columns

['hvfhs_license_num',
 'dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag']

In [17]:
df_fhvhv = df_fhvhv \
    .withColumn('pickup_date', F.to_date('pickup_datetime'))

In [18]:
df_fhvhv.printSchema()
df_fhvhv.show(10)

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- pickup_date: date (nullable = true)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|pickup_date|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+
|           HV0003|              B02887|2021-02-06 01:18:35|2021-02-06 01:40:34|         163|         235|   null| 2021-02-06|
|           HV0005|              B02510|2021-02-05 07:13:06|2021-02-05 07:31:56|         225|         181|   null|

In [19]:
df_fhvhv.registerTempTable('trips_data')

In [20]:
spark.sql("""
SELECT 
    COUNT(*) AS trips
FROM 
    trips_data
WHERE 
    pickup_date == '2021-02-15'
""").show()

+------+
| trips|
+------+
|367170|
+------+



In [21]:
spark.sql("""
SELECT 
    pickup_date,
    pickup_datetime,
    dropoff_datetime,
    (dropoff_datetime - pickup_datetime) AS trip_duration,
    (BIGINT(dropoff_datetime) - BIGINT(pickup_datetime)) AS trip_duration_int
FROM 
    trips_data
ORDER BY 
    trip_duration_int DESC
""").show(10)

+-----------+-------------------+-------------------+--------------------+-----------------+
|pickup_date|    pickup_datetime|   dropoff_datetime|       trip_duration|trip_duration_int|
+-----------+-------------------+-------------------+--------------------+-----------------+
| 2021-02-11|2021-02-11 13:40:44|2021-02-12 10:39:44| 20 hours 59 minutes|            75540|
| 2021-02-17|2021-02-17 15:54:53|2021-02-18 07:48:34|15 hours 53 minut...|            57221|
| 2021-02-20|2021-02-20 12:08:15|2021-02-21 00:22:14|12 hours 13 minut...|            44039|
| 2021-02-03|2021-02-03 20:24:25|2021-02-04 07:41:58|11 hours 17 minut...|            40653|
| 2021-02-19|2021-02-19 23:17:44|2021-02-20 09:44:01|10 hours 26 minut...|            37577|
| 2021-02-25|2021-02-25 17:13:35|2021-02-26 02:57:05|9 hours 43 minute...|            35010|
| 2021-02-20|2021-02-20 01:36:13|2021-02-20 11:16:19|9 hours 40 minute...|            34806|
| 2021-02-18|2021-02-18 15:24:19|2021-02-19 01:01:11|9 hours 36 minute

In [22]:
spark.sql("""
SELECT 
    dispatching_base_num,
    COUNT(*) AS trips
FROM 
    trips_data
GROUP BY 
    dispatching_base_num
ORDER BY 
    trips DESC
""").show(10)

+--------------------+-------+
|dispatching_base_num|  trips|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
|              B02869| 429720|
|              B02887| 322331|
|              B02871| 312364|
|              B02864| 311603|
|              B02866| 311089|
+--------------------+-------+
only showing top 10 rows



In [23]:
!wget --directory-prefix=data/raw/zones https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

--2022-03-01 13:19:40--  https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 54.231.135.40
Connecting to s3.amazonaws.com (s3.amazonaws.com)|54.231.135.40|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [application/octet-stream]
Saving to: ‘data/raw/zones/taxi+_zone_lookup.csv.1’


2022-03-01 13:19:40 (130 MB/s) - ‘data/raw/zones/taxi+_zone_lookup.csv.1’ saved [12322/12322]



In [24]:
zones_schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True),
])

In [25]:
df_zones = spark.read \
    .option("header", "true") \
    .schema(zones_schema) \
    .csv('data/raw/zones/taxi+_zone_lookup.csv')

In [26]:
df_zones.printSchema()
df_zones.show(10)

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
+----------+-------------+-------------

In [27]:
df_zpu = df_zones \
    .withColumnRenamed('LocationID', 'zpuLocationID') \
    .withColumnRenamed('Zone', 'zpuZone') \
    .drop('Borough', 'service_zone')

df_zdo = df_zones \
    .withColumnRenamed('LocationID', 'zdoLocationID') \
    .withColumnRenamed('Zone', 'zdoZone') \
    .drop('Borough', 'service_zone')

In [28]:
df_join = df_fhvhv \
    .join(df_zpu, df_fhvhv.PULocationID == df_zpu.zpuLocationID) \
    .join(df_zdo, df_fhvhv.DOLocationID == df_zdo.zdoLocationID)

In [29]:
df_join.show(10)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+-------------+--------------------+-------------+--------------------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|pickup_date|zpuLocationID|             zpuZone|zdoLocationID|             zdoZone|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+-------------+--------------------+-------------+--------------------+
|           HV0003|              B02887|2021-02-06 01:18:35|2021-02-06 01:40:34|         163|         235|   null| 2021-02-06|          163|       Midtown North|          235|University Height...|
|           HV0005|              B02510|2021-02-05 07:13:06|2021-02-05 07:31:56|         225|         181|   null| 2021-02-05|          225|  Stuyvesant Heights|          181|          Park Slope|
|           HV0

In [31]:
df_join \
    .drop('SR_Flag', 'pickup_date', 'zpuLocationID', 'zdoLocationID') \
    .write.parquet('/content/parquet/locations-pairs')

In [32]:
df_join.registerTempTable('locations_table')

In [33]:
spark.sql("""
SELECT 
    CONCAT(COALESCE(zpuZone, 'Unknown'), '/', COALESCE(zdoZone, 'Unknown')) AS locations,
    COUNT(1) AS trips
FROM 
    locations_table
GROUP BY 
    locations
ORDER BY 
    trips DESC
""").show(10)

+--------------------+-----+
|           locations|trips|
+--------------------+-----+
|East New York/Eas...|45041|
|Borough Park/Boro...|37329|
|   Canarsie/Canarsie|28026|
|Crown Heights Nor...|25976|
| Bay Ridge/Bay Ridge|17934|
|Jackson Heights/J...|14688|
|     Astoria/Astoria|14688|
|Central Harlem No...|14481|
|Bushwick South/Bu...|14424|
|Flatbush/Ditmas P...|13976|
+--------------------+-----+
only showing top 10 rows

