In [11]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import types
import pandas as pd

In [12]:
credentials_location = '/home/pedroh/.google/credentials/dataenginner-zoomcamp-7ade4ac1d455.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "./lib/gcs-connector-hadoop3-latest.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [13]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=test, master=local[*]) created by __init__ at /tmp/ipykernel_57677/2776130930.py:1 

In [14]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [46]:
fhv_schema = types.StructType([
    types.StructField("dispatching_base_num", types.StringType(), True),
    types.StructField("pickup_datetime", types.StringType(), True),
    types.StructField("dropOff_datetime", types.StringType(), True),
    types.StructField("PUlocationID", types.LongType(), True),
    types.StructField("DOlocationID", types.LongType(), True),
    types.StructField("SR_Flag", types.DoubleType(), True),
    types.StructField("Affiliated_base_number", types.StringType(), True)
])

In [77]:
df_fhv = spark.read.format('parquet').schema(fhv_schema).load('gs://taxi-data-ph/fhv/fhv_tripdata_2019-10.parquet')

In [78]:
from pyspark.sql.functions import *

In [79]:
df_fhv = df_fhv.withColumn("pickup_datetime", to_timestamp("pickup_datetime")).withColumn("dropOff_datetime", to_timestamp("dropOff_datetime"))

In [50]:
df_fhv.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: long (nullable = true)
 |-- DOlocationID: long (nullable = true)
 |-- SR_Flag: double (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [51]:
df_fhv.rdd.getNumPartitions()

4

In [52]:
df_fhv.repartition(6).write.parquet("gs://taxi-data-ph/pq/fhv/", mode="overwrite")

                                                                                

In [56]:
df_fhv.select("*").where("month(pickup_datetime) = 10 and day(pickup_datetime) = 15").count()

                                                                                

62610

In [80]:
df_fhv = df_fhv.withColumn("trip_time", ((col("dropOff_datetime").cast("long") - col("pickup_datetime").cast("long"))/3600))

In [84]:
df_fhv.createOrReplaceTempView("fhv")

In [86]:
spark.sql("""
            select day(pickup_datetime) as day, max(trip_time)
            from fhv
            group by 1
            order by 2 desc
    
""").show()



+---+------------------+
|day|    max(trip_time)|
+---+------------------+
| 11|          631152.5|
| 28|          631152.5|
| 31| 87672.44083333333|
|  1| 70128.02805555555|
| 17|            8794.0|
| 26| 8784.166666666666|
| 30|1464.5344444444445|
| 25|1056.8266666666666|
|  2| 769.2313888888889|
| 23| 745.6166666666667|
|  3|          745.3825|
|  4| 744.6166666666667|
|  7| 744.1666666666666|
|  5| 697.1808333333333|
|  6| 674.0077777777777|
|  8| 625.0822222222222|
| 16| 604.0666666666667|
|  9| 601.3102777777777|
| 10| 577.3888888888889|
| 12|          528.9125|
+---+------------------+
only showing top 20 rows



                                                                                

In [90]:
zone_df = spark.read.csv("/home/pedroh/dataengineer-zoomcamp2024/week6/code/notebooks/taxi+_zone_lookup.csv", header = True)

In [91]:
zone_df.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [92]:
join_df = df_fhv.join(zone_df, df_fhv.PUlocationID == zone_df.LocationID, "inner")

In [93]:
join_df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+--------------------+----------+-------+---------------+------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|           trip_time|LocationID|Borough|           Zone|service_zone|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+--------------------+----------+-------+---------------+------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|                 0.2|       264|Unknown|             NV|         N/A|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013| 0.03138888888888889|       264|Unknown|             NV|         N/A|
|              B00014|2019-10-01 00:11:43|2019-10-01 00

[Stage 22:>                                                         (0 + 1) / 1]                                                                                

In [101]:
join_df.select("LocationID", "Zone").groupBy("LocationID", "Zone").count().orderBy("count").show()



+----------+--------------------+-----+
|LocationID|                Zone|count|
+----------+--------------------+-----+
|         2|         Jamaica Bay|    1|
|       105|Governor's Island...|    2|
|       111| Green-Wood Cemetery|    5|
|        30|       Broad Channel|    8|
|       120|     Highbridge Park|   14|
|        12|        Battery Park|   15|
|       207|Saint Michaels Ce...|   23|
|        27|Breezy Point/Fort...|   25|
|       154|Marine Park/Floyd...|   26|
|         8|        Astoria Park|   29|
|       128|    Inwood Hill Park|   39|
|       253|       Willets Point|   47|
|        96|Forest Park/Highl...|   53|
|        34|  Brooklyn Navy Yard|   57|
|        59|        Crotona Park|   62|
|        58|        Country Club|   77|
|        99|     Freshkills Park|   89|
|       190|       Prospect Park|   98|
|        54|     Columbia Street|  105|
|       217|  South Williamsburg|  110|
+----------+--------------------+-----+
only showing top 20 rows

