In [1]:
from pyspark import SparkContext

In [2]:
sparkContext = SparkContext()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/08 00:13:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# 数据准备

lineRDD = sparkContext.textFile("../../res/wikiOfSpark.txt")
cleanWordRDD = (
    lineRDD
    .flatMap(lambda line: line.split(" "))
    .filter(lambda word: word != "")
)

## map 以元素为粒度的数据转换

<hr />

```scala
def f(word: String): (String, Int) = {
    return (word, 1)
}

val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(f)
```
<hr />

转换成 python

In [4]:
def f(word):
    return (word, 1)


kvRDD = cleanWordRDD.map(f)

kvRDD.take(3)

                                                                                

[('Apache', 1), ('Spark', 1), ('From', 1)]

## mapPartitions：以数据分区为粒度的数据转换

map 实现方式

<hr />

```scala
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map{ word =>
    // 获取MD5对象实例
    val md5 = MessageDigest.getInstance("MD5")
    // 使用MD5计算哈希值
    val hash = md5.digest(word.getBytes).mkString
    // 返回哈希值与数字1的Pair
    (hash, 1)
}
```
<hr />

转换成 python

In [5]:
from hashlib import md5


def f(word):
    hash = md5(word.encode("utf8"))
    return (hash.hexdigest(), 1)


kvRDD = cleanWordRDD.map(f)

kvRDD.take(5)

[('e9713ae04a02a810d6f33dd956f42794', 1),
 ('8cde774d6f7333752ed72cacddb05126', 1),
 ('5da618e8e4b89c66fe86e32cdafde142', 1),
 ('82b4ef4154f1823e73cf6191e307196c', 1),
 ('8fc42c6ddf9966db3b09e84365034357', 1)]

使用 mapPartitions 优化

<hr />

```scala
val kvRDD: RDD[(String, Int)] = cleanWordRDD.mapPartitions( partition => {
    // 注意！这里是以数据分区为粒度，获取MD5对象实例
    val md5 = MessageDigest.getInstance("MD5")
    val newPartition = partition.map( word => {
    // 在处理每一条数据记录的时候，可以复用同一个Partition内的MD5对象
    (md5.digest(word.getBytes()).mkString,1)
    })
    newPartition
})
```
<hr />

转 Python

In [6]:
def f(partition):
    for word in partition:
        hash = md5(word.encode("utf8"))
        yield (hash.hexdigest(), 1)


kvRDD = cleanWordRDD.mapPartitions(f)

kvRDD.take(5)


[('e9713ae04a02a810d6f33dd956f42794', 1),
 ('8cde774d6f7333752ed72cacddb05126', 1),
 ('5da618e8e4b89c66fe86e32cdafde142', 1),
 ('82b4ef4154f1823e73cf6191e307196c', 1),
 ('8fc42c6ddf9966db3b09e84365034357', 1)]

In [7]:
def f(idx, partition):
    for word in partition:
        hash = md5(word.encode("utf8"))
        yield (hash.hexdigest(), 1)


kvRDD = cleanWordRDD.mapPartitionsWithIndex(f)

kvRDD.take(5)

[('e9713ae04a02a810d6f33dd956f42794', 1),
 ('8cde774d6f7333752ed72cacddb05126', 1),
 ('5da618e8e4b89c66fe86e32cdafde142', 1),
 ('82b4ef4154f1823e73cf6191e307196c', 1),
 ('8fc42c6ddf9966db3b09e84365034357', 1)]

## flatMap：从元素到集合、再从集合到元素

<hr />

```scala
val wordPairRDD: RDD[String] = lineRDD.flatMap( line => {
    // 将行转换为单词数组
    val words: Array[String] = line.split(" ")
    // 将单个单词数组，转换为相邻单词数组
    for (i <- 0 until words.length - 1) yield words(i) + "-" + words(i+1)
})
```
<hr />

转换成 python

In [8]:
def f(line):
    words = line.split(" ")
    for i in range(len(words) - 1):
        yield words[i] + "-" + words[i + 1]


wordPairRDD = lineRDD.flatMap(f)

wordPairRDD.take(5)

['Apache-Spark',
 'From-Wikipedia,',
 'Wikipedia,-the',
 'the-free',
 'free-encyclopedia']

## filter：过滤

<hr />

```scala
// 定义特殊字符列表
val list: List[String] = List("&", "|", "#", "^", "@")
 
// 定义判定函数f
def f(s: String): Boolean = {
    val words: Array[String] = s.split("-")
    val b1: Boolean = list.contains(words(0))
    val b2: Boolean = list.contains(words(1))
    return !b1 && !b2 // 返回不在特殊字符列表中的词汇对
}
 
// 使用filter(f)对RDD进行过滤
val cleanedPairRDD: RDD[String] = wordPairRDD.filter(f)
```
<hr />

转换成 python

In [9]:
special_chars = ["&", "|", "#", "^", "@"]


def f(str):
    words = str.split("-")
    b1 = words[0] in special_chars
    b2 = words[1] in special_chars

    # return not (b1 or b2)
    return b1 or b2


cleanedPairRDD = wordPairRDD.filter(f)

cleanedPairRDD.take(5)

['1.3.0-|', '|-Apache', 'Cassandra-|', '|-Pluralsight".', '"MLlib-|']