In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [None]:
df_green = spark.read.parquet('./data/green_tripdata_2021-01')

In [None]:
rdd = df_green.select('lpep_pickup_datetime', 'PULocationID', 'total_amount').rdd
rdd.take(5)

In [None]:
from datetime import datetime

def filter_outliers(row):
    return row.lpep_pickup_datetime >= datetime(year=2020, month=1, day=1)

In [None]:
rdd = rdd.filter(filter_outliers)
rdd.take(10)

In [None]:
def prepare_for_grouping(row):
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)
    
    amount = row.total_amount
    count = 1
    value = (amount, count)
    
    return (key, value)

In [None]:
rdd = rdd.map(prepare_for_grouping)
rdd.take(10)

In [None]:
def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value 
      
    output_amount = left_amount + right_amount
    output_count = left_count + right_count
    
    return (output_amount, output_count) 

In [None]:
rdd = rdd.reduceByKey(calculate_revenue)
rdd.take(10)

In [None]:
from collections import namedtuple
RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])

def unwrap(row):
    return RevenueRow(
        hour=row[0][0], 
        zone=row[0][1],
        revenue=row[1][0],
        count=row[1][1]
    )

In [None]:
rdd = rdd.map(unwrap)
rdd.take(10)

In [None]:
from pyspark.sql import types
result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])

In [None]:
df_result = rdd.toDF(result_schema)
df_result.show()

In [None]:
df_result.write.parquet('./data/report/revenue/green_rdd')

Map Partition

In [2]:
columns = ['VendorID', 'lpep_pickup_datetime', 'PULocationID', 'DOLocationID', 'trip_distance']
df_green = spark.read.parquet('./data/green_tripdata_2021-01')
duration_rdd = df_green.select(columns).rdd
duration_rdd.take(10)

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2021, 1, 27, 9, 10, 53), PULocationID=43, DOLocationID=229, trip_distance=3.09),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2021, 1, 9, 7, 18, 37), PULocationID=244, DOLocationID=240, trip_distance=6.13),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2021, 1, 16, 12, 56, 9), PULocationID=25, DOLocationID=33, trip_distance=1.02),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2021, 1, 18, 20, 31, 5), PULocationID=95, DOLocationID=95, trip_distance=1.83),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2021, 1, 2, 16, 39, 30), PULocationID=82, DOLocationID=95, trip_distance=2.17),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2021, 1, 22, 14, 4, 28), PULocationID=166, DOLocationID=142, trip_distance=2.55),
 Row(VendorID=1, lpep_pickup_datetime=datetime.datetime(2021, 1, 11, 20, 24, 52), PULocationID=97, DOLocationID=89, trip_distance=4.2),
 Row(VendorID=2, lpep_pickup_datetime=dateti

In [3]:
import pyspark.pandas as pd

def apply_model_in_batch(partition):
    df = pd.DataFrame(partition, columns=columns)
    cnt = len(df)
    return [cnt]



In [4]:
duration_rdd.mapPartitions(apply_model_in_batch).collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 1 times, most recent failure: Lost task 1.0 in stage 2.0 (TID 3) (LAPTOP-66MED8N2 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "D:\data-engineering-zoomcamp\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\sql\pandas\utils.py", line 27, in require_minimum_pandas_version
    import pandas
ModuleNotFoundError: No module named 'pandas'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "D:\data-engineering-zoomcamp\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 1227, in main
  File "D:\data-engineering-zoomcamp\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 90, in read_command
  File "D:\data-engineering-zoomcamp\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 174, in _read_with_length
    return self.loads(obj)
  File "D:\data-engineering-zoomcamp\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 472, in loads
    return cloudpickle.loads(obj, encoding=encoding)
  File "D:\data-engineering-zoomcamp\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\cloudpickle\cloudpickle.py", line 649, in subimport
    __import__(name)
  File "D:\data-engineering-zoomcamp\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\pandas\__init__.py", line 34, in <module>
    require_minimum_pandas_version()
  File "D:\data-engineering-zoomcamp\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\sql\pandas\utils.py", line 34, in require_minimum_pandas_version
    raise ImportError(
ImportError: Pandas >= 1.0.5 must be installed; however, it was not found.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "D:\data-engineering-zoomcamp\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\sql\pandas\utils.py", line 27, in require_minimum_pandas_version
    import pandas
ModuleNotFoundError: No module named 'pandas'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "D:\data-engineering-zoomcamp\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 1227, in main
  File "D:\data-engineering-zoomcamp\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 90, in read_command
  File "D:\data-engineering-zoomcamp\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 174, in _read_with_length
    return self.loads(obj)
  File "D:\data-engineering-zoomcamp\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 472, in loads
    return cloudpickle.loads(obj, encoding=encoding)
  File "D:\data-engineering-zoomcamp\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\cloudpickle\cloudpickle.py", line 649, in subimport
    __import__(name)
  File "D:\data-engineering-zoomcamp\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\pandas\__init__.py", line 34, in <module>
    require_minimum_pandas_version()
  File "D:\data-engineering-zoomcamp\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\sql\pandas\utils.py", line 34, in require_minimum_pandas_version
    raise ImportError(
ImportError: Pandas >= 1.0.5 must be installed; however, it was not found.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
