# 12장 RDD

In [1]:
spark.sparkContext

In [2]:
spark.range(10).rdd

MapPartitionsRDD[5] at javaToPython at NativeMethodAccessorImpl.java:0

In [3]:
spark.range(10).toDF('id').rdd.map(lambda row: row[0])

PythonRDD[12] at RDD at PythonRDD.scala:53

In [4]:
spark.range(10).rdd.toDF()

DataFrame[id: bigint]

# 12.3.2 로컬 컬렉션으로 RDD 생성하기

### 두 개의 파티션을 가진 병렬 컬렉션 객체 만들기

In [5]:
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(' ')

words = spark.sparkContext.parallelize(myCollection, 2)

In [6]:
words.setName('myWords')
words.name()

'myWords'

In [7]:
words

myWords ParallelCollectionRDD[24] at parallelize at PythonRDD.scala:195

# 12.5 트랜스포메이션

In [8]:
## 12.5.1 distinct
words.distinct().count()

10

In [9]:
## 12.5.2 filter
def startsWithS(individual):
    return individual.startswith('S')

words.filter(lambda word: startsWithS(word).collect())

PythonRDD[30] at RDD at PythonRDD.scala:53

In [10]:
## 12.5.3 map
words2 = words.map(lambda word: (word, word[0], word.startswith('S')))
words2.filter(lambda record: record[2]).take(5)

[('Spark', 'S', True), ('Simple', 'S', True)]

In [11]:
words2.take(5)

[('Spark', 'S', True),
 ('The', 'T', False),
 ('Definitive', 'D', False),
 ('Guide', 'G', False),
 (':', ':', False)]

In [12]:
## flat map
words.flatMap(lambda word: list(word)).take(5)

['S', 'p', 'a', 'r', 'k']

In [13]:
## 12.5.4 sortBy
words.sortBy(lambda word: len(word) * -1).take(10)

['Definitive',
 'Processing',
 'Simple',
 'Spark',
 'Guide',
 'Data',
 'Made',
 'The',
 'Big',
 ':']

In [14]:
## 12.5.5 randomSplit
fiftyFiftySplit = words.randomSplit([0.5, 0.5])
fiftyFiftySplit[0].take(5)

['Spark', 'Definitive', 'Data', 'Processing', 'Made']

In [15]:
fiftyFiftySplit[1].take(5)

['The', 'Guide', ':', 'Big', 'Simple']

## 12.6 액션

In [16]:
## 12.6.1 reduce
spark.sparkContext.parallelize(range(1, 21)).reduce(lambda x, y: x + y)

210

In [17]:
def wordLengthReducer(leftWord, rightWord):
    if len(leftWord) > len(rightWord):
        return leftWord
    else:
        return rightWord
    
words.reduce(wordLengthReducer)

'Processing'

In [18]:
## 12.6.2 count
words.count()

10

In [19]:
## countApprox
confidence = 0.95
timeoutMilliseconds = 400
words.countApprox(timeoutMilliseconds, confidence)

10

In [20]:
## countApproxDistinct
words.countApproxDistinct(0.05)

10

In [21]:
## countByValue
words.countByValue()

defaultdict(int,
            {'Spark': 1,
             'The': 1,
             'Definitive': 1,
             'Guide': 1,
             ':': 1,
             'Big': 1,
             'Data': 1,
             'Processing': 1,
             'Made': 1,
             'Simple': 1})

In [22]:
## 12.6.3 first
words.first()

'Spark'

In [23]:
## 12.6.4 max, min
spark.sparkContext.parallelize(range(1, 21)).max()

20

In [24]:
spark.sparkContext.parallelize(range(1, 21)).min()

1

In [25]:
## 12.6.5 take
words.take(5)

['Spark', 'The', 'Definitive', 'Guide', ':']

In [26]:
words.takeOrdered(5)

[':', 'Big', 'Data', 'Definitive', 'Guide']

In [27]:
words.top(5)

['The', 'Spark', 'Simple', 'Processing', 'Made']

In [28]:
withReplacement = True
numberToTake = 6
randomSeed = 100
words.takeSample(withReplacement, numberToTake, randomSeed)

['Data', 'Definitive', 'Data', 'The', 'Definitive', 'Spark']

# 12.7 파일 저장하기

In [30]:
## 12.7.1 saveAsTextFile
words.saveAsTextFile('file:///home/ubuntu/book')

# 12.8 캐싱

In [31]:
words.cache()

myWords ParallelCollectionRDD[24] at parallelize at PythonRDD.scala:195

In [32]:
words.getStorageLevel()

StorageLevel(False, True, False, False, 1)

# 12.9 체크포인팅

In [33]:
spark.sparkContext.setCheckpointDir("file:///home/ubuntu/checkpoint")

In [34]:
words.checkpoint()

# 12.10 RDD를 시스템 명령을 전송하기

In [35]:
## 파티션 당 5개의 로우
words.pipe('wc -l').collect()

['5', '5']

In [36]:
## 12.10.1 mapPartitions
words.mapPartitions(lambda part: [1]).sum()

2

In [37]:
## 파티션 인덱스르 통해 각 레코드가 속한 데이터셋이 어디에 있는지 알아보기
def indexedFunc(partitionIndex, withinPartIterator):
    return ['partition: {} => {}'.format(partitionIndex, x) for x in withinPartIterator]
words.mapPartitionsWithIndex(indexedFunc).collect()

['partition: 0 => Spark',
 'partition: 0 => The',
 'partition: 0 => Definitive',
 'partition: 0 => Guide',
 'partition: 0 => :',
 'partition: 1 => Big',
 'partition: 1 => Data',
 'partition: 1 => Processing',
 'partition: 1 => Made',
 'partition: 1 => Simple']

In [38]:
## 12.10.3 glom
spark.sparkContext.parallelize(['Hello', 'World'], 2).glom().collect()

[['Hello'], ['World']]

# 13 RDD 고급 개념

In [39]:
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(' ')
words = spark.sparkContext.parallelize(myCollection, 2)

# 13.1 키-값 형태의 RDD

In [40]:
words.map(lambda word : (word.lower(), 1))

## 13.1.1 keyBy
keyword = words.keyBy(lambda word: word.lower()[0])

In [41]:
## 13.1.2 값 매핑하기
keyword.mapValues(lambda word: word.upper()).collect()

[('s', 'SPARK'),
 ('t', 'THE'),
 ('d', 'DEFINITIVE'),
 ('g', 'GUIDE'),
 (':', ':'),
 ('b', 'BIG'),
 ('d', 'DATA'),
 ('p', 'PROCESSING'),
 ('m', 'MADE'),
 ('s', 'SIMPLE')]

In [42]:
keyword.flatMapValues(lambda word: word.upper()).collect()

[('s', 'S'),
 ('s', 'P'),
 ('s', 'A'),
 ('s', 'R'),
 ('s', 'K'),
 ('t', 'T'),
 ('t', 'H'),
 ('t', 'E'),
 ('d', 'D'),
 ('d', 'E'),
 ('d', 'F'),
 ('d', 'I'),
 ('d', 'N'),
 ('d', 'I'),
 ('d', 'T'),
 ('d', 'I'),
 ('d', 'V'),
 ('d', 'E'),
 ('g', 'G'),
 ('g', 'U'),
 ('g', 'I'),
 ('g', 'D'),
 ('g', 'E'),
 (':', ':'),
 ('b', 'B'),
 ('b', 'I'),
 ('b', 'G'),
 ('d', 'D'),
 ('d', 'A'),
 ('d', 'T'),
 ('d', 'A'),
 ('p', 'P'),
 ('p', 'R'),
 ('p', 'O'),
 ('p', 'C'),
 ('p', 'E'),
 ('p', 'S'),
 ('p', 'S'),
 ('p', 'I'),
 ('p', 'N'),
 ('p', 'G'),
 ('m', 'M'),
 ('m', 'A'),
 ('m', 'D'),
 ('m', 'E'),
 ('s', 'S'),
 ('s', 'I'),
 ('s', 'M'),
 ('s', 'P'),
 ('s', 'L'),
 ('s', 'E')]

In [43]:
## 13.1.3 키와 값 추출하기
keyword.keys().collect()

['s', 't', 'd', 'g', ':', 'b', 'd', 'p', 'm', 's']

In [44]:
keyword.values().collect()

['Spark',
 'The',
 'Definitive',
 'Guide',
 ':',
 'Big',
 'Data',
 'Processing',
 'Made',
 'Simple']

In [45]:
## 12.1.4 lookup
keyword.lookup('s')

['Spark', 'Simple']

In [46]:
## 13.1.5 sampleByKey
import random

distinctChars = words.flatMap(lambda word: list(word.lower())).distinct().collect()
sampleMap = dict(map(lambda c: (c, random.random()), distinctChars))

words.map(lambda word: (word.lower()[0], word)).sampleByKey(True, sampleMap, 6).collect()

[('t', 'The'), ('t', 'The'), ('d', 'Definitive'), ('m', 'Made')]

## 13.2 집계

In [47]:
chars = words.flatMap(lambda word: word.lower())
KVcharacters = chars.map(lambda letter: (letter, 1))

def maxFunc(left, right):
    return max(left, right)
def addFunc(left, right):
    return left + right
nums = sc.parallelize(range(1, 31), 5)

In [48]:
KVcharacters.take(5)

[('s', 1), ('p', 1), ('a', 1), ('r', 1), ('k', 1)]

In [49]:
## countByKey
KVcharacters.countByKey()

defaultdict(int,
            {'s': 4,
             'p': 3,
             'a': 4,
             'r': 2,
             'k': 1,
             't': 3,
             'h': 1,
             'e': 7,
             'd': 4,
             'f': 1,
             'i': 7,
             'n': 2,
             'v': 1,
             'g': 3,
             'u': 1,
             ':': 1,
             'b': 1,
             'o': 1,
             'c': 1,
             'm': 2,
             'l': 1})

In [50]:
## groupByKey
from functools import reduce
KVcharacters.groupByKey().map(lambda row: (row[0], reduce(addFunc, row[1]))).collect()

[('s', 4),
 ('p', 3),
 ('r', 2),
 ('h', 1),
 ('d', 4),
 ('i', 7),
 ('g', 3),
 ('b', 1),
 ('c', 1),
 ('l', 1),
 ('a', 4),
 ('k', 1),
 ('t', 3),
 ('e', 7),
 ('f', 1),
 ('n', 2),
 ('v', 1),
 ('u', 1),
 (':', 1),
 ('o', 1),
 ('m', 2)]

In [51]:
## reduceByKey
KVcharacters.reduceByKey(addFunc).collect()

[('s', 4),
 ('p', 3),
 ('r', 2),
 ('h', 1),
 ('d', 4),
 ('i', 7),
 ('g', 3),
 ('b', 1),
 ('c', 1),
 ('l', 1),
 ('a', 4),
 ('k', 1),
 ('t', 3),
 ('e', 7),
 ('f', 1),
 ('n', 2),
 ('v', 1),
 ('u', 1),
 (':', 1),
 ('o', 1),
 ('m', 2)]

In [52]:
## 13.2.3 기타 집계 메서드
## aggregate
nums.aggregate(0, maxFunc, addFunc)

90

In [53]:
depth = 3
nums.treeAggregate(0, maxFunc, addFunc, depth)

90

In [54]:
nums.take(100)

[1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30]

In [55]:
## aggregateByKey
KVcharacters.aggregateByKey(0, addFunc, maxFunc).collect()

[('s', 3),
 ('p', 2),
 ('r', 1),
 ('h', 1),
 ('d', 2),
 ('i', 4),
 ('g', 2),
 ('b', 1),
 ('c', 1),
 ('l', 1),
 ('a', 3),
 ('k', 1),
 ('t', 2),
 ('e', 4),
 ('f', 1),
 ('n', 1),
 ('v', 1),
 ('u', 1),
 (':', 1),
 ('o', 1),
 ('m', 2)]

In [56]:
## combineByKey
def valToCombiner(value):
    return [value]
def mergeValuesFunc(vals, valToAppend):
    vals.append(valToAppend)
    return vals
def mergeCombinerFunc(vals1, vals2):
    return vals1 + vals2
outputPartitions = 6

KVcharacters.combineByKey(valToCombiner, mergeValuesFunc, mergeCombinerFunc, outputPartitions).collect()

[('s', [1, 1, 1, 1]),
 ('d', [1, 1, 1, 1]),
 ('l', [1]),
 ('v', [1]),
 (':', [1]),
 ('p', [1, 1, 1]),
 ('r', [1, 1]),
 ('c', [1]),
 ('k', [1]),
 ('t', [1, 1, 1]),
 ('n', [1, 1]),
 ('u', [1]),
 ('o', [1]),
 ('h', [1]),
 ('i', [1, 1, 1, 1, 1, 1, 1]),
 ('g', [1, 1, 1]),
 ('b', [1]),
 ('a', [1, 1, 1, 1]),
 ('e', [1, 1, 1, 1, 1, 1, 1]),
 ('f', [1]),
 ('m', [1, 1])]

In [57]:
## foldByKey 결합 함수와 항등원인 '제로값'을 이용해 각 키의 값을 병함
## 제로값은 결과에 여러 번 사용될 수 있으나 결과를 변경할 수 없는 값 (덧셈에서는 0 , 곱셉에서는 1)
KVcharacters.foldByKey(0, addFunc).collect()

[('s', 4),
 ('p', 3),
 ('r', 2),
 ('h', 1),
 ('d', 4),
 ('i', 7),
 ('g', 3),
 ('b', 1),
 ('c', 1),
 ('l', 1),
 ('a', 4),
 ('k', 1),
 ('t', 3),
 ('e', 7),
 ('f', 1),
 ('n', 2),
 ('v', 1),
 ('u', 1),
 (':', 1),
 ('o', 1),
 ('m', 2)]

# 13.3 cogroup

In [59]:
distinctChars = words.flatMap(lambda word: word.lower()).distinct()
charRDD = distinctChars.map(lambda c: (c, random.random()))
charRDD2 = distinctChars.map(lambda c: (c, random.random()))

charRDD.cogroup(charRDD2).take(5)

[('s',
  (<pyspark.resultiterable.ResultIterable at 0x7f55c574de90>,
   <pyspark.resultiterable.ResultIterable at 0x7f55c4827fd0>)),
 ('p',
  (<pyspark.resultiterable.ResultIterable at 0x7f55c4827e90>,
   <pyspark.resultiterable.ResultIterable at 0x7f55c40a8d90>)),
 ('r',
  (<pyspark.resultiterable.ResultIterable at 0x7f55c4803d10>,
   <pyspark.resultiterable.ResultIterable at 0x7f55c4803790>)),
 ('i',
  (<pyspark.resultiterable.ResultIterable at 0x7f55c4748190>,
   <pyspark.resultiterable.ResultIterable at 0x7f55c40a8f90>)),
 ('g',
  (<pyspark.resultiterable.ResultIterable at 0x7f55c5e20590>,
   <pyspark.resultiterable.ResultIterable at 0x7f55c5e203d0>))]

# 13.4 조인

In [62]:
## 13.4.1 내부 조인
keyedChars = distinctChars.map(lambda c: (c, random.random()))
outputPartitions = 10

KVcharacters.join(keyedChars).count()

51

In [63]:
KVcharacters.join(keyedChars, outputPartitions).count()

51

In [64]:
## 13.4.2 zip
numRange = sc.parallelize(range(10), 2)
words.zip(numRange).collect()

[('Spark', 0),
 ('The', 1),
 ('Definitive', 2),
 ('Guide', 3),
 (':', 4),
 ('Big', 5),
 ('Data', 6),
 ('Processing', 7),
 ('Made', 8),
 ('Simple', 9)]

# 13.5 파티션 제어하기

In [65]:
## 13.5.1 coalesce
words.coalesce(1).getNumPartitions()

1

In [66]:
## 13.5.2 repartition
words.repartition(10)

MapPartitionsRDD[191] at coalesce at NativeMethodAccessorImpl.java:0

In [69]:
## 13.5.4 사용자 정의 파티셔닝
path = "file:///home/ubuntu/Spark-The-Definitive-Guide-master/data/retail-data/all/online-retail-dataset.csv"
df = spark.read.option("header", 'true').option('inferSchema', 'true').csv(path)

In [70]:
rdd = df.coalesce(10).rdd
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [71]:
def partitionFunc(key):
    import random
    
    if key == 17850 or key == 12583:
        return 0
    else:
        return random.randint(1,2)

keyedRDD = rdd.keyBy(lambda row: row[6])

keyedRDD\
.partitionBy(3, partitionFunc)\
.map(lambda x: x[0])\
.glom()\
.map(lambda x: len(set(x)))\
.take(5)

[2, 4302, 4303]

# 14장 분산형 공유 변수

# 14.1 브로드캐스트 변수

In [72]:
my_collection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(' ')
words = sc.parallelize(my_collection, 2)

In [73]:
supplementalData = {"Spark":1000, "Definitive":200, "Big":-300, "Simple":100}

suppBroadcast = spark.sparkContext.broadcast(supplementalData)
suppBroadcast.value

{'Spark': 1000, 'Definitive': 200, 'Big': -300, 'Simple': 100}

In [74]:
words.map(lambda word: (word, suppBroadcast.value.get(word, 0)))\
.sortBy(lambda wordPair: wordPair[1])\
.collect()

[('Big', -300),
 ('The', 0),
 ('Guide', 0),
 (':', 0),
 ('Data', 0),
 ('Processing', 0),
 ('Made', 0),
 ('Simple', 100),
 ('Definitive', 200),
 ('Spark', 1000)]

# 14.2 어큐뮬레이터

In [75]:
## 14.2.1 기본 예제
path = "file:///home/ubuntu/Spark-The-Definitive-Guide-master/data/flight-data/parquet/2010-summary.parquet"
flights = spark.read.parquet(path)

In [76]:
accChina = sc.accumulator(0)

In [77]:
def accChinaFunc(flight_row):
    destination = flight_row['DEST_COUNTRY_NAME']
    origin = flight_row['ORIGIN_COUNTRY_NAME']
    
    if destination == 'China':
        accChina.add(flight_row['count'])
    if origin == 'China':
        accChina.add(flight_row['count'])
        
flights.foreach(lambda flight_row: accChinaFunc(flight_row))

In [78]:
## 스파크 ui 에서 확인 가능
accChina.value

953