Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partition memory footprint improvements #24

Merged
merged 16 commits into from Feb 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Expand Up @@ -22,12 +22,18 @@ Running DISTOD with a custom logging configuration:
java -Dlogback.configurationFile=deployment/logback.xml -jar distod.jar
```

> **Attention!**
>
> Please make sure to always run DISTOD with the G1 GC (`-XX:+UseG1GC`).
> If you do not explicitly disable system monitoring, it is required to set both heap memory limits (`-Xms` and `-Xmx`) during application start to the same value.
> This allows the system monitoring component to make more accurate decision regarding the memory usage.

## Profiling with JMC

[JMC](https://www.oracle.com/technetwork/java/javaseproducts/mission-control/index.html) is a tool to analyze metric recordings with the Java Flight Recorder (JFR).
JFR is a very lightweight way to collect low level and detailed runtime information built into the Oracle JDK.
The application (JAR) must therefore be run with an Oracle JVM.
You can download it (Java SE JDK 11) from [their wesite](https://www.oracle.com/technetwork/java/javase/downloads/jdk11-downloads-5066655.html).
You can download it (Java SE JDK 11) from [their website](https://www.oracle.com/technetwork/java/javase/downloads/jdk11-downloads-5066655.html).
A Oracle account is required (free).

To profile the application, build the DISTOD assembly with `sbt assembly` and run it with an Oracle JVM using the following parameters:
Expand Down
32 changes: 26 additions & 6 deletions distod/src/main/resources/application.conf
Expand Up @@ -7,7 +7,7 @@ distod {

input {
// filepath pointing to a dataset that is read and analyzed for ODs
path = "data/flight_500_28c.csv"
path = "data/flight/flight_500_28c.csv"
// path = "data/flights_20_500k.csv"
// if the input file has a header
has-header = yes
Expand Down Expand Up @@ -39,9 +39,29 @@ distod {
// this does also limit the max number of threads for the cpu-bound-task-dispatcher
max-workers = 64

partition-manager {
// Partition compaction removes old partitions from the partition manager to free up memory and increase lookup speed
// It can safely be enabled, because singleton partitions are never removed from the partition manager, so that all
// other partitions can be generated from them. It may impact performance when it is triggered too often, so that a
// partition is generated multiple times.
partition-compaction {
enabled = yes
// interval (in seconds or minutes) for removing old partitions from the partition manager
cleanup-interval = 5s
interval = 1m
}

// DISTOD monitors the JVM to adapt its algorithms to changing conditions. Currently only the heap is monitored.
monitoring {
// sampling rate
interval = 1s

// If memory usage reaches the 'heap-eviction-threshold', DISTOD tries to free up temporary data structures (which
// increase the speed, but are not necessary) to allow running the algorithm with lower memory boundaries. Please
// specify the threshold in percent. Be careful! If it is 100, heap eviction is disabled.
heap-eviction-threshold = 85

statistics-log-interval = 5s

statistics-log-level = "DEBUG"
}

// Dispatcher used for CPU-bound task processing to free up default dispatcher from this heavy load.
Expand Down Expand Up @@ -192,8 +212,8 @@ akka {
watch-failure-detector {
implementation-class = "akka.remote.PhiAccrualFailureDetector"
heartbeat-interval = 1s
threshold = 10.0
max-sample-size = 200
threshold = 8.0
max-sample-size = 1000
min-std-deviation = 100ms
acceptable-heartbeat-pause = 10s
expected-response-after = 1s
Expand Down Expand Up @@ -240,7 +260,7 @@ akka {
threshold = 8.0
max-sample-size = 1000
min-std-deviation = 100ms
acceptable-heartbeat-pause = 3s
acceptable-heartbeat-pause = 10s
monitored-by-nr-of-members = 5 // default: 9
expected-response-after = 1s
}
Expand Down
67 changes: 60 additions & 7 deletions distod/src/main/scala/com/github/codelionx/distod/Settings.scala
Expand Up @@ -4,7 +4,7 @@ import java.util.concurrent.TimeUnit

import akka.actor.typed.{ActorSystem, DispatcherSelector, Extension, ExtensionId}
import com.github.codelionx.distod.ActorSystem.{FOLLOWER, LEADER, Role}
import com.github.codelionx.distod.Settings.InputParsingSettings
import com.github.codelionx.distod.Settings.{InputParsingSettings, MonitoringSettings, PartitionCompactionSettings}
import com.typesafe.config.{Config, ConfigException}

import scala.concurrent.duration.FiniteDuration
Expand All @@ -30,6 +30,23 @@ object Settings extends ExtensionId[Settings] {
def maxRows: Option[Int]
}

trait PartitionCompactionSettings {

def enabled: Boolean

def interval: FiniteDuration
}

trait MonitoringSettings {

def interval: FiniteDuration

def heapEvictionThreshold: Double

def statisticsLogInterval: FiniteDuration

def statisticsLogLevel: String
}
}


Expand Down Expand Up @@ -80,12 +97,19 @@ class Settings private(config: Config) extends Extension {
val cpuBoundTaskDispatcher: DispatcherSelector =
DispatcherSelector.fromConfig(s"$namespace.cpu-bound-tasks-dispatcher")

// cuts off nanosecond part of durations (we dont care about this, because duration should be in
// seconds or greater anyway
val partitionManagerCleanupInterval: FiniteDuration = FiniteDuration.apply(
config.getDuration(s"$namespace.partition-manager.cleanup-interval").getSeconds,
TimeUnit.SECONDS
)
val partitionCompactionSettings: PartitionCompactionSettings = new PartitionCompactionSettings {

private val subnamespace = s"$namespace.partition-compaction"

override def enabled: Boolean = config.getBoolean(s"$subnamespace.enabled")

// cuts off nanosecond part of durations (we dont care about this, because duration should be in
// seconds or greater anyway)
override def interval: FiniteDuration = FiniteDuration.apply(
config.getDuration(s"$subnamespace.interval").getSeconds,
TimeUnit.SECONDS
)
}

val inputParsingSettings: InputParsingSettings = new InputParsingSettings {

Expand All @@ -105,4 +129,33 @@ class Settings private(config: Config) extends Extension {
else
None
}

val monitoringSettings: MonitoringSettings = new MonitoringSettings {

private val subnamespace = s"$namespace.monitoring"

override def interval: FiniteDuration = {
val duration = config.getDuration(s"$subnamespace.interval")
val finiteDurationOnlySeconds = FiniteDuration(duration.getSeconds, TimeUnit.SECONDS)
val finiteDurationOnlyNanos = FiniteDuration(duration.getNano, TimeUnit.NANOSECONDS)
finiteDurationOnlySeconds + finiteDurationOnlyNanos
}

override def heapEvictionThreshold: Double = config.getInt(s"$subnamespace.heap-eviction-threshold") match {
case i if i <= 0 || i > 100 => throw new ConfigException.BadValue(
s"$subnamespace.heap-eviction-threshold",
s"threshold must be between [excluding] 0 and [including] 100 (percent value)"
)
case i => i / 100.0
}

override def statisticsLogInterval: FiniteDuration = {
val duration = config.getDuration(s"$subnamespace.statistics-log-interval")
val finiteDurationOnlySeconds = FiniteDuration(duration.getSeconds, TimeUnit.SECONDS)
val finiteDurationOnlyNanos = FiniteDuration(duration.getNano, TimeUnit.NANOSECONDS)
finiteDurationOnlySeconds + finiteDurationOnlyNanos
}

override def statisticsLogLevel: String = config.getString(s"$subnamespace.statistics-log-level")
}
}
Expand Up @@ -44,8 +44,11 @@ object FollowerGuardian {
// val clusterTester = context.spawn[Nothing](ClusterTester(), ClusterTester.name)
// context.watch(clusterTester)

// system montior
val monitor = context.spawn(SystemMonitor(), SystemMonitor.name)

// local partition manager
val partitionManager = context.spawn(PartitionManager(), PartitionManager.name)
val partitionManager = context.spawn(PartitionManager(monitor), PartitionManager.name)
context.watch(partitionManager)

// local result collector proxy
Expand Down
@@ -0,0 +1,105 @@
package com.github.codelionx.distod.actors

import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler}
import com.github.codelionx.distod.actors.SystemMonitor._
import com.github.codelionx.distod.Settings
import com.github.codelionx.util.GenericLogLevelLogger._


object SystemMonitor {

sealed trait Command
final case class Register(ref: ActorRef[SystemEvent]) extends Command
private case object Tick extends Command
private case object StatisticsTick extends Command

sealed trait SystemEvent
final case object CriticalHeapUsage extends SystemEvent

val name = "system-monitor"

def apply(): Behavior[Command] = Behaviors.setup(context =>
Behaviors.withTimers(timer =>
new SystemMonitor(context, timer).start()
)
)
}


class SystemMonitor(context: ActorContext[Command], timer: TimerScheduler[Command]) {

private final val megabyte: Int = 1024*1024

private val settings = Settings(context.system).monitoringSettings
private val runtime = Runtime.getRuntime

if(context.log.isEnabled(settings.statisticsLogLevel)) {
timer.startTimerWithFixedDelay("statistics-tick", StatisticsTick, settings.statisticsLogInterval)
}
timer.startTimerWithFixedDelay("tick", Tick, settings.interval)

def start(): Behavior[Command] = behavior(Set.empty)

var free: Long = 0
CodeLionX marked this conversation as resolved.
Show resolved Hide resolved
var total: Long = 0
CodeLionX marked this conversation as resolved.
Show resolved Hide resolved
var usageP: Double = .0
CodeLionX marked this conversation as resolved.
Show resolved Hide resolved
var max: Long = 0
CodeLionX marked this conversation as resolved.
Show resolved Hide resolved

def behavior(listeners: Set[ActorRef[SystemEvent]]): Behavior[Command] = Behaviors.receiveMessage {
case Register(ref) =>
behavior(listeners + ref)

case Tick =>
updateStatistics()
if (usageP > settings.heapEvictionThreshold) {
logStatistics("CriticalHeapUsage event triggered!")
listeners.foreach(_ ! CriticalHeapUsage)
waitForGC(listeners)
} else {
Behaviors.same
}

case StatisticsTick =>
logStatistics()
Behaviors.same
}

def waitForGC(listeners: Set[ActorRef[SystemEvent]], waitingTicks: Int = 3): Behavior[Command] = Behaviors.receiveMessage{
case Register(ref) =>
waitForGC(listeners + ref)

case Tick =>
updateStatistics()
val newWaitingTicks = waitingTicks - 1
if (usageP > settings.heapEvictionThreshold || newWaitingTicks > 0) {
waitForGC(listeners, newWaitingTicks)
} else {
max = 0
behavior(listeners)
}

case StatisticsTick =>
logStatistics()
Behaviors.same
}

private def updateStatistics(): Unit = {
free = runtime.freeMemory()
total = runtime.totalMemory()
val usage = total - free
usageP = usage.toDouble / total.toDouble
max = scala.math.max(max, usage)
}

private def logStatistics(prefixMessage: String = ""): Unit = {
context.log.log(
settings.statisticsLogLevel,
s"Heap usage: {} % [free={} mb, total={} mb, max={} mb] ${prefixMessage}",
scala.math.ceil(usageP * 100),
free / megabyte,
total / megabyte,
max / megabyte
)
}
}
@@ -1,19 +1,19 @@
package com.github.codelionx.distod.actors.master

import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, StashBuffer}
import akka.actor.typed.{ActorRef, Behavior}
import com.github.codelionx.distod.Settings
import com.github.codelionx.distod.actors.LeaderGuardian
import com.github.codelionx.distod.actors.master.Master.{Command, LocalPeers}
import com.github.codelionx.distod.actors.master.MasterHelper.{GenerateCandidates, NextJob}
import com.github.codelionx.distod.actors.worker.Worker
import com.github.codelionx.distod.discovery.CandidateGeneration
import com.github.codelionx.distod.partitions.StrippedPartition
import com.github.codelionx.distod.protocols.{PartitionManagementProtocol, ResultCollectionProtocol}
import com.github.codelionx.distod.protocols.DataLoadingProtocol._
import com.github.codelionx.distod.protocols.PartitionManagementProtocol._
import com.github.codelionx.distod.protocols.ResultCollectionProtocol.ResultCommand
import com.github.codelionx.distod.protocols.{PartitionManagementProtocol, ResultCollectionProtocol}
import com.github.codelionx.distod.types.{CandidateSet, PartitionedTable}
import com.github.codelionx.util.Math
import com.github.codelionx.util.largeMap.mutable.FastutilState
Expand Down Expand Up @@ -115,7 +115,7 @@ class Master(context: ActorContext[Command], stash: StashBuffer[Command], localP
))

// L1: single attribute candidate nodes
val (l1candidates, l1candidateState) = generateLevel1(attributes, partitions)
val (l1candidates, l1candidateState) = generateLevel1(attributes)
l1candidates.zipWithIndex.foreach { case (candidate, index) =>
partitionManager ! InsertPartition(candidate, partitions(index))
}
Expand Down