# RDD编程代码

In [1]:
from pyspark import SparkContext
sc = SparkContext('local', 'RDD Programming')

### RDD基础 

##### 使用SparkContext的parallelize()方法将程序中存在的集合转换为RDD 

In [2]:
lines = sc.parallelize(['pandas', 'i like pandas'])

In [8]:
type(lines)

pyspark.rdd.RDD

In [9]:
lines.collect()

['pandas', 'i like pandas']

##### 使用sc.textFile()将外部文件转换为RDD

In [10]:
jupyter_log = sc.textFile('../data/jupyter.log')

In [11]:
jupyter_log.count()

98

##### 转换操作是惰性求值，只有遇到行动操作之后才会计算 

In [12]:
# 文件不存在，运行后并不会报错
jupyter_log = sc.textFile('../data/jupyter.log_')

In [13]:
# 执行行动操作后才会报错
jupyter_log.count()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/jovyan/work/data/jupyter.log_
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:55)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


##### 转换操作例子，取log中包含'JupyterLab'的行数，使用转换操作filter()

In [19]:
jupyter_log = sc.textFile('../data/jupyter.log')
# filter操作不会改变=已有的jupyter_log中的数据，而是创建了一个新的RDD，jupyter_log在后续的实验中还会继续使用
jupyter_rdd = jupyter_log.filter(lambda line: 'JupyterLab' in line)

In [20]:
# count()是行动操作，返回RDD中元素的数目
jupyter_rdd.count()

10

In [21]:
# collect()也是行动操作
jupyter_rdd.collect()

['[I 06:48:27.829 LabApp] JupyterLab extension loaded from /opt/conda/lib/python3.7/site-packages/jupyterlab',
 '[I 06:48:27.829 LabApp] JupyterLab application directory is /opt/conda/share/jupyter/lab',
 '[W 06:48:27.830 LabApp] JupyterLab server extension not enabled, manually loading...',
 '[I 06:48:27.836 LabApp] JupyterLab extension loaded from /opt/conda/lib/python3.7/site-packages/jupyterlab',
 '[I 06:48:27.836 LabApp] JupyterLab application directory is /opt/conda/share/jupyter/lab',
 '[I 06:49:07.791 LabApp] JupyterLab extension loaded from /opt/conda/lib/python3.7/site-packages/jupyterlab',
 '[I 06:49:07.791 LabApp] JupyterLab application directory is /opt/conda/share/jupyter/lab',
 '[W 06:49:07.792 LabApp] JupyterLab server extension not enabled, manually loading...',
 '[I 06:49:07.797 LabApp] JupyterLab extension loaded from /opt/conda/lib/python3.7/site-packages/jupyterlab',
 '[I 06:49:07.798 LabApp] JupyterLab application directory is /opt/conda/share/jupyter/lab']

In [22]:
# 使用行动操作take() 从RDD中获得一些例子

In [23]:
for line in jupyter_rdd.take(5):
    print(line)

[I 06:48:27.829 LabApp] JupyterLab extension loaded from /opt/conda/lib/python3.7/site-packages/jupyterlab
[I 06:48:27.829 LabApp] JupyterLab application directory is /opt/conda/share/jupyter/lab
[W 06:48:27.830 LabApp] JupyterLab server extension not enabled, manually loading...
[I 06:48:27.836 LabApp] JupyterLab extension loaded from /opt/conda/lib/python3.7/site-packages/jupyterlab
[I 06:48:27.836 LabApp] JupyterLab application directory is /opt/conda/share/jupyter/lab


### 向Spark传递函数

##### 使用lambda表达式

In [31]:
jupyter_rdd = jupyter_log.filter(lambda line: 'JupyterLab' in line)
jupyter_rdd.first()

'[I 06:48:27.829 LabApp] JupyterLab extension loaded from /opt/conda/lib/python3.7/site-packages/jupyterlab'

In [27]:
# 定义函数 如果包含JupyterLab则返回True，不包含则返回False
def containJupyterLab(s):
    return 'JupyterLab' in s

In [28]:
jupyter_rdd = jupyter_log.filter(containJupyterLab)

In [30]:
jupyter_rdd.first()

'[I 06:48:27.829 LabApp] JupyterLab extension loaded from /opt/conda/lib/python3.7/site-packages/jupyterlab'

***传递函数的时候需要小心***，Python会在不经意间把函数所在的整个对象也序列化传出去。<br>
当你床底的对象时某一个对象的成员时，或者包含了某个对象中一个字段的引用时（例如self.field）,<br>
Spark会将整个对象发到worker上，有可能比预想要发送的内容大。<br>
有时候传递的内容，***包含python不知道如何序列化传输的对象***，也会导致程序失败。

In [32]:
# 传递一个带字段引用的函数 (这是一个错误的例子，不可以这么做!!!)
class SearchFunctions():
    def __init__(self, qurey):
        self.qurey = qurey
    
    def isMatch(self, s):
        return self.query in s
    
    def getMatchesFunctionReference(self, rdd):
        # 注意！ 在self.isMatch中引用了整个self
        return rdd.filter(self.isMatch)
    
    def getMatchesMemberReference(self, rdd):
        # 注意！在self.query中引用了整个self
        return rdd.filter(lambda x: self.query in x)

代替的方法是

In [None]:
# 传递一个带字段引用的函数 (这是一个错误的例子，不可以这么做!!!)
class SearchFunctions():
    ....
    def getMatchesMemberReference(self, rdd):
        # 只需要把变量提取到局部变量就可以了
        query = self.query
        return rdd.filter(lambda x: query in x)