# Spark Streaming 读取 Kinesis 数据 (scala)

Kinesis中的原数据，目前 Kinesis 中不断有数据注入。
```json
{
    "ordertime": 1573573055,
    "orderid": 23,
    "itemid": "Item_1231231",
    "orderunits": 15,
    "address": {
        "city": "City_a",
        "state": "State_xxx",
        "zipcode": 10000
    }
}
```

最终目标为
1. Kinesis 中的数据落到 S3 中
2. 根据 **ordertime** 这个字段进行分区，例如 `ot=2019101123/`
3. 落盘 S3 的数据需要进行铺平, 目标格式如下：
```json
{
    "ordertime": 1573573055,
    "orderid": 23,
    "itemid": "Item_1231231",
    "orderunits": 15,
    "address_city": "City_a",
    "address_state": "State_xxx",
    "address_zipcode": 10000
}
```

## 引入项目依赖

* 在 Jupyter 中，使用 `%%configure` 来配置饮用外部包。
* 通过 spark-submit 来提交，命令为 `spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.4.4 --class ShiHengKinesisSparkApp s3://<path-to-the-jar-file-in-s3.jar>`

## 创建目录结构

```shell
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/ShiHengKinesisSparkApp.scala
```

## 编写配置文件

```shell
$ cat ./build.sbt
name := "ShiHengKinesisSparkApp"
version := "1.0"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.4.4"
```

其中，scalaVersion, libraryDependencies的版本号可以通过下方命令查询：

```shell
$ spark-shell --version
```
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
          /_/

    Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_222

In [None]:
// src/main/scala/ShiHengKinesisSparkApp.scala
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.services.kinesis.AmazonKinesisClient
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{StructType,ArrayType}
import org.joda.time.DateTime
import scala.util.parsing.json._

object ShiHengKinesisSparkApp {
    def main(args: Array[String]) {
        def flattenDataframe(df: DataFrame): DataFrame = {
            val fields = df.schema.fields
            val fieldNames = fields.map(x => x.name)
            val length = fields.length
            for(i <- 0 to fields.length-1){
                val field = fields(i)
                val fieldtype = field.dataType
                val fieldName = field.name
                fieldtype match {
                    case arrayType: ArrayType =>
                        val fieldNamesExcludingArray = fieldNames.filter(_!=fieldName)
                        val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName) as $fieldName")
                        // val fieldNamesToSelect = (fieldNamesExcludingArray ++ Array(s"$fieldName.*"))
                        val explodedDf = df.selectExpr(fieldNamesAndExplode:_*)
                        return flattenDataframe(explodedDf)
                    case structType: StructType =>
                        val childFieldnames = structType.fieldNames.map(childname => fieldName +"."+childname)
                        val newfieldNames = fieldNames.filter(_!= fieldName) ++ childFieldnames
                        val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_"))))
                        val explodedf = df.select(renamedcols:_*)
                        return flattenDataframe(explodedf)
                    case _ =>
                    }
                }
                df
            }

        val unixToDT = udf{(ordertime:Long) => new DateTime(ordertime * 1000).toDateTime.toString("yyyyMMddHHmm").toLong}

        val appName     = "shiHengKinesisSparkApp"
        val streamName  = "orders"
        val endpointUrl = "https://kinesis.cn-northwest-1.amazonaws.com.cn"
        val regionName  = "cn-northwest-1"
        val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
        require(credentials != null, "No AWS credentials found. Please specify credentials using one of the methods specified " +
          "in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
        val kinesisClient = new AmazonKinesisClient(credentials)
        kinesisClient.setEndpoint(endpointUrl)
        val numShards   = kinesisClient.describeStream(streamName).getStreamDescription.getShards().size()
        val numStreams  = numShards

        // Spark Streaming interval
        val batchInterval = Milliseconds(5000)
        val kinesisCheckpointInterval = batchInterval

        val sparkConf   = new SparkConf().setAppName(appName) // for spark-submit
        // val sparkConf   = sc // for spark-shell

        val ssc = new StreamingContext(sparkConf, batchInterval)
        // ssc.checkpoint("/tmp/checkpoint") // no checkpoint can be set, or you'll get error

        val kinesisStreams = (0 until numStreams).map { i =>
          KinesisInputDStream.builder
            .streamingContext(ssc)
            .streamName(streamName)
            .endpointUrl(endpointUrl)
            .regionName(regionName)
            .initialPosition(new Latest())
            .checkpointAppName(appName)
            .checkpointInterval(kinesisCheckpointInterval)
            .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
            .build()
        }

        val unionStreams = ssc.union(kinesisStreams)
        val sqlContext = SparkSession.builder().getOrCreate()

        unionStreams.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => {
          println("**********************************************")
          println(rdd.count)
          println("rdd isempty:" + rdd.isEmpty)
          if (rdd.isEmpty()) {
            println("No data input!!!")
          } else {
            val lines   = rdd.map(byteArray => new String(byteArray))//.collect()//.toList
            val linesDF = sqlContext.read.json(lines)
            // linesDF.show()
            // linesDF.printSchema
            val linesFlattenDF = flattenDataframe(linesDF)
            val linesResultDF  = linesFlattenDF.withColumn("ot", unixToDT(linesFlattenDF("ordertime")))
            // linesResultDF.show()
            
            // set mode to append in case `analysisexception path already exists`
            linesResultDF.write.mode("append").partitionBy("ot").csv("s3://joeshi-poc/binc/parted-prod/")
          }
          println("**********************************************")
        })
        ssc.start()
        ssc.awaitTermination()
    }
}

## 编译Scala脚本

[参考](https://medium.com/@tedherman/compile-scala-on-emr-cb77610559f0)

### 安装SBT

这里使用SBT来编译脚本, 可以在[Github](https://github.com/sbt/sbt/releases/)找到最新的版本。

```shell
wget https://github.com/sbt/sbt/releases/download/v1.3.3/sbt-1.3.3.tgz
```

### 解压缩并配置环境变量

```shell
tar -xf sbt-1.3.3.tgz
export PATH=$PATH:`pwd`/sbt/bin
```

### link lib文件夹

需要创建软链接指向spark的lib库，否则会导致编译失败。
```shell
cd <project/path>
ln -s /usr/lib/spark/jars lib
```

### 编译并打包

```shell
sbt compile
sbt package
```

## 提交脚本

打包后的文件存在`target/scala-2.11/`中。[参考](https://spark.apache.org/docs/latest/quick-start.html#self-contained-applications)

```shell
spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.4.4 target/scala-2.11/shihengkinesissparkapp_2.11-1.0.jar
```

## 退出程序

使用 yarn 退出
1. 登录 EMR master node
2. `yarn application -kill <application-id>`

## 实验结束

打开 S3, 查看我们生成的文件