Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

akka.remote.artery.advanced.maximum-large-frame-size does not accept messages bigger than the default value of 2MiB #22257

Closed
toaditoad opened this issue Feb 3, 2017 · 7 comments
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted help wanted Issues that the core team will likely not have time to work on t:docs t:remoting:artery
Milestone

Comments

@toaditoad
Copy link
Contributor

toaditoad commented Feb 3, 2017

I have a project that reads frames from a video file using OpenCV. Since it is a 4k video recording, a single Mat object that is sent as message of case class Frame(videoPos: Double, frame: Mat) can have more than 7 MB. I'm trying to use Artery to send these frames via the dedicated channel for large messages. However, it seems that akka.remote.artery.advanced.maximum-large-frame-size does not take higher values than 7 megabytes.

Consider the following example when sending a message of ~7.5 MB:

Expected console output with akka.remote.artery.advanced.maximum-large-frame-size = 7 megabytes because the message is too big:

[ERROR] [02/03/2017 13:15:35.718] [MainActorSystem-akka.remote.default-remote-dispatcher-7] [Encoder(akka://MainActorSystem)] Failed to serialize oversized message [de.itd.actor.common.Message$Frame].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://RemoteActorSystem@192.168.178.35:2553/remote/akka/MainActorSystem@192.168.178.42:2552/user/MainActor/$a/DetectionActor-0#384816253]): max allowed size 7000000 bytes. Message type [de.itd.actor.common.Message$Frame].

Strange console output with akka.remote.artery.advanced.maximum-large-frame-size = 8 megabytes because it says that maxMessageLength is 2097152 what matches the default value of 2 Mebibyte (MiB):

[ERROR] [02/03/2017 13:17:00.382] [MainActorSystem-akka.remote.default-remote-dispatcher-5] [akka://MainActorSystem/system/StreamSupervisor-0/remote-5-0-unknown-operation] Error in stage [akka.remote.artery.AeronSink@2374b707]: Encoded message exceeds maxMessageLength of 2097152, length=7506300
java.lang.IllegalArgumentException: Encoded message exceeds maxMessageLength of 2097152, length=7506300
	at io.aeron.Publication.checkForMaxMessageLength(Publication.java:467)
	at io.aeron.Publication.offer(Publication.java:333)
	at io.aeron.Publication.offer(Publication.java:298)
	at akka.remote.artery.AeronSink$$anon$1.publish(AeronSink.scala:150)
	at akka.remote.artery.AeronSink$$anon$1.onPush(AeronSink.scala:146)
	at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:747)
	at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:649)
	at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471)
	at akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:423)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:603)
	at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:618)
	at akka.actor.Actor.aroundReceive(Actor.scala:496)
	at akka.actor.Actor.aroundReceive$(Actor.scala:494)
	at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:529)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
@toaditoad
Copy link
Contributor Author

I created a MWE without OpenCV's involvement or any serializer. Hopefully, somebody can share some light into this issue: As seen in the Coordinator actor, two messages are sent to the Detection actor. The first is ~2MB and gets transferred as expected, but the second one with ~3MB fails, even though akka.remote.artery.advanced.maximum-large-frame-size = 4MiB. What am I missing?

[INFO] [02/07/2017 09:33:49.438] [main] [akka.remote.artery.ArteryTransport(akka://ClusterSystem)] Started embedded media driver in directory [/var/folders/xk/1c8p5q211vn22xmn__shzp380000gn/T/aeron-Thorsten-ClusterSystem-601ea032-8832-476a-aeb0-c60fa8e6cc1f]
[INFO] [02/07/2017 09:33:49.727] [main] [akka.remote.artery.ArteryTransport(akka://ClusterSystem)] Remoting started; listening on address: [akka://ClusterSystem@10.150.80.177:2551] with UID [4926614557307924452]
[INFO] [02/07/2017 09:33:49.757] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@10.150.80.177:2551] - Starting up...
[INFO] [02/07/2017 09:33:49.896] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@10.150.80.177:2551] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [02/07/2017 09:33:49.896] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@10.150.80.177:2551] - Started up successfully
[INFO] [02/07/2017 09:33:49.903] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@10.150.80.177:2551] - Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [02/07/2017 09:33:49.906] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@10.150.80.177:2551] - Metrics collection has started successfully
[INFO] [02/07/2017 09:33:49.930] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@10.150.80.177:2551] - Node [akka://ClusterSystem@10.150.80.177:2551] is JOINING, roles [main]
[INFO] [02/07/2017 09:33:49.964] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@10.150.80.177:2551] - Leader is moving node [akka://ClusterSystem@10.150.80.177:2551] to [Up]
[INFO] [02/07/2017 09:33:55.070] [ClusterSystem-akka.actor.default-dispatcher-8] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@10.150.80.177:2551] - Node [akka://ClusterSystem@10.150.20.159:2552] is JOINING, roles [detector]
[ERROR] [02/07/2017 09:33:55.320] [ClusterSystem-akka.remote.default-remote-dispatcher-7] [akka://ClusterSystem/system/StreamSupervisor-0/remote-5-0-unknown-operation] Error in stage [akka.remote.artery.AeronSink@387cc4d7]: Encoded message exceeds maxMessageLength of 2097152, length=3000163
java.lang.IllegalArgumentException: Encoded message exceeds maxMessageLength of 2097152, length=3000163
	at io.aeron.Publication.checkForMaxMessageLength(Publication.java:467)
	at io.aeron.Publication.offer(Publication.java:333)
	at io.aeron.Publication.offer(Publication.java:298)
	at akka.remote.artery.AeronSink$$anon$1.publish(AeronSink.scala:150)
	at akka.remote.artery.AeronSink$$anon$1.onPush(AeronSink.scala:146)
	at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:747)
	at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:649)
	at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471)
	at akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:423)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:603)
	at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:618)
	at akka.actor.Actor.aroundReceive(Actor.scala:496)
	at akka.actor.Actor.aroundReceive$(Actor.scala:494)
	at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:529)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

build.sbt

name := "thesis-mwe-artery-main" // for the second project "thesis-mwe-artery-detector"
version := "1.0"
scalaVersion := "2.12.1"
libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.4.16",
  "com.typesafe.akka" %% "akka-cluster" % "2.4.16",
  "com.typesafe.akka" %% "akka-slf4j" % "2.4.16",
  "org.slf4j" % "slf4j-api" % "1.7.21",
  "ch.qos.logback" % "logback-classic" % "1.1.8"
)

main.conf

akka {
  actor {
    provider = cluster
    warn-about-java-serializer-usage = off
  }

  remote {
    artery {
      enabled = on
      canonical {
        hostname = "10.150.80.177"
        port = 2551
      }
      large-message-destinations = ["/user/*"]
      advanced {
        maximum-frame-size = 256KiB
        buffer-pool-size = 128
        maximum-large-frame-size = 4MiB
        large-buffer-pool-size = 32
      }
    }

    log-remote-lifecycle-events = off
  }

  cluster {
    seed-nodes = ["akka://ClusterSystem@10.150.80.177:2551"]
    roles = ["main"]
  }
}

Main1.scala

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object Main1 extends App {
  val mainConfig = ConfigFactory.load("main")
  val mainActorSystem = ActorSystem("ClusterSystem", mainConfig)
  mainActorSystem.actorOf(Props[Coordinator], "Coordinator")
}

Coordinator.scala

import akka.actor.Actor

class Coordinator extends Actor {
  override def receive: Receive = {
    case "Start" =>
      sender() ! new Array[Byte](2000000)
      sender() ! new Array[Byte](3000000)
  }
}

detector.conf

akka {
  actor {
    provider = cluster
    warn-about-java-serializer-usage = off
  }

  remote {
    artery {
      enabled = on
      canonical {
        hostname = "10.150.20.159"
        port = 2552
      }
      large-message-destinations = ["/user/*"]
      advanced {
        maximum-frame-size = 256KiB
        buffer-pool-size = 128
        maximum-large-frame-size = 4MiB
        large-buffer-pool-size = 32
      }
    }
    log-remote-lifecycle-events = off
  }

  cluster {
    seed-nodes = ["akka://ClusterSystem@10.150.80.177:2551", "akka://ClusterSystem@10.150.20.159:2552"]
    roles = ["detector"]
  }
}

Main2.scala

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object Main2 extends App {
  val detectorConfig = ConfigFactory.load("detector")
  val detectorActorSystem = ActorSystem("ClusterSystem", detectorConfig)
  detectorActorSystem.actorOf(Props[Detector], "Detector")
}

Detector.scala

import akka.actor.{Actor, ActorLogging, RootActorPath}
import akka.cluster.{Cluster, Member, MemberStatus}
import akka.cluster.ClusterEvent.{CurrentClusterState, MemberUp}

class Detector extends Actor with ActorLogging {
  val cluster = Cluster(context.system)

  override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])

  override def postStop(): Unit = cluster.unsubscribe(self)

  override def receive: Receive = {
    case a: Array[Byte]             => log.info(s"size of msg: ${a.length}")
    case state: CurrentClusterState => state.members.filter(_.status == MemberStatus.Up) foreach register
    case MemberUp(m)                => register(m)
  }

  def register(member: Member): Unit = {
    if (member.hasRole("main")) {
      val coordinator = context.actorSelection(RootActorPath(member.address) / "user" / "Coordinator")
      coordinator ! "Start"
    }
  }
}

@toaditoad toaditoad changed the title akka.actor.remote.artery.advanced.maximum-large-frame-size does not accept messages bigger than 7 MB akka.remote.artery.advanced.maximum-large-frame-size does not accept messages bigger than the default value of 2MiB Feb 7, 2017
@2m
Copy link
Member

2m commented Feb 7, 2017

This could be related to real-logic/aeron#41

@patriknw
Copy link
Member

@toaditoad When looking at the Aeron source code I can confirm what @2m mentioned. You can probably increase the aeron.term.buffer.size to be able to send larger messages. https://github.com/real-logic/Aeron/wiki/Configuration-Options

However, I would not recommend sending such large messages, especially not using
large-message-destinations = ["/user/*"] for all actors, but I guess you were testing it out.

The result of this ticket should be to mention something about this 1/8 of term size in the documentation for large messages.

@patriknw patriknw added 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted t:remoting:artery help wanted Issues that the core team will likely not have time to work on labels Feb 13, 2017
@toaditoad
Copy link
Contributor Author

toaditoad commented Feb 15, 2017

Sorry for my late reply - Are you sure it is aeron.term.buffer.size? I can only find aeron.term.buffer.length what seems to work. However, if the aeron.term.buffer.length has to be 8 times as big as the maximum-large-frame-size, it creates ridiculous large logs what my system cannot handle properly. If that is the case, I accept that it is not recommended and probably even possible to send messages of 8MB and more. My thesis investigates different approaches to track objects in video recordings by using Akka and OpenCV - so, this finding is totally fine.

@patriknw, and yes! ["/user/*"] was the very lazy way to have a valid pattern. ;-) At that moment, only these large video frames were sent anyways. Now, it is "/remote/akka/*/user/MainActor/DetectionGroupActor/* - the first star matches any remote detection system and the second any DetectionActor instance. Next, I will dive into akka-cluster...

@patriknw
Copy link
Member

I don't know the exact property name, but that should be possible to find in the Aeron documentation.

Anyway, sounds like a good idea to not use Akka Remoting for this, but use a separate side channel. I'd suggest that you take a look at Akka Streams and it's TCP support. You can still use Actor messaging as the control plane (e.g. for setting up these streams).

@toaditoad
Copy link
Contributor Author

Then, it seems to be aeron.term.buffer.size.

Thank you for this suggestion. I have developed some designs with only actors, only streams and also streams that integrate actors. I reached some conclusion about Akka Remoting regarding this special use case for my thesis. I haven't decided on how to publish my paper yet but I will probably pass you guys a copy. Your work is terrific. Thanks a lot - I close the issue.

@patriknw
Copy link
Member

Thanks. I'm reopening because I think this should be documented.
We should mention something about this 1/8 of term size in the documentation for large messages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted help wanted Issues that the core team will likely not have time to work on t:docs t:remoting:artery
Projects
None yet
Development

No branches or pull requests

3 participants