本文介绍spark中RDD的基本操作。RDD全称是Resilient Distributed Datasets（即弹性分布式数据集），它是spark的一种抽象数据类型。


## RDD的创建

创建RDD的方法一般有两种，第一种方式是从外部读取数据集，另一种是在程序里生成。下面结合两个例子来演示spark中如何创建RDD。



### 程序内部创建RDD
下面通过一个例子来说明spark中创建RDD的方法，该例中我们首先在程序里初始化一个由一组整数组成的RDD，接着将这组整数平方。

In [1]:
from pyspark import SparkContext
sc = SparkContext(appName='square the numbers')
nums = sc.parallelize([1,2,3,4])
squared = nums.map(lambda x:x * x)
for num in squared.collect():
    print '%i '%num

1 
4 
9 
16 


`SparkContext`是spark的上下文，任何spark程序都需要申请一个spark上下文来运行；通过`parallelize`方法，我们可以快速地在程序中生成一个RDD数据集；`collect`函数用于将rdd以list的形式载入到驱动程序的内存。


### 外部读取数据集创建RDD



In [2]:
lines = sc.textFile('/home/hschen/Data/wordcount.txt')
for i, line in enumerate(lines.collect()):
    print 'line %d:%s'%(i+1, line)

line 1:We've all heard the scare stories about North Korea: the homemade nuclear arsenal built while their people starve and then aimed imprecisely at the rest of the world, a 
line 2:leader so deluded he makes L Ron Hubbard look like a man excessively overburdened with self-doubt and their deep-seated belief that foreign capitalists will invade at any 
line 3:moment and steal all their bauxite.
line 4:The popular portrayal of this Marxist nation is something like one of the more harrowing episodes of M*A*S*H, only with the cast of wacky characters replaced by twitchy, 
line 5:    heavily armed Stalinist meth addicts
line 6:    Cracked would like to take a moment to celebrate the good things about North Korea though, the things that the country's enemies prefer to suppress as part of their politically 
line 7:    motivated jealousy. Like how no different to you and me, there's nothing every North Korean likes more after an 18 hour shift at the phosphorus plant than a nice beer to go wi

`sc.textFile()`方法用于从外部读取文本文件并创建RDD，该RDD由文本文件的所有行组成。

# RDD基本操作

RDD的操作符可以分为`transform`和`action`两类。

-  `transform` 

transform的作用是将一个RDD映射到另一个RDD。

-  `action` 

由于spark的计算采用的是lazy evaluation的机制，`transform`只是定义了一系列的变换操作，只有当程序执行`action`操作时才会有实质上的计算，其结果被返回给驱动程序或写入文件系统。

如果你无法区分一个函数属于哪种操作，可以查看该函数的返回值。如果返回值是RDD，那么它是transform操作；如果返回的是其他的数据类型，那么就是action操作。

## Spark的惰性计算机制

前面提到，Spark的计算模型是一种惰性计算（Lazy Evaluation）的方式，这意味着`transform`操作并不会马上得到执行，而是等到`action`操作被调用时才一并执行。

## 常用的transform操作

常用的transform操作有：

- `map`：对RDD中的每个元素执行相同的操作，并返回由操作的结果构成的RDD
- `filter`：根据条件过滤数据，筛选条件判断为true的元素
- `flatMap`：类似python的itertools.chain，把结果中所有可迭代对象里面的元素放在同一个可迭代对象内
- `distinct`：rdd集合去重
- `sample`：rdd集合随机抽样（有放回或无放回）



### map操作符

假设我们有一个RDD数据集{1,2,3,3}，我们通过map函数给RDD中每个元素加1

In [3]:
nums = sc.parallelize([1,2,3,3])
result = nums.map(lambda x:x + 1)
print result.collect()

[2, 3, 4, 4]


### filter操作符

假设我们想找出RDD中大于1的元素

In [4]:
result = nums.filter(lambda x:x > 1)
print result.collect()

[2, 3, 3]


### flatMap

In [5]:
result = nums.flatMap(lambda x:(x, 100*x, x**2))
print result.collect()

[1, 100, 1, 2, 200, 4, 3, 300, 9, 3, 300, 9]


### distinct
我们用distinct操作符来对RDD中的元素进行去重

In [6]:
result = nums.distinct()
print result.collect()

[1, 2, 3]


### sample

我们可以用sample操作符来对RDD中的元素进行无放回的采样

In [7]:
for r in range(5):
    result = nums.sample(False, 0.5)
    print 'round %d'%r
    print result.collect()

round 0
[1, 2, 3]
round 1
[1, 2]
round 2
[1, 2, 3]
round 3
[2, 3, 3]
round 4
[3, 3]


## 常用的action操作

常用的action操作
- `reduce`:对RDD中的两个元素执行操作，返回相同类型
- `collect`：返回rdd中所有的元素，执行collect操作时，驱动程序会从各个执行器收集数据并写入到内存，值得注意的是如果内存不够，该操作会失败，并发生内存溢出
- `take(n)`:取rdd的前n个元素
- `top(n)`：取rdd的top n个元素
- `count`：rdd中有多少元素
- `countByValue`：rdd中每个元素出现次数
- `takeOrdered`：按指定顺序返回RDD中前n个元素
- `takeSample`：随机返回n个元素（放回或无放回）
- `fold`：与reduce功能相同，不同之处在于0作为初始值，而reduce以rdd的第一个元素作为初始值
- `aggregate`：与reduce作用类似，不同的是可以返回与输入RDD不同的类型
- `foreach`：遍历RDD中的元素。注意，如果对RDD执行foreach，只会在Executor端有效，而并不是Driver端。比如：rdd.foreach(println)，只会在Executor的stdout中打印出来，Driver端是看不到的。

### reduce

用reduce操作求RDD中所有元素之和

In [8]:
nums.reduce(lambda x,y:x+y)

9

其中x代表返回值，y表示对RDD中每个元素的遍历

### collect

collect用于返回RDD中的所有元素

In [9]:
print nums.collect()

[1, 2, 3, 3]


### take

用take操作符返回RDD的前2个元素

In [10]:
print nums.take(2)

[1, 2]


### top

我们可以用top操作符取得RDD中top n的元素

In [11]:
print nums.top(2)

[3, 3]


### count


In [12]:
print nums.count()

4


### countByValue

In [13]:
result = nums.countByValue()
for k,v in result.iteritems():
    print 'key:{0},count:{1}'.format(k, v)

key:1,count:1
key:2,count:1
key:3,count:2


### takeOrdered

按降序排列，返回前3个元素

In [14]:
print nums.takeOrdered(3, lambda x:-x)

[3, 3, 2]


### takeSample

有放回地随机取出10个元素

In [15]:
print nums.takeSample(True, 10)

[2, 2, 3, 2, 2, 1, 1, 2, 1, 2]


### fold


In [16]:
nums.fold(0, lambda x, y: x + y)

9

### aggregate

aggregate与reduce不同的是可以返回与输入RDD不同的类型。这个例子中，我们用aggregate操作符分别计算RDD集合的累加和计数，然后求得RDD中元素的平均值

In [17]:
seqOp = lambda x, y: (x[0] + y, x[1] + 1)
combOp = lambda x, y: (x[0] + y[0], x[1] + y[1])
neutral_zero_value = (0, 0)
result = nums.aggregate(neutral_zero_value, seqOp,combOp)
total, cnt = result
print 'average is:{}'.format(total/ float(cnt))

average is:2.25


解读一下这段代码。上文介绍`reduce`操作符的时候提到匿名函数 
```py
lambda x,y:x+y
```

中x代表函数的返回值，y用于遍历RDD中各个元素。这里，neutral_zero_value指定了返回值的初值（tuple类型），因此seqOp中的x是一个tuple，x[0]表示累加值，x[1]表示计数值，而y依旧表示RDD中各元素的遍历。combOp的作用是对各个Executor上的结果进行合并，比如Executor1上返回结果是(5,3)，Executor2上返回的结果是(4,1)，那么combOp的结果是(5+4,3+1)，即(9,4)

### foreach



In [18]:
cnt = sc.accumulator(0)
def accumulate(x):
    global cnt
    cnt += x
nums.foreach(accumulate)
print cnt.value

9


## 练习：用spark实现wordcount

In [19]:
tokens = lines.flatMap(lambda x:x.split())
word_count = tokens.countByValue()
word_count_sort = sorted(word_count.iteritems(),key=lambda x:x[1], reverse=True)
for word, count in word_count_sort:
    # 过滤掉词频小于5的单词
    if count < 5:
        break
    print 'word:{0},count:{1}'.format(word, count)

word:the,count:22
word:to,count:11
word:of,count:11
word:and,count:11
word:a,count:10
word:their,count:6
word:with,count:6
word:North,count:5


# 参考

1.Learning Spark

2.http://www.jianshu.com/p/15739e95a46e

3.https://www.iteblog.com/archives/1396.html