# JaegerTracesIngest

<div>In this Notebook the procedure of retrieving Jaeger Data from protobuf API v_2 is illustrated and explained step by step. The following code is a Scala translation/adaptation of the code of the officiel Java Library Jaeger-Data-Analytics.</div><div><br></div><div>The main role of this notebook is to illustrate how to create a Data model that fits our needs from Jaeger API, it does not cover graph creation and metric extraction from traces (yet ?).<br></div>

## Connection with Jaeger Instance


<div><div>Establish a connection with the <u>gRPC API v2</u> of the Jaeger Instance instance installed in the namespace (reachable via the service <code>jaeger-query</code> on port <code>16686</code> behind the path <code>/jaeger</code>).</div><div><br></div><div>Some useful links:<br></div><ul><li><b>A brief overview of Jaeger APIs:</b> [https://www.jaegertracing.io/docs/1.18/apis/#grpcprotobuf-stable]</li><li><b>Jaeger gRPC API Proto Definition</b>: [https://github.com/jaegertracing/jaeger-idl/blob/master/proto/api_v2/query.proto]</li><li><b>The Java library for Jaeger Data Analytics</b>: [https://github.com/jaegertracing/jaeger-analytics-java]</li><li><b>Reference Jupyter Notebook</b>: [https://github.com/jaegertracing/jaeger-analytics-java/blob/master/jupyter/jaeger-query.ipynb]</li></ul></div>We create a gRPC channel with Jaeger API v2 and create a **Trace Request** on the root service that will be run in the rest of the Notebook.<br>


In [2]:
import com.google.protobuf.ByteString
import io.grpc.{ManagedChannel, ManagedChannelBuilder}
import io.jaegertracing.api_v2.Model.{Span => ProtoSpan}
import io.jaegertracing.api_v2.Query._
import io.jaegertracing.api_v2.QueryServiceGrpc
import io.jaegertracing.api_v2.QueryServiceGrpc.QueryServiceBlockingStub

import collection.JavaConverters._

class JaegerAPIClient(jaegerQueryHostPort: String) {
  private var orphanSpans: List[ProtoSpan] = List[ProtoSpan]()

  private val channel: ManagedChannel = ManagedChannelBuilder
    .forTarget(jaegerQueryHostPort)
    .usePlaintext
    .build

  val queryService: QueryServiceBlockingStub =
    QueryServiceGrpc.newBlockingStub(channel)

  def queryTraces(serviceName: String) = {
    val query = TraceQueryParameters.newBuilder
      .setServiceName(serviceName)
      .build

    val jaegerTraceRequest = FindTracesRequest.newBuilder
      .setQuery(query)
      .build

    for {
      spanChunks <- queryService.findTraces(jaegerTraceRequest).asScala
      protoSpan <- spanChunks.getSpansList.asScala
    } yield protoSpan
  }

  /** Applies a "process" function on a batch of Jaeger Spans.
    * The process function take all spans as input and shall output a collection of elements typed Either[Iterable[ProtoSpan], A].
    * The Right part of the process function returned type are individual elements typed A returned by the batchProcess method.
    * The Left part is a Collection of Spans that have not been processed but that shall be processed in the future (e.g. spans of 
    * unfinished traces). These spans are saved into the JaegerAPIClient.orphanSpans List and will be appended to the next collection 
    * of Jaeger Spans retrived by the batchProcess function.
    * Can be used to build traces out of Spans: either traces can be built from spans, either the trace is not complete and building
    * this trace is defered to the next batch.
    */
  def batchProcess[A](
      batchSize: Int,
      serviceName: String,
      process: Iterable[ProtoSpan] => Iterable[Either[Iterable[ProtoSpan], A]])
    : Iterable[A] = {
    val newSpanBatch = this.queryTraces(serviceName).take(batchSize).toList

    val spanBatch = this.orphanSpans ++ newSpanBatch
    this.orphanSpans = List[ProtoSpan]()

    val processedSpans = process(spanBatch)

    this.orphanSpans = processedSpans
      .collect { case Left(protoSpans) => protoSpans }
      .toList
      .flatten
    println(s"Got ${this.orphanSpans.size} spans unprocessed in this batch")

    processedSpans.collect { case Right(a) => identity[A](a) }
  }
}

object JaegerAPIClient {
  def apply(jaegerQueryHostPort: String) =
    new JaegerAPIClient(jaegerQueryHostPort: String)
}

## Mapping ProtoBuf data to Standard Java/Scala API<br>


This object `Converter` is the Scala transposition to my own needs of the `Converter` class defined in `io.jaegertracing.analytics.model`.


Then we define an object named `Converter` that hold helper functions translating protobuf objects into Java objects. Target Java Objects are compatible with JVM 8 (matching Scala version of the notebook) and are native java objects (instead of Scala). This will parse:


* Jaeger *SpanIDs* and *TracesIDs* as `String`
* Protobuf Timestamps as `LocalDateTime`
* *Jaeger Tags* (typed Key-Value tuples) into `Map[String, String]`
* *Jaeger Span References* into `Tuple3(String, String, String)` where:
     * the first item represents TraceID
     * the second SpanID
     * the third the type of Relationship (either "CHILD_OF" or "FOLLOWS_FROM")
  
  
  


> 
> <div>Note: Target Types of the conversions is designed to be compatible with as much gremlin backends as possible, especially the Neo4j-one (tests are still to do)<br></div>
> 




In [4]:
import com.google.protobuf.{
  Timestamp => ProtoTimestamp,
  Duration => ProtoDuration,
  ByteString => ProtoByteString
}
import io.jaegertracing.api_v2.Model.{KeyValue, ValueType, SpanRef, SpanRefType}
import io.jaegertracing.analytics.model.{Converter => JaegerBaseConverter}

import java.time.{LocalDateTime, ZoneOffset, Duration}
import java.util.{List => JList}

object Converter {
  def toStringId(pBs: ProtoByteString): String =
    JaegerBaseConverter.toStringId(pBs)

  def toLocalDateTime(pTs: ProtoTimestamp): LocalDateTime =
    LocalDateTime.ofEpochSecond(pTs.getSeconds, pTs.getNanos, ZoneOffset.UTC)

  def toDuration(pDur: ProtoDuration): Duration =
    Duration.ofSeconds(pDur.getSeconds, pDur.getNanos)

  // Rewriting this method as it is defined as Private in JaegerBaseConverter
  def toMap(kvs: JList[KeyValue]): Map[String, String] = {
    for { kv <- kvs.asScala } yield
      kv.getVType match {
        case ValueType.STRING  => (kv.getKey, kv.getVStr)
        case ValueType.BOOL    => (kv.getKey, kv.getVBool.toString)
        case ValueType.INT64   => (kv.getKey, kv.getVInt64.toString)
        case ValueType.FLOAT64 => (kv.getKey, kv.getVFloat64.toString)
        case ValueType.BINARY  => (kv.getKey, kv.getVBinary.toStringUtf8)
        case _                 => (kv.getKey, "unrecognized")
      }
  }.toMap

  def toTuples3(srs: JList[SpanRef]): List[(String, String, String)] = {
    for { sr <- srs.asScala } yield
      sr.getRefType match {
        case SpanRefType.CHILD_OF =>
          (toStringId(sr.getTraceId), toStringId(sr.getSpanId), "CHILD_OF")
        case SpanRefType.FOLLOWS_FROM =>
          (toStringId(sr.getTraceId), toStringId(sr.getSpanId), "FOLLOWS_FROM")
        case _ =>
          (toStringId(sr.getTraceId), toStringId(sr.getSpanId), "unrecognized")
      }
  }.toList
}

## Definition of the *Analytics Trace-Data-Model*


> 
> This model is far from being mature and designed to fit my personal needs regarding the data Analytics I do. It shall be perceived as a seed, a starting point, to create your own model. Unlike, the model defined in Jaeger analytics Java library, this model does not cover the notion of Logs or Events (yet ?)
> 
> 
> 


We define a set of Scala classes modeling *Jaeger* Traces, they will be used as Label in our future property graph and are instantiated from protobuf data. In general terms the model cans be summed up by the following statement:

> In *OpenTelemetry*, Traces are made of a DAG of Spans, each of these Spans represent the latency measurement of a single Operation executed on a particular Resource.
> 
> 


Class defined are :


* `Operation`: This class models the concept of Operation that are shared among Spans in a Trace.<br>
* `Resource`: This class represents instances executing of a program, thus costing resources. OpenTelemetry Resource semntic is defined at https://github.com/open-telemetry/opentelemetry-specification/blob/v0.6.0/specification/resource/semantic_conventions/README.md<br>
* `Span`: In *OpenTelemetry* Spans are the representation of the latency measurement (at a given *startTime* there is an associated operation *duration*), they bring also other relevant numerical data, like, for example, the number of byte exchanged over the network. *OpenTelemetry* definition of Span can be found at : https://github.com/open-telemetry/opentelemetry-specification/blob/v0.6.0/specification/trace/api.md#span<br>
* `Trace`: Trace is the aggregation of Spans characterizing the propagation of Remote Procedure Calls or other network calls in a distributed system, this spans build a Directed Acyclic Dependency Graph of Resources.

<div>Each class defines a <i>static</i> .<code>of()</code> method used to wrap the constructor with Protobuf data pre-processing, thus allowing to create model instances directly from API data.<br></div>




In [6]:
import io.jaegertracing.api_v2.Model.{Span => ProtoSpan, SpanRef, SpanRefType}

// Definition of Operation
case class Operation(name: String)

object Operation {
  def apply(name: String) = new Operation(name)

  def of(ps: ProtoSpan): Operation = this.apply(ps.getOperationName)
}

// Defitinion of Reference pointers used in Spans
case class ReferenceId(traceId: String, spanId: String, name: String)

object ReferenceId {
  def apply(traceId: String, spanId: String, name: String): ReferenceId =
    new ReferenceId(traceId, spanId, name)

  def of(tuple3: (String, String, String)): ReferenceId =
    this.apply(traceId = tuple3._1, spanId = tuple3._2, name = tuple3._3)

  def of(sr: SpanRef): ReferenceId = sr.getRefType match {
    case SpanRefType.CHILD_OF =>
      this.apply(Converter.toStringId(sr.getTraceId),
                 Converter.toStringId(sr.getSpanId),
                 "CHILD_OF")
    case SpanRefType.FOLLOWS_FROM =>
      this.apply(Converter.toStringId(sr.getTraceId),
                 Converter.toStringId(sr.getSpanId),
                 "FOLLOWS_FROM")
    case _ =>
      this.apply(Converter.toStringId(sr.getTraceId),
                 Converter.toStringId(sr.getSpanId),
                 "unrecognized")
  }
}

// Definition of Resources (originally ``Processes`` in OpenTracing)
case class Resource(name: String, attributes: Map[String, String])

object Resource {
  def apply(name: String, attributes: Map[String, String]): Resource =
    new Resource(name, attributes)

  def of(ps: ProtoSpan): Resource = {
    val res = ps.getProcess

    this.apply(res.getServiceName, Converter.toMap(res.getTagsList))
  }
}

// Defintion of Spans
case class Span(
    spanId: String,
    traceId: String,
    startTime: LocalDateTime,
    duration: Duration,
    references: List[ReferenceId],
    attributes: Map[String, String],
    operationKey: String,
    resourceKey: String
)
object Span {
  def apply(
      spanId: String,
      traceId: String,
      startTime: LocalDateTime,
      duration: Duration,
      references: List[ReferenceId],
      attributes: Map[String, String],
      operationKey: String,
      resourceKey: String
  ): Span =
    new Span(spanId,
             traceId,
             startTime,
             duration,
             references,
             attributes,
             operationKey,
             resourceKey)

  def of(ps: ProtoSpan,
         getOperationKey: (ProtoSpan) => String,
         getResourceKey: (ProtoSpan) => String): Span =
    this.apply(
      spanId = Converter.toStringId(ps.getSpanId),
      traceId = Converter.toStringId(ps.getTraceId),
      startTime = Converter.toLocalDateTime(ps.getStartTime),
      duration = Converter.toDuration(ps.getDuration),
      references = ps.getReferencesList.asScala map { ReferenceId.of } toList,
      operationKey = getOperationKey(ps),
      resourceKey = getResourceKey(ps),
      attributes = Converter.toMap(ps.getTagsList)
    )
}

// Definition of Trace as an aggregation of Spans
case class Trace(traceId: String,
                 spans: List[Span],
                 operations: Map[String, Operation],
                 resources: Map[String, Resource])
object Trace {
  def apply(traceId: String,
            spans: List[Span],
            operations: Map[String, Operation],
            resources: Map[String, Resource]): Trace =
    new Trace(traceId, spans, operations, resources)

  def of(traceId: String,
         protoSpans: List[ProtoSpan]): Either[List[ProtoSpan], Trace] = {
    val getOperationIndex = (ps: ProtoSpan) => ps.getOperationName
    val getResourceIndex = (ps: ProtoSpan) => ps.getProcess.getServiceName

    val operations: List[(String, Operation)] = for {
      ps <- protoSpans
    } yield getOperationIndex(ps) -> Operation.of(ps)

    val resources: List[(String, Resource)] = for {
      ps <- protoSpans
    } yield getResourceIndex(ps) -> Resource.of(ps)

    val spans: List[Span] = for {
      ps <- protoSpans
    } yield Span.of(ps, getOperationIndex, getResourceIndex)

    val doLinkedSpansExist: List[Boolean] = for {
      span <- spans
      ref <- span.references
    } yield
      spans.exists(s => s.traceId == ref.traceId && s.spanId == ref.spanId)

    val isCompleteTrace = doLinkedSpansExist.forall(bool => bool)

    if (isCompleteTrace) {
      Right(this.apply(traceId, spans, operations.toMap, resources.toMap))
    } else {
      Left(protoSpans)
    }
  }
}

## Extracting Traces out of the flow of Jaeger Spans from API


<div>Once the model is defined, Jaeger data can be processed and parsed into our model. Here we process and aggregate 1 000 Spans from the API that we group by trace ID to construct Traces from our model.<br></div>

In [8]:
import io.jaegertracing.api_v2.Model.{Span => ProtoSpan}

val traces = JaegerAPIClient("jaeger-query.monitoring.svc.cluster.local:16686/jaeger")
                .batchProcess[Trace](
                    batchSize = 1000, 
                    serviceName = "frontend",
                    process = (protoSpans) => {
                        protoSpans
                            .groupBy { span => Converter.toStringId(span.getTraceId) }
                            .map { case (traceId, traceProtoSpans) => Trace.of(traceId, traceProtoSpans.toList) }
                    }
                ).toList

println(s"Generated ${traces.size} traces out of 1000 spans")
println(s"  • Creating graphs from traces with more than 8 spans")

for (t <- traces if t.spans.size > 8 ) {
    println(s"    - Trace ${t.traceId} has ${t.spans.size} spans")
}

Got 36 spans unprocessed in this batch
Generated 40 traces out of 1000 spans
  • Creating graphs from traces with more than 8 spans
    - Trace b0e4eff88619efa28dc2e9963739fdd4 has 41 spans
    - Trace 4e443a734a6e03875b6e158b18c4d758 has 41 spans
    - Trace 414ac60b2f34931e055019c7e6552b12 has 44 spans
    - Trace 5c8dbc94d520ffb97bc061cce4e3e54b has 68 spans
    - Trace 18355e7e75cf4de2d08809ae2b1c5c3c has 44 spans
    - Trace 35560e83460b921a73f60ccae05b7b0e has 41 spans
    - Trace d2c1dbfb33045899016e92f8e9131e44 has 41 spans
    - Trace 4b653fe4ec33ddd21d34279c76c132d1 has 42 spans
    - Trace 5fde8ab17acbb9892a19100adcb41646 has 42 spans
    - Trace 84fc4694185d99e9e81b106fd04f4190 has 41 spans
    - Trace 279d607a736a1d3a362850d44e299af8 has 41 spans
    - Trace 23d0908f373282d7432bee599a5ceafd has 42 spans
    - Trace 9563232b32df8129e3e00b448cd36ede has 41 spans
    - Trace 944a15007d91c9fa2421e0b81c2b76a3 has 41 spans
    - Trace d6a0d774da37f8763408ff422e3e731b has 41 span

<div>Spans are now gathered into Traces, the trace Class can be tuned to add custom behavior that fits your analytics needs.</div><div><br></div><div>Voilà, Voilà<br></div>