In [1]:
import pyspark 
from pyspark.sql import SparkSession, functions as F  

In [2]:
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("groupby") \
        .getOrCreate()

In [3]:
spark

In [18]:
df_green = spark.read.parquet('Dataset/pq/green/*/*')

In [19]:
df_green.columns

['VendorID',
 'lpep_pickup_datetime',
 'lpep_dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge']

In [20]:
df_green.registerTempTable("green")

In [23]:
green_revenue = spark.sql(
    """
    SELECT PULocationID AS zone, 
            date_trunc('hour', lpep_pickup_datetime) AS hour,

            SUM(total_amount) AS amount,
            COUNT(1) AS number_records
    FROM green
    GROUP BY 1,2
    ORDER BY 3 desc
    """
)

In [24]:
green_revenue.show()



+----+-------------------+------------------+--------------+
|zone|               hour|            amount|number_records|
+----+-------------------+------------------+--------------+
|  75|2020-02-04 16:00:00| 2281.749999999999|           123|
|  75|2020-01-08 16:00:00| 2245.769999999999|           115|
|  75|2020-01-23 16:00:00|2225.6399999999994|           123|
|  17|2021-05-28 16:00:00|           2135.55|             2|
|  75|2020-02-28 17:00:00|2124.8499999999985|           118|
|  75|2020-01-28 16:00:00| 2099.389999999998|           130|
|  75|2020-01-09 17:00:00| 2098.539999999998|           122|
|  75|2020-01-21 16:00:00| 2084.509999999999|           114|
|  75|2020-02-06 16:00:00| 2066.149999999998|           123|
|  75|2020-02-27 17:00:00|2018.7499999999984|           141|
|  75|2020-01-13 17:00:00|2015.6699999999973|           127|
|  75|2020-01-29 16:00:00|1999.0399999999975|           121|
|  75|2020-02-27 18:00:00|1985.0299999999993|           129|
|  75|2020-01-31 16:00:0

                                                                                

In [None]:
green_in.show()

In [25]:
df_yellow = spark.read.parquet('Dataset/pq/yellow/*/*')

In [26]:
df_yellow.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'airport_fee']

In [27]:
df_yellow.registerTempTable("yellow")

In [29]:
yellow_revenue = spark.sql(
    """
    SELECT PULocationID AS zone, 
            date_trunc('hour', tpep_pickup_datetime) AS hour,

            SUM(total_amount) AS amount,
            COUNT(1) AS number_records
    FROM yellow
    GROUP BY 1,2
    ORDER BY 3 desc
    """
)

In [30]:
yellow_revenue.show()



+----+-------------------+------------------+--------------+
|zone|               hour|            amount|number_records|
+----+-------------------+------------------+--------------+
| 193|2020-03-10 09:00:00|1000026.4500000001|             3|
|  41|2020-10-07 10:00:00| 998602.2700000003|            21|
| 166|2020-03-04 17:00:00| 673021.1200000012|           117|
| 142|2020-05-04 20:00:00| 429733.2499999999|            12|
| 170|2020-12-26 13:00:00| 399255.8399999999|            57|
| 142|2020-08-14 17:00:00| 188859.9099999998|            79|
|  41|2020-11-17 06:00:00|151590.91999999998|             8|
| 132|2020-01-05 21:00:00|40236.339999999895|           725|
| 132|2020-01-26 20:00:00|38895.609999999935|           708|
| 132|2020-01-20 18:00:00| 38346.72999999991|           693|
| 132|2020-01-12 22:00:00| 37748.56999999992|           711|
| 132|2020-01-20 16:00:00| 37534.08999999995|           638|
| 132|2020-01-20 22:00:00| 37197.59999999996|           652|
| 132|2020-01-20 20:00:0

                                                                                

In [31]:
yellow_investigate = spark.sql("""
    SELECT VendorID, tpep_pickup_datetime, tpep_dropoff_datetime, payment_type, fare_amount,
     extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount

     FROM yellow
     WHERE date_trunc('hour', tpep_pickup_datetime) = "2020-10-07 10:00:00"
     AND PULocationID = 41
     
""")

In [32]:
yellow_investigate.show()



+--------+--------------------+---------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       2| 2020-10-07 10:18:42|  2020-10-07 10:28:59|           1|        9.0|  0.0|    0.5|       2.0|         0.0|                  0.3|        14.3|
|       2| 2020-10-07 10:58:03|  2020-10-07 11:01:10|           1|        4.5|  0.0|    0.5|      1.06|         0.0|                  0.3|        6.36|
|       2| 2020-10-07 10:36:45|  2020-10-07 10:56:22|           1|       12.5|  0.0|    0.5|      2.66|         0.0|                  0.3|       15.96|
|       1| 2020-10-07 10:14:10|  2020-10-07 10:20:43|           0|        6.5|  0.0|    

                                                                                

In [35]:
yellow_revenue.write.parquet('Dataset/report/revenue/yellow', mode='overwrite')

                                                                                

25/04/10 08:39:53 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

In [36]:
green_revenue.write.parquet('Dataset/report/revenue/green', mode='overwrite')

                                                                                

25/04/10 08:40:47 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
