In [23]:
import findspark
findspark.init('C:\spark-3.4.0-bin-hadoop3')
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
import pandas as pd
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [24]:
df_green = spark.read.options(inferSchema = 'True', header = 'True').csv("file/green_tripdata_2021-01.csv")

In [25]:
df_green.rdd

MapPartitionsRDD[92] at javaToPython at NativeMethodAccessorImpl.java:0

In [26]:
rdd_green = df_green.select('lpep_pickup_datetime', 'PULocationID', 'total_amount').rdd

In [27]:
from datetime import datetime
start = datetime(year=2021, month=1, day=1)
def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

In [28]:
rdd_green.take(5)

[Row(lpep_pickup_datetime=datetime.datetime(2021, 1, 1, 0, 15, 56), PULocationID=43, total_amount=6.8),
 Row(lpep_pickup_datetime=datetime.datetime(2021, 1, 1, 0, 25, 59), PULocationID=166, total_amount=16.86),
 Row(lpep_pickup_datetime=datetime.datetime(2021, 1, 1, 0, 45, 57), PULocationID=41, total_amount=8.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 12, 31, 23, 57, 51), PULocationID=168, total_amount=9.3),
 Row(lpep_pickup_datetime=datetime.datetime(2021, 1, 1, 0, 16, 36), PULocationID=265, total_amount=-52.8)]

In [29]:
rdd_green.filter(lambda row: row.lpep_pickup_datetime >= start ).take(1)

[Row(lpep_pickup_datetime=datetime.datetime(2021, 1, 1, 0, 15, 56), PULocationID=43, total_amount=6.8)]

In [30]:
def prepare_for_grouping(row): 
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)
    
    amount = row.total_amount
    count = 1
    value = (amount, count)

    return (key, value)

In [31]:
rdd_green \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .take(10)

[((datetime.datetime(2021, 1, 1, 0, 0), 43), (6.8, 1)),
 ((datetime.datetime(2021, 1, 1, 0, 0), 166), (16.86, 1)),
 ((datetime.datetime(2021, 1, 1, 0, 0), 41), (8.3, 1)),
 ((datetime.datetime(2021, 1, 1, 0, 0), 265), (-52.8, 1)),
 ((datetime.datetime(2021, 1, 1, 0, 0), 265), (52.8, 1)),
 ((datetime.datetime(2021, 1, 1, 0, 0), 265), (216.36, 1)),
 ((datetime.datetime(2021, 1, 1, 0, 0), 75), (5.76, 1)),
 ((datetime.datetime(2021, 1, 1, 0, 0), 225), (3.8, 1)),
 ((datetime.datetime(2021, 1, 1, 0, 0), 225), (42.05, 1)),
 ((datetime.datetime(2021, 1, 1, 0, 0), 244), (19.3, 1))]

In [32]:
def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value
    
    output_amount = left_amount + right_amount
    output_count = left_count + right_count
    
    return (output_amount, output_count)

In [33]:
rdd_green \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue)\
    .take(10)

[((datetime.datetime(2021, 1, 1, 0, 0), 43), (6.8, 1)),
 ((datetime.datetime(2021, 1, 1, 0, 0), 41), (8.3, 1)),
 ((datetime.datetime(2021, 1, 1, 0, 0), 265), (216.36, 3)),
 ((datetime.datetime(2021, 1, 1, 0, 0), 75), (34.36, 3)),
 ((datetime.datetime(2021, 1, 1, 0, 0), 225), (85.41, 4)),
 ((datetime.datetime(2021, 1, 1, 0, 0), 7), (61.47, 2)),
 ((datetime.datetime(2021, 1, 1, 0, 0), 259), (29.0, 1)),
 ((datetime.datetime(2021, 1, 1, 0, 0), 247), (37.12, 2)),
 ((datetime.datetime(2021, 1, 1, 0, 0), 17), (102.34, 3)),
 ((datetime.datetime(2021, 1, 1, 1, 0), 152), (14.55, 1))]

In [38]:
type(rdd_green)

pyspark.rdd.RDD

In [39]:
from collections import namedtuple

In [40]:
RevenueRow = namedtuple('Revenue_green', ['hour', 'zone', 'revenue', 'count'])

In [41]:
def unwrap(row):
    return RevenueRow(
        hour=row[0][0], 
        zone=row[0][1],
        revenue=row[1][0],
        count=row[1][1]
    )

In [42]:
from pyspark.sql import types

In [43]:
green_revenue_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])

In [44]:
DF_green_revenue = rdd_green \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue)\
    .map(unwrap)\
    .toDF(green_revenue_schema)

In [45]:
DF_green_revenue.show()

+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2021-01-01 00:00:00|  43|               6.8|    1|
|2021-01-01 00:00:00|  41|               8.3|    1|
|2021-01-01 00:00:00| 265|            216.36|    3|
|2021-01-01 00:00:00|  75|             34.36|    3|
|2021-01-01 00:00:00| 225|             85.41|    4|
|2021-01-01 00:00:00|   7|             61.47|    2|
|2021-01-01 00:00:00| 259|              29.0|    1|
|2021-01-01 00:00:00| 247|             37.12|    2|
|2021-01-01 00:00:00|  17|            102.34|    3|
|2021-01-01 01:00:00| 152|             14.55|    1|
|2021-01-01 01:00:00|  42|             37.89|    2|
|2021-01-01 01:00:00| 236|               5.8|    1|
|2021-01-01 01:00:00| 244|             14.76|    1|
|2021-01-01 01:00:00| 166|              9.36|    1|
|2021-01-01 01:00:00|  14|               7.8|    1|
|2021-01-01 01:00:00|  74|              31.6|    2|
|2021-01-01 

In [48]:
df_green_partition = spark.read.options(inferSchema = 'True', header = 'True').csv("file/green_tripdata_2021-01.csv")
# df_green_partition = df_green.repartition(3) chia partition

In [49]:
columns = ['VendorID', 'lpep_pickup_datetime', 'PULocationID', 'DOLocationID', 'trip_distance']

rdd = df_green_partition \
    .select(columns) \
    .rdd

In [50]:
import pandas as pd

In [51]:
def model_predict(df):
    time = round(df.trip_distance * 5,3)
    return time

In [52]:
def trip_distance_km(df):
    trip_km = round(df.trip_distance * 1.609344,3)
    return trip_km

In [53]:
def time_prediction(rows):
    df = pd.DataFrame(rows, columns=columns)
    predictions = model_predict(df)
    trip_km = trip_distance_km(df)
    df['time_taxi_prediction'] = predictions
    df['trip_km'] = trip_km
    for row in df.itertuples():
        yield row

In [54]:
df_predicts = rdd \
    .mapPartitions(time_prediction)\
    .toDF()

In [55]:
df_predicts.show()
df_predicts.select('trip_km','time_taxi_prediction').show()

+-----+--------+--------------------+------------+------------+-------------+--------------------+-------+
|Index|VendorID|lpep_pickup_datetime|PULocationID|DOLocationID|trip_distance|time_taxi_prediction|trip_km|
+-----+--------+--------------------+------------+------------+-------------+--------------------+-------+
|    0|     2.0|                  {}|          43|         151|         1.01|                5.05|  1.625|
|    1|     2.0|                  {}|         166|         239|         2.53|               12.65|  4.072|
|    2|     2.0|                  {}|          41|          42|         1.12|                 5.6|  1.802|
|    3|     2.0|                  {}|         168|          75|         1.99|                9.95|  3.203|
|    4|     2.0|                  {}|         265|         265|          0.0|                 0.0|    0.0|
|    5|     2.0|                  {}|         265|         265|          0.0|                 0.0|    0.0|
|    6|     2.0|                  {}|