Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

深入浅出StreamingPro #47

Closed
zqhxuyuan opened this issue Sep 5, 2017 · 4 comments
Closed

深入浅出StreamingPro #47

zqhxuyuan opened this issue Sep 5, 2017 · 4 comments

Comments

@zqhxuyuan
Copy link

zqhxuyuan commented Sep 5, 2017

原文:http://zqhxuyuan.github.io/2017/09/04/2017-09-04-StreamingPro/

StreamingPro支持Spark、SparkStreaming、SparkStruncture、Flink。入口类都是统一的StreamingApp

object StreamingApp {
  def main(args: Array[String]): Unit = {
    val params = new ParamsUtil(args)
    require(params.hasParam("streaming.name"), "Application name should be set")
    PlatformManager.getOrCreate.run(params)
  }
}

通过streaming.platform可以指定不同的运行平台。当然,不同的运行引擎的jar包也不同。

SHome=/Users/allwefantasy/streamingpro

./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name test \
$SHome/streamingpro-spark-2.0-0.4.15-SNAPSHOT.jar    \
-streaming.name test    \
-streaming.platform spark_streaming \
-streaming.job.file.path file://$SHome/spark-streaming.json

bin/flink run -c streaming.core.StreamingApp \ 
/Users/allwefantasy/streamingpro/streamingpro.flink-0.4.14-SNAPSHOT-online-1.2.0.jar \
-streaming.name god \
-streaming.platform flink_streaming \
-streaming.job.file.path file:///Users/allwefantasy/streamingpro/flink.json

jar包会被用来加载不同的Runtime。Runtime运行的映射关系定义在PlatformManagerplatformNameMapping变量中。
Runtime是一个接口,最主要的是startRuntime方法和params方法。后面我们把Runtime叫做执行引擎

trait StreamingRuntime {
  def startRuntime: StreamingRuntime
  def destroyRuntime(stopGraceful: Boolean, stopContext: Boolean = false): Boolean
  def streamingRuntimeInfo: StreamingRuntimeInfo
  def resetRuntimeOperator(runtimeOperator: RuntimeOperator)
  def configureStreamingRuntimeInfo(streamingRuntimeInfo: StreamingRuntimeInfo)
  def awaitTermination
  def startThriftServer
  def startHttpServer
  def params: JMap[Any, Any]
}

StreamingPro本质上还是通过spark-submit运行。框架的整体运行流程在PlatformManagerrun方法中。主要的步骤有:

  1. 设置配置信息
  2. 根据反射机制,创建并获取运行时环境
  3. 获取dispatcher以及所有的strategies
  4. 启动REST服务、Thrift服务、注册ZK(可选)
  5. 启动执行引擎,并等待作业完成

关于Dispatcher、Strategy的概念,参考作者的ServiceframeworkDispatcher项目。
反射创建执行引擎,调用的是对应Object类的getOrCreate方法,并传入params参数,最后实例化为StreamingRuntime。

  def platformNameMapping = Map[String, String](
    SPAKR_S_S -> "streaming.core.strategy.platform.SparkStructuredStreamingRuntime",
    SPAKR_STRUCTURED_STREAMING -> "streaming.core.strategy.platform.SparkStructuredStreamingRuntime",
    FLINK_STREAMING -> "streaming.core.strategy.platform.FlinkStreamingRuntime",
    SPAKR_STREAMING -> "streaming.core.strategy.platform.SparkStreamingRuntime",
    SPARK -> "streaming.core.strategy.platform.SparkRuntime"
  )

注意:StreamingPro的Runtime只是Spark作业的执行引擎,具体根据配置文件加载策略是ServiceframeworkDispatcher的工作。
假设我们定义了下面的一个配置文件,由于采用了shortName,需要定义一个ShortNameMapping

{
  "convert-multi-csv-to-json": {
    "desc": "测试",
    "strategy": "spark",
    "algorithm": [
      {
        "name": "testProcessor"
      }
    ],
    "ref": [],
    "compositor": [
      {
        "name": "testCompositor"
      }
    ],
    "configParams": {
    }
  }
}

DefaultShortNameMapping的定义如下。这样配置文件中的spark就和ServiceframeworkDispatcher的加载过程对应起来了。

class DefaultShortNameMapping extends ShortNameMapping {
  private val compositorNameMap: Map[String, String] = Map[String, String](
    "spark" -> "serviceframework.dispatcher.test.DefaultStrategy",
    "testProcessor" -> "serviceframework.dispatcher.test.TestProcessor",
    "testCompositor" -> "serviceframework.dispatcher.test.TestCompositor"
  )
  override def forName(shortName: String): String = {
    if (compositorNameMap.contains(shortName)) compositorNameMap(shortName)
    else shortName
  }
}

ServiceframeworkDispatcher的核心是StrategyDispatcher,这个类在创建的时候,会读取配置文件。
然后解析配置文件中的strategy、algorithm(processor)、ref、compositor、configParams等配置项,并构造对应的对象。
ServiceframeworkDispatcher是一个模块组合框架,它主要定义了Compositor、Processor、Strategy三个接口。

Strategy接口包含了processor、ref、compositor,以及初始化和result方法。

trait Strategy[T] extends ServiceInj{
  def processor:JList[Processor[T]]
  def ref:JList[Strategy[T]]
  def compositor:JList[Compositor[T]]
  def name:String
  def initialize(name:String,alg:JList[Processor[T]],ref:JList[Strategy[T]],com:JList[Compositor[T]],params:JMap[Any,Any])
  def result(params:JMap[Any,Any]):JList[T]
  def configParams:util.Map[Any, Any]
  def stop = {}
}

Strategy策略的初始化需要算法、引用、组合器,以及配置信息,对应的方法是StrategyDispatcher的createStrategy方法。

注意下面的initialize方法,createAlgorithms和createCompositors初始化时
会读取params配置,这是一个嵌套了Map的列表:JList[JMap[String, Any]]

  def createStrategy(name: String, desc: JMap[_, _]): Option[Strategy[T]] = {
    if (_strategies.contains(name)) return None;
    // 实例化策略,如果有shortName,则先获取fullName,并通过Class.forName实例化具体的策略类
    val strategy = Class.forName(shortNameMapping.forName(desc.get("strategy").asInstanceOf[String])).newInstance().asInstanceOf[Strategy[T]]
    // 读取配置信息,并实例化为Map[Any,Any]
    val configParams: JMap[Any, Any] = if (desc.containsKey("configParams")) desc.get("configParams").asInstanceOf[JMap[Any, Any]] else new java.util.HashMap()
    // 初始化策略,需要创建算法、引用、组合器
    strategy.initialize(name, createAlgorithms(desc), createRefs(desc), createCompositors(desc), configParams)
    _strategies.put(name, strategy)
    Option(strategy)
  }

  // 创建算法。一个策略由0个或者多个算法提供结果
  private def createAlgorithms(jobJMap: JMap[String, Any]): JList[Processor[T]] = {
    if (!jobJMap.contains("algorithm") && !jobJMap.contains("processor")) return new AList[Processor[T]]()
    val processors = if (jobJMap.contains("algorithm")) jobJMap("algorithm") else jobJMap("processor")
    processors.asInstanceOf[JList[JMap[String, Any]]].map {
      alg =>
        val name = shortName2FullName(alg)
        val processor = Class.forName(name).newInstance().asInstanceOf[Processor[T]]
        val params: JList[JMap[String, Any]] = if (alg.contains("params")) alg("params").asInstanceOf[JList[JMap[String, Any]]] else new AList[JMap[String, Any]]()
        processor.initialize(name, params)
        processor
    }
  }

  // 创建组合器,可以多个,按顺序调用。有点类似过滤器链。第一个过滤器会接受算法或者策略的结果。后续的组合器就只能处理上一阶段的组合器吐出的结果
  private def createCompositors(jobJMap: JMap[String, Any]): JList[Compositor[T]] = {
    if (!jobJMap.contains("compositor")) return new AList()
    val compositors = jobJMap.get("compositor")
    compositors.asInstanceOf[JList[JMap[String, Any]]].map {
      f =>
        val compositor = Class.forName(shortName2FullName(f)).newInstance().asInstanceOf[Compositor[T]]
        val params: JList[JMap[String, Any]] = if (f.contains("params")) f.get("params").asInstanceOf[JList[JMap[String, Any]]] else new AList[JMap[String, Any]]()
        compositor.initialize(f.get("typeFilter").asInstanceOf[JList[String]], params)
        compositor
    }
  }

ServiceframeworkDispatcher的核心是StrategyDispatcher,而StrategyDispatcher的核心是其dispatch方法。

  def dispatch(params: JMap[Any, Any]): JList[T] = {
    findStrategies(clientType) match {
      case Some(strategies) =>
        strategies.flatMap { f => f.result(params) }
    }
  }

不同执行引擎的启动方法实现不同:

class SparkRuntime(_params: JMap[Any, Any]) extends StreamingRuntime with PlatformManagerListener {
  override def startRuntime: StreamingRuntime = this

  var sparkSession: SparkSession = createRuntime
  def createRuntime = {
    //...创建SparkSession,这里会根据参数判断是否支持Hive、Carbondata
  }

  params.put("_session_", sparkSession) //将SparkSession放入params中
  registerUDF  

  override def params: JMap[Any, Any] = _params
}

class SparkStreamingRuntime(_params: JMap[Any, Any]) extends StreamingRuntime with PlatformManagerListener { self =>
  var streamingContext: StreamingContext = createRuntime
  def createRuntime = {
    //创建StreamingContext,并将SparkSession放入params中
  }

  override def startRuntime = {
    streamingContext.start()
    this
  }
  override def awaitTermination = streamingContext.awaitTermination()
}

但真正执行StreamingPro主流程在streamingpro-commons下的SparkStreamingStrategy类。
注意:如果是spark-1.6,则streamingpro-spark下也有一个SparkStreamingStrategy类。

class SparkStreamingStrategy[T] extends Strategy[T] with DebugTrait with JobStrategy {
  var _ref: util.List[Strategy[T]] = _
  var _compositor: util.List[Compositor[T]] = _
  var _processor: util.List[Processor[T]] = _
  var _configParams: util.Map[Any, Any] = _

  def result(params: util.Map[Any, Any]): util.List[T] = {
    ref.foreach { r => r.result(params) } // 先执行ref
    if (compositor != null && compositor.size() > 0) {
      // 第一个Compositor, 产生第一个中间结果
      var middleR = compositor.get(0).result(processor, ref, null, params)
      // 将新的中间结果运用到下一个Compositor
      // 第一个Compositor的结果运用到第二个的输入, 第二个Compositor的结果运用到第三个Compositor的输入...
      // 所以不同Compositor是链式执行的
      for (i <- 1 until compositor.size()) {
        middleR = compositor.get(i).result(processor, ref, middleR, params)
      }
      middleR
    } else new util.ArrayList[T]()
  }  
}

注意:配置文件中每个Job都有一个strategy级别的configParamsref也会使用这个全局的configParams
它是一个Map[String, Any]的结构。每个Compositor和Processor内部也有一个params配置,这是一个数组。

实际上,全局的configParams参数会被用在Strategy、Ref/Processor和Compositor的result()方法的最后一个参数。

    "compositor": [
      {
        "name": "testCompositor",
        "params": [
          {
            "sql": "select avg(value) avgAge from test",
            "outputTableName": "test3"
          },
          {
            "sql": "select sum(value) sumAge from test",
            "outputTableName": "test4"
          }
        ]
      }
    ],

接下来以读取多个数据源的Compositor实现类为例:

  • _configParams是在创建Compositor时初始化调用的,这是一个List[Map[String, Any]]的结构,对应了params列表配置
  • 如果需要替换,则会先处理配置信息
  • 接着,从params中获取SparkSession(还记得之前创建Runtime时放入Map中吗?),
  • 然后,执行sparkSession.read.format(xx).options(Map).load(path)
  • 最后,通过df.createOrReplaceTempView创建Spark SQL的临时表,名称为outputTable
class MultiSQLSourceCompositor[T] extends Compositor[T] with CompositorHelper {
  private var _configParams: util.List[util.Map[Any, Any]] = _

  override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {
    this._configParams = configParams
  }

  override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {

    _configParams.foreach { sourceConfig =>
      val name = sourceConfig.getOrElse("name", "").toString

      val _cfg = sourceConfig.map(f => (f._1.toString, f._2.toString)).map { f =>
        (f._1, params.getOrElse(s"streaming.sql.source.${name}.${f._1}", f._2).toString)
      }.toMap

      val sourcePath = _cfg("path")
      val df = sparkSession(params).read.format(sourceConfig("format").toString).options(
        (_cfg - "format" - "path" - "outputTable").map(f => (f._1.toString, f._2.toString))).load(sourcePath)
      df.createOrReplaceTempView(_cfg.getOrElse("outputTable", _cfg.getOrElse("outputTableName", "")))
    }
    List()
  }
}

为了支持配置的动态替换,_cfg参数会做一些处理,比如上面的s"streaming.sql.source.${name}.${f._1}"如果需要被替换,则会被替换为f._2
下表列举了StreamingPro支持的几种替换方式。

配置参数 配置示例 动态传参数
streaming.sql.source.[name].[参数] "path": "file:///tmp/sample_article.txt" -streaming.sql.source.firstSource.path file:///tmp/wow.txt
streaming.sql.out.[name].[参数] "path": "file:///tmp/sample_article.txt" -streaming.sql.source.firstSink.path file:///tmp/wow_20170101.txt
streaming.sql.params.[param-name] "sql": "select * from test where hp_time=:today" -streaming.sql.params.today "20170101"

假设有两个数据输入源和一个输出目标的配置如下:

      {
        "name": "batch.sources",
        "params": [
          {
            "name":"firstSource",
            "path": "file:///tmp/sample_article.txt",
            "format": "com.databricks.spark.csv",
            "outputTable": "article",
            "header":true
          },
          {
              "name":"secondSource",
              "path": "file:///tmp/sample_article2.txt",
              "format": "com.databricks.spark.csv",
              "outputTable": "article2",
              "header":true
            }
        ]
      },
      {
        "name": "batch.outputs",
        "params": [
          {
            "name":"firstSink",
            "path": "file:///tmp/sample_article.txt",
            "format": "com.databricks.spark.csv",
            "outputTable": "article",
            "header":true
          }
        ]
      }

Source的功能是:读取输入源形成DataFrame,然后创建临时表。其他组件比如SQL也是类似的。至此StreamingPro的大致流程就分析完了。

@allwefantasy
Copy link
Contributor

写的太棒了

1 similar comment
@allantaylor81
Copy link

写的太棒了

@robin-su
Copy link

优秀

@foreverjay
Copy link

很棒的文章,谢谢作者开源了,解决了算法和工程的痛,牛

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants