-
Notifications
You must be signed in to change notification settings - Fork 2
/
SalesServer.scala
71 lines (53 loc) · 2.08 KB
/
SalesServer.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.logicalgenetics.reports
import java.time.Duration
import java.util
import com.logicalgenetics.Config
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.serialization.StringDeserializer
import org.json4s.{DefaultFormats, Formats}
import org.scalatra._
import org.scalatra.json._
import scala.collection.JavaConverters._
import scala.collection.mutable
object SalesCache {
lazy val sales : mutable.Map[String, String] = mutable.Map[String, String]()
def update(bars: Seq[(String, String)]): Seq[(String, String)] = {
sales ++= bars.toMap
sales.toList
}
}
class SalesServer extends ScalatraServlet with JacksonJsonSupport {
protected implicit lazy val jsonFormats: Formats = DefaultFormats
lazy val consumer: KafkaConsumer[String, GenericRecord] = {
val properties = new util.Properties()
properties.put("bootstrap.servers", Config.servers)
properties.put("schema.registry.url", Config.schemaRegistry)
properties.put("group.id", "takings.by.bar")
properties.put("key.deserializer", classOf[StringDeserializer])
properties.put("value.deserializer", classOf[KafkaAvroDeserializer])
properties.put("auto.offset.reset", "latest")
new KafkaConsumer[String, GenericRecord](properties)
}
before() {
contentType = formats("json")
}
def fetchRows: List[ConsumerRecord[String, GenericRecord]] = {
Iterator.continually(consumer.poll(Duration.ofSeconds(1)))
.takeWhile(_.count() > 0)
.flatMap(_.iterator().asScala)
.toList
}
get("/") {
consumer.subscribe(util.Arrays.asList("takings_by_bar_last_min"))
val rows = fetchRows
val updates = rows
.map(record => record.value())
.map(x => (x.get("BAR").toString, x.get("SALES").toString))
val bars = SalesCache
.update(updates)
.map {case (bar, sales) => Map("bar" -> bar.toInt, "sales" -> sales.toInt)}
Map("count" -> bars.length, "processed" -> rows.length, "bars" -> bars)
}
}