In [1]:
import java.util.UUID
import org.joda.time.DateTime
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka._
import org.apache.spark.sql.{Row, SQLContext}
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.ext.JodaTimeSerializers

In [2]:
val zookeeper = "10.100.198.200:2181"
val group = UUID.randomUUID().toString
val topic = "plant-data"
val threads = 2
val topicMap = Map(topic -> threads)

## Streaming Context erstellen

In [4]:
val ssc = new StreamingContext(sc, Seconds(1))
ssc.checkpoint("checkpoint")

## Einen Kafka-Stream erstellen 
Die Kafka-Dependency wurde beim Installieren des Kernels angegeben!

In [5]:
val values : DStream[(String, String)] = KafkaUtils.createStream(ssc, zookeeper, group, topicMap)
val window : DStream[(Option[_], (String, Int))] = values.window(Seconds(5)).map{
    case (key, value) => (None, (value, 1))
}

Model eines Datenpunktes - DataPoint

In [6]:
case class DataPoint(val plantId: String, val timestamp: String, val metric: String, val value: Double)

In [7]:
val dpWindow : DStream[(Option[_], (Double, Int))] = window.map{
    case (key, (value,count)) => {
        implicit val formats = DefaultFormats ++ org.json4s.ext.JodaTimeSerializers.all
        val json = parse(value)
        val dp = json.extract[DataPoint]
        (Some(dp.plantId+"_"+dp.metric), (dp.value, count))
    }
}

## Die eigentliche Berechnung des Mittelwerts

In [8]:
val avgCalc = dpWindow.transform(rdd =>   
    rdd.reduceByKey{
        case (acc, el) => (acc._1 + el._1, acc._2 + el._2)
    }.map {
        case (key, (sum, count)) => (key, sum/count)
    })

In [9]:
avgCalc.print()

In [None]:
ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 1475650380000 ms
-------------------------------------------

-------------------------------------------
Time: 1475650381000 ms
-------------------------------------------

-------------------------------------------
Time: 1475650382000 ms
-------------------------------------------

-------------------------------------------
Time: 1475650383000 ms
-------------------------------------------

-------------------------------------------
Time: 1475650384000 ms
-------------------------------------------

-------------------------------------------
Time: 1475650385000 ms
-------------------------------------------

-------------------------------------------
Time: 1475650386000 ms
-------------------------------------------

-------------------------------------------
Time: 1475650387000 ms
-------------------------------------------

-------------------------------------------
Time: 1475650388000 ms
-------------------------------------