In [1]:
import findspark

In [2]:
findspark.init('C:/spark-3.0.2-bin-hadoop2.7')

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql import types

In [8]:
spark = SparkSession.builder\
        .master('local[*]')\
        .appName('test')\
        .getOrCreate()

In [10]:
spark.version

'3.0.2'

In [11]:
schema = types.StructType([
        types.StructField('hvfhs_license_num', types.StringType(), True),
        types.StructField('dispatching_base_num', types.StringType(), True),
        types.StructField('pickup_datetime', types.TimestampType(), True),
        types.StructField('dropoff_datetime', types.TimestampType(), True),
        types.StructField('PULocationID', types.IntegerType(), True),
        types.StructField('DOLocationID', types.IntegerType(), True),
        types.StructField('SR_Flag', types.StringType(), True)    
])

In [12]:
%%time
df = spark.read\
     .option('header', 'true')\
     .schema(schema)\
     .csv('C:/Users/Hoe/Desktop/Learning/DataTalksClub_DE_ZoomCamp/Dataset/fhvhv_tripdata_2021-02.csv')

Wall time: 1.76 s


In [13]:
print(f'number of rows: {df.count()} & number of columns: {len(df.columns)}')

number of rows: 11613942 & number of columns: 7


In [14]:
df.show(5)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02764|2021-02-01 00:10:40|2021-02-01 00:21:09|          35|          39|   null|
|           HV0003|              B02764|2021-02-01 00:27:23|2021-02-01 00:44:01|          39|          35|   null|
|           HV0005|              B02510|2021-02-01 00:28:38|2021-02-01 00:38:27|          39|          91|   null|
|           HV0005|              B02510|2021-02-01 00:43:37|2021-02-01 01:23:20|          91|         228|   null|
|           HV0003|              B02872|2021-02-01 00:08:42|2021-02-01 00:17:57|         126|         250|   null|
+-----------------+--------------------+-------------------+-------------------+

In [15]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



### Question 3

In [16]:
%%time
df.filter((df['pickup_datetime'] >= '2021-02-15 00:00:00') & (df['pickup_datetime'] <= '2021-02-15 23:59:59')).count()

367170

### Question 4

In [17]:
df = df.withColumn('DiffInSeconds', df.dropoff_datetime.cast("long") - df.pickup_datetime.cast("long"))
    #.withColumn('time_diff', (df.dropoff_datetime - df.pickup_datetime))
    

In [18]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- DiffInSeconds: long (nullable = true)



In [19]:
df.registerTempTable('df')

In [20]:
%%time
spark.sql(
"""
SELECT DiffInSeconds
FROM df 
ORDER BY DiffInSeconds DESC
"""

).show()

+-------------+
|DiffInSeconds|
+-------------+
|        75540|
|        57221|
|        44039|
|        40653|
|        37577|
|        35010|
|        34806|
|        34612|
|        34555|
|        34169|
|        32476|
|        32439|
|        32223|
|        32087|
|        31447|
|        30913|
|        30856|
|        30732|
|        30660|
|        30511|
+-------------+
only showing top 20 rows

Wall time: 1min 32s


In [21]:
%%time
spark.sql(
"""
SELECT pickup_datetime
FROM df 
WHERE DiffInSeconds = 75540
"""

).show()

+-------------------+
|    pickup_datetime|
+-------------------+
|2021-02-11 13:40:44|
+-------------------+

Wall time: 2min 28s


### Question 5

In [22]:
df.registerTempTable('df')

In [23]:
%%time
spark.sql(
"""
SELECT dispatching_base_num, count(*) AS count
FROM df 
GROUP BY dispatching_base_num
ORDER BY count DESC
"""

).show()

+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
|              B02869| 429720|
|              B02887| 322331|
|              B02871| 312364|
|              B02864| 311603|
|              B02866| 311089|
|              B02878| 305185|
|              B02682| 303255|
|              B02617| 274510|
|              B02883| 251617|
|              B02884| 244963|
|              B02882| 232173|
|              B02876| 215693|
|              B02879| 210137|
|              B02867| 200530|
|              B02877| 198938|
+--------------------+-------+
only showing top 20 rows

Wall time: 6.75 s


### Question 6

In [24]:
%%time
taxi_lookup = spark.read\
     .option('header', 'true')\
     .csv('C:/Users/Hoe/Desktop/Learning/DataTalksClub_DE_ZoomCamp/taxi+_zone_lookup.csv')

Wall time: 334 ms


In [25]:
taxi_lookup.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [26]:
taxi_lookup.registerTempTable('taxi_lookup')

In [27]:
%%time
spark.sql(
"""
SELECT CONCAT(coalesce(b.Zone, 'Unknown') , ' / ' , coalesce(c.Zone, 'Unknown')) AS location, count(*) AS COUNT
FROM df a
LEFT JOIN taxi_lookup b on a.PULocationID = b.LocationID
LEFT JOIN taxi_lookup c on a.DOLocationID = c.LocationID
GROUP BY CONCAT(coalesce(b.Zone, 'Unknown') , ' / ' , coalesce(c.Zone, 'Unknown'))
ORDER BY COUNT DESC
"""

).show()

+--------------------+-----+
|            location|COUNT|
+--------------------+-----+
|East New York / E...|45041|
|Borough Park / Bo...|37329|
| Canarsie / Canarsie|28026|
|Crown Heights Nor...|25976|
|Bay Ridge / Bay R...|17934|
|Jackson Heights /...|14688|
|   Astoria / Astoria|14688|
|Central Harlem No...|14481|
|Bushwick South / ...|14424|
|Flatbush/Ditmas P...|13976|
|South Ozone Park ...|13716|
|Brownsville / Bro...|12829|
|    JFK Airport / NA|12542|
|Prospect-Lefferts...|11814|
|Forest Hills / Fo...|11548|
|Bushwick North / ...|11491|
|Bushwick South / ...|11487|
|Crown Heights Nor...|11462|
|Crown Heights Nor...|11342|
|Prospect-Lefferts...|11308|
+--------------------+-----+
only showing top 20 rows

Wall time: 9.97 s
