### 필요 라이브러리 import

In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col, expr
from pyspark.sql.types import StructType, StructField, TimestampType, DoubleType
from datetime import datetime

### 데이터 수집
- 23년 1월 ~ 25년 6월까지의 For-Hire Vehicle.parquet 데이터
- https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

In [25]:
spark = (
    SparkSession
      .builder
      .appName("ClusterMode")
      .master("spark://spark-master:7077")       # ← 클러스터 마스터 지정
      .config("spark.driver.bindAddress", "0.0.0.0")  # 또는 spark-master
      .getOrCreate()
)

In [26]:
schema = StructType([
    StructField("request_datetime",    TimestampType(), True),
    StructField("trip_miles",          DoubleType(),    True),
    StructField("base_passenger_fare", DoubleType(),    True),
    StructField("tips",                DoubleType(),    True)
])

In [27]:
df = (
    spark.read
        .schema(schema)
        .parquet("./dataset")
        .withColumn("date", to_date(col("request_datetime")))
        .withColumn("revenue", expr("base_passenger_fare + tips"))
)

In [28]:
rdd = df.rdd

### 날짜, 총수익, 거리로 필터링한 RDD생성
- 날짜 : request_datetime 컬럼을 datetime.date()이용하여 날짜별로 변환
- 총수익 : base_passenger_fare + tips을 통하여 러프한 전체 수입을 나타냄(0보다 큰 수만 남김)
- 거리 : trip_miles로 나타냄(0보다 큰 수만 남김)

In [29]:
cols = rdd.filter(lambda row: row.date is not None)

In [30]:
cols = rdd.filter(lambda row: row.trip_miles is not None)

In [31]:
cols = rdd.filter(lambda row: row.revenue is not None)

In [32]:
cols = rdd.map(lambda row: (row.date, row.trip_miles, row.revenue))

In [33]:
cols = rdd.filter(lambda col: col[1] > 0)

In [34]:
cols = rdd.filter(lambda col: col[2] > 0)

In [35]:
cols.take(5)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 71) (172.18.0.3 executor 56): java.io.InvalidClassException: org.apache.spark.scheduler.Task; local class incompatible: stream classdesc serialVersionUID = 553100815431272095, local class serialVersionUID = -6188971942555435033
	at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source)
	at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:580)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:181)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.InvalidClassException: org.apache.spark.scheduler.Task; local class incompatible: stream classdesc serialVersionUID = 553100815431272095, local class serialVersionUID = -6188971942555435033
	at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source)
	at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:580)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)


In [23]:
cols.count()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 61) (172.18.0.3 executor 1): java.io.InvalidClassException: org.apache.spark.scheduler.Task; local class incompatible: stream classdesc serialVersionUID = 553100815431272095, local class serialVersionUID = -6188971942555435033
	at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source)
	at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:580)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1046)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1045)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.InvalidClassException: org.apache.spark.scheduler.Task; local class incompatible: stream classdesc serialVersionUID = 553100815431272095, local class serialVersionUID = -6188971942555435033
	at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source)
	at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:580)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)


## 날짜 컬럼 기준으로 택시이용자 수, 강수량, 적설량, 평균기온 테이블 제작

In [11]:
# 날짜, 강수량, 적설량, 평균기온 컬럼만 가진 테이블 제작, 날짜 타입을 Date로 변환
weather_cp = (
    weather_df
      .filter(col("NAME") == "NY CITY CENTRAL PARK, NY US")
      .withColumn(
          "TAVG",
          (
            ( (col("TMAX") + col("TMIN")) / 2 - 32 ) * 5 / 9
          )
      )
      .withColumn("DATE", to_date(col("DATE"), "yyyy-MM-dd"))
      .select("DATE", "PRCP", "SNOW", "TAVG")
)