In [2]:
import findspark
findspark.init()

from pyspark import SparkContext

sc = SparkContext()

In [3]:
sc

In [19]:
data = range(1, 101)

# 해당 데이터를 8개의 파티션으로 나눔
rangeRDD = sc.parallelize(data, 8)

# 해당 RDD 타입 확인
print('type of RangeRDD: {0}'.format(type(rangeRDD)))

type of RangeRDD: <class 'pyspark.rdd.PipelinedRDD'>


In [20]:
help(sc.parallelize)

Help on method parallelize in module pyspark.context:

parallelize(c: Iterable[~T], numSlices: Optional[int] = None) -> pyspark.rdd.RDD[~T] method of pyspark.context.SparkContext instance
    Distribute a local Python collection to form an RDD. Using range
    is recommended if the input represents a range for performance.
    
    .. versionadded:: 0.7.0
    
    Parameters
    ----------
    c : :class:`collections.abc.Iterable`
        iterable collection to distribute
    numSlices : int, optional
        the number of partitions of the new RDD
    
    Returns
    -------
    :class:`RDD`
        RDD representing distributed collection.
    
    Examples
    --------
    >>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
    [[0], [2], [3], [4], [6]]
    >>> sc.parallelize(range(0, 6, 2), 5).glom().collect()
    [[], [0], [], [2], [4]]
    
    Deal with a list of strings.
    
    >>> strings = ["a", "b", "c"]
    >>> sc.parallelize(strings, 2).glom().collect()
    [['a'], [

In [21]:
# RDD의 파티션 숫자 확인
rangeRDD.getNumPartitions()

8

In [22]:
print(rangeRDD.toDebugString())

print('rangeRDD id: {0}'.format(rangeRDD.id()))

b'(8) PythonRDD[4] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[3] at readRDDFromFile at PythonRDD.scala:287 []'
rangeRDD id: 4


In [23]:
rangeRDD.setName('My first RDD')

# RDD에 이름 지정
print(rangeRDD.toDebugString())
# help(rangeRDD)

b'(8) My first RDD PythonRDD[4] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[3] at readRDDFromFile at PythonRDD.scala:287 []'


#### map transformation이 적용되면 base RDD에 있는 각각의 item은 새로운 RDD로 재탄생함. base RDD가 20개의 elements를 가지고 있다면 새로운 RDD 역시 20개의 elements를 가짐.

##### spark는 lazy evaluation 방식이라 transformation이 발생해도 실제 데이터는 변경되지 않음.
##### collect() 함수를 통해서 분배된 데이터들을 새로운 list로 합치고 transformation도 적용.
##### 주의! collect()함수는 적은 양의 데이터 일 경우에만 사용함!!

In [26]:
def sub(value):
    return (value - 1)

# RDD에 sub함수를 적용시킴
subRDD = rangeRDD.map(sub)

# 실제 action이 이루어지고 sub함수가 적용된 리스트 리턴
print(subRDD.collect())

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]


In [25]:
# filter() : 함수 결과가 참인 경우에만 요소들을 통과시킴. 
# 결과로 새로운 RDD를 생성. action 아님

def ten(value):
    if(value < 10):
        return True
    else:
        return False
    
filteredRDD = subRDD.filter(ten)
print(filteredRDD.collect())

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


In [27]:
# Lambda() : 런타인에서 이름을 할당 받을 필요가 없는 한 줄 짜리 익명함수

lambdaRDD = subRDD.filter(lambda x:x < 10)
print(lambdaRDD.collect())

evenRDD = lambdaRDD.filter(lambda x: x%2 == 0)
print(evenRDD.collect())

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[0, 2, 4, 6, 8]


In [36]:
print(filteredRDD.first())
print(filteredRDD.take(4))
print()

# 해당 개수에 데이터를 정렬을 해서 가져옴(오름, 내림차순)
print(filteredRDD.takeOrdered(4, lambda s:-s))
print(filteredRDD.top(4, lambda s:-s))
print()

from operator import add
print(filteredRDD.reduce(add))
print(filteredRDD.reduce(lambda a,b : a+b))
print(filteredRDD.reduce(lambda a,b : a-b))
print(filteredRDD.repartition(4).reduce(lambda a,b : a-b))
print(filteredRDD.repartition(4).reduce(lambda a,b : a+b))

0
[0, 1, 2, 3]

[9, 8, 7, 6]
[0, 1, 2, 3]

45
45
-45
-45
45


In [38]:
# takeSample() : dataset으로부터 랜덤으로 원소를 리턴한다.
# withReplacement 파라미터가 있는데 True일 경우 동일한 원소가 여러번 리턴가능
# seed 파라미터는 랜덤 넘버를 생성할 때 seed값으로 설정, action이기 때문에 실제 메모리에서 계산

print(filteredRDD.takeSample(withReplacement=True, num=6, seed=500))
repetitiveRDD = sc.parallelize([1,2,3,1,2,3,1,2,1,2,3,3,3,4,5,4,6])

# 값으로 그룹화 한 후 그 개수를 count함
print(repetitiveRDD.countByValue())

[7, 6, 6, 4, 8, 6]
defaultdict(<class 'int'>, {1: 4, 2: 4, 3: 5, 4: 2, 5: 1, 6: 1})


##### flatmap() : map()이 적용된 RDD는 iterator로 만들어진 새로운 RDD를 얻는데, iterator안에 포함된 값으로 RDD를 구성하기 원할 경우에 flatmap()을 사용

In [41]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)

wordsRDD

ParallelCollectionRDD[63] at readRDDFromFile at PythonRDD.scala:287

In [42]:
# map()을 수행한 결과가 collection, 튜플로 리턴

wordsRDDMap = wordsRDD.map(lambda x: (x, x+'s'))
print(wordsRDDMap.collect())

[('cat', 'cats'), ('elephant', 'elephants'), ('rat', 'rats'), ('rat', 'rats'), ('cat', 'cats')]


In [44]:
# flatmap()을 수행한 결과 iteraor 안에 포함된 값으로 리턴

wordsRDDMap = wordsRDD.flatMap(lambda x: (x, x+'s'))
print(wordsRDDMap.collect())

['cat', 'cats', 'elephant', 'elephants', 'rat', 'rats', 'rat', 'rats', 'cat', 'cats']


In [45]:
# reduceByKey() : pair RDD로 이루어져 있는 경우에 적용가능, 대규모 분산 dataset에 대해 매우 효과적임
# node를 통해 data가 셔플이 일어나기 전에 각각 파티션에서 키를 통해 출력 데이터를 결합할 수 있기 때문.
# ex) [(a, 1), (b, 2)] --> 키는 튜플의 첫 번째 원소, 값은 두 번째 원소

# groupByKey() : 모든 key-value 쌍이 셔플함. (spark는 메모리가 차면 disk로 자동 swap하지만 일괄적으로 동작,
# 따라서 out-of-memory가 발생 할 수 있음)

import math

pairRDD = sc.parallelize([('a', 1), ('a', 2), ('b', 1)])
print(pairRDD.groupByKey().mapValues(lambda x: list(x)).collect())
print(pairRDD.groupByKey().mapValues(lambda x: math.fsum(x)).collect())
print(pairRDD.reduceByKey(add).collect())

[('b', [1]), ('a', [1, 2])]
[('b', 1.0), ('a', 3.0)]
[('b', 1), ('a', 3)]


##### 효율적인 RDD 메모리 관리를 위해선 컨텐츠를 메모리에 보관하는게 유리하다. 하지만 너무 많으면 spark에서는 자동적으로 RDD를 삭제
##### cache() 함수를 통해 만들어진 RDD를 메모리에 상주 시킬 수 있다. 가장 적은 빈도로 사용했던(LRU) RDD부터 삭제를 시작함.

In [46]:
filteredRDD.setName('My Filtered RDD')
filteredRDD.cache()
print(filteredRDD.is_cached)

True


In [48]:
# toDebugString() : 현재 생성된 RDD 정보를 보여줌

print(filteredRDD.toDebugString())
filteredRDD.unpersist()
print()

# getStorageLevel() : RDD가 현재 어느 위치에 저장되어 있는지(mem or disk)
print(filteredRDD.getStorageLevel())
filteredRDD.cache()
print()

print(filteredRDD.getStorageLevel())

b'(8) My Filtered RDD PythonRDD[6] at collect at C:\\Users\\qkrwn\\AppData\\Local\\Temp\\ipykernel_17140\\2634476502.py:11 [Memory Serialized 1x Replicated]\n |  ParallelCollectionRDD[3] at readRDDFromFile at PythonRDD.scala:287 [Memory Serialized 1x Replicated]'

Serialized 1x Replicated

Memory Serialized 1x Replicated
