Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

给Structured Streaming添加Sink的方式 #2

Open
dubin555 opened this issue Sep 30, 2018 · 0 comments
Open

给Structured Streaming添加Sink的方式 #2

dubin555 opened this issue Sep 30, 2018 · 0 comments
Labels
spark Apache Spark

Comments

@dubin555
Copy link
Owner

dubin555 commented Sep 30, 2018

背景:

在数据平台部门中, 当需求井喷了之后, 对每个需求做定制化的编码已经不现实了。这时候一般都需要与业务部门合作,这时最好有一个可视化的开发UI, 后端有个service层, 用户所有对数据处理的逻辑通过配置文件,或者纯SQL来表达。这时用户有看实时结果的需求来验证代码或者配置的正确性。

已有的方案及不足:

在其他博客有看到通过websocket来拿当前的spark日志, 包括spark运行时的日志, 这样就可以把streaming的sink改为console模式来获得一部分的日志。
不足之处在于, console模式获得的日志其实是dataframe.show()的结果, 对于前端的交互可视化会比较差。那可不可以做到运行时获得一部分数据并以类似json的方式传回来

源码

先看下console的实现方式,从start方法入手:

def start(): StreamingQuery = {
     ...
    if (source == "memory") {
    ...
    } else if (source == "foreach") {
    ...  
    } else {
      val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
      val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
      val sink = ds.newInstance() match {
        case w: StreamWriteSupport if !disabledSources.contains(w.getClass.getCanonicalName) => w
        case _ =>
          val ds = DataSource(
            df.sparkSession,
            className = source,
            options = extraOptions.toMap,
            partitionColumns = normalizedParCols.getOrElse(Nil))
          ds.createSink(outputMode)
      }
...

其中有个lookupDataSource方法,

       case sources =>
          // There are multiple registered aliases for the input. If there is single datasource
          // that has "org.apache.spark" package in the prefix, we use it considering it is an
          // internal datasource within Spark.
          val sourceNames = sources.map(_.getClass.getName)
          val internalSources = sources.filter(_.getClass.getName.startsWith("org.apache.spark"))
          if (internalSources.size == 1) {
            logWarning(s"Multiple sources found for $provider1 (${sourceNames.mkString(", ")}), " +
              s"defaulting to the internal datasource (${internalSources.head.getClass.getName}).")
            internalSources.head.getClass
          } else {
            throw new AnalysisException(s"Multiple sources found for $provider1 " +
              s"(${sourceNames.mkString(", ")}), please specify the fully qualified class name.")
          }

看来类名要以org.apache.spark开头才行, 按console的方式, 实现一下自己的Debug sink,

class DebugWriter(schema: StructType, options: DataSourceOptions) extends StreamWriter {

  assert(SparkSession.getActiveSession.isDefined)
  protected val spark = SparkSession.getActiveSession.get

  implicit val formats = org.json4s.DefaultFormats

  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {

    printRows(messages, schema, s"Batch: $epochId")
  }

  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}

  override def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory

  protected def printRows(
                           commitMessages: Array[WriterCommitMessage],
                           schema: StructType,
                           printMessage: String): Unit = {
    val rows = commitMessages.collect {
      case PackedRowCommitMessage(rs) => rs
    }.flatten


    val sample = spark
      .createDataFrame(rows.take(10).toList.asJava, schema)
      .toJSON
      .collect()

    val message = Serialization.writePretty(Map("message" -> sample))

    // scalastyle:off println
    println("-------------------------------------------")
    println(message)
    println("-------------------------------------------")
    // scalastyle:off println
  }
}

checkpoint 的地址必须要指定, 因为源代码中可以看到useTempCheckpointLocation = source == "console", 随机一个/tmp下的目录即可
这样json格式的debug用的部分dataframe就可以打印出来了, 我们可以上传到redis或者kafka用于前端画图, 方便用户debug, 查看列类型等等。

@dubin555 dubin555 added the spark Apache Spark label Sep 30, 2018
@dubin555 dubin555 changed the title 给Structured Streaming 添加 Sink的方式 给Structured Streaming添加Sink的方式 Sep 30, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
spark Apache Spark
Projects
None yet
Development

No branches or pull requests

1 participant