Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose materializer, attributes or actor system to Source/Flow/Sink #26192

Closed
2m opened this issue Jan 3, 2019 · 3 comments

Comments

Projects
None yet
4 participants
@2m
Copy link
Member

commented Jan 3, 2019

There are quite a few connectors in Alpakka where the implementation needs access to a Materializer, Actor System or Attributes. These are currently exposed to custom GraphStages but are not easily available for simple Sources Flows or Sinks.

For example the S3 connector uses akka-http therefore it needs access to Actor System. It also has some inner flows that are run in the implementation for which it needs access to Materializer.

Current S3 connector takes these resources as parameters in the factory methods. However that makes created Sources Flows or Sinks not freely sharable because they are tied to a particular Actor System or Materializer.

We have a WIP Alpakka PR which exposes Materializer to arbitrary Source Flow or Sink. Here is how the typical S3 factory looks like currently:

final class S3Client(val s3Settings: S3Settings)(implicit system: ActorSystem, mat: Materializer) {
  ...
  def listBucket(bucket: String, prefix: Option[String]): Source[ListBucketResultContents, NotUsed] = ...
  ...
}

First user needs to instantiate S3Client giving the S3Settings, ActorSystem and Materializer as parameters. Then instance of S3Client can be used to create Source Flow and Sinks, which are tied to these parameters.

After the refactoring in the before mentioned PR, factories are converted to be real static factories:

object S3 {
  ...
  def listBucket(bucket: String, prefix: Option[String]): Source[ListBucketResultContents, NotUsed] = ...
  ...
}

The implementation of listBucket after the refactoring uses lazySource to expose the Materializer:

object MaterializerAccess {
  def source[T, M](factory: ActorMaterializer => Source[T, M]): Source[T, Future[M]] =
    Source
      .lazily(() => {
        factory(extractMat(GraphInterpreter.currentInterpreterOrNull))
      })
}

This approach has two downsides:

  • materialized value is wrapped into a Future
  • lazySink does not materialize the inner Sink if the stream does not emit an element. This is a problem in the S3 upload use-case where creating an empty file, which is encoded by a Source.empty[ByteString], is supported.

This ticket is about discussing a supported way to expose Materializer.

One approach would be to modify the traversal logic and add a possibility to expose Materializer which would make it possible to materialize factories of Materializer => Source/Flow/Sink.

Another approach would be to implement a custom GraphStage that would wrap the Source Flow or Sink and provide Materializer and Attributes, since these are already available to the implementation of custom Graph Stages. However materialized value of the inner Source Flow or Sink would still be wrapped into a Future using this approach.

Let me know what you think.

@hepin1989

This comment has been minimized.

Copy link
Contributor

commented Jan 3, 2019

Great!How about adding a method like preMaterialize to GraphStageWithMaterializedValue?
like :

abstract class GraphStageWithMaterializedValue{
 def preMaterialize(materializer:Materializer):Unit
}
@2m

This comment has been minimized.

Copy link
Member Author

commented Jan 4, 2019

@hepin1989, yes, but such method would be useful only for side-effects (method return value is Unit) and only when extending GraphStageWithMaterializedValue. I would like to come up with something that could be used together with any Source Flow or Sink.

@2m

This comment has been minimized.

Copy link
Member Author

commented Jan 9, 2019

I have arrived at a clean solution to expose ActorMaterializer and Attributes by wrapping a Source, Flow or Sink with a GraphStage. Inside of the custom GraphStage it uses Sub{SourceOutlet,SinkInlet} to wire and run the original graph from preStart. You can find the implementation in the before mentioned S3 refactoring PR.

The materialized value is still wrapped in a Future but that was not a problem for S3 connector, because all materialized values are wrapped in Future anyway.

Eventualy we will want Setup.{source,flow,sink} to be moved to akka-stream sub-project as it will be used across many Alpakka connectors.

@2m 2m referenced this issue Mar 6, 2019

Merged

Add setup operator #26477

johanandren added a commit that referenced this issue May 17, 2019

@johanandren johanandren added this to the 2.5.23 milestone May 17, 2019

johanandren added a commit to johanandren/akka that referenced this issue May 17, 2019

This was referenced May 17, 2019

@johanandren johanandren reopened this May 17, 2019

johanandren added a commit that referenced this issue May 17, 2019

@patriknw patriknw added 1 - triaged and removed discuss labels May 21, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.