Skip to content

Commit

Permalink
Merge pull request jitsi#38 from jitsi/stats
Browse files Browse the repository at this point in the history
Aggregates Node stats
  • Loading branch information
bgrozev committed Mar 29, 2019
2 parents c5b899a + d588825 commit 990b359
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 49 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<dependency>
<groupId>org.jitsi</groupId>
<artifactId>jitsi-utils</artifactId>
<version>1.0-20190321.185837-2</version>
<version>1.0-20190328.184331-5</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
Expand Down
2 changes: 1 addition & 1 deletion src/main/kotlin/org/jitsi/nlj/PacketInfo.kt
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class EventTimeline(
*/
fun totalDelay(): Duration {
return referenceTime?.let {
return Duration.ofMillis(timeline.last().second - it)
return Duration.ofMillis(timeline.last().second)

} ?: Duration.ofMillis(0)
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/kotlin/org/jitsi/nlj/RtpReceiverImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ class RtpReceiverImpl @JvmOverloads constructor(
private val logger = getLogger(classLogger, logLevelDelegate)
private var running: Boolean = true
private val inputTreeRoot: Node
private val incomingPacketQueue = PacketInfoQueue(id, executor, this::handleIncomingPacket)
private val incomingPacketQueue
= PacketInfoQueue("rtp-receiver-incoming-packet-queue", executor, this::handleIncomingPacket)
private val srtpDecryptWrapper = SrtpTransformerDecryptNode()
private val srtcpDecryptWrapper = SrtcpTransformerDecryptNode()
private val tccGenerator = TccGeneratorNode(rtcpSender)
Expand Down
2 changes: 1 addition & 1 deletion src/main/kotlin/org/jitsi/nlj/RtpSenderImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class RtpSenderImpl(
private val outgoingRtpRoot: Node
private val outgoingRtxRoot: Node
private val outgoingRtcpRoot: Node
private val incomingPacketQueue = PacketInfoQueue(id, executor, this::processPacket)
private val incomingPacketQueue = PacketInfoQueue("rtp-sender-incoming-packet-queue", executor, this::processPacket)
var numIncomingBytes: Long = 0
var firstPacketWrittenTime = -1L
var lastPacketWrittenTime = -1L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ abstract class AbstractSrtpTransformerNode(name: String) : MultipleOutputTransfo
}

override fun stop() {
super.stop()
cachedPackets.forEach { packetDiscarded(it) }
}
}
183 changes: 144 additions & 39 deletions src/main/kotlin/org/jitsi/nlj/transform/node/Node.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import org.jitsi.nlj.util.Util.Companion.getMbps
import org.jitsi.nlj.util.getLogger
import org.jitsi.rtp.Packet
import org.jitsi.rtp.PacketPredicate
import org.json.simple.JSONObject
import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Predicate
import kotlin.properties.Delegates
import kotlin.streams.toList
Expand Down Expand Up @@ -146,15 +148,14 @@ sealed class StatsKeepingNode(name: String): Node(name) {
private var lastPacketTime: Long = -1

/**
* Total nanoseconds spent processing packets in this node.
* Keeps stats for this [Node]
*/
private var totalProcessingDuration: Long = 0
private val stats = NodeStats()

private var numInputPackets = 0
private var numOutputPackets = 0
private var numInputBytes: Long = 0

private var numDiscardedPackets = 0
/**
* Avoid stopping more than once.
*/
private var stopped = false

/**
* The function that all subclasses should implement to do the actual
Expand All @@ -169,43 +170,43 @@ sealed class StatsKeepingNode(name: String): Node(name) {
doProcessPacket(packetInfo)
}

override fun getNodeStats(): NodeStatsBlock {
return NodeStatsBlock("Node $name ${hashCode()}").apply {
addStat("numInputPackets: $numInputPackets")
addStat("numOutputPackets: $numOutputPackets")
addStat("numDiscardedPackets: $numDiscardedPackets")
addStat("total time spent: ${Duration.ofNanos(totalProcessingDuration).toMillis()} ms")
addStat("average time spent per packet: ${Duration.ofNanos(totalProcessingDuration / Math.max(numInputPackets, 1)).toNanos()} ns")
addStat("$numInputBytes bytes over ${Duration.ofNanos(lastPacketTime - firstPacketTime).toMillis()} ms")
addStat("throughput: ${getMbps(numInputBytes, Duration.ofNanos(lastPacketTime - firstPacketTime))} mbps")
addStat("processing throughput: ${getMbps(numInputBytes, Duration.ofNanos(totalProcessingDuration))} mbps")
}
override fun getNodeStats() = NodeStatsBlock("Node $name ${hashCode()}").apply {
this@StatsKeepingNode.stats.appendTo(this)
val numBytes = this@StatsKeepingNode.stats.numInputBytes
addStat("$numBytes bytes over ${Duration.ofNanos(lastPacketTime - firstPacketTime).toMillis()} ms")
addStat(
"throughput: ${getMbps(numBytes, Duration.ofNanos(lastPacketTime - firstPacketTime))} mbps")
}

private fun onEntry(packetInfo: PacketInfo) {
startTime = System.nanoTime()
if (firstPacketTime == -1L) {
firstPacketTime = startTime
}
if (enableStatistics) {
startTime = System.nanoTime()
if (firstPacketTime == -1L) {
firstPacketTime = startTime
}

numInputPackets++
numInputBytes += packetInfo.packet.length
stats.numInputPackets++
stats.numInputBytes += packetInfo.packet.length

packetInfo.addEvent(nodeEntryString)
lastPacketTime = startTime
packetInfo.addEvent(nodeEntryString)
lastPacketTime = startTime
}
}

/**
* Should be called by sub classes when they finish processing of the input packet, but before they call into any
* other nodes, so that [Node] can keep track of its statistics.
*/
protected fun doneProcessing(packetInfo: PacketInfo?) {
val processingDuration = System.nanoTime() - startTime
totalProcessingDuration += processingDuration

packetInfo?.let {
numOutputPackets++
it.addEvent(nodeExitString)
if (enableStatistics) {
val processingDuration = System.nanoTime() - startTime
stats.totalProcessingDurationNs += processingDuration
stats.maxProcessingDurationNs = Math.max(stats.maxProcessingDurationNs, processingDuration)

packetInfo?.let {
stats.numOutputPackets++
it.addEvent(nodeExitString)
}
}
}

Expand All @@ -214,19 +215,123 @@ sealed class StatsKeepingNode(name: String): Node(name) {
* other nodes, so that [Node] can keep track of its statistics.
*/
protected fun doneProcessing(packetInfos: List<PacketInfo>) {
val processingDuration = System.nanoTime() - startTime
totalProcessingDuration += processingDuration

numOutputPackets += packetInfos.size
packetInfos.forEach {
it.addEvent(nodeExitString)
if (enableStatistics) {
val processingDuration = System.nanoTime() - startTime
stats.totalProcessingDurationNs += processingDuration
stats.maxProcessingDurationNs = Math.max(stats.maxProcessingDurationNs, processingDuration)

stats.numOutputPackets += packetInfos.size
packetInfos.forEach {
it.addEvent(nodeExitString)
}
}
}

protected fun packetDiscarded(packetInfo: PacketInfo) {
numDiscardedPackets++
stats.numDiscardedPackets++
BufferPool.returnBuffer(packetInfo.packet.buffer)
}


override fun stop() {
if (stopped) {
return
}
stopped = true

if (enableStatistics && stats.numInputPackets > 0) {
synchronized(globalStats) {
val classStats = globalStats.computeIfAbsent(this::class.toString()) { NodeStats() }
classStats.aggregate(stats)
}
}
}

companion object {
/**
* Maps a [Node]'s class name to a [NodeStats] object with aggregated stats for all instances of that class.
*/
private val globalStats: MutableMap<String, NodeStats> = ConcurrentHashMap()

var enableStatistics = true

/**
* Gets the aggregated statistics for all classes as a JSON map.
*/
fun getStatsJson(): JSONObject {
val jsonObject = JSONObject()
globalStats.forEach { className, stats ->
jsonObject[className] = stats.getJson()
}
return jsonObject
}
}

data class NodeStats(
/**
* Total nanoseconds spent processing packets in this node.
*/
var totalProcessingDurationNs: Long = 0,
var numInputPackets: Long = 0,
var numOutputPackets: Long = 0,
var numInputBytes: Long = 0,
var numDiscardedPackets: Long = 0,
/**
* The longest time it took to process a single packet.
*/
var maxProcessingDurationNs: Long = -1
) {
/**
* How many class instances have been aggregated
*/
private var numAggregates: Int = 0
private val averageProcessingTimePerPacketNs
get() = totalProcessingDurationNs / Math.max(numInputPackets, 1)
private val processingThroughputMbps
get() = getMbps(numInputBytes, Duration.ofNanos(totalProcessingDurationNs))
private val totalProcessingDurationMs
get() = Duration.ofNanos(totalProcessingDurationNs).toMillis()
private val maxProcessingDurationMs: Double
get() = maxProcessingDurationNs / 1000_000.0

fun appendTo(block: NodeStatsBlock) {
block.apply {
addStat("numInputPackets: $numInputPackets")
addStat("numOutputPackets: $numOutputPackets")
addStat("numDiscardedPackets: $numDiscardedPackets")
addStat("total time spent: $totalProcessingDurationMs} ms")
addStat("average time spent per packet: $averageProcessingTimePerPacketNs ns")
addStat("processing throughput: $processingThroughputMbps Mbps")
addStat("max packet process time: $maxProcessingDurationMs ms")
}
}

fun getJson(): JSONObject {
val jsonObject = JSONObject()
jsonObject["total_processing_duration_ms"] = totalProcessingDurationMs
jsonObject["input_packets"] = numInputPackets
jsonObject["output_packets"] = numOutputPackets
jsonObject["input_bytes"] = numInputBytes
jsonObject["discarded_packets"] = numDiscardedPackets
jsonObject["max_processing_duration_ms"] = maxProcessingDurationMs
jsonObject["average_processing_time_ns"] = averageProcessingTimePerPacketNs
jsonObject["processing_throughput_mbps"] = processingThroughputMbps
jsonObject["num_aggregates"] = numAggregates
return jsonObject
}

fun aggregate(other: NodeStats) {
if (other.numInputPackets > 0) {
numAggregates++
totalProcessingDurationNs += other.totalProcessingDurationNs
numInputPackets += other.numInputPackets
numOutputPackets += other.numOutputPackets
numInputBytes += other.numInputBytes
numDiscardedPackets += other.numDiscardedPackets
maxProcessingDurationNs = Math.max(maxProcessingDurationNs, other.maxProcessingDurationNs)
}
}
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/kotlin/org/jitsi/nlj/util/PacketInfoQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class PacketInfoQueue(
id: String,
executor: ExecutorService,
handler: (PacketInfo) -> Boolean
) : PacketQueue<PacketInfo>(100, false, false, id, handler, executor) {
) : PacketQueue<PacketInfo>(100, false, null, id, handler, executor) {
override fun getBuffer(packetInfo: PacketInfo): ByteArray {
TODO()
// return packetInfo.packet.getBuffer().array()
Expand Down
7 changes: 2 additions & 5 deletions src/main/kotlin/org/jitsi/nlj/util/Util.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@
*/
package org.jitsi.nlj.util

import java.math.BigDecimal
import java.time.Duration

class Util {
companion object {
fun getMbps(numBytes: Long, duration: Duration): String {
val numBits = BigDecimal(numBytes * 8)
val megaBits = (numBits / BigDecimal(1000000.0)).toFloat()
return "%.2f".format((megaBits / duration.toMillis()) * Duration.ofSeconds(1).toMillis())
fun getMbps(numBytes: Long, duration: Duration): Double {
return (numBytes * 8.0) / (duration.toMillis() * 1000)
}
}
}
Expand Down

0 comments on commit 990b359

Please sign in to comment.