In [2]:
import pyspark
from pyspark.sql import SparkSession
import warnings
from pyspark.sql.types import StructField, StructType, IntegerType, DateType, StringType, TimestampType, DoubleType, LongType
import pyspark.sql.functions as F
warnings.filterwarnings("ignore")

In [3]:
spark = SparkSession.builder.master("local[*]").appName("intro").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/04 11:37:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df = spark.read.option("header", "true").parquet("./data/stage/green_tripdata_2020-01_partitioned")
df.show(5)

                                                                                

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2020-01-12 10:42:30|  2020-01-12 11:00:17|                 N|      null|         127|         239|           null|          7.9|       24.5|  0.0|    0.

In [11]:
df = df.withColumn("lpep_pickup_date", F.to_date(df["lpep_pickup_datetime"]))\
       .withColumn("lpep_dropoff_date", F.to_date(df["lpep_dropoff_datetime"]))

In [15]:
df.filter(df["lpep_pickup_date"] == F.lit("2020-01-01")).count()

10970

In [16]:
df.createOrReplaceTempView("green_data")

In [20]:
query = """
SELECT COUNT(*) AS n_trips FROM green_data WHERE lpep_pickup_date == DATE('2020-01-01');
"""
spark.sql(query).show()

+-------+
|n_trips|
+-------+
|  10970|
+-------+



In [26]:
df_result = spark.sql("""
SELECT 
    -- Reveneue grouping 
    PULocationID AS revenue_zone,
    date_trunc('month', lpep_pickup_datetime) AS revenue_month,
    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,
    -- Additional calculations
    AVG(passenger_count) AS avg_montly_passenger_count,
    AVG(trip_distance) AS avg_montly_trip_distance
FROM
    green_data
GROUP BY
    1, 2
""")
df_result.show()

[Stage 26:>                                                         (0 + 4) / 4]

+------------+-------------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|revenue_zone|      revenue_month|revenue_monthly_fare|revenue_monthly_extra|revenue_monthly_mta_tax|revenue_monthly_tip_amount|revenue_monthly_tolls_amount|revenue_monthly_improvement_surcharge|revenue_monthly_total_amount|revenue_monthly_congestion_surcharge|avg_montly_passenger_count|avg_montly_trip_distance|
+------------+-------------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|         190|2020-01-01 00:00:00|  3786.0200000000004|   

                                                                                

In [30]:
query = """
SELECT date_trunc('day', lpep_pickup_date) AS pickup_day,
       COUNT(*) AS n_trips 
       FROM green_data
       GROUP BY 1 ORDER BY 1 ASC;
"""
spark.sql(query).show()

+-------------------+-------+
|         pickup_day|n_trips|
+-------------------+-------+
|2008-12-31 00:00:00|      1|
|2009-01-01 00:00:00|     12|
|2010-09-23 00:00:00|      3|
|2019-12-18 00:00:00|      1|
|2019-12-31 00:00:00|     18|
|2020-01-01 00:00:00|  10970|
|2020-01-02 00:00:00|  15023|
|2020-01-03 00:00:00|  16062|
|2020-01-04 00:00:00|  14295|
|2020-01-05 00:00:00|  12732|
|2020-01-06 00:00:00|  16323|
|2020-01-07 00:00:00|  17027|
|2020-01-08 00:00:00|  17586|
|2020-01-09 00:00:00|  18491|
|2020-01-10 00:00:00|  18307|
|2020-01-11 00:00:00|  15175|
|2020-01-12 00:00:00|  13442|
|2020-01-13 00:00:00|  14288|
|2020-01-14 00:00:00|  13977|
|2020-01-15 00:00:00|  14157|
+-------------------+-------+
only showing top 20 rows



In [32]:
location_lookup = spark.read.option("header", "true").csv("/home/Ezz/week_5/data/taxi+_zone_lookup.csv")
location_lookup.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [41]:
# join using spark...
# because of zones lookup is very small then each executor takes a copy.
temp_df = df.select("VendorID", "lpep_pickup_date", "lpep_dropoff_datetime",
                    "PULocationID", "DOLocationID", "passenger_count", "total_amount", "trip_distance")
merged_df = temp_df.alias("base").join(location_lookup.alias("a"),
                                       F.col("base.PULocationID") == F.col("a.LocationID"),
                                       how = "left")\
                                 .join(location_lookup.alias("b"),
                                       F.col("base.DOLocationID") == F.col("b.LocationID"),
                                       how = "left")\
                                 .select(F.col("a.Zone").alias("PUZone"), F.col("b.Zone").alias("DOZone"), "trip_distance")
merged_df.columns

['PUZone', 'DOZone', 'trip_distance']

In [42]:
merged_df.show(10)

+--------------------+--------------------+-------------+
|              PUZone|              DOZone|trip_distance|
+--------------------+--------------------+-------------+
|              Inwood|Upper West Side S...|          7.9|
|        Forest Hills|   Kew Gardens Hills|         2.06|
|Long Island City/...|            Woodside|         2.54|
|    Elmhurst/Maspeth|            Woodside|         0.13|
|      Manhattanville|   East Harlem North|          1.2|
|Washington Height...|Central Harlem North|         1.77|
|   East Harlem North|       Melrose South|         2.24|
|    Brooklyn Heights|    Brooklyn Heights|         0.47|
|    Brooklyn Heights|East Flatbush/Far...|         6.97|
|        Clinton Hill|         Boerum Hill|         1.66|
+--------------------+--------------------+-------------+
only showing top 10 rows



In [43]:
# using spark sql
location_lookup.createOrReplaceTempView("lookup")
merged_df = spark.sql("""
SELECT a.Zone AS PUZone,
       b.Zone AS DOZone,
       base.trip_distance
FROM green_data AS base
LEFT JOIN lookup AS a ON base.PULocationID = a.LocationID
LEFT JOIN lookup AS b ON base.DOLocationID = b.LocationID
""")

In [44]:
merged_df.show(10)

+--------------------+--------------------+-------------+
|              PUZone|              DOZone|trip_distance|
+--------------------+--------------------+-------------+
|              Inwood|Upper West Side S...|          7.9|
|        Forest Hills|   Kew Gardens Hills|         2.06|
|Long Island City/...|            Woodside|         2.54|
|    Elmhurst/Maspeth|            Woodside|         0.13|
|      Manhattanville|   East Harlem North|          1.2|
|Washington Height...|Central Harlem North|         1.77|
|   East Harlem North|       Melrose South|         2.24|
|    Brooklyn Heights|    Brooklyn Heights|         0.47|
|    Brooklyn Heights|East Flatbush/Far...|         6.97|
|        Clinton Hill|         Boerum Hill|         1.66|
+--------------------+--------------------+-------------+
only showing top 10 rows

