In [3]:
import pyspark
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.getOrCreate()

In [10]:
df_fhv = spark.read.parquet('fhv/2019/10')

In [11]:
df_fhv.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|     B00889         |2019-10-01 15:38:29|2019-10-01 15:56:54|         129|          92|   null|       B00889         |
|              B01239|2019-10-02 10:17:37|2019-10-02 10:33:46|         264|         241|   null|                B01239|
|              B01745|2019-10-01 14:07:24|2019-10-01 14:19:02|         264|         215|   null|                B01745|
|              B00256|2019-10-02 13:05:34|2019-10-02 13:54:04|         264|         264|   null|                B00256|
|              B03060|2019-10-01 13:33:31|2019-10-01 13:49:13|         264|         155|   null|                B02875|
|              B02120|2019-10-02 16:00:0

In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

df_fhv.createOrReplaceTempView('fhv_data')

In [13]:
query = """
    SELECT COUNT(*) 
    FROM fhv_data 
    WHERE 
    CAST(pickup_datetime AS DATE) = '2019-10-15'
"""

In [14]:
result = spark.sql(query)

In [15]:
count_value = result.collect()[0][0]

In [16]:
print(f"The number of taxi trips on 2019-10-15 is: {count_value}")

The number of taxi trips on 2019-10-15 is: 62610


In [17]:
query = """
    SELECT 
        MAX((UNIX_TIMESTAMP(dropoff_datetime) - UNIX_TIMESTAMP(pickup_datetime)) / 3600) as max_trip_length
    FROM fhv_data
"""

result = spark.sql(query)

max_trip_length = result.collect()[0]['max_trip_length']

print(f"The length of the longest trip in hours is: {max_trip_length}")

The length of the longest trip in hours is: 631152.5


In [18]:
df_zone = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')
df_zone.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [19]:
df_zone.schema

StructType([StructField('LocationID', StringType(), True), StructField('Borough', StringType(), True), StructField('Zone', StringType(), True), StructField('service_zone', StringType(), True)])

In [20]:
import pandas as pd
df_pandas = pd.read_csv('taxi_zone_lookup.csv')
df_pandas.dtypes

LocationID       int64
Borough         object
Zone            object
service_zone    object
dtype: object

In [21]:
from pyspark.sql import types
schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True)
])

In [22]:
df_zone = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('taxi_zone_lookup.csv')
df_zone.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [34]:
df_zone.write.parquet('fhv_zone/2019/10/')

In [35]:
df_zone = spark.read.parquet('fhv_zone/2019/10/')

In [37]:
df_zone.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [97]:
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql.functions import col

df_fhv.registerTempTable('fhv_data_df')



In [100]:
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql.functions import col

df_zone.registerTempTable('zones')

In [108]:
df_result = spark.sql("""
SELECT
    pul.Zone,
    COUNT(1)
FROM 
    fhv_data_df fhv LEFT JOIN zones pul ON fhv.PULocationID = pul.LocationID
                      LEFT JOIN zones dol ON fhv.DOLocationID = dol.LocationID
GROUP BY 
    1
ORDER BY
    2 ASC
LIMIT 10;
""").show()

+--------------------+--------+
|                Zone|count(1)|
+--------------------+--------+
|         Jamaica Bay|       1|
|Governor's Island...|       2|
| Green-Wood Cemetery|       5|
|       Broad Channel|       8|
|     Highbridge Park|      14|
|        Battery Park|      15|
|Saint Michaels Ce...|      23|
|Breezy Point/Fort...|      25|
|Marine Park/Floyd...|      26|
|        Astoria Park|      29|
+--------------------+--------+

