# 스파크 완벽 가이드
https://github.com/NEN-STUDY/Spark/blob/main/Part4/Chapter15.ipynb

<img src = "https://drive.google.com/uc?id=13TlsSwaad8ZRXYMgkgcAlF10ZMpydz_s">

- action: print out, display 하는 연산
  - count()
  - take()
- transformation: not human readable

# **Working with RDD (Resilient Distributed Dataset)**

**`Udemy Course: Best Hands-on Big Data Practices and Use Cases using PySpark`**

**`Author: Amin Karami (PhD, FHEA)`**

---

**Resilient Distributed Dataset (RDD)**: RDD is the fundamental data structure of Spark. It is fault-tolerant (resilient) and immutable distributed collections of any type of objects.

source: https://spark.apache.org/docs/latest/rdd-programming-guide.html

source: https://spark.apache.org/docs/latest/api/python/reference/

In [1]:
########## ONLY in Colab ##########
!pip3 install pyspark
########## ONLY in Colab ##########

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 44 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 68.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=586f7a99dc295f0d64a132a7b6c348907b52b4397234a6e6051123219678dfac
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
# Linking with Spark
from pyspark import SparkContext, SparkConf

In [3]:
# Initializing Spark
conf = SparkConf().setAppName("RDD_practice").setMaster("local[*]")
sc = SparkContext(conf=conf)
print(sc)

<SparkContext master=local[*] appName=RDD_practice>


# **Part 1: Create RDDs and Basic Operations**
# **There are two ways to create RDDs:**

1.   Parallelizing an existing collection in your driver program
2.   Referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

- RDD 파티션은 read only 이므로 새로운 컬럼에 대해 연산을 수행하고 싶다면 새로운 RDD 파티션을 만들어야함

In [4]:
# Generate random data:
import random

randomlist = random.sample(range(0, 40), 10)
print(randomlist)

[30, 32, 12, 0, 21, 2, 29, 14, 7, 36]


In [5]:
# Create RDD:
rdd1 = sc.parallelize(randomlist, 4) # partitions = 4
rdd1 # RDD is not human readable

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

In [6]:
rdd1.collect() # collect()는 모든 데이터를 가져오기 때문에 데이터가 클 때는 take() 등을 쓰도록 하자

[30, 32, 12, 0, 21, 2, 29, 14, 7, 36]

In [7]:
# Data distribution in partitions:
rdd1.getNumPartitions()

4

In [8]:
rdd1.glom().collect() # glom() : 파티션 포맷으로 데이터를 보여줌

[[30, 32], [12, 0], [21, 2], [29, 14, 7, 36]]

In [9]:
rdd1.glom().take(2) # 파티션 2개 샘플만 확인

[[30, 32], [12, 0]]

In [10]:
# Print last partition
rdd1.glom().collect()[3]

[29, 14, 7, 36]

In [11]:
# count():
rdd1.count()

10

In [12]:
# first():
rdd1.first()

30

In [13]:
# top():
rdd1.top(2) # 모든 데이터를 single node로 모은 다음에 정렬하는 것이기 때문에 되도록 쓰지 않는 것이 좋음

[36, 32]

In [14]:
# distinct():
rdd1.distinct() # distinct()는 transformation이라서 not readable

PythonRDD[12] at RDD at PythonRDD.scala:53

In [15]:
rdd1.distinct().collect()

[32, 12, 0, 36, 21, 29, 30, 2, 14, 7]

In [16]:
# map():
# API description: Return a new distributed dataset formed by passing each element of the source through a function func.
# 새로운 rdd를 만드는 것

def myfunc(item):
  return (item + 1) * 3

rdd_map = rdd1.map(myfunc)
rdd_map.collect()

[93, 99, 39, 3, 66, 9, 90, 45, 24, 111]

In [17]:
rdd_map.glom().collect() # 파티션이 유지되고 연산이 적용되었음

[[93, 99], [39, 3], [66, 9], [90, 45, 24, 111]]

In [18]:
rdd1.map(lambda item: (item + 1) * 3).collect()

[93, 99, 39, 3, 66, 9, 90, 45, 24, 111]

In [19]:
# filter(): 
rdd_filter = rdd1.filter(lambda x: x % 3 == 0)
rdd_filter.glom().collect() # 결과가 비어있는 파티션은 garbage collector technique으로 처리

[[30], [12, 0], [21], [36]]

In [20]:
rdd_filter.count()

5

In [21]:
rdd_filter.repartition(1) # 데이터 사이즈가 작아졌으므로 리파티션 적용

MapPartitionsRDD[27] at coalesce at NativeMethodAccessorImpl.java:0

In [22]:
# flatMap():
rdd1.map(lambda x: x % 3 == 0).collect() # 10개 데이터 그대로 계산됨

[True, False, True, True, True, False, False, False, False, True]

In [23]:
rdd1.filter(lambda x: x % 3 == 0).collect() # 10개 데이터 그대로 계산됨

[30, 12, 0, 21, 36]

In [24]:
rdd1.map(lambda x: [x + 2, x + 5]).collect() # 각 element마다 따로 계산해서 리턴

[[32, 35],
 [34, 37],
 [14, 17],
 [2, 5],
 [23, 26],
 [4, 7],
 [31, 34],
 [16, 19],
 [9, 12],
 [38, 41]]

In [25]:
rdd1.map(lambda x: [x + 2, x + 5]).glom().collect()

[[[32, 35], [34, 37]],
 [[14, 17], [2, 5]],
 [[23, 26], [4, 7]],
 [[31, 34], [16, 19], [9, 12], [38, 41]]]

In [26]:
rdd_flatmap = rdd1.flatMap(lambda x: [x + 2, x + 5])
rdd_flatmap.collect() # 하나의 리스트로 리턴

[32, 35, 34, 37, 14, 17, 2, 5, 23, 26, 4, 7, 31, 34, 16, 19, 9, 12, 38, 41]

In [27]:
rdd_flatmap.glom().collect()

[[32, 35, 34, 37],
 [14, 17, 2, 5],
 [23, 26, 4, 7],
 [31, 34, 16, 19, 9, 12, 38, 41]]

In [29]:
rdd_flatmap.collect()

[32, 35, 34, 37, 14, 17, 2, 5, 23, 26, 4, 7, 31, 34, 16, 19, 9, 12, 38, 41]

In [38]:
rdd_flatmap.reduce(lambda x, y: x + y)
# 각 파티션마다 계산해서 리턴
  # partition 1에서 x + y
  # partition 2에서 x + y
  # partition 3에서 x + y
  # partition 4에서 x + y

436

In [39]:
# Descriptive statistics:
rdd1.max()

36

In [40]:
rdd1.min()

0

In [41]:
rdd1.mean()

18.3

In [42]:
round(rdd1.stdev(), 2)

12.43

In [43]:
rdd1.sum()

183

In [44]:
# mapPartitions():

def myfunc(partition):
  sum = 0
  for item in partition:
    sum += item
  return sum # return은 전체 데이터에 적용되는 것이라서 에러가 발생함

rdd_mappartitions = rdd1.mapPartitions(myfunc).collect()
rdd_mappartitions

Py4JJavaError: ignored

- return, yield, generator
  - https://www.daleseo.com/python-yield/

In [47]:
def myfunc(partition):
  sum = 0
  for item in partition:
    sum += item
  yield sum # return을 yield로 수정

rdd_mappartitions = rdd1.mapPartitions(myfunc).collect()
rdd_mappartitions

[62, 12, 23, 86]

In [45]:
rdd1.map(myfunc).collect()

Py4JJavaError: ignored

# **Part 2: Advanced RDD Transformations and Actions**

In [None]:
# union():
# API description: Return a new dataset that contains the union of the elements in the source dataset and the argument.
# 새로운 rdd가 만들어짐

In [48]:
rdd1.collect()

[30, 32, 12, 0, 21, 2, 29, 14, 7, 36]

In [49]:
rdd2 = sc.parallelize([1, 14, 20, 20, 28, 10, 13, 3], 2)
rdd2.collect()

[1, 14, 20, 20, 28, 10, 13, 3]

In [50]:
rdd_union = rdd1.union(rdd2)
rdd_union.collect()

[30, 32, 12, 0, 21, 2, 29, 14, 7, 36, 1, 14, 20, 20, 28, 10, 13, 3]

In [52]:
rdd_union.glom().collect()

[[30, 32], [12, 0], [21, 2], [29, 14, 7, 36], [1, 14, 20, 20], [28, 10, 13, 3]]

In [51]:
rdd_union.getNumPartitions()

6

In [54]:
# intersection():
rdd1.intersection(rdd2).collect()

[14]

In [56]:
rdd1.intersection(rdd2).getNumPartitions()

6

In [57]:
# Find empty partitions
rdd_intersection = rdd1.intersection(rdd2)
rdd_intersection.glom().collect()

[[], [], [14], [], [], []]

In [58]:
counter = 0
for item in rdd_intersection.glom().collect():
  if len(item) == 0:
    counter += 1

counter

5

In [59]:
# coalesce(numPartitions):
rdd_intersection.coalesce(1).glom().collect()

[[14]]

In [60]:
# takeSample(withReplacement, num, [seed])
rdd1.takeSample(False, 5) # takeSample(): action에 해당하는 연산, 모든 데이터를 모아서 드라이버 메모리에서 실행하므로 큰 데이터에서는 하지 않는 것이 좋음

[0, 36, 14, 12, 29]

In [61]:
# takeOrdered(n, [ordering])
rdd1.takeOrdered(5) # 오름차순(default)

[0, 2, 7, 12, 14]

In [62]:
rdd1.takeOrdered(5, key=lambda x: -x) # 내림차순

[36, 32, 30, 29, 21]

In [63]:
# reduce():
rdd1.reduce(lambda x, y: x - y) 

-5

In [83]:
print(sc.parallelize([1,2,3,4], 1).reduce(lambda x, y: x - y))
print(1 - 2 - 3 - 4)

-8
-8


In [85]:
print(sc.parallelize([1,2,3,4], 2).reduce(lambda x, y: x - y))
print((1 - 2) - (3 - 4))

0
0


In [87]:
print(sc.parallelize([1,2,3,4], 4).reduce(lambda x, y: x - y))
print(1 - 2 - 3 - 4)

-8
-8


In [88]:
# reduceByKey():
rdd_Rbk = sc.parallelize([(1, 4), (7, 10), (5, 7), (1, 12), (7, 12), (7, 1), (9, 1), (7, 4)], 2)
rdd_Rbk.glom().collect()

[[(1, 4), (7, 10), (5, 7), (1, 12)], [(7, 12), (7, 1), (9, 1), (7, 4)]]

In [89]:
rdd_Rbk.reduceByKey(lambda x, y: x + y).collect() # (1, 16), ...

[(1, 16), (7, 27), (5, 7), (9, 1)]

In [90]:
# 보기 쉽게 visuallization
import pandas as pd

Counter = pd.DataFrame({'Key': rdd_Rbk.keys().collect(),
                        'Value': rdd_Rbk.values().collect()})
Counter

Unnamed: 0,Key,Value
0,1,4
1,7,10
2,5,7
3,1,12
4,7,12
5,7,1
6,9,1
7,7,4


In [91]:
# sortByKey():
rdd_Rbk.reduceByKey(lambda x, y: x + y).sortByKey().collect()

[(1, 16), (5, 7), (7, 27), (9, 1)]

In [92]:
rdd_Rbk.reduceByKey(lambda x, y: x + y).sortByKey(False).collect()

[(9, 1), (7, 27), (5, 7), (1, 16)]

In [93]:
# countByKey()
rdd_Rbk.countByKey()

defaultdict(int, {1: 2, 5: 1, 7: 4, 9: 1})

In [94]:
rdd_Rbk.countByKey().items()

dict_items([(1, 2), (7, 4), (5, 1), (9, 1)])

In [95]:
sorted(rdd_Rbk.countByKey().items())

[(1, 2), (5, 1), (7, 4), (9, 1)]

In [96]:
# groupByKey():
rdd_group = rdd_Rbk.groupByKey() # 드라이버 노드에 모든 key, value를 보내는 연산이므로 자주 쓰지 않도록 한다
rdd_group.getNumPartitions()

2

In [97]:
rdd_group.collect()

[(1, <pyspark.resultiterable.ResultIterable at 0x7fb7debff550>),
 (7, <pyspark.resultiterable.ResultIterable at 0x7fb7debff710>),
 (5, <pyspark.resultiterable.ResultIterable at 0x7fb7dec00710>),
 (9, <pyspark.resultiterable.ResultIterable at 0x7fb7dec00250>)]

In [98]:
rdd_group.glom().collect()

[[],
 [(1, <pyspark.resultiterable.ResultIterable at 0x7fb7dec08790>),
  (7, <pyspark.resultiterable.ResultIterable at 0x7fb7dec1d490>),
  (5, <pyspark.resultiterable.ResultIterable at 0x7fb7dec1d4d0>),
  (9, <pyspark.resultiterable.ResultIterable at 0x7fb7dec1ded0>)]]

In [99]:
for item in rdd_group.collect():
  print(item[0], [values for values in item[1]])

1 [4, 12]
7 [10, 12, 1, 4]
5 [7]
9 [1]


In [100]:
# lookup(key):
rdd_Rbk.lookup(7)

[10, 12, 1, 4]

In [101]:
# cache:
# By default, each transformed RDD may be recomputed each time you run an action on it.
# However, you may also persist an RDD in memory using the persist (or cache) method,
# in which case Spark will keep the elements around on the cluster for much faster access the next time you query it.

rdd_Rbk.persist()

ParallelCollectionRDD[146] at readRDDFromFile at PythonRDD.scala:274

In [102]:
rdd_Rbk.unpersist()

ParallelCollectionRDD[146] at readRDDFromFile at PythonRDD.scala:274

In [103]:
# Persistence (https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence)
# 만약 1TB를 32GB 메모리에 얹는다면 매우 느려질 것이다.
# 이럴때 disk랑 같이 쓸 수 있는 옵션을 제공한다.

from pyspark.storagelevel import StorageLevel

rdd1.persist(StorageLevel.MEMORY_AND_DISK)

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274