Skip to content

Latest commit

 

History

History
111 lines (109 loc) · 19.8 KB

01.Spark简介.md

File metadata and controls

111 lines (109 loc) · 19.8 KB

Spark简介

  • 快速,分布式,可扩展,容错的集群计算机框架
  • Spark是基于内存计算的大数据分布式计算框架
  • 低延迟的复杂分析
  • Spark是Hadoop Mapreduce的替代方案,MapReudce不适合迭代和交互式任务,Spark主要为交互式查询和迭代算法设计,支持内存存储和高效的容错恢复。Spark拥有MapReduce具有的优点,但不同于MapReduce,Spark中间输出结果可以保存在内存中,减少读写HDFS的次数

spark的特点

  • 快速:
    • 一般情况下,对于迭代次数较多的应用程序,Spark程序在内存中的运行速度是Hadoop MapReduce运行速度的100多倍。
    • Spark默认情况下的迭代的中间结果放到==内存==中,后续的运行作业利用这些结果进一步计算。而Hadoop的计算结果都需要存储到==磁盘==中。
  • 易用性:
    • Spark支持使用Scala、Python、Java及R语言快速编写应用。同时Spark提供超过80个高级运算符,使得编写并行应用程序变得容易并且可以在Scala、Python或R的交互模式下使用Spark。
  • 通用性:
    • Hadoop只提供Map和Reduce两种操作,而Spark提供的数据集操作的类型很多。转换操作内容有map、filter、flatMap、groupByKey、reduceByKey、union、join、mapValues、sort和partionBy等操作类型;行动操作包括collect、reduce、save等操作。另外,处理节点间的通信模型也不止有Shuffle一种模式,用户可以命名、物化、控制中间结果的存储、分区。
  • 随处运行:
    • 用户可以使用Spark的独立集群模式运行Spark,也可以在EC2(亚马逊弹性计算云)、Hadoop YARN或者Apache Mesos上运行Spark。并且可以从HDFS、Cassandra、HBase、Hive、Tachyon和任何分布式文件系统读取数据
  • 代码简洁

Spark生态圈

Pasted image 20231031105823

Sparkcore Spark Core是Spark生态系统的核心组件,是一种大数据分布式处理框架。它实现了MapReduce的算子map函数和reduce函数及计算模型,还提供filter、join、groupByKey等丰富的算子,同时,也实现了Spark的基本功能,包括任务调度、内存管理、错误恢复与存储系统交互等模块。

  • Spark 主要特征:
    • Spark Core提供多种运行模式,不仅可以使用自身运行模式处理任务,如本地模式、Standalone模式,还可以使用第三方资源调度框架来处理任务,比如YARN、MESOS等,通过使用这些部署模式分配资源,可以提高任务的并发执行效率。
    • Spark Core提供了DAG分布式并行计算框架,而且提供内存机制支持多次迭代计算或数据共享,减少迭代计算时读数据开销,提高迭代计算的性能。
    • Spark引入弹性分布式数据集(Resilient Distributed Datasets,RDD),实现了应用任务的调度、RPC、序列化和压缩,并为运行在其上层的组件提供API,另外这些对象集合是弹性的,若有部分数据集丢失,它可根据“血统”对丢失部分进行重建,提高数据的容错性。
    • Spark优先考虑使用各节点的内存作为存储,当内存不足时才会考虑使用磁盘,这极大地减少了访问磁盘次数,提升了任务执行的效率,使得Spark适用于实时计算、流式计算等场景。
  • Spark SQL:
    • Spark SQL是Spark的一个结构化数据处理模块,可以看作是一个分布式SQL查询引擎。 Spark SQL提供的最核心的编程抽象就是DataFrame。DataFrame是以指定列(named columns)分布式组织数据集,它在概念上等同于关系数据库的一个表或R/Python的一个数据框架,但是提供更精细的优化方式。
    • 主要特征:
      • 支持多种数据源:Hive、RDD、Parquet、JSON、JDBC等。
      • 多种性能优化技术:内存列存储(in-memory columnar storage)、字节码生成技术(byte-code generation)、cost model动态评估等。内存列存储意味着Spark SQL的数据不是使用Java对象的方式来进行存储,而是使用面向列的内存存储的方式来进行存储。也就是说,每一列作为一个数据存储的单位。从而大大优化了内存使用的效率。
      • 组件扩展性:对于SQL的语法解析器、分析器以及优化器,用户都可以重新开发,并且动态扩展。
  • Spark Streaming:
    • Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,包括Kafka、Flume、Twitter、Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map、reduce、join和window等高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统,数据库和现场仪表盘。Spark Streaming最大的优势是其处理引擎和RDD编程模型可以同时进行批处理和流处理。
    • 工作流程: Spark Streaming将流式计算分解成多个Spark Job,对于每一段数据的处理都会经过Spark DAG图分解以及Spark的任务集的调度过程。对于目前版本的Spark Streaming而言,其最小的Batch Size的选取在0.5~2秒钟之间(Storm目前最小的延迟是100ms左右),所以Spark Streaming能够满足除对实时性要求非常高(如高频实时交易)之外的所有流式准实时计算场景。
  • Mllib/MLBASE:
    • MLBase是Spark生态圈里的一部分,专门负责机器学习,它的目标是降低机器学习的门槛。在性能方面,Spark在机器学习方面提供了高质量的算法,使MLlib能够快速运行,比MapReduce快100倍。同时,MLlib还在发展完善中,不断的有新的机器学习算法加入其中,为大数据编程提供更大的便利。
    • MLBASE组成:
      • MLRuntime:是Spark Core提供的分布式内存计算框架,运行由Optimizer优化过的算法进行数据计算,并且输出结果。
      • MLlib:是Spark实现一些机器学习算法和实用程序,比如说分类、回归、聚类、降维、协同过滤还有底层的优化方面。
      • ML Optimizer:它会把用户的数据用它认为最适合的内部已经实现好的机器学习算法以及相关参数处理,并且返回模型或者其他帮助分析的结果。
      • MLI:它是一个特征抽取和高级ML编程抽象算法实现的API的平台。
  • Graphx:
    • GraphX是Spark中图计算的核心组件,便于高效完成图计算的完整的流水作业。在高层次上, GraphX 通过引入一种具有附加到每个顶点和边的属性来扩展 Spark RDD。为了支持图计算,GraphX 公开了一组基本运算符(例如:subgraph,joinVertices和 aggregateMessages)以及Pregel API的优化变体。此外,GraphX还在不断增加其图算法。
  • Spark R:
    • SparkR是AMPLab发布的一个R开发包,为Apache Spark提供了轻量的前端。SparkR提供了Spark中弹性分布式数据集(RDD)的API,用户可以在集群通过R shell交互性地运行Job。

      Spark具有快速、可扩展和交互的特点,R具有统计、绘图的优势,R和Spark的有效结合,解决了R语言中无法级联扩展的难题,同时也丰富了Spark在机器学习方面能使用的Lib库。

Spark架构

Pasted image 20231031111621

架构

  • 客户端程序:用户提交作业的客户端。
  • Driver: 运行Application的main函数并创建SparkContext。Application指的是用户编写的Spark应用程序,包含一个Driver 功能的代码和分布在集群中多个 节点上的Executor代码。
  • SparkContext: 应用上下文,控制整个生命周期。
  • Cluster Manager:指的是在集群上获取资源的外部服务,即资源管理器。目前主要有Standalone 和Hadoop YARN, Standalone 是Spark原生的资源管理器,由Master负责资源的分配,也可以理解为使用Standalone时Cluster Manager是Master主节点。若是使用YARN模式,则是由ResourceManager负责资源的分配。
  • Spark Worker: 集群中任何可以运行Application的节点,运行一个或多个Executor进程。
  • Executor: 运行在Worker的Task执行器。Executor启动线程池运行Task, 并且负责将数据存在内存或磁盘上,每个Application都会申请各自的Executor来处理任务。
  • Task: 被送到某个Executor的具体工作任务。

Spark基本架构:

  • Spark架构采用分布式计算中的Master-Slave模型。集群中的主控节点称为Master,同样,集群中含有Worker进程的节点为Slave。Master负责控制整个集群的运行;Worker节点相当于分布式系统中的计算节点,它接收Master节点指令并返回计算进程到Master;Executor负责任务的执行;Client是用户提交应用的客户端;Driver负责协调提交后的分布式应用。
  • Driver是应用逻辑的起点,负责Task任务的分发和调度;Worker负责管理计算节点,并创建Executor来并行处理Task任务。Task执行过程中所需的文件和包由Driver序列化后传输给对应的Worker节点,Executor对相应分区的任务进行处理。
  • 本地模式:在本地运行模式中,Spark的所有进程都在一台机器上的JVM上运行。在本地运行模式下,作业划分调度后,任务集会发送到本地终端点,本地终端接收到任务后,会在本地启动Executor,这一切工作都在本地执行。
  • Standalone模式运行流程:Spark独自包揽除了存储文件操作之外的所有操作,包括集群管理,任务调度,程序计算等等,这种模式适合不大的程序,不需要yarn等将部署整的很复杂。专业点的描述就是,利用Spark自带的资源管理与调度器运行Spark集群。Standalone模式主要的节点有Client 节点、Master节点和Worker节点。Driver 既可以运行在Master节点上,也可以运行在本地Client端。当用spark-shell 交互式工具提交Spark 的Job时,Driver 在Master 节点上运行。当使用spark-submit工具提交Job或者在Eclipse、 IDEA等开发平台上使用“new SparkConf().setMaster(spark://master:7077)”方式运行Spark任务时, Driver 是运行在本地Client端上的。
  • 当以Sandalone模式向Spark集群提交作业时,作业的流程如下:
    • 首先,SparkContetxt 连接到Master,向Master注册并申请资源;
    • Worker定期发送心跳信息给Master并报告Executor 状态;
    • Master根据SparkContext的资源申请要求和Worker心跳周期内报告的信息决定在哪个Worker上分配资源,然后在该Worker上获取资源,启动StandaloneExecutorBackend;
    • StandaloneExecutorBackend向SparkContext注册;
    • SparkContext 将Application 代码发送给StandaloneExecutorBackend, 并且SparkContext解析Application代码,构建DAG图,并提交给DAG Scheduler,分解成Stage (当碰到Action操作时,就会催生Job,每个Job中含有一个或多个Stage),然后将Stage (或者称为TaskSet)提交给Task Scheduler, Task Scheduler负责将Task分配到相应的Worker,最后提交给StandaloneExecutorBackend执行。
    • StandaloneExecutorBackend会建立Executor 线程池,开始执行Task,并向SparkContext报告,直至Task完成。
    • 所有Task完成后,SparkContext 向Master注销,释放资源。
  • Spark on Yarn:
    • Spark若与别的分布式应用共享集群,Spark就需要运行在集群管理器上(例如Spark框架与MapReduce框架同时运行,如果不用第三方资源管理器进行资源分配,MapReduce分到的内存资源会很少,效率低下)。 资源管理器(Yet Another Resource Negotiator,YARN)是一个通用的资源管理系统,能够为上层应用提供统一资源管理和资源调度。YARN为集群在利用率、资源统一管理和数据共享等方面带来了巨大的好处。
    • YARN 将资源管理和作业调度及监控分成了两个独立的服务程序:全局的资源管理(Resource Manager,RM)和针对个人应用的Master(Application Master,AM)。
    • ResourceManager(RM):负责全局资源管理,接收Client端任务请求,接收和监控NodeManager的资源情况汇报,负责资源的分配与调度,启动和监控ApplicationMaster。
    • NodeManager(NM):可以看作节点的资源和任务管理器,启动Container运行Task计算,汇报资源、Container情况给RM,汇报任务处理情况给AM。
    • ApplicationMaster(AM):主要是负责单个Application(Job)的Task管理和调度,向RM申请资源,向NM发出launch Container指令,接收NM的Task处理状态信息。
    • Container:YARN中资源分配单位,资源使用Container表示,每个任务占用一个Container,并在Container中运行。
    • Spark on YARN模式根据Driver在集群中的位置分为两种模式,一种是YARN-Client模式(客户端模式),另一种是YARN-Cluster模式(集群模式)。
    • 在YARN运行模式中,不需要启动Spark独立集群,所以这个时候去访问http://master:8080也是访问不了的。
    • 启动YARN客户端模式的Spark shell命令bin/spark-shell --master yarn-client
    • 启动YARN集群模式可使用bin/spark-shell --master yarn-cluster
    • Pasted image 20231031113301
    • Pasted image 20231031113318
      • 在YARN-Cluster模式下,Spark Driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉Client,作业会继续在YARN上运行,所以YARN-Cluster模式不适合运行交互类型的作业。然而在YARN-Client模式下,AM仅仅向YARN请求Executor,Client会和请求得到的Container通信来调度它们工作,也就是说Client不能离开。
      • 在YARN-Cluster模式下,Spark Driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉Client,作业会继续在YARN上运行,所以YARN-Cluster模式不适合运行交互类型的作业。然而在YARN-Client模式下,AM仅仅向YARN请求Executor,Client会和请求得到的Container通信来调度它们工作,也就是说Client不能离开。
  • RDD:
    • RDD(Resilient Distributed Datasets弹性分布式数据集),是Spark中最重要的概念,可以简单的把RDD理解成一个提供了许多操作接口的数据集合,和一般数据集不同的是,其实际数据分布存储于一批机器中(内存或磁盘中) Pasted image 20231031113452
  • RDD主要有两大类操作,分别为转换( Transformations)和操作( Actions)。转换主要是指把原始数据集加载到RDD以及把一个RDD转换为另外一个RDD,而操作主要指把RDD存储到硬盘或触发转换执行。例如,map 是一个Transformation操作,该操作作用于数据集上的每一个元素,并且返回一个新的RDD作为结果。而reduce是个 Action 操作,该操作通过一些函数聚合RDD中的所有元素并且返回最终的结果给Driver 程序。
  • TRANSFORMATION1698723411111
  • ACTION:1698723514830
  • RDD原理: - Pasted image 20231031113931
  • 所有的转换都是懒惰(Lazy)操作,它们只是记住了需要这样的转换操作,并不会马上执行,只有等到Actions操作的时候才会真正启动计算过程进行计算。举个具体的例子,如上图所示,先经过转换textFile把数据从HDFS加载到RDDA以及RDDC,这时其实RDDA或者RDDC中都是没有数据的。再到后面的转换flatMap、map、reduceByKey等,分别把RDDA转换为RDDB到RDDF以及RDDC到RDDE等,这些转换都是没有执行的。 可以理解为先做个计划,但是并没有具体执行,等到执行操作saveAsSequenceFile时,才开始真正触发并执行任务。
    • 宽依赖与窄依赖:
      • 图中的每个小方格代表一个分区, 而一个大方格(比如包含3个或两个小方格的大方格)代表一个RDD,竖线左边显示的是窄依赖,而右边是宽依赖。要知道宽窄依赖的区别,需要先了解父RDD ( Parent RDD)和子RDD ( Child RDD )。在图 “map,, filter”上方箭头左边的是父RDD,而右边的是子RDD。“union” 上方箭头左边的两个RDD都是其右边的RDD的父RDD,所以它是有两个父RDD的。Pasted image 20231031114107
      • 窄依赖指的是子RDD( Child RDD )的一个分区只依赖于某个父RDD( Parent RDD(s))中的一个分区( Partition )。
      • 宽依赖指的是子RDD ( Child RDD)的每一个分区( Partition)都依赖于某个父RDD ( Parent RDD(s))的一个以上分区。
  • Stage:
    • Spark中还有一个重要的概念, 即Stage。一般而言, 一个 Job会分成一定数量的Stage,各个Stage之间按照顺序执行。那么Stage是怎么划分的?在Spark中,一个 Job会被拆分成多组Task,每组任务就是一个 Stage。 而Spark中有两类Task, 分别是ShuffleMapTask和ResultTask。ShuffleMapTask 的输出是Shuffle所需的数据,ResultTask 的输出是最终的结果。因此Stage也以此为依据进行划分,简单地说,Stage是以Shuffle和Result这两种类型划分的,Shuffle 之前的所有变换是一个 Stage, Shuffle 之后的操作是另一个Stage。
    • 比如rdd.parallize(1 to 10).foreach(println)这个操作没有Shufle,直接就输出了,它的Task只有一个,即ResultTask, Stage 也只有一个。 如果是rdd.map(x=>(x,1)).reduceByKey(+). foreach(println),因为有reduceByKey操作,所以有一个Shuffle过程,那么reduceByKey之前的是一个Stage,执行ShuffleMapTask,输出Shuffle所需的数据,reduceByKey到最后是一个stage,直接就输出结果了。如果Job中有多个shuffle,那么每个shuffle之前都是一个stage。
  • Spark核心原理:
    • Pasted image 20231031114357
  • 构建DAG图。Spark应用程序进行很多RDD操作,其中包括各种转换操作,它们描述了RDD之间的依赖关系,当遇到行动操作触发Job的提交,提交的是根据RDD依赖关系构建的DAG图,DAG图提交给DAGScheduler进行解析。
    • DAGScheduler进行任务划分。DAGScheduler会收到之前生成的DAG图,然后对该图进行拆分,拆分的依据是DAG图中一个RDD到下一个RDD之间的操作步骤是否为宽依赖,是宽依赖则拆开为Stage。Stage的提交是有顺序的,如下图,在Stage依赖关系中,它的Stage提交逻辑的Stage1与Stage2是可以并行的,在有计算资源的情况下会首先被提交,并且在这两个Stage都计算完成的情况下,再提交Stage3。
    • 依赖关系图:Pasted image 20231031114808
    • 提交顺序:Pasted image 20231031114820
    • TaskScheduler调度任务。TaskScheduler接收了来自DAGScheduler发送的TaskSet,然后把收到的TaskSet中的Task(Task的数量与输入的RDD的partition数量一样,partition是RDD的最小单元,RDD是分布在各个节点上的partition组成,因为每一个task只是处理一个partition上的数据),发送到集群Worker节点上去执行。
      • Worker执行任务。Worker节点中的Executor收到TaskScheduler发送过来的Task后会执行Task。Task以多线程方式运行,每个线程负责一个任务。