Skip to content

Latest commit

 

History

History
153 lines (145 loc) · 24.6 KB

04.Spark编程.md

File metadata and controls

153 lines (145 loc) · 24.6 KB

Spark核心原理

构建DAG图

  • Pasted image 20231102122227构建DAG图。Spark应用程序进行很多RDD操作,其中包括各种转换操作,它们描述了RDD之间的依赖关系,当遇到行动操作触发Job的提交,提交的是根据RDD依赖关系构建的DAG图,DAG图提交给DAGScheduler进行解析。
  • DAGScheduler进行任务划分。DAGScheduler会收到之前生成的DAG图,然后对该图进行拆分,拆分的依据是DAG图中一个RDD到下一个RDD之间的操作步骤是否为宽依赖,是宽依赖则拆开为Stage。
  • TaskScheduler调度任务。TaskScheduler接收了来自DAGScheduler发送的TaskSet,然后把收到的TaskSet中的Task(Task的数量与输入的RDD的partition数量一样,partition是RDD的最小单元,RDD是分布在各个节点上的partition组成,因为每一个task只是处理一个partition上的数据),发送到集群Worker节点上去执行。
  • Worker执行任务。Worker节点中的Executor收到TaskScheduler发送过来的Task后会执行Task。Task以多线程方式运行,每个线程负责一个任务。

RDD基础

  • RDD是Spark的重要组成部分,通过学习RDD,有利于理解分布式计算的实质以及Spark计算框架的实现。弹性分布式数据集(Resilient Distributed Dataset,RDD)是只读的记录分区的集合,能横跨集群的所有节点进行并行计算,是一种基于工作集的应用抽象。这个数据集的全部或者部分可以缓存在内存中,在多次计算间重用,所谓弹性就是指在内存不够时可以与磁盘进行交换。
  • RDD可以让用户将数据存储到磁盘和内存中,并能控制数据的分区,且提供了丰富的API操作整个集群进行数据挖掘。逻辑上认为RDD是一个不可变的分布式对象集合,而集合中的每个元素可以是用户自定义的任意数据结构。RDD通过其依赖关系形成Spark的调度顺序,然后通过RDD的操作形成整个Spark程序。
  • RDD基本特征: Spark一切操作都是基于RDD,RDD就是Spark输入的数据。RDD有五个特征,其中分区、函数、依赖是三个基本特征,分区策略和优先位置是两个可选特征。、
    • 分区:
      • 将RDD划分成多个分区(partitions)分布到集群的节点上,分区的多少涉及到对RDD进行并行计算的粒度,每一个分区的数据能够进行并行计算,RDD的并行度默认从父RDD传给子RDD。RDD本质上是逻辑分区记录的集合,在集群中一个RDD可以包含多个分布在不同节点上的分区,每个分区是一个dataset片段。
      • 在对RDD操作中,用户可以使用partitions.size方法查看该RDD划分的分区数目。可以通过手动设置分区数目。1、方法一是在RDD调用textFile或者parallelize和makeRDD方法时手动指定分区个数即可。2、方法二是在通过转换操作得到新的RDD时,直接调用repartition方法强制改变RDD的分区数量即可。
    • 函数:
      • 每一个分区都有一个计算函数,Spark RDD的计算是以分区为基本单位的。
    • 依赖(dependency):
      • 依赖具体分为宽依赖和窄依赖。由于RDD每次转换都会生成新的RDD,窄依赖的RDD之间就会形成类似于流水线(pipeline)一样的前后依赖关系,以流水线的方式计算所有父分区,不会造成网络之间的数据混合。但宽依赖不类似于流水线,它会涉及数据混合,首先计算好所有父分区数据,然后在节点之间进行Shuffle(数据混洗)。
    • 分区策略:
      • 键-值对(key-value)形式的RDD根据Partitioner策略进行分区,类似于MapReduce中的Partitioner接口,根据key来决定分配位置。它只针对key-value的形式,如果不是key-value的形式,它就不会有具体的Partitioner。
    • 优先位置:
      • 优先位置列表存储每个Partition的优先位置,对于一个HDFS文件而言,这个列表就是每个Partition所在的块的位置。依据大数据中“计算向数据靠拢”的原则,Spark本身在进行任务调度的时候会尽可能地将任务分配到其所要处理数据块的存储位置。
  • RDD的依赖关系: Spark将依赖关系分为窄依赖(narrow dependency)和宽依赖(wide dependency)。窄依赖和宽依赖的作用主要体现在两方面
    • 一是实现了RDD良好的容错性能;
    • 二是在调度中构建DAG(有向无环图)作为不同Stage的划分。
    • 窄依赖: 窄依赖指的是子RDD( Child RDD )的一个分区只依赖于某个父RDD( Parent RDD(s))中的一个分区( Partition )。Pasted image 20231102123856
    • 窄依赖分为两类:第一类是一对一的依赖关系,如图中的map、filter和join with inputs copartitioned。第二类是范围的依赖关系,它仅仅被UnionRDD使用。UnionRDD是将多个RDD拼接合成一个RDD,即每个parent RDD的Partition的相对顺序不变,只不过每个父RDD的分区在UnionRDD中的分区的起始位置不同
      • 宽依赖: 宽依赖指的是子RDD ( Child RDD)的每一个分区( Partition)都依赖于某个父RDD ( Parent RDD(s))的一个以上分区。Pasted image 20231102124018
    • 总是将存在窄依赖关系的RDD划分在同一个stage,因为相对于宽依赖,窄依赖对优化更有利,
      • 窄依赖优于宽依赖主要基于两个原因:
        • 首先,从数据混洗的角度考虑,窄依赖的RDD可以通过相同的键进行联合分区,每个父RDD的分区只会传入到一个子RDD的分区中,整个操作都可以在同一个集群节点上以流水线(pipeline)形式执行。例如,执行了map操作后,紧接着执行filter,不会造成网络之间的数据混洗。相反,宽依赖的RDD在运行过程中将一个父RDD的分区传入到不同的子RDD分区中,会涉及数据混洗,需要所有的父分区均为可用,可能还需要进行跨节点传递数据。
        • 其次,从数据恢复的角度考虑,窄依赖的数据恢复更有效,只需要重新计算丢失的父分区,而且可以并行地在不同节点上重新计算。而宽依赖涉及RDD各级的多个父分区,可能导致计算冗余。

创建RDD

  • Spark提供了两种创建RDD的方式:
    • 从内存中已有数据创建RDD(例如:数组);
      • 一种是转化Seq集合为RDD;
        • parallelize
        • makeRDD
      • 另一种是从已有RDD转化成新的RDD
    • 从外部存储创建RDD,包括本地的文件系统,所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase、Tachyon等。

RDD转换与操作

  • 从输入中逻辑上生成A、C两个RDD,经过一系列“转换”操作,逻辑上生成了F(也是一个RDD),之所以说是逻辑上,是因为这时候计算并没有发生,spark只是记录了RDD之间的生成和依赖关系。当F要进行输出时,也就是当F进行“行动”操作时,Spark才会根据RDD的依赖关系生产DAG,并从起点开始真正的计算。

  • Lazy特性 lazy 可以避免很多不必要的中间临时数据,这比较符合分布式并行计算的需求;另一个层面是调度层面,最后一步要计算时,可以看到前面的所有步骤,看见的步骤越多,进行优化的机会就越多,所以Spark是基于Lazy特性进行操作、基于“血统”(Lineage)来构建整个调度系统的,最终形成了DAG图。

  • Transformation(转换) 操作是根据已经存在的数据集创建一个新的数据集,是数据集的逻辑操作,但不会触发一次真正的计算。 Pasted image 20231102130434 Pasted image 20231102132143

  • Transaction:

    • Value型Transformation算子Value型数据的算子封装在RDD类中可以直接使用。
    • map: 将原来RDD的每个数据项通过map中的用户自定义函数f转换成一个新的RDD,map操作不会改变RDD的分区数目
      1698903638083
    • filter: 对元素进行过滤,对每个元素应用给定的函数,返回值为true的元素在RDD中保留,为false的则被过滤掉; 1698904016827
    • flatMap: flatMap的操作是将函数应用于RDD之中的每一个元素, 将返回的迭代器(数组、列表等)中的所有元素构成新的RDD。简单来讲,使用flatMap,就是先map再flat, 数据会先完成跟map一样的功能,为每一条输人返回一个迭代器 (可迭代的数据类型),然后将所得到的不同级别的迭代器中的元素全部当成同级别的元素,返回一个元素级别全部相同的RDD。这个转换通常用来切分单词。 1698904274881 Pasted image 20231102134827 ata = sc.parallelize(List((1,3),(4,5))) val sort_data = data.sortBy(x=>x._2,false,1)`
      • union: 将两个RDD的元素合并成一个,不进行去重操作,而且两个RDD中每个元素中的个数和类型需要保持一致。`scala> val rdd_a = sc.parallelize(Array("apple","orange","pineapple","pineapple"))scala> val rdd_b = sc.parallelize(Array("apple","orange","grape"))scala> val rdd_union = rdd_a.union(rdd_b).foreach(println)
      • distinct: distinct算子对RDD的转换,旨在将RDD中数据去重,剩下的元素组成一个新的RDD。
      • repartition(numParations): 这个操作将重洗所有的数据,强行改变分区数。
      • subtract(otherDataset): 该方法是在两个RDD间进行的,其主要获取两个RDD之间的差集
      • intersection: 找出两个RDD的共同元素,也就是找出两个RDD的交集
      • cartesian: 笛卡尔积就是将两个集合的元素两两组合成一组
      • zip: 将两个RDD组合成Key/Value形式的RDD,这里要求两个RDD的partition数量以及元素数量都相同,否则会抛出异常。
      • 键值对型Transformation算子虽然大部分Spark的RDD操作都支持所有种类的单值RDD,但是有少部分特殊的操作只能作用于键值对类型的RDD。顾名思义,键值对RDD由一组组的键值对组成,这些RDD被称为PairRDD。Key-Value对数据类型的算子封装在PairRDDFunctions类中,用户需要引用import.org.apache.spark.SparkContext._ 才能使用。
        • 创建键值对RDD:Pasted image 20231102141140有很多种创建键值对RDD的方式,很多存储键值对的数据格式会在读取时直接返回由其键值对组成的PairRDD。当需要将一个普通的RDD转化为一个PairRDD时可以使用map函数来进行操作,传递的函数需要返回键值对。
      • 做为键值对类型的RDD,包含了键跟值两个部分。Spark提供了两个方法分别获取键值对RDD的键跟值。keys返回一个仅包含键的RDD,values返回一个仅包含值的RDD。
        • groupKey() 当在一个由键值对(K,V)组成的数据集上调用时,按照key进行分组,返回一个(K,Iterable< V >)键值对的数据集 `scala> val rdd3 = sc.parallelize(Array((1,3),(2,6),(2,2),(3,6)))scala> val rdd_gbk = rdd3.groupByKey().foreach(println)(1,CompactBuffer(3))(3,CompactBuffer(6))(2,CompactBuffer(6, 2))
        • reduceByKey: 当在一个键值对(K,V)数据集上调用时,按照key将数据分组,使用给定的func聚合values值,返回一个键值对(K,V)数据集,其中func函数的类型必须是(V,V)=>V。`scala> val rdd1 = sc.parallelize(Array((1,2),(2,3),(2,6),(3,8),(3,10)))scala> val rdd2 = rdd1.reduceByKey((x,y)=>x+y).foreach(println)(1,2)(3,18)(2,9)
        • mapVules: mapValues是针对键值对(Key,Value)类型的数据中的Value进行Map操作,而不对Key进行处理Pasted image 20231102145648
      • sortByKey: 返回一个以key值排序(升序或者降序)的(K,V)键值对组成的数据集,其中布尔代数ascending参数决定升序还是降序,若为true则升序,若为false则降序,默认为升序
        • join: 把键值对数据相同键的值整合起来
        • CombineByKey: 由于聚合操作会遍历分区中所有的元素,因此每个元素(这里指的是键值对)的键只有两种情况:以前没出现过或以前出现过。(1)如果以前没出现过,则执行的是ceateCombiner方法,cealombinen会在新遇到的键对应的累加器中赋予初始值,否则执行mergeValue方法。(2)对于已经出现过的键(key) ,调用mergeValue来进行聚合操作,对该键的累加器对应的当前值(C格式)与新值(V格式)进行合并。(3)由于每个分区都是独立处理的,因此对于同一个键可以有 多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果(全是C格式)进行合并。
      • Action
        • Action(行动)操作是一种算法的描述,它通过SparkContext的runJob方法提交作业(Job),触发RDD DAG的执行并将数据输出到Spark系统。Action在RDD上进行计算之后返回一个值到Driver,这样设计能让Spark运行得更加高效。
      • Pasted image 20231103100459
      • Pasted image 20231103100511
      • reduce(func:(T,T)=>T):Tval rdd4=parallelize(Array(3,4,5,6,7)) val rdd5 = rdd4.reduce((x,y)=>x+y)
        • fold(zeroValue:T)(func:(T,T)=>T):T 与reduce类似,聚合每个分区的元素,然后使用具有关联性的操作,以及一个初始值,将每个分区聚合的结果进行归并,不同的是每次对分区内的value聚集时,分区内初始化的值为zeroValue。data1.(0)((A,B)=>A+B)
        • lookup 作用于K-V类型的RDD上,返回指定K的所有V值
        • collect 以数组的形式返回RDD中所有的元素,适用于小数据处理后的返回
        • saveAsTextFile 把RDD保存到HDFS中。对每个元素调用toString方法将其转换成一个文件中的文本行
        • take(num) 获取RDD前面num条记录,返回类型为数组
        • count() 计算RDD中所有元素个数
        • foreach(func) 在数据集的每个元素上都运行func函数。要注意如果对RDD执行foreach,只会在Executor端有效,而并不是Driver端,比如:rdd.foreach(println),只会在Executor的stdout中打印出来,Driver端是看不到的。
        • countByKey() 只能运行在键值对类型(K,V)上,对每个key相等的元素个数进行计数。

文件的读取与存储

  • Spark支持多种输入输出源,一部分原因是因为Spark本身是基于Hadoop生态圈而构建,特别是Spark可以通过Hadoop MapReduce所使用的InputFormat和OutputFormat接口访问数据。对于存储在本地文件系统或者分布式文件系统中的数据,Spark可以访问多种不同的文件格式,包括文本文件、JSON、SequenceFile等。
    
格式名称 结构化 描述
文本文件 普通文本
json 半结构化 常见的基本文本的格式,半结构化;大多数库都要求每行一条记录
csv 通常在电子表格英语中使用
SequenceFile 一种用于键值对数据的常见Hadoop文件格式

从诸如文本文件的非结构化的文件,到诸如JSON格式的半结构化文件,再到诸如SequenceFile这样的结构化文件,Spark都支持。

  1. 文本文件的读取:val math = sc.textFile("result_math.txt")
    1. 若是将多个完整的文本文件一次性读取为一个pair RDD,其中键是文件名,值是文件内容,inputFile为一个包含多个文件的目录,读取格式是sc.wholeTextFiles(inputFile),读取给定目录中的所有文件。
    2. val input=sc.wholeTextFiles("file///home/word-court")
    3. 因为Spark采用了惰性机制,在执行转换操作的时候,即使输入了错误的语句,spark-shell也不会马上报错(假设word123.txt不存在)
  2. 文本文件的保存:保存文本文件用saveAsTextFile(outputFile)。其中outputFile为保存文件目录,将RDD中的内容都输入到路径对应的文件中,每个元素为一行。
    1. textFile.saveAsTextFile("file:///home/word-count/writeback")
  3. 因为Spark通常在分布式环境下进行,RDD会存在多个分区,由多个任务对这些分区进行并行计算,每个分区的计算结果都会保存到一个单独的文件中。Spark将传入的路径作为目录对待,会在那个目录下输出多个文件,这样,Spark就可以从多个节点上并行输出了。
  4. JSON文件读取:
    • JSON是一种轻量级的数据交换格式,它采用完全独立于编程语言的文本格式来存储和表示数据。将JSON文件当做普通文本文件读取,inputFile为文件目录,读取格式是sc.textFile(inputFile)。这种方法要求文件每行是一条JSON记录,如果记录跨行,就需要读取整个文件,对每个文件进行解析。Pasted image 20231112182638Pasted image 20231112182654
  • JOSN文件存储: - res=Jobject(List((person,Jobject((List((name,JString(Marilyn)),(age,Jint(33)))))))) - compact(render(res))
  1. csv文件读取:
    • CSV文件是逗号分隔值文件,每行都有固定数目的字段,字段间用逗号隔开(在制表符分隔值文件即TSV 文件中,用制表符隔开)。读取CSV/TSV 数据和读取JSON 数据相似,都需要先把文件当作普通文本文件来读取数据,然后CSV库对CSV数据进行解析,再对数据进行处理。读取格式是sc.textFile(inputFile),inputFile为文件目录。
    • Pasted image 20231115151138
    • 使用csv库输出到文件或者输出器,可以使用strinngWriter或者Stringio来将结果放到RDD中,使用Spark的文本文件API写出即调用saveAsTextFile(outputfile)保存文件 Pasted image 20231115151443
  2. SquenceFile的存储
    • SequenceFile文件是Hadoop用来存储二进制形式的[Key,Value]对而设计的一种文件。 SequenceFile文件存储非常简单,首先保证有一个键值对类型的RDD,然后直接调用saveAsSequenceFile(path)保存数据。
    • 读取:Spark有专门读取SequenceFile的接口,可以调用SparkContext中的sequenceFile(path,keyClass,valueClass,miniPartition)实现。 SequenceFile使用的是Hadoop的Writable类型,所以keyClass和valueClass参数必须定义为正确的Writable类。

RDD缓存与容错机制

  • Spark的存储介质包括内存和磁盘等;RDD以分区为单位进行持久化或缓存,是Spark主要的特征之一,持久化/缓存是迭代式计算和交互式应用的关键技术,通常可以使部分计算的计算速度提升10倍以上
  • 在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。
  • 为了避免重复计算的开销,就涉及到持久化(缓存)机制的问题,即第一次行动操作得到的结果,如果能被第二次行动操作使用,则不需要从头开始计算,直接使用持久化的结果。
  • 可以通过使用persist()或者cache()方法标记持久化的RDD,之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化。持久化后的RDD将会被保留在计算节点的内存或磁盘中被后面的行动操作重复使用,Spark的cache()方法默认为将RDD缓存在内存中,实际上是调用persist(MEMORY_ONLY)方法。
  • Spark的缓存机制是容错的,如果RDD的任意分区丢失,它将会自动通过最初创建的转换操作重新计算,不需要全部重新计算,只需要计算丢失的部分。
级    别 需要内存空间 CPU计算时间 是否在内存中 是否在磁盘上 备    注
MEMORY_ONLY 数据仅保留在内存中
MEMOTY_ONLY_SER 数据序列化后保存在内存中
MEMORY_AND_DISK 中等 部分 部分 数据先写到内存,内存放不下则溢写到磁盘上
MEMORY_AND_DISK_SER 部分 部分 序列化的数据先写到内存,内存不足则溢写到磁盘
DISK_ONLY 数据仅存在磁盘上
  • RDD的存储级别应该根据需要以及环境具体情况设定,在RDD参与第一次计算后,RDD就会根据设置的存储级别保持RDD计算后的值在内存中或磁盘上。
  • 只有未曾设置存储级别的RDD才能设置存储级别,设置了存储级别的RDD不能修改其存储级别。
  • 针对仅存储数据在内存中的存储策略,如果内存不足的话,Spark会使用LRU(最近最少使用)缓存策略清除最老的分区,为新的RDD提供空间
  • 移除数据: RDD可以随意在RAM中进行缓存,因此它提供了更快速的数据访问。目前,缓存的粒度为RDD级别,只能缓存全部的RDD。Spark自动监视每个节点上使用的缓存,在集群中没有足够的内存时,Spark会根据缓存的情况确定一个LRU的数据分区进行删除。如果想要手动删除RDD,而不想等待它从缓存中消失,可以使用RDD的unpersist()方法移除数据,unpersist()方法是立即生效的。
  • Spark中对于数据保存除了持久化操作之外还存在检查点(Checkpoint)方式。RDD的缓存能够在第一次计算完成后,将计算结果保存到内存、本地文件系统中。通过缓存,Spark避免了RDD上的重复计算,能够极大地提升计算速度。
  • 缓存的方式虽然也可以以文件形式保存在磁盘中,但是磁盘会出现损坏,文件也会出现丢失,如果缓存丢失了,则需要重新计算。如果计算特别复杂或者计算耗时特别多,那么缓存丢失对于整个Job的影响是不容忽视的。为了避免缓存丢失重新计算带来的开销,Spark又引入检查点(Checkpoint)机制
  • Checkpoint的产生就是为了相对而言更加可靠的持久化操作,在Checkpoint可以指定把数据放在本地并且是多副本方式,但是在正常的生产环境下是放在 HDFS,这就天然借助了HDFS高容错、高可靠的特性来完成了最大化的可靠持久化数据的方式,从而降低数据被破坏或者丢失的风险,也减少了数据重新计算时的开销。
    • checkpoint运行原理:
      • 首先在Job结束后,会判断是否需要Checkpoint。如果需要,就调用org.apache.spark.rdd.RDDCheckpointData#doCheckpoint。doCheckpoint首先为数据创建一个目录;
      • 然后启动一个新的Job,将目标RDD写入新创建的目录;
      • 接着创建一个org.apache.spark.rdd.CheckpointRDD;
      • 最后,原始RDD的所有依赖被清除,这就意味着RDD的转换的计算链等信息都被清除。这个处理逻辑中,数据写入的实现在org.apache.spark.rdd.CheckpointRDD$#writeToFile。Pasted image 20231115153045
    • 这样RDD的Checkpoint完成,其中Checkpoint的数据可以通过CheckpointRDD的readFromFile读取。 注:在Spark中,某RDD进行Checkpoint操作后会将此RDD的依赖关系清空,该RDD的父RDD就是CheckpointRDD,故在后面的计算再使用该RDD时,若数据丢失,可以从Checkpoint中读取数据,不需要重新计算。