In [29]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import functions as F

In [4]:
credentials_location = '/home/realadmin/.google/creds/zcamp-spark-28a6151fa616.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "/home/realadmin/spark/spark-3.5.5-bin-hadoop3/jars/gcs-connector-hadoop3-latest.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [5]:
sc = SparkContext.getOrCreate(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

25/03/07 22:35:54 WARN Utils: Your hostname, biccboii resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/03/07 22:35:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/03/07 22:35:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [9]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [11]:
spark.version

'3.5.5'

In [13]:
df = spark.read.parquet('gs://zcamp-nyc-taxi/pq/yellow/2024/*')

                                                                                

In [14]:
df = df.repartition(4)

In [15]:
df.write.parquet('gs://zcamp-nyc-taxi/yellow/2024/10/', mode='overwrite')

                                                                                

In [33]:
df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee']

In [16]:
df.createOrReplaceTempView('trips')

In [20]:
df_result = spark.sql("""
    SELECT * FROM trips LIMIT 10;
""").show()

                                                                                

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2024-10-07 16:40:43|  2024-10-07 18:10:56|              1|         14.8|        99|                 N|         127|         225|           1|       47.5|  0.0|    0.5|       0.

In [55]:
df.filter(F.to_date(df.tpep_pickup_datetime) == "2024-10-15").count()

                                                                                

128893

In [23]:
df_15th = spark.sql("""
    SELECT COUNT(1) FROM trips
    WHERE CAST(tpep_pickup_datetime AS Date) = '2024-10-15'
""").show()



+--------+
|count(1)|
+--------+
|  128893|
+--------+



                                                                                

In [37]:
df_long = spark.sql("""
    SELECT DATEDIFF(hour,tpep_pickup_datetime, tpep_dropoff_datetime) as length
    FROM trips
    ORDER BY length DESC
    LIMIT 1;
""").show()



+------+
|length|
+------+
|   162|
+------+



                                                                                

In [43]:
df_zones = spark.read.csv('taxi_zone_lookup.csv', header=True)

In [44]:
df_zones.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [41]:
df_zones.createOrReplaceTempView('zones')

In [47]:
df_trips_zones = df.join(df_zones, df.PULocationID == df_zones.LocationID).drop('LocationID')

In [48]:
df_trips_zones.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee',
 'Borough',
 'Zone',
 'service_zone']

In [49]:
df_trips_zones.createOrReplaceTempView('trips_zones')

In [53]:
df_least = spark.sql("""
    SELECT PULocationID
    ,Zone
    ,COUNT(1) AS total_pickups
    FROM trips_zones
    GROUP BY PULocationID, Zone
    ORDER BY total_pickups ASC
    LIMIT 1;
""").show()



+------------+--------------------+-------------+
|PULocationID|                Zone|total_pickups|
+------------+--------------------+-------------+
|         105|Governor's Island...|            1|
+------------+--------------------+-------------+



                                                                                