In [2]:
import findspark
findspark.init('C:\Spark\spark-3.1.3-bin-hadoop3.2')
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
import pandas as pd
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [3]:
df_green = spark.read.parquet('file\green_tripdata_2021-01.parquet')

In [4]:
df_green.rdd

MapPartitionsRDD[7] at javaToPython at NativeMethodAccessorImpl.java:0

In [5]:
rdd_green = df_green \
    .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
    .rdd

In [6]:
from datetime import datetime
start = datetime(year=2021, month=1, day=1)
def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

In [7]:
rdd_green.take(5)

[Row(lpep_pickup_datetime=datetime.datetime(2021, 1, 1, 7, 15, 56), PULocationID=43, total_amount=6.8),
 Row(lpep_pickup_datetime=datetime.datetime(2021, 1, 1, 7, 25, 59), PULocationID=166, total_amount=16.86),
 Row(lpep_pickup_datetime=datetime.datetime(2021, 1, 1, 7, 45, 57), PULocationID=41, total_amount=8.3),
 Row(lpep_pickup_datetime=datetime.datetime(2021, 1, 1, 6, 57, 51), PULocationID=168, total_amount=9.3),
 Row(lpep_pickup_datetime=datetime.datetime(2021, 1, 1, 7, 16, 36), PULocationID=265, total_amount=-52.8)]

In [8]:
rdd_green.filter(lambda row: row.lpep_pickup_datetime >= start ).take(1)

[Row(lpep_pickup_datetime=datetime.datetime(2021, 1, 1, 7, 15, 56), PULocationID=43, total_amount=6.8)]

In [9]:
def prepare_for_grouping(row): 
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)
    
    amount = row.total_amount
    count = 1
    value = (amount, count)

    return (key, value)

In [10]:
rdd_green \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .take(10)

[((datetime.datetime(2021, 1, 1, 7, 0), 43), (6.8, 1)),
 ((datetime.datetime(2021, 1, 1, 7, 0), 166), (16.86, 1)),
 ((datetime.datetime(2021, 1, 1, 7, 0), 41), (8.3, 1)),
 ((datetime.datetime(2021, 1, 1, 6, 0), 168), (9.3, 1)),
 ((datetime.datetime(2021, 1, 1, 7, 0), 265), (-52.8, 1)),
 ((datetime.datetime(2021, 1, 1, 7, 0), 265), (52.8, 1)),
 ((datetime.datetime(2021, 1, 1, 7, 0), 265), (216.36, 1)),
 ((datetime.datetime(2021, 1, 1, 7, 0), 75), (5.76, 1)),
 ((datetime.datetime(2021, 1, 1, 7, 0), 225), (3.8, 1)),
 ((datetime.datetime(2021, 1, 1, 7, 0), 225), (42.05, 1))]

In [11]:
def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value
    
    output_amount = left_amount + right_amount
    output_count = left_count + right_count
    
    return (output_amount, output_count)

In [12]:
rdd_green \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue)\
    .take(10)

[((datetime.datetime(2021, 1, 1, 7, 0), 43), (6.8, 1)),
 ((datetime.datetime(2021, 1, 1, 7, 0), 166), (22.66, 2)),
 ((datetime.datetime(2021, 1, 1, 7, 0), 41), (8.3, 1)),
 ((datetime.datetime(2021, 1, 1, 6, 0), 168), (9.3, 1)),
 ((datetime.datetime(2021, 1, 1, 7, 0), 265), (216.36, 3)),
 ((datetime.datetime(2021, 1, 1, 7, 0), 75), (34.36, 3)),
 ((datetime.datetime(2021, 1, 1, 7, 0), 225), (85.41, 4)),
 ((datetime.datetime(2021, 1, 1, 7, 0), 244), (19.3, 1)),
 ((datetime.datetime(2021, 1, 1, 7, 0), 74), (86.57, 7)),
 ((datetime.datetime(2021, 1, 1, 7, 0), 42), (28.02, 3))]

In [13]:
from collections import namedtuple

In [14]:
RevenueRow = namedtuple('Revenue_green', ['hour', 'zone', 'revenue', 'count'])

In [18]:
def unwrap(row):
    return RevenueRow(
        hour=row[0][0], 
        zone=row[0][1],
        revenue=row[1][0],
        count=row[1][1]
    )

In [19]:
from pyspark.sql import types

In [20]:
green_revenue_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])

In [23]:
DF_green_revenue = rdd_green \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue)\
    .map(unwrap)\
    .toDF(green_revenue_schema)

In [24]:
DF_green_revenue.show()

+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2021-01-01 07:00:00|  43|               6.8|    1|
|2021-01-01 07:00:00| 166|             22.66|    2|
|2021-01-01 07:00:00|  41|               8.3|    1|
|2021-01-01 06:00:00| 168|               9.3|    1|
|2021-01-01 07:00:00| 265|            216.36|    3|
|2021-01-01 07:00:00|  75|             34.36|    3|
|2021-01-01 07:00:00| 225|             85.41|    4|
|2021-01-01 07:00:00| 244|              19.3|    1|
|2021-01-01 07:00:00|  74|             86.57|    7|
|2021-01-01 07:00:00|  42|             28.02|    3|
|2021-01-01 07:00:00| 116|             55.59|    4|
|2021-01-01 07:00:00|   7|             61.47|    2|
|2021-01-01 07:00:00| 152|             84.92|    1|
|2021-01-01 07:00:00|  82|              11.8|    1|
|2021-01-01 07:00:00| 259|              29.0|    1|
|2021-01-01 07:00:00| 247|             37.12|    2|
|2021-01-01 

In [53]:
df_green_partition = spark.read.parquet('file\green\*')
# df_green_partition = df_green.repartition(3) chia partition

In [54]:
columns = ['VendorID', 'lpep_pickup_datetime', 'PULocationID', 'DOLocationID', 'trip_distance']

rdd = df_green_partition \
    .select(columns) \
    .rdd

In [55]:
def apply_to_model_in_batch(partition):
    count = 0
    for row in partition:
        count = count +1
    return [count]

In [56]:
rdd.mapPartitions(apply_to_model_in_batch).collect()

[76518, 64572]

In [43]:
df_green_partition.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

