In [1]:
from pyspark import SparkConf, SparkContext

In [2]:
conf = SparkConf().setMaster("local").setAppName("App")
sc = SparkContext(conf=conf)

In [3]:
# 上传本地文件到hdfs spark目录下
# hadoop fs -copyFromLocal /data/data.txt /spark/  
lines = sc.textFile('hdfs://localhost:9000/spark/data.txt')

In [4]:
# 统计单词的数量
wordCount = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a, b : a + b)
wordCount.collect()

[('one', 2), ('two', 2), ('three', 2), ('four', 2), ('five', 2), ('six', 1)]

In [5]:
# 统计字符的数量 
lineLengths = lines.map(lambda s:len(s)) 
lineLengths.reduce(lambda a, b: a + b)

47

In [6]:
# 统计包含'e'的行的数量
lines.filter(lambda line: 'e' in line).count()

4

In [7]:
# 找出单行文本中包含单词数量的最大值
lines.map(lambda line: len(line.split(" "))).reduce(lambda a, b: (a > b and a or b))

3

In [8]:
# 持久化
slist = ["Hadoop", "Spark", "Hive"]
rdd = sc.parallelize(slist)
rdd.cache()
rdd.count()

3

In [9]:
','.join(rdd.collect())

'Hadoop,Spark,Hive'

In [10]:
# 键值对RDD 从文件中加载
pairRDD1 = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1))

In [11]:
# 并行集合创建RDD
pairRDD2 = rdd.map(lambda word: (word, 1))

In [12]:
# 合并具有相同键的值
pairRDD1.reduceByKey(lambda a, b: a+b).collect()

[('one', 2), ('two', 2), ('three', 2), ('four', 2), ('five', 2), ('six', 1)]

In [13]:
# 对具有相同键的值进行分组
pairRDD1.groupByKey().collect()

[('one', <pyspark.resultiterable.ResultIterable at 0x1ae333ffcc0>),
 ('two', <pyspark.resultiterable.ResultIterable at 0x1ae333ff940>),
 ('three', <pyspark.resultiterable.ResultIterable at 0x1ae333ffd30>),
 ('four', <pyspark.resultiterable.ResultIterable at 0x1ae333ff198>),
 ('five', <pyspark.resultiterable.ResultIterable at 0x1ae333ffc88>),
 ('six', <pyspark.resultiterable.ResultIterable at 0x1ae333ffa90>)]

In [14]:
# 对RDD中的key返回形成新的RDD
pairRDD1.keys().collect()

['one',
 'two',
 'three',
 'four',
 'five',
 'six',
 'four',
 'five',
 'one',
 'two',
 'three']

In [15]:
# 将RDD中value返回形成新的RDD
pairRDD1.values().collect()

[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

In [16]:
# 根据键进行排序
pairRDD1.sortByKey().collect()

[('five', 1),
 ('five', 1),
 ('four', 1),
 ('four', 1),
 ('one', 1),
 ('one', 1),
 ('six', 1),
 ('three', 1),
 ('three', 1),
 ('two', 1),
 ('two', 1)]

In [17]:
# 对RDD的value部分进行处理
pairRDD1.mapValues(lambda x: x + 1).collect()

[('one', 2),
 ('two', 2),
 ('three', 2),
 ('four', 2),
 ('five', 2),
 ('six', 2),
 ('four', 2),
 ('five', 2),
 ('one', 2),
 ('two', 2),
 ('three', 2)]