In [1]:
sc

In [2]:
lines = sc.textFile("../resources/testfile.md")

#### 创建RDD-对集合并行化

In [3]:
x = sc.parallelize(["hello", "spark"])
x

ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:194

In [4]:
x.collect()

['hello', 'spark']

#### filter()转化操作

In [5]:
pythonLines = lines.filter(lambda line: "Python" in line)
pythonLines.first()

u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that'

#### 行动函数：count() take() collect() first()

In [6]:
pythonLines.count()

3

In [8]:
pythonLines.take(2)

[u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that',
 u'## Interactive Python Shell']

#### map()

In [9]:
two = sc.parallelize(["hello spark", "I am learning"])
lists = two.map(lambda x: x.split(" "))
print(lists.count())
lists.collect()

2


[['hello', 'spark'], ['I', 'am', 'learning']]

#### flatMap()相当于把map(split)得到列表再合并成一个RDD

In [10]:
words = two.flatMap(lambda line: line.split(" "))
print(words.count())
words.collect()

5


['hello', 'spark', 'I', 'am', 'learning']

#### cartesian()笛卡尔积转化

In [12]:
pythonLines.cartesian(two).collect()

[(u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that',
  'hello spark'),
 (u'## Interactive Python Shell', 'hello spark'),
 (u'Alternatively, if you prefer Python, you can use the Python shell:',
  'hello spark'),
 (u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that',
  'I am learning'),
 (u'## Interactive Python Shell', 'I am learning'),
 (u'Alternatively, if you prefer Python, you can use the Python shell:',
  'I am learning')]

#### sample()采样

In [20]:
sam = words.sample(True, 0.5)
print(sam.collect())

['hello', 'hello', 'I', 'am']


In [22]:
sam2 = words.sample(False, 0.5)
print(sam2.collect())

['hello', 'I', 'am']


#### 行动函数 reduce()

In [33]:
rdd = sc.parallelize(range(10))
rdd.reduce(lambda x, y: x + y)

45

In [34]:
rdd.fold(100, lambda x, y: x + y)

545

In [35]:
rdd.getNumPartitions()

4

这里是4个分区，每个分区都有个初始值100，而结果是多了500，说明应该是4个分区执行后汇总到一个新的分区，而这个分区也有个初始值100

#### aggregate()

In [39]:
sumCount = rdd.aggregate((0, 0),
              (lambda acc, value: (acc[0] + value, acc[1] + 1)),
              (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))
             )
print(sumCount)
float(sumCount[0]) / sumCount[1]

(45, 10)


4.5

aggregate(zeroValue, seqOp, combOp) seqOp和combOp其实都类似于fold()操作，只不过seqOp在每个分区上执行，最终得到一个** (分区元素和, 计数) **这样的结果，而combOp是把所有分区的结果进步一汇总计算，得到** (整个RDD和, 整个RDD计数) **
```
seqOp： 第1步 (0,0), 第1个元素x => (x, 1)
      第2步 (x,1), 第2个元素y => (x+y, 2)
      ...
      
combOp：(0,0) + (分区1和, 分区1计数) + ...
```

### 创建pariRDD

In [46]:
pairRDD = lines.map(lambda x: (x.split(" ")[0], x))
pairRDD.take(3)

[(u'#', u'# Apache Spark'),
 (u'', u''),
 (u'Spark',
  u'Spark is a fast and general cluster computing system for Big Data. It provides')]

In [48]:
pair = sc.parallelize([(1, 3), (2, 5)])
pair.collect()

[(1, 3), (2, 5)]

#### combinerByKey()

In [74]:
comRDD = sc.parallelize([("fancy", 3), ("fancy", 5), ("chuan", 3), ("what", 9)])
sumCount = comRDD.combineByKey(
                (lambda x: (x, 1)),
                (lambda kv, v: (kv[0] + v, kv[1] + 1)),
                (lambda x, y: (x[0] + y[0], x[1] + y[1])))
sumCount.map(lambda kxy: (kxy[0], kxy[1][0]/kxy[1][1])).collectAsMap()

{'chuan': 3, 'fancy': 4, 'what': 9}

In [75]:
sumCount.map(lambda (key, xy): (key, xy[0]/xy[1])).collect()

[('chuan', 3), ('what', 9), ('fancy', 4)]

注意最后一步map操作， sumCount.map(lambda key, xy: (key, xy[0]/xy[1])).collect() 是不行的，map传入的是一个元素（二元组），要加个括号才能自动拆包

#### 自定义分区数
- 创建RDD时指定分区数，不指定的话，跟集群环境有关。比如本地运行spark时 local[3] 就是3个分区

In [79]:
# 默认4个分区
comRDD.getNumPartitions()

4

In [80]:
# 指定3个分区
comRDD2 = sc.parallelize([("fancy", 3), ("fancy", 5), ("chuan", 3), ("what", 9)], 3)
comRDD2.getNumPartitions()

3

In [82]:
# 指定10个分区，也就是说一个集群节点上可以有多个分区
comRDD3 = sc.parallelize([("fancy", 3), ("fancy", 5), ("chuan", 3), ("what", 9)], 10)
comRDD3.getNumPartitions()

10

- 修改原有的RDD分区，通过coalesce(),repartition()也可以，只是性能比coalesce()差

In [81]:
comRDD.coalesce(2).getNumPartitions()

2

In [78]:
words.coalesce?