Skip to content

Commit

Permalink
remove SSERenderable classes (#938)
Browse files Browse the repository at this point in the history
Simplify the messages returned back from the stream api and
use the same model objects for both the server and the client.
  • Loading branch information
brharrington committed Oct 13, 2018
1 parent 9ae3eaf commit d5bd34b
Show file tree
Hide file tree
Showing 13 changed files with 91 additions and 205 deletions.
Expand Up @@ -15,6 +15,8 @@
*/
package com.netflix.atlas.eval.model

import com.netflix.atlas.json.JsonSupport

/**
* Datapoint read in from the LWC service.
*
Expand All @@ -29,6 +31,7 @@ package com.netflix.atlas.eval.model
* @param value
* Value for the datapoint.
*/
case class LwcDatapoint(timestamp: Long, id: String, tags: Map[String, String], value: Double) {
case class LwcDatapoint(timestamp: Long, id: String, tags: Map[String, String], value: Double)
extends JsonSupport {
val `type`: String = "datapoint"
}
Expand Up @@ -29,3 +29,14 @@ import com.netflix.atlas.json.JsonSupport
case class LwcDiagnosticMessage(id: String, message: DiagnosticMessage) extends JsonSupport {
val `type`: String = "diagnostic"
}

object LwcDiagnosticMessage {

def info(id: String, message: String): LwcDiagnosticMessage = {
apply(id, DiagnosticMessage.info(message))
}

def error(id: String, message: String): LwcDiagnosticMessage = {
apply(id, DiagnosticMessage.error(message))
}
}
Expand Up @@ -15,6 +15,8 @@
*/
package com.netflix.atlas.eval.model

import com.netflix.atlas.json.JsonSupport

/**
* Pair representing the expression and step size for data being requested from the LWCAPI
* service. A set of data expressions corresponding with this request will be returned as
Expand All @@ -25,6 +27,6 @@ package com.netflix.atlas.eval.model
* @param step
* The step size used for this stream of data.
*/
case class LwcExpression(expression: String, step: Long) {
case class LwcExpression(expression: String, step: Long) extends JsonSupport {
val `type`: String = "expression"
}
Expand Up @@ -15,9 +15,11 @@
*/
package com.netflix.atlas.eval.model

import akka.util.ByteString
import com.fasterxml.jackson.databind.JsonNode
import com.netflix.atlas.akka.DiagnosticMessage
import com.netflix.atlas.json.Json
import com.netflix.atlas.json.JsonSupport

/**
* Helpers for working with messages coming back from the LWCAPI service.
Expand All @@ -38,4 +40,23 @@ object LwcMessages {
case _ => Json.decode[DiagnosticMessage](data)
}
}

def toSSE(msg: JsonSupport): ByteString = {
val prefix = msg match {
case _: LwcSubscription => subscribePrefix
case _: LwcDatapoint => metricDataPrefix
case _: LwcDiagnosticMessage => diagnosticPrefix
case _: LwcHeartbeat => heartbeatPrefix
case _ => defaultPrefix
}
prefix ++ ByteString(msg.toJson) ++ suffix
}

val subscribePrefix = ByteString("info: subscribe ")
val metricDataPrefix = ByteString("data: metric ")
val diagnosticPrefix = ByteString("data: diagnostic ")
val heartbeatPrefix = ByteString("data: heartbeat ")
val defaultPrefix = ByteString("data: ")

private val suffix = ByteString("\r\n\r\n")
}
Expand Up @@ -15,6 +15,8 @@
*/
package com.netflix.atlas.eval.model

import com.netflix.atlas.json.JsonSupport

/**
* Subscription message that is returned by the LWC service.
*
Expand All @@ -23,6 +25,6 @@ package com.netflix.atlas.eval.model
* @param metrics
* Data expressions that result from the root expression.
*/
case class LwcSubscription(expression: String, metrics: List[LwcDataExpr]) {
case class LwcSubscription(expression: String, metrics: List[LwcDataExpr]) extends JsonSupport {
val `type`: String = "subscription"
}
Expand Up @@ -46,7 +46,7 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) with InHandler with OutHandler {
import LwcToAggrDatapoint._
import com.netflix.atlas.eval.model.LwcMessages._

// Default to a decent size so it is unlikely there'll be a need to allocate
// a larger array
Expand Down Expand Up @@ -132,10 +132,3 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext)
}
}
}

object LwcToAggrDatapoint {
private val subscribePrefix = ByteString("info: subscribe ")
private val metricDataPrefix = ByteString("data: metric ")
private val diagnosticPrefix = ByteString("data: diagnostic ")
private val heartbeatPrefix = ByteString("data: heartbeat ")
}
Expand Up @@ -21,9 +21,9 @@ import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import com.netflix.atlas.akka.CustomDirectives._
import com.netflix.atlas.akka.WebApi
import com.netflix.atlas.eval.model.LwcDatapoint
import com.netflix.atlas.eval.model.LwcDiagnosticMessage
import com.netflix.atlas.json.JsonSupport
import com.netflix.atlas.lwcapi.StreamApi._
import com.netflix.spectator.api.Registry
import com.typesafe.scalalogging.StrictLogging

Expand All @@ -39,47 +39,43 @@ class EvaluateApi(registry: Registry, sm: StreamSubscriptionManager)
endpointPath("lwc" / "api" / "v1" / "evaluate") {
post {
parseEntity(json[EvaluateRequest]) { req =>
evaluate(req.timestamp, req.toSSE)
payloadSize.record(req.metrics.size)
val timestamp = req.timestamp
req.metrics.foreach { m =>
val datapoint = LwcDatapoint(timestamp, m.id, m.tags, m.value)
evaluate(m.id, datapoint)
}
req.messages.foreach { m =>
evaluate(m.id, m)
}
complete(HttpResponse(StatusCodes.OK))
}
}
}
}

private def evaluate(timestamp: Long, items: List[(String, SSERenderable)]): Unit = {
payloadSize.record(items.size)
items.foreach { item =>
val (id, msg) = item
val queues = sm.handlersForSubscription(id)
if (queues.nonEmpty) {
queues.foreach { queue =>
logger.trace(s"sending $msg to $queue")
queue.offer(msg)
}
} else {
logger.trace(s"no subscriptions, ignoring $msg")
ignoredCounter.increment()
private def evaluate(id: String, msg: JsonSupport): Unit = {
val queues = sm.handlersForSubscription(id)
if (queues.nonEmpty) {
queues.foreach { queue =>
logger.trace(s"sending $msg to $queue")
queue.offer(msg)
}
} else {
logger.trace(s"no subscriptions, ignoring $msg")
ignoredCounter.increment()
}
}
}

object EvaluateApi {
type TagMap = Map[String, String]

case class Item(id: String, tags: TagMap, value: Double) extends JsonSupport
case class Item(id: String, tags: TagMap, value: Double)

case class EvaluateRequest(
timestamp: Long,
metrics: List[Item] = Nil,
messages: List[LwcDiagnosticMessage] = Nil
) extends JsonSupport {

def toSSE: List[(String, SSERenderable)] = {
val builder = List.newBuilder[(String, SSERenderable)]
builder ++= metrics.map(m => m.id -> SSEMetric(timestamp, m))
builder ++= messages.map(m => m.id -> SSEGenericJson("diagnostic", m))
builder.result()
}
}
) extends JsonSupport
}
Expand Up @@ -17,6 +17,7 @@ package com.netflix.atlas.lwcapi

import akka.stream.QueueOfferResult
import akka.stream.scaladsl.SourceQueueWithComplete
import com.netflix.atlas.json.JsonSupport
import com.typesafe.scalalogging.StrictLogging

import scala.concurrent.Future
Expand All @@ -30,11 +31,10 @@ import scala.concurrent.Future
* @param queue
* Underlying queue that will receive the messsages.
*/
class QueueHandler(id: String, queue: SourceQueueWithComplete[SSERenderable])
extends StrictLogging {
class QueueHandler(id: String, queue: SourceQueueWithComplete[JsonSupport]) extends StrictLogging {

def offer(msg: SSERenderable): Future[QueueOfferResult] = {
logger.trace(s"enqueuing message for $id: ${msg.toSSE}")
def offer(msg: JsonSupport): Future[QueueOfferResult] = {
logger.trace(s"enqueuing message for $id: ${msg.toJson}")
queue.offer(msg)
}

Expand Down
114 changes: 8 additions & 106 deletions atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/StreamApi.scala
Expand Up @@ -31,16 +31,12 @@ import akka.stream.ThrottleMode
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.ByteString
import com.netflix.atlas.akka.CustomDirectives._
import com.netflix.atlas.akka.DiagnosticMessage
import com.netflix.atlas.akka.StreamOps
import com.netflix.atlas.akka.WebApi
import com.netflix.atlas.eval.model.LwcDataExpr
import com.netflix.atlas.eval.model.LwcDatapoint
import com.netflix.atlas.eval.model.LwcHeartbeat
import com.netflix.atlas.eval.model.LwcSubscription
import com.netflix.atlas.json.Json
import com.netflix.atlas.eval.model.LwcMessages
import com.netflix.atlas.json.JsonSupport
import com.netflix.iep.NetflixEnvironment
import com.netflix.spectator.api.Registry
Expand Down Expand Up @@ -85,24 +81,20 @@ class StreamApi @Inject()(

// Drop any other connections that may already be using the same id
sm.unregister(streamId).foreach { queue =>
queue.offer(
SSEShutdown(
s"Dropped: another connection is using the same stream-id: $streamId",
unsub = false
)
)
val msg = DiagnosticMessage.info(s"dropped: another connection is using id: $streamId")
queue.offer(msg)
queue.complete()
}

// Create queue to allow messages coming into /evaluate to be passed to this stream
val (queue, pub) = StreamOps
.queue[SSERenderable](registry, "StreamApi", queueSize, OverflowStrategy.dropHead)
.toMat(Sink.asPublisher[SSERenderable](true))(Keep.both)
.queue[JsonSupport](registry, "StreamApi", queueSize, OverflowStrategy.dropHead)
.toMat(Sink.asPublisher[JsonSupport](true))(Keep.both)
.run()

// Send initial setup messages
val handler = new QueueHandler(streamId, queue)
queue.offer(SSEHello(streamId, instanceId))
queue.offer(DiagnosticMessage.info(s"setup stream $streamId on $instanceId"))
sm.register(streamId, handler)

// Heartbeat messages to ensure that the socket is never idle
Expand All @@ -127,16 +119,15 @@ class StreamApi @Inject()(
.map(_.metadata.frequency)
.distinct
.map { step =>
val heartbeat = LwcHeartbeat(stepAlignedTime(step), step)
SSEGenericJson("heartbeat", heartbeat)
LwcHeartbeat(stepAlignedTime(step), step)
}
Source(steps)
}

val source = Source
.fromPublisher(pub)
.merge(heartbeatSrc)
.map(msg => ChunkStreamPart(ByteString(msg.toSSE) ++ suffix))
.map(msg => ChunkStreamPart(LwcMessages.toSSE(msg)))
.via(StreamOps.monitorFlow(registry, "StreamApi"))
.watchTermination() { (_, f) =>
f.onComplete {
Expand All @@ -153,96 +144,7 @@ class StreamApi @Inject()(
}
}

trait SSERenderable {

def toSSE: String

def toJson: String
}

object StreamApi {

private val instanceId = NetflixEnvironment.instanceId()

private val suffix = ByteString("\r\n\r\n")

case class ExpressionsRequest(expressions: List[ExpressionMetadata]) extends JsonSupport

object ExpressionsRequest {

def fromJson(json: String): ExpressionsRequest = {
val decoded = Json.decode[ExpressionsRequest](json)
if (decoded.expressions == null || decoded.expressions.isEmpty)
throw new IllegalArgumentException("Missing or empty expressions array")
decoded
}
}

abstract class SSEMessage(msgType: String, what: String, content: JsonSupport)
extends SSERenderable {

def toSSE = s"$msgType: $what ${content.toJson}"

def getWhat: String = what
}

// Hello message
case class HelloContent(streamId: String, instanceId: String) extends JsonSupport

case class SSEHello(streamId: String, instanceId: String)
extends SSEMessage("info", "hello", HelloContent(streamId, instanceId)) {

def toJson: String = {
Json.encode(DiagnosticMessage.info(s"setup stream $streamId on $instanceId"))
}
}

// Generic message string
case class SSEGenericJson(what: String, msg: JsonSupport) extends SSEMessage("data", what, msg) {

def toJson: String = msg.toJson
}

// Shutdown message
case class ShutdownReason(reason: String) extends JsonSupport

case class SSEShutdown(reason: String, private val unsub: Boolean = true)
extends SSEMessage("info", "shutdown", ShutdownReason(reason)) {

def toJson: String = {
Json.encode(DiagnosticMessage.info(s"shutting down stream on $instanceId: $reason"))
}

def shouldUnregister: Boolean = unsub
}

// Subscribe message
case class SubscribeContent(expression: String, metrics: List[ExpressionMetadata])
extends JsonSupport

case class SSESubscribe(expr: String, metrics: List[ExpressionMetadata])
extends SSEMessage("info", "subscribe", SubscribeContent(expr, metrics)) {

def toJson: String = {
Json.encode(
LwcSubscription(expr, metrics.map(m => LwcDataExpr(m.id, m.expression, m.frequency)))
)
}
}

case class SSEMetricContent(timestamp: Long, id: String, tags: EvaluateApi.TagMap, value: Double)
extends JsonSupport

// Evaluate message
case class SSEMetric(timestamp: Long, data: EvaluateApi.Item)
extends SSEMessage(
"data",
"metric",
SSEMetricContent(timestamp, data.id, data.tags, data.value)
) {

def toJson: String = {
Json.encode(LwcDatapoint(timestamp, data.id, data.tags, data.value))
}
}
}

0 comments on commit d5bd34b

Please sign in to comment.