New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misconfiguration of blocking IO dispatcher for akka streams #24357

Closed
nick-nachos opened this Issue Jan 19, 2018 · 2 comments

Comments

Projects
None yet
4 participants
@nick-nachos
Contributor

nick-nachos commented Jan 19, 2018

While working with akka streams API, I came across an issue when I used the following statement:

someSink.withAttributes(ActorAttributes.dispatcher("akka.stream.blocking-io-dispatcher"))

That call resulted to the following exception:

Caused by: com.typesafe.config.ConfigException$WrongType: reference.conf @ jar:file:/home/nnakas/.m2/repository/com/typesafe/akka/akka-stream_2.11/2.5.6/akka-stream_2.11-2.5.6.jar!/reference.conf: 80: akka.stream.blocking-io-dispatcher has type STRING rather than OBJECT
	at com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:159)
	at com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:170)
	at com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176)
	at com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176)
	at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:184)
	at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:189)
	at com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:264)
	at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:270)
	at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:37)
	at akka.dispatch.Dispatchers.config(Dispatchers.scala:127)
	at akka.dispatch.Dispatchers.lookupConfigurator(Dispatchers.scala:96)
	at akka.dispatch.Dispatchers.lookup(Dispatchers.scala:79)
	at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:746)
	... 67 more

I went through the akka sources, and the configuration at reference.conf is as follows:

# Fully qualified config path which holds the dispatcher configuration
# to be used by ActorMaterializer when creating Actors for IO operations,
# such as FileSource, FileSink and others.
blocking-io-dispatcher = "akka.stream.default-blocking-io-dispatcher"

default-blocking-io-dispatcher {
  type = "Dispatcher"
  executor = "thread-pool-executor"
  throughput = 1

  thread-pool-executor {
    fixed-pool-size = 16
  }
}

So it would seem that the configuration should probably be like this:

blocking-io-dispatcher = ${akka.stream.default-blocking-io-dispatcher}

This finding however made me wonder why this error doesn't occur in the akka-stream library in general. Theoretically, according to the akka docs:

If you want to configure a custom dispatcher for file IO operations globally, you can do so by changing the akka.stream.blocking-io-dispatcher

so I went a little bit further with the research, and came across this inside akka.stream.ActorAttributes:

object ActorAttributes {
  import Attributes._
  final case class Dispatcher(dispatcher: String) extends Attribute
  final case class SupervisionStrategy(decider: Supervision.Decider) extends Attribute

  val IODispatcher: Dispatcher = ActorAttributes.Dispatcher("akka.stream.default-blocking-io-dispatcher")
  // ...

This effectively means that the stream IO dispatcher is fixed to the default one (i.e. akka.stream.default-blocking-io-dispatcher) and cannot be overridden via akka.stream.blocking-io-dispatcher setting, which is the reason however that the dispatcher resolution works in the first place (default-blocking-io-dispatcher is a valid Config object as demonstrated above)

To sum up, the issues that need to be tackled are as follows:

  1. Fix the resolution of the akka.stream.blocking-io-dispatcher property
  2. Use akka.stream.blocking-io-dispatcher instead of akka.stream.default-blocking-io-dispatcher within the akka-stream library.

If you approve of the issue and the suggested approach for fixing this, I'll be happy to PR the fix.

@johanandren

This comment has been minimized.

Show comment
Hide comment
@johanandren

johanandren Jan 19, 2018

Member

Good catch!

The reason why it was not noticed that the default-blocking-io-dispatcher roundtrip does not work is that all code use the akka.stream.ActorAttributes.IODispatcher constant, which points to akka.stream.default-blocking-io-dispatcher (in addition to missing test coverage for the feature).

The correct fix for this would be to leave the config as is, change that IODispatcher constant to point to the akka.stream.blocking-io-dispatcher and add translation logic that matches against that and picks up the right dispatcher name from config somewhere in the materializer.

Member

johanandren commented Jan 19, 2018

Good catch!

The reason why it was not noticed that the default-blocking-io-dispatcher roundtrip does not work is that all code use the akka.stream.ActorAttributes.IODispatcher constant, which points to akka.stream.default-blocking-io-dispatcher (in addition to missing test coverage for the feature).

The correct fix for this would be to leave the config as is, change that IODispatcher constant to point to the akka.stream.blocking-io-dispatcher and add translation logic that matches against that and picks up the right dispatcher name from config somewhere in the materializer.

@nick-nachos

This comment has been minimized.

Show comment
Hide comment
@nick-nachos

nick-nachos Jan 22, 2018

Contributor

This is starting to look like one of those cases where you come across a seemingly simple bug, only to find yourself stumbling upon a wall.

I went through the akka configuration (akka-actor/reference.conf) to see how other cases similar to this one are dealt with. Indeed, properties that provide users with the ability to change the default dispatchers are NOT config objects as I originally thought, but config object paths. The way this typically works is that there is a configuration of the following form:

someconf {
  default-dispatcher-name = "someconf.default-dispatcher"

  default-dispatcher {
    type = "Dispatcher"
    // other dispatcher props...
  }
}

and the consuming code does something like this:

val dispatcherId = system.settings.config.getString("someconf.default-dispatcher-name")
val dispatcher = system.dispatchers.lookup(dispatcherId)

Thus, your suggestion @johanandren to leave the config intact is correct, as it is on par with what users would expect from such a configuration property (it's actually public API, so any change to it is a breaking one).

However, modifying how the akka.stream.ActorAttributes.IODispatcher attribute is used is also a no-go; that singleton is public API, and is also an instance of the akka.stream.ActorAttributes.Dispatcher class, which is public API as well:

object ActorAttributes {
  import Attributes._
  final case class Dispatcher(dispatcher: String) extends Attribute
  final case class SupervisionStrategy(decider: Supervision.Decider) extends Attribute

  val IODispatcher: Dispatcher = ActorAttributes.Dispatcher("akka.stream.default-blocking-io-dispatcher")

  /**
   * Specifies the name of the dispatcher. This also adds an async boundary.
   */
  def dispatcher(dispatcher: String): Attributes = Attributes(Dispatcher(dispatcher))
  // ...

Consumers of this class (i.e. akka.stream.ActorAttributes.Dispatcher) would typically use the ActorAttributes.dispatcher method found above as such:

someSink.withAttributes(ActorAttributes.dispatcher("my.conf.dispatcher"))

with a configuration that would look like this:

my {
  conf {
    dispatcher {
      type = "Dispatcher"
      executor = "thread-pool-executor"
      throughput = 1

      thread-pool-executor {
        fixed-pool-size = 16
      }
    }
  }
}

Changing the akka internals to treat the akka.stream.ActorAttributes.Dispatcher.dispatcher field as the config path of a variable that contains the config path of a dispatcher (inception 😛) - instead of the config path of the dispatcher directly - would break all these consumers, as they would need to change their configuration as follows:

my {
  conf {
    dispatcher = "my.conf.dispatcher-def"

    dispatcher-def {
      type = "Dispatcher"
      executor = "thread-pool-executor"
      throughput = 1

      thread-pool-executor {
        fixed-pool-size = 16
      }
    }
  }
}

in order for the Scala snippet above to work with the "new" underlying resolution strategy.

Besides breaking the public API (which is a big deal by itself), such a change would also make the API counter-intuitive (due to the "inception" stuff mentioned above), and all that hassle would be only to provide a small configuration benefit which probably nobody has ever attempted to make use of (or at least no-one has ever complained about it not working...). As an added bonus, the dispatcher resolution does not occur only in one place; there's actually three places that make use of the akka.stream.ActorAttributes.Dispatcher.dispatcher field:

  1. akka.stream.impl.PhasedFusingActorMaterializer (effectiveSettings method)
  2. akka.stream.impl.io.OutputStreamSourceStage (createLogicAndMaterializedValue method)
  3. akka.stream.impl.io.FileSink (create method)

which effectively means that any new resolution logic would have to be used on all three of them, as well as any new point in the future.

The only sound approach to solving this issue would have been to some-how manage to resolve the value of the akka.stream.blocking-io-dispatcher, and use that value to create the akka.stream.ActorAttributes.IODispatcher, i.e. doing something like this:

object ActorAttributes {
  import Attributes._
  final case class Dispatcher(dispatcher: String) extends Attribute
  final case class SupervisionStrategy(decider: Supervision.Decider) extends Attribute

  val IODispatcher: Dispatcher = ActorAttributes.Dispatcher(magic-resolution("akka.stream.blocking-io-dispatcher"))

However, resolving the property value at that point does not seem possible, as akka.stream.ActorAttributes does not have the configuration of any actor system in context. I actually assume that this may be the reason that the default-blocking-io-dispatcher property was used there in the first place (instead of blocking-io-dispatcher).

Given all the above I cannot think of any reasonable modification to the current implementation. Perhaps it should be that the documentation (and corresponding comments in the source code) should be updated by removing the claim that the API supports parameterization of the default blocking-io-dispatcher via configuration.

I would be interested to hear your thoughts on this.

Contributor

nick-nachos commented Jan 22, 2018

This is starting to look like one of those cases where you come across a seemingly simple bug, only to find yourself stumbling upon a wall.

I went through the akka configuration (akka-actor/reference.conf) to see how other cases similar to this one are dealt with. Indeed, properties that provide users with the ability to change the default dispatchers are NOT config objects as I originally thought, but config object paths. The way this typically works is that there is a configuration of the following form:

someconf {
  default-dispatcher-name = "someconf.default-dispatcher"

  default-dispatcher {
    type = "Dispatcher"
    // other dispatcher props...
  }
}

and the consuming code does something like this:

val dispatcherId = system.settings.config.getString("someconf.default-dispatcher-name")
val dispatcher = system.dispatchers.lookup(dispatcherId)

Thus, your suggestion @johanandren to leave the config intact is correct, as it is on par with what users would expect from such a configuration property (it's actually public API, so any change to it is a breaking one).

However, modifying how the akka.stream.ActorAttributes.IODispatcher attribute is used is also a no-go; that singleton is public API, and is also an instance of the akka.stream.ActorAttributes.Dispatcher class, which is public API as well:

object ActorAttributes {
  import Attributes._
  final case class Dispatcher(dispatcher: String) extends Attribute
  final case class SupervisionStrategy(decider: Supervision.Decider) extends Attribute

  val IODispatcher: Dispatcher = ActorAttributes.Dispatcher("akka.stream.default-blocking-io-dispatcher")

  /**
   * Specifies the name of the dispatcher. This also adds an async boundary.
   */
  def dispatcher(dispatcher: String): Attributes = Attributes(Dispatcher(dispatcher))
  // ...

Consumers of this class (i.e. akka.stream.ActorAttributes.Dispatcher) would typically use the ActorAttributes.dispatcher method found above as such:

someSink.withAttributes(ActorAttributes.dispatcher("my.conf.dispatcher"))

with a configuration that would look like this:

my {
  conf {
    dispatcher {
      type = "Dispatcher"
      executor = "thread-pool-executor"
      throughput = 1

      thread-pool-executor {
        fixed-pool-size = 16
      }
    }
  }
}

Changing the akka internals to treat the akka.stream.ActorAttributes.Dispatcher.dispatcher field as the config path of a variable that contains the config path of a dispatcher (inception 😛) - instead of the config path of the dispatcher directly - would break all these consumers, as they would need to change their configuration as follows:

my {
  conf {
    dispatcher = "my.conf.dispatcher-def"

    dispatcher-def {
      type = "Dispatcher"
      executor = "thread-pool-executor"
      throughput = 1

      thread-pool-executor {
        fixed-pool-size = 16
      }
    }
  }
}

in order for the Scala snippet above to work with the "new" underlying resolution strategy.

Besides breaking the public API (which is a big deal by itself), such a change would also make the API counter-intuitive (due to the "inception" stuff mentioned above), and all that hassle would be only to provide a small configuration benefit which probably nobody has ever attempted to make use of (or at least no-one has ever complained about it not working...). As an added bonus, the dispatcher resolution does not occur only in one place; there's actually three places that make use of the akka.stream.ActorAttributes.Dispatcher.dispatcher field:

  1. akka.stream.impl.PhasedFusingActorMaterializer (effectiveSettings method)
  2. akka.stream.impl.io.OutputStreamSourceStage (createLogicAndMaterializedValue method)
  3. akka.stream.impl.io.FileSink (create method)

which effectively means that any new resolution logic would have to be used on all three of them, as well as any new point in the future.

The only sound approach to solving this issue would have been to some-how manage to resolve the value of the akka.stream.blocking-io-dispatcher, and use that value to create the akka.stream.ActorAttributes.IODispatcher, i.e. doing something like this:

object ActorAttributes {
  import Attributes._
  final case class Dispatcher(dispatcher: String) extends Attribute
  final case class SupervisionStrategy(decider: Supervision.Decider) extends Attribute

  val IODispatcher: Dispatcher = ActorAttributes.Dispatcher(magic-resolution("akka.stream.blocking-io-dispatcher"))

However, resolving the property value at that point does not seem possible, as akka.stream.ActorAttributes does not have the configuration of any actor system in context. I actually assume that this may be the reason that the default-blocking-io-dispatcher property was used there in the first place (instead of blocking-io-dispatcher).

Given all the above I cannot think of any reasonable modification to the current implementation. Perhaps it should be that the documentation (and corresponding comments in the source code) should be updated by removing the claim that the API supports parameterization of the default blocking-io-dispatcher via configuration.

I would be interested to hear your thoughts on this.

@johanandren johanandren self-assigned this Jan 29, 2018

patriknw added a commit that referenced this issue Feb 22, 2018

Actually use the relative blocking io dispatcher setting #24357
* Remove docs and have only deprecation comment on old setting
* ConfigFactory.load fixed

@patriknw patriknw added this to the 2.5.10 milestone Feb 22, 2018

patriknw added a commit that referenced this issue Feb 22, 2018

fix blocking-io-dispatcher setting, #24357
It must still have a valid value becuase used from Akka HTTP.

patriknw added a commit that referenced this issue Feb 22, 2018

Merge pull request #24606 from akka/wip-deprecated-blocking-io-dispat…
…cher-patriknw

 fix blocking-io-dispatcher setting, #24357

@ktoso ktoso removed the 3 - in progress label Feb 23, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment