# Pyspark 广播和累加器
对于并行处理，Apache Spark使用共享变量。当驱动程序将任务发送到集群上的执行程序时，共享变量的副本将在集群的每个节点上运行，以便可以将其用于执行任务。  
Apache Spark支持两种类型的共享变量:  
Broadcast  
Accumulato

## 广播
Broadcast 广播变量用于跨所有节点保存数据副本。此变量缓存在所有计算机上，而不是在具有任务的计算机上发送。

In [3]:
from pyspark import SparkContext
sc = SparkContext("local","broadcast app")
word = sc.broadcast(["scala","java","python","spark"])
print(word.value) # Broadcast变量有一个名为value的属性，它存储数据并用于返回广播值。
print(word.value[2])

['scala', 'java', 'python', 'spark']
python


## 累加器
Accumulate 累加器变量用于通过关联和交换操作聚合信息。例如，您可以使用累加器进行求和操作或计数器（在MapReduce中）。

In [10]:
from pyspark import SparkContext
# sc = SparkContext("local","Accumulator app")
num = sc.accumulator(10)
def f(x):
   global num
   num+=x
rdd = sc.parallelize([20,30,40,50])
rdd.foreach(f)
print(num.value)
print(rdd.collect())

150
[20, 30, 40, 50]


# PySpark SparkConf

以下是SparkConf最常用的一些属性

set（key，value） - 设置配置属性。

setMaster（value） - 设置主URL。

setAppName（value） - 设置应用程序名称。

get（key，defaultValue = None） - 获取密钥的配置值。

setSparkHome（value） - 在工作节点上设置Spark安装路径。

In [2]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("PySpark App").setMaster("local")
sc = SparkContext(conf=conf)

# PySpark SparkFiles
在Apache Spark中，您可以使用 sc.addFile 上传文件（sc是您的默认SparkContext），并使用 SparkFiles.get 获取工作者的路径。  
SparkFiles解析通过 SparkContext.addFile（） 添加的文件的路径。  
SparkFiles包含以下类方法:  
get(filename)  
getrootdirectory()

In [6]:
from pyspark import SparkFiles
path = "/home/ace/Desktop/github/data_anasylsis/pyspark/5000_points.txt"
pathname = "500_points.txt"
sc.addFile(path)
print(SparkFiles.get(pathname))
print(SparkFiles.getRootDirectory())

/tmp/spark-d4ba64bd-845f-400f-81a4-2f9086435ecb/userFiles-629fa65e-3b3d-47e7-a601-b13ea7e71e55/500_points.txt
/tmp/spark-d4ba64bd-845f-400f-81a4-2f9086435ecb/userFiles-629fa65e-3b3d-47e7-a601-b13ea7e71e55


# PySpark MLlib
mllib.classification - 支持二进制分类，多类分类和回归分析的各种方法。分类中一些最流行的算法是 随机森林，朴素贝叶斯，决策树 等。

mllib.clustering - 聚类是一种无监督的学习问题，您可以根据某些相似概念将实体的子集彼此分组。

mllib.fpm - 频繁模式匹配是挖掘频繁项，项集，子序列或其他子结构，这些通常是分析大规模数据集的第一步。 多年来，这一直是数据挖掘领域的一个活跃的研究课题。

mllib.linalg - 线性代数的MLlib实用程序。

mllib.recommendation - 协同过滤通常用于推荐系统。 这些技术旨在填写用户项关联矩阵的缺失条目。它目前支持基于模型的协同过滤，其中用户和产品由一小组可用于预测缺失条目的潜在因素描述。 spark.mllib使用交替最小二乘（ALS）算法来学习这些潜在因素。

mllib.regression - 线性回归属于回归算法族。 回归的目标是找到变量之间的关系和依赖关系。使用线性回归模型和模型摘要的界面类似于逻辑回归案例。

# PySpark Serializers
序列化用于Apache Spark的性能调优。通过网络发送或写入磁盘或持久存储在内存中的所有数据都应序列化。  
 
Marshal Serializer  此序列化程序比PickleSerializer更快，但支持更少的数据类型。  
Pickle Serializer   此序列化程序几乎支持任何Python对象，但可能不如更专业的序列化程序快

In [8]:
from pyspark.context import SparkContext
from pyspark.serializers import MarshalSerializer
sc.stop()
sc = SparkContext("local", "serialization app", serializer = MarshalSerializer()) # 使用MarshalSerializer序列化数据
print(sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10))
sc.stop()

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
