# Apache Spark RDD API Examples

python version of http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html  
not all covered

---
另一個簡中系列
- [Spark Python API函数学习：pyspark API(1) – 过往记忆](https://www.iteblog.com/archives/1395.html)
- [Spark Python API函数学习：pyspark API(2) – 过往记忆](https://www.iteblog.com/archives/1396.html)
- [Spark Python API函数学习：pyspark API(3) – 过往记忆](https://www.iteblog.com/archives/1399.html)
- [Spark Python API函数学习：pyspark API(4) – 过往记忆](https://www.iteblog.com/archives/1400.html)

## aggregate

### aggregate integer

In [1]:
data = sc.parallelize(range(1, 7), 2)

In [2]:
def myfunc(index, it):
    for x in it:
        yield "[partID:%s, val:%s]" % (str(index), str(x))

In [3]:
data.mapPartitionsWithIndex(myfunc).collect()

['[partID:0, val:1]',
 '[partID:0, val:2]',
 '[partID:0, val:3]',
 '[partID:1, val:4]',
 '[partID:1, val:5]',
 '[partID:1, val:6]']

In [4]:
data.aggregate(0, lambda x,y: max(x,y), lambda x,y: x+y)
# 0 + max(0,1,2,3) + max(0, 4, 5, 6) = 0 + 3 + 6 = 9

9

In [5]:
data.aggregate(5, lambda x,y: max(x,y), lambda x,y: x+y)
# 5 + max(5, 1, 2, 3) + max(5, 4, 5, 6) = 5 + 5 + 6 = 16

16

### aggregate string

In [6]:
data = sc.parallelize('abcdef', 2)

In [7]:
def myfunc(index, it):
    for x in it:
        yield "[partID:%s, val:%s]" % (str(index), str(x))

In [8]:
data.mapPartitionsWithIndex(myfunc).collect()

['[partID:0, val:a]',
 '[partID:0, val:b]',
 '[partID:0, val:c]',
 '[partID:1, val:d]',
 '[partID:1, val:e]',
 '[partID:1, val:f]']

In [9]:
data.aggregate('', lambda x,y: x+y, lambda x,y: x+y)

'abcdef'

In [10]:
data.aggregate('x', lambda x,y: x+y, lambda x,y: x+y)

'xxabcxdef'

### aggregate others

In [11]:
data = sc.parallelize(['12', '23', '345', '4567'], 2)

In [12]:
data.aggregate('', lambda x,y: str(max(len(x), len(y))), lambda x,y: x+y)

'24'

In [13]:
data.aggregate('', lambda x,y: str(min(len(x), len(y))), lambda x,y: x+y)

'11'

In [14]:
data = sc.parallelize(['12', '23', '345', ''], 2)

In [15]:
data.aggregate('', lambda x,y: str(min(len(x), len(y))), lambda x,y: x+y)

'10'

In [16]:
data = sc.parallelize(['12', '23', '', '345'], 2)

In [17]:
data.aggregate('', lambda x,y: str(min(len(x), len(y))), lambda x,y: x+y)

'11'

## aggregateByKey

In [18]:
pairRDD = sc.parallelize([('cat', 2), ('cat', 5), ('mouse', 4), ('cat', 2), ('dog', 12), ('mouse', 2)], 2)

In [19]:
def myfunc(index, it):
    for x in it:
        yield '[partID:%s, val:%s]' % (str(index), str(x))

In [20]:
pairRDD.mapPartitionsWithIndex(myfunc).collect()

["[partID:0, val:('cat', 2)]",
 "[partID:0, val:('cat', 5)]",
 "[partID:0, val:('mouse', 4)]",
 "[partID:1, val:('cat', 2)]",
 "[partID:1, val:('dog', 12)]",
 "[partID:1, val:('mouse', 2)]"]

In [21]:
pairRDD.aggregateByKey(0, max, lambda x,y: x+y).collect()

[('mouse', 6), ('dog', 12), ('cat', 7)]

In [22]:
pairRDD.aggregateByKey(100, max, lambda x,y: x+y).collect()

[('mouse', 200), ('dog', 100), ('cat', 200)]

## cartesian

In [23]:
x = sc.parallelize(range(5))
y = sc.parallelize(range(5, 10))

In [24]:
x.cartesian(y).collect()

[(0, 5),
 (0, 6),
 (0, 7),
 (0, 8),
 (0, 9),
 (1, 5),
 (1, 6),
 (1, 7),
 (1, 8),
 (1, 9),
 (2, 5),
 (2, 6),
 (2, 7),
 (2, 8),
 (2, 9),
 (3, 5),
 (3, 6),
 (3, 7),
 (3, 8),
 (3, 9),
 (4, 5),
 (4, 6),
 (4, 7),
 (4, 8),
 (4, 9)]

## checkpoint

In [25]:
sc.setCheckpointDir('somewhere')

In [26]:
data = sc.parallelize(range(4))

In [27]:
data.checkpoint()

In [28]:
data.count()

4

## coalesce & repartition

In [29]:
y = sc.parallelize(range(10), 10)
z = y.coalesce(2, False)
# y.repartition(2) == y.coalesce(2, True)

In [30]:
z.getNumPartitions()

2

## cogroup & groupWith

In [31]:
a = sc.parallelize([1,2,1,4], 1)
b = a.map(lambda x: (x, 'b'))
c = a.map(lambda x: (x, 'c'))
for key, val in b.cogroup(c).collect():
    print key, [[x for x in y] for y in val]

2 [['b'], ['c']]
4 [['b'], ['c']]
1 [['b', 'b'], ['c', 'c']]


In [32]:
x = sc.parallelize([(1, 'apple'), (2, 'banana'), (3, 'orange'), (4, 'kiwi')], 2)
y = sc.parallelize([(5, 'computer'), (1, 'laptop'), (1, 'desktop'), (4, 'iPad')], 2)
for key, val in x.cogroup(y).collect():
    print key, [[x for x in y] for y in val]

4 [['kiwi'], ['iPad']]
1 [['apple'], ['laptop', 'desktop']]
5 [[], ['computer']]
2 [['banana'], []]
3 [['orange'], []]


## collect & collectAsMap

In [33]:
c = sc.parallelize(['Gnu', 'Cat', 'Rat', 'Dog', 'Gnu', 'Rat'], 2)
c.collect()

['Gnu', 'Cat', 'Rat', 'Dog', 'Gnu', 'Rat']

In [34]:
a = sc.parallelize([1, 2, 1, 3], 1)
b = sc.parallelize([4, 6, 7, 6], 1)
c = a.zip(b)
c.collectAsMap()

{1: 7, 2: 6, 3: 6}

## combineByKey

In [35]:
a = sc.parallelize(['dog', 'cat', 'gnu', 'salmon', 'rabbit', 'turkey', 'wolf', 'bear', 'bee'], 3)
b = sc.parallelize([1, 1, 2, 2, 2, 1, 2, 2, 2], 3)
c = b.zip(a)
d = c.combineByKey(lambda x: [x], lambda x,y: x + [y], lambda x,y: x+y)

In [36]:
d.collect()

[(1, ['dog', 'cat', 'turkey']),
 (2, ['gnu', 'salmon', 'rabbit', 'wolf', 'bear', 'bee'])]