In [4]:
import pyspark
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [8]:
df_green = spark.read.parquet('data/pq/green/*/*')

In [None]:
'''
SELECT 
    date_trunc('hour', lpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
'''

In [9]:
rdd  = df_green\
            .select('lpep_pickup_datetime', 'PULocationID', 'total_amount')\
            .rdd

In [34]:
df_green.show(5)

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2020-01-12 18:15:04|  2020-01-12 18:19:52|                 N|         1|          41|          41|              1|         0.78|        5.5|  0.0|    0.

In [24]:
rdd.take(10)

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 12, 18, 15, 4), PULocationID=41, total_amount=7.88),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 31, 20, 24, 10), PULocationID=173, total_amount=8.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 7, 8, 16, 53), PULocationID=74, total_amount=23.46),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 15, 14, 47, 15), PULocationID=25, total_amount=7.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 31, 10, 8), PULocationID=259, total_amount=25.54),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 18, 17, 46, 45), PULocationID=177, total_amount=13.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 17, 20, 8, 44), PULocationID=65, total_amount=11.16),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 13, 10, 47), PULocationID=165, total_amount=20.56),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 7, 15, 36), PULocationID=170, total_amount=49.05),
 Row(lpep_pickup_datetime=datetime.datetime(202

In [25]:
from datetime import datetime

In [26]:
start = datetime( year=2020,month = 1, day= 1 )

def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

In [28]:
rows = rdd.take(10)
row = rows[0]

In [29]:
row

Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 12, 18, 15, 4), PULocationID=41, total_amount=7.88)

In [None]:
we want to prepare the RDD 

In [32]:
def prepare_for_grouping(row):
    hour =  row.lpep_pickup_datetime.replace(minute = 0, second = 0, microsecond = 0)
    zone = row.PULocationID
    key = (hour, zone)
    
    amount = row.total_amount
    count = 1
    value = (amount, count)
    return (key,value)

In [35]:
def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value
    
    output_amount = left_amount + right_amount
    output_count = left_count + right_count
    
    return (output_amount, output_count)

In [42]:
from collections import namedtuple

RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])

In [41]:
def unwrap(row):
    return RevenueRow(
        hour=row[0][0], 
        zone=row[0][1],
        revenue=row[1][0],
        count=row[1][1]
    )

In [55]:
from pyspark.sql import types

result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])

In [58]:
df_fromrdd_to_DF = rdd\
    .filter(filter_outliers)\
    .map(prepare_for_grouping)\
    .reduceByKey(calculate_revenue)\
    .map(unwrap)\
    .toDF(result_schema)

In [62]:
df_fromrdd_to_DF.write.parquet('tmp/green_revenue')

23/08/10 15:37:11 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                