- 
                Notifications
    
You must be signed in to change notification settings  - Fork 1
 
Open
Labels
Description
最近在用 Spark SQL 做一些事情,但是数据格式的转换花了一些功夫。如果你希望查询结果转化为 JSON 格式,那么直接使用 Row 肯定是不行的。通过 Spark SQL 查询得到的数据是 Array[Row],需要结合 Schema 方可构造出 Array[Map] 这样的数据。下面这段代码可以用来做这样的转换。转换完成之后,通过其他一些 Scala 的 JSON 序列化工具(例如 lift-json)即可得到 JSON 格式的数据。
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.types._
import scala.collection.mutable.{ArrayBuffer}
object SparkRowFormatter {
  def formatRowsWithSchema(rowArr: Array[Row], schema: StructType) = {
    rowArr.map(r => formatStruct(schema.fields, r))
  }
  private def formatStruct(schema: Seq[StructField], r: Row) = {
    val paired = schema.zip(r)
    paired.foldLeft(Map[String, Any]())((s, p) => s ++ formatItem(p))
  }
  private def formatItem(p: Pair[StructField, Any]): Map[String, Any] = {
    p match {
      case (sf, a) =>
        sf.dataType match {
          case ArrayType(et, _) =>
            Map(sf.name -> (if (a == null) a else formatArray(et, a.asInstanceOf[ArrayBuffer[Any]])))
          case StructType(s) =>
            Map(sf.name -> (if (a == null) a else formatStruct(s, a.asInstanceOf[Row])))
          case _ => Map(sf.name -> a)
        }
    }
  }
  private def formatArray(et: DataType, arr: ArrayBuffer[Any]): Seq[Any] = {
    et match {
      case StructType(s) => arr.map(e => formatStruct(s, e.asInstanceOf[Row]))
      case ArrayType(t, _) =>
        arr.map(e => formatArray(t, e.asInstanceOf[ArrayBuffer[Any]]))
      case _ => arr
    }
  }
}