### PySpark 설정

In [1]:
# [+] pyspark 불러오기
from pyspark import SparkConf, SparkContext

In [3]:
# [+] SparkConf, SparkContext 객체 생성하기
conf = SparkConf().setMaster('local').setAppName('reduction_operations')
sc = SparkContext(conf=conf)

### reduce( )

In [4]:
# reduce()
sc.parallelize([1, 2, 3, 4, 5]).reduce(lambda x,y : x+y)

15

In [5]:
# reduce() 는 각 파티션별로 수행됨
# 파티션 수: 2
sc.parallelize([1, 2, 3, 4, 5], 2).reduce(lambda x,y : x+y)

15

In [6]:
# reduce() 주의점: 파티션 구성에 따라 결과 값이 달라질 수 있음
# 예: lambda x, y: (x * 2) + y
sc.parallelize([1, 2, 3, 4, 5]).reduce(lambda x,y : (x*2) + y)

57

In [7]:
sc.parallelize([1, 2, 3, 4]).reduce(lambda x,y : (x*2) + y)

26

In [8]:
# 파티션 수: 2
sc.parallelize([1, 2, 3, 4],2).reduce(lambda x,y : (x*2) + y)

18

### fold( )

In [9]:
# fold(): reduce()와 유사하며, 초기값을 지정
sc.parallelize([1, 2, 3, 4]).fold(1, lambda x,y : x + y)

12

In [10]:
# 파티션 수: 2
sc.parallelize([1, 2, 3, 4], 2).fold(1, lambda x,y : x + y)

13

In [None]:
# 큰 제목
## 중간
### 작은 제목

+항목
++세부항목

### 10000개 정수 난수의 총합 계산

In [11]:
import numpy as np

In [12]:
x = np.random.randint(0,100, size=10000)

In [14]:
# slicing
x[:100]

array([69, 83, 51, 99, 45, 30, 95, 81, 74, 23, 77,  8,  8, 67, 97, 33, 65,
       81, 37, 70, 39, 50, 88, 93, 26, 72, 59, 74,  8, 86, 48,  8, 87, 57,
        4, 12, 57, 34,  8, 27, 63,  5, 68, 14, 28, 41, 45, 55, 95, 43, 89,
       57, 51, 17, 25, 71, 88, 50, 86, 18, 34, 82, 57, 16, 12, 79, 43, 19,
        0, 55, 26, 28,  1, 55,  9, 23, 39, 75, 82,  3, 88, 55, 18, 80, 13,
       34, 20, 25, 90, 18, 34, 36, 53,  3, 52, 59, 58, 29, 38, 85])

In [15]:
rdd = sc.parallelize(x, 8)

In [16]:
# glom() 파티션 별 값
partitioned_data = rdd.glom().collect()
for i, partition in enumerate(partitioned_data):
    print(f"partition {1} : {partition}\n")

partition 1 : [69, 83, 51, 99, 45, 30, 95, 81, 74, 23, 77, 8, 8, 67, 97, 33, 65, 81, 37, 70, 39, 50, 88, 93, 26, 72, 59, 74, 8, 86, 48, 8, 87, 57, 4, 12, 57, 34, 8, 27, 63, 5, 68, 14, 28, 41, 45, 55, 95, 43, 89, 57, 51, 17, 25, 71, 88, 50, 86, 18, 34, 82, 57, 16, 12, 79, 43, 19, 0, 55, 26, 28, 1, 55, 9, 23, 39, 75, 82, 3, 88, 55, 18, 80, 13, 34, 20, 25, 90, 18, 34, 36, 53, 3, 52, 59, 58, 29, 38, 85, 28, 20, 79, 85, 90, 67, 51, 2, 32, 70, 60, 74, 88, 98, 57, 24, 6, 72, 4, 51, 40, 14, 57, 11, 60, 90, 82, 46, 16, 51, 68, 35, 33, 12, 24, 70, 90, 9, 97, 52, 92, 78, 67, 8, 94, 45, 27, 71, 86, 99, 85, 24, 21, 26, 58, 53, 64, 5, 14, 91, 73, 36, 20, 19, 10, 67, 34, 13, 73, 31, 65, 10, 27, 4, 99, 97, 15, 23, 86, 69, 87, 56, 94, 7, 97, 30, 3, 1, 42, 59, 89, 59, 52, 27, 17, 40, 26, 8, 81, 20, 57, 94, 11, 72, 58, 98, 86, 70, 55, 67, 74, 30, 35, 22, 76, 40, 78, 53, 59, 8, 20, 63, 34, 22, 20, 66, 43, 86, 15, 70, 39, 14, 61, 13, 74, 45, 52, 44, 66, 7, 48, 46, 32, 29, 10, 31, 94, 77, 52, 24, 27, 10, 21

In [17]:
rdd.reduce(lambda x,y : x+y)

497992

### groupBy( )

In [18]:
# groupBy(): 인자 함수를 기준으로 그룹핑을 수행
res = sc.parallelize([1, 1, 2, 3, 5, 8]).groupBy(lambda x : x % 2).collect()

In [20]:
# groupBy() 결과 출력하기
for k, v in res:
    print(k, list(v))

1 [1, 1, 3, 5]
0 [2, 8]


In [21]:
# 바로 값 출력 불가
res

[(1, <pyspark.resultiterable.ResultIterable at 0x1bc78aeba30>),
 (0, <pyspark.resultiterable.ResultIterable at 0x1bc78c26070>)]

### aggregate( )

In [22]:
# reduce(), fold(): 입출력 RDD 형식이 같지 않으면 오류가 발생
sc.parallelize([1, 2, 3, 4]).reduce(lambda x, y: (x + y, x * y)) 
# single value -> pair 생성 시도할 경우 오류
    # 에러 메시지로 원인 추측 어려움

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 28) (E2006009 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Dev\Spark\spark-3.2.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 619, in main
  File "C:\Dev\Spark\spark-3.2.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 611, in process
  File "C:\Dev\Spark\spark-3.2.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\Users\HNW\anaconda3\lib\site-packages\pyspark\rdd.py", line 997, in func
    yield reduce(f, iterator, initial)
  File "C:\Users\HNW\anaconda3\lib\site-packages\pyspark\util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "C:\Users\HNW\AppData\Local\Temp/ipykernel_18296/1066064347.py", line 2, in <lambda>
TypeError: can only concatenate tuple (not "int") to tuple

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:556)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:762)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:744)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:509)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Dev\Spark\spark-3.2.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 619, in main
  File "C:\Dev\Spark\spark-3.2.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 611, in process
  File "C:\Dev\Spark\spark-3.2.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\Users\HNW\anaconda3\lib\site-packages\pyspark\rdd.py", line 997, in func
    yield reduce(f, iterator, initial)
  File "C:\Users\HNW\anaconda3\lib\site-packages\pyspark\util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "C:\Users\HNW\AppData\Local\Temp/ipykernel_18296/1066064347.py", line 2, in <lambda>
TypeError: can only concatenate tuple (not "int") to tuple

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:556)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:762)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:744)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:509)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [23]:
sc.parallelize([1, 2, 3, 4]).fold((0, 0), lambda x, y: (x[0] + y, x[1] + 1))

TypeError: unsupported operand type(s) for +: 'int' and 'tuple'

In [None]:
"""
    aggregate(): 입출력 형식 다른 RDD 에 대한 reduction을 수행
    - zeroValues: 파티션 초기 값
    - seqOp: 각 파티션에 대한 reduction 함수
    - combop: 파티션 결과를 합치는 reduction 함수
""" 

seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))

In [None]:
# aggregate() 수행
