In [104]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [105]:
df_fhvhv = spark.read \
    .option("header", "true") \
        .csv('../code/data/raw/fhvhv/2021/06')

In [106]:
df_fhvhv.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|
|              B02510|2021-06-01 00:45:42|2021-06-01 01:03:33|         144|         146|      N|                  null|
|              B02510|2021-06-01 00:18:1

In [107]:
df_fhvhv.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [108]:
import pandas as pd

In [109]:
df_fhvhv_pd = pd.read_csv('../code/data/raw/fhvhv/2021/06/fhvhv_tripdata_2021_06.csv.gz', nrows = 1000)

In [110]:
df_fhvhv_pd['Affiliated_base_number'] = df_fhvhv_pd['Affiliated_base_number'].fillna('')
df_fhvhv_pd.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 7 columns):
 #   Column                  Non-Null Count  Dtype 
---  ------                  --------------  ----- 
 0   dispatching_base_num    1000 non-null   object
 1   pickup_datetime         1000 non-null   object
 2   dropoff_datetime        1000 non-null   object
 3   PULocationID            1000 non-null   int64 
 4   DOLocationID            1000 non-null   int64 
 5   SR_Flag                 1000 non-null   object
 6   Affiliated_base_number  1000 non-null   object
dtypes: int64(2), object(5)
memory usage: 54.8+ KB


In [111]:
df_fhvhv_pd.head()

Unnamed: 0,dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag,Affiliated_base_number
0,B02764,2021-06-01 00:02:41,2021-06-01 00:07:46,174,18,N,B02764
1,B02764,2021-06-01 00:16:16,2021-06-01 00:21:14,32,254,N,B02764
2,B02764,2021-06-01 00:27:01,2021-06-01 00:42:11,240,127,N,B02764
3,B02764,2021-06-01 00:46:08,2021-06-01 00:53:45,127,235,N,B02764
4,B02510,2021-06-01 00:45:42,2021-06-01 01:03:33,144,146,N,


In [112]:
spark.createDataFrame(df_fhvhv_pd).schema

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [113]:
from pyspark.sql import types

In [114]:
fhvhv_schema = types.StructType([
types.StructField('dispatching_base_num', types.StringType(), True), 
types.StructField('pickup_datetime', types.TimestampType(), True), 
types.StructField('dropoff_datetime', types.TimestampType(), True), 
types.StructField('PULocationID', types.IntegerType(), True), 
types.StructField('DOLocationID', types.IntegerType(), True), 
types.StructField('SR_Flag', types.StringType()), 
types.StructField('Affiliated_base_number', types.StringType(), True)
])


year = 2021

for month in range(6, 7):
    print(f'processing data for {year}/{month}')

    input_path = f'../code/data/raw/fhvhv/{year}/{month:02d}/'
    output_path = f'../code/data/pq/fhvhv/{year}/{month:02d}/'

    df_fhvhv = spark.read \
        .option("header", "true") \
        .schema(fhvhv_schema) \
        .csv(input_path)

    df_fhvhv \
        .repartition(12) \
        .write.parquet(output_path)

Question 3:  
How many taxi trips were there on June 15?  
Consider only trips that started on June 15.  

In [115]:
df_fhvhv_pq = spark.read.parquet('../code/data/pq/fhvhv/2021/06*')

In [116]:
df_fhvhv_pq.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02889|2021-06-04 20:51:44|2021-06-04 21:10:12|         239|         158|      N|                B02889|
|              B02800|2021-06-04 15:50:15|2021-06-04 16:19:29|          75|         116|      N|                  null|
|              B02510|2021-06-02 21:03:38|2021-06-02 21:10:12|         167|         168|      N|                  null|
|              B02867|2021-06-02 12:51:57|2021-06-02 13:05:09|         151|         142|      N|                B02867|
|              B02869|2021-06-21 09:51:45|2021-06-21 10:09:17|         106|          65|      N|                B02869|
+--------------------+------------------

In [117]:
from pyspark.sql.functions import count, date_format, col

In [118]:
df_fhvhv_pq.select(date_format(col("pickup_datetime"), "dd.MM.yyyy").alias("pickup_date")) \
    .filter('pickup_date == "15.06.2021"').count()

                                                                                

452470

In [119]:
df_fhvhv_pq.registerTempTable('fhvhv_data')



In [120]:
spark.sql("""


SELECT
(bigint(dropoff_datetime) - bigint(pickup_datetime)) /3600 as test
FROM fhvhv_data
ORDER BY (bigint(dropoff_datetime) - bigint(pickup_datetime)) DESC
LIMIT 1
"""


).show()

[Stage 91:>                                                         (0 + 4) / 4]

+----------------+
|            test|
+----------------+
|66.8788888888889|
+----------------+



                                                                                

## Question 6: 

**Most frequent pickup location zone**
Using the zone lookup data and the fhvhv June 2021 data,  
what is the name of the most frequent pickup location zone?  

In [121]:
!wget -nc https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

File ‘taxi_zone_lookup.csv’ already there; not retrieving.



In [122]:
taxi_zones = spark.read \
            .option("Header", "True") \
            .csv('taxi_zone_lookup.csv')

In [123]:
taxi_zones.createOrReplaceTempView('taxi_zones_view')

In [124]:
spark.sql("""


SELECT 
*
FROM fhvhv_data
LIMIT 1
"""


).show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02889|2021-06-04 20:51:44|2021-06-04 21:10:12|         239|         158|      N|                B02889|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+



In [125]:
spark.sql("""
SELECT 
taxi_zones_view.Zone,
COUNT(*)
FROM fhvhv_data 
LEFT JOIN taxi_zones_view
ON fhvhv_data.PULocationID = taxi_zones_view.LocationID
GROUP BY taxi_zones_view.Zone
ORDER BY COUNT(*) DESC
LIMIT 10

""").show()

[Stage 98:>                                                         (0 + 4) / 4]

+--------------------+--------+
|                Zone|count(1)|
+--------------------+--------+
| Crown Heights North|  231279|
|        East Village|  221244|
|         JFK Airport|  188867|
|      Bushwick South|  187929|
|       East New York|  186780|
|TriBeCa/Civic Center|  164344|
|   LaGuardia Airport|  161596|
|            Union Sq|  158937|
|        West Village|  154698|
|             Astoria|  152493|
+--------------------+--------+



                                                                                