In [16]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('hw_5') \
    .getOrCreate()

In [17]:
spark.version

'3.0.3'

In [5]:
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

--2022-02-27 17:33:57--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.217.160.113
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.217.160.113|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘fhvhv_tripdata_2021-02.csv’


2022-02-27 17:34:21 (30.5 MB/s) - ‘fhvhv_tripdata_2021-02.csv’ saved [733822658/733822658]



In [18]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)])

In [25]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-02.csv')

In [26]:
df.repartition(24).write.parquet('data/pq/fhvhv', mode='overwrite')

                                                                                

In [27]:
!ls -lh /home/khv/code/data/pq/fhvhv

total 208M
-rw-r--r-- 1 khv khv    0 Feb 27 18:12 _SUCCESS
-rw-r--r-- 1 khv khv 8.7M Feb 27 18:12 part-00000-be245855-98ba-4519-a032-b2a774918482-c000.snappy.parquet
-rw-r--r-- 1 khv khv 8.7M Feb 27 18:12 part-00001-be245855-98ba-4519-a032-b2a774918482-c000.snappy.parquet
-rw-r--r-- 1 khv khv 8.7M Feb 27 18:12 part-00002-be245855-98ba-4519-a032-b2a774918482-c000.snappy.parquet
-rw-r--r-- 1 khv khv 8.7M Feb 27 18:12 part-00003-be245855-98ba-4519-a032-b2a774918482-c000.snappy.parquet
-rw-r--r-- 1 khv khv 8.7M Feb 27 18:12 part-00004-be245855-98ba-4519-a032-b2a774918482-c000.snappy.parquet
-rw-r--r-- 1 khv khv 8.7M Feb 27 18:12 part-00005-be245855-98ba-4519-a032-b2a774918482-c000.snappy.parquet
-rw-r--r-- 1 khv khv 8.7M Feb 27 18:12 part-00006-be245855-98ba-4519-a032-b2a774918482-c000.snappy.parquet
-rw-r--r-- 1 khv khv 8.7M Feb 27 18:12 part-00007-be245855-98ba-4519-a032-b2a774918482-c000.snappy.parquet
-rw-r--r-- 1 khv khv 8.7M Feb 27 18:12 part-00008-be245855-98ba-4519-a032-b

In [28]:
df = spark.read.parquet('data/pq/fhvhv')

In [29]:
df.registerTempTable('data')

In [31]:
#Q3
spark.sql("""
SELECT COUNT(*) AS cnt
  FROM data
 WHERE pickup_datetime >= '2021-02-15 00:00:00'
   AND pickup_datetime < '2021-02-16 00:00:00';
""").show()



+------+
|   cnt|
+------+
|367170|
+------+



                                                                                

In [40]:
df = df.withColumn("trip_duration_ms", \
                   (F.col("dropoff_datetime").cast("long") - F.col("pickup_datetime").cast("long")))

In [45]:
#Q4
df.select( \
          F.col('pickup_datetime').cast('date'), \
          'trip_duration_ms'
         ).orderBy(df.trip_duration_ms.desc()).show()



+---------------+----------------+
|pickup_datetime|trip_duration_ms|
+---------------+----------------+
|     2021-02-11|           75540|
|     2021-02-17|           57221|
|     2021-02-20|           44039|
|     2021-02-03|           40653|
|     2021-02-19|           37577|
|     2021-02-25|           35010|
|     2021-02-20|           34806|
|     2021-02-18|           34612|
|     2021-02-18|           34555|
|     2021-02-10|           34169|
|     2021-02-10|           32476|
|     2021-02-25|           32439|
|     2021-02-21|           32223|
|     2021-02-09|           32087|
|     2021-02-06|           31447|
|     2021-02-02|           30913|
|     2021-02-10|           30856|
|     2021-02-09|           30732|
|     2021-02-21|           30660|
|     2021-02-05|           30511|
+---------------+----------------+
only showing top 20 rows



                                                                                

In [46]:
#Q5
spark.sql("""
SELECT 
    dispatching_base_num
    , COUNT(*) AS cnt
  FROM data
 GROUP BY 
     dispatching_base_num
 ORDER BY 
     COUNT(*) DESC;
""").show()



+--------------------+-------+
|dispatching_base_num|    cnt|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
|              B02869| 429720|
|              B02887| 322331|
|              B02871| 312364|
|              B02864| 311603|
|              B02866| 311089|
|              B02878| 305185|
|              B02682| 303255|
|              B02617| 274510|
|              B02883| 251617|
|              B02884| 244963|
|              B02882| 232173|
|              B02876| 215693|
|              B02879| 210137|
|              B02867| 200530|
|              B02877| 198938|
+--------------------+-------+
only showing top 20 rows



                                                                                

In [53]:
df_pu_zones = spark.read.parquet('zones/')

df_pu_zones = df_pu_zones \
    .withColumnRenamed('Borough', 'pickup_borough') \
    .withColumnRenamed('Zone', 'pickup_zone')

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [56]:
df_do_zones = spark.read.parquet('zones/')

df_do_zones = df_do_zones \
    .withColumnRenamed('Borough', 'dropoff_borough') \
    .withColumnRenamed('Zone', 'dropoff_zone')

In [65]:
df_join_zones = df \
    .join(df_pu_zones, df.PULocationID == df_pu_zones.LocationID, how='outer') \
    .join(df_do_zones, df.DOLocationID == df_do_zones.LocationID, how='outer')

In [66]:
df_join_zones.registerTempTable('data_zones')

In [67]:
#Q6
spark.sql("""
SELECT 
    pickup_borough
    , pickup_zone
    , dropoff_borough
    , dropoff_zone
    , COUNT(*) AS cnt
  FROM data_zones
 GROUP BY 
     pickup_borough
    , pickup_zone
    , dropoff_borough
    , dropoff_zone
 ORDER BY 
     COUNT(*) DESC;
""").show()



+--------------+--------------------+---------------+--------------------+-----+
|pickup_borough|         pickup_zone|dropoff_borough|        dropoff_zone|  cnt|
+--------------+--------------------+---------------+--------------------+-----+
|      Brooklyn|       East New York|       Brooklyn|       East New York|45041|
|      Brooklyn|        Borough Park|       Brooklyn|        Borough Park|37329|
|      Brooklyn|            Canarsie|       Brooklyn|            Canarsie|28026|
|      Brooklyn| Crown Heights North|       Brooklyn| Crown Heights North|25976|
|      Brooklyn|           Bay Ridge|       Brooklyn|           Bay Ridge|17934|
|        Queens|     Jackson Heights|         Queens|     Jackson Heights|14688|
|        Queens|             Astoria|         Queens|             Astoria|14688|
|     Manhattan|Central Harlem North|      Manhattan|Central Harlem North|14481|
|      Brooklyn|      Bushwick South|       Brooklyn|      Bushwick South|14424|
|      Brooklyn|Flatbush/Dit



In [None]:
spark.stop()