-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
gRPC as Replicated ES transport #757
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like a good start
...c/main/scala/akka/projection/grpc/replication/scaladsl/ReplicatedEventSourcingOverGrpc.scala
Outdated
Show resolved
Hide resolved
destinationReplicaId.persistenceId.id, | ||
replicatedEventMetadata.version, | ||
envelope.event) | ||
val askResult = entityRef.ask[Done]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be tempting to increase the parallelism of the mapAsync, but then we must be careful. Messages can be lost from here to the destination entity and we accept higher sequence numbers and can't validate exact sequence number. Then the following is possible:
- send e1, e2, e3 to the entity
- e1 arrives all fine
- e2 is lost
- e3 arrives and is accepted since it has a higher sequence number
- there is a timeout of the ack for e2 and the stream is restarted
- e2 is sent again, arrives but is handled as s duplicate because it has a lower sequence number than already handled e3
Once again we would need something like the mapAsyncPartitioned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lightweight internal mapAsyncPartitioned operator added in ef8f50d
32fe923
to
5bf9e15
Compare
268b069
to
ef8f50d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reviewed this in more detail, looking good
|
||
public static void start(ActorSystem<?> system) { | ||
Set<Replica> otherReplicas = new HashSet<>(); | ||
otherReplicas.add(Replica.apply( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add javadsl Replica.create
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 9c64f9f
akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto
Outdated
Show resolved
Hide resolved
parallelUpdates, | ||
ReplicationProjectionProviderAdapter.toScala(replicationProjectionProvider))(classTag) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this contains rather essential api with mix of Java create
and Scala apply
. Would it be difficult to move to the javadsl and scaladsl packages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it was initially fine but outgrew a shared class as more and more settings became obvious, I'll create separate APIs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Massive change to separate APIs: 9c64f9f
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And now even more separate in 1e0cbba to support Entity => Entity transformations for entity config in both DSLs
replicatedBehaviorFactory: ReplicationContext => EventSourcedBehavior[Command, Event, State])( | ||
implicit system: ActorSystem[_]): Replication[Command] = { | ||
require( | ||
system.settings.config.getString("akka.actor.provider") == "cluster", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be better to check ExtendedActorSystem.provider
since it's still possible to configure cluster with full class name of the provider
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 971235e
akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/Replication.scala
Outdated
Show resolved
Hide resolved
docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md
Outdated
Show resolved
Hide resolved
...ojection-grpc/src/it/scala/akka/projection/grpc/replication/ReplicationIntegrationSpec.scala
Show resolved
Hide resolved
...ojection-grpc/src/it/scala/akka/projection/grpc/replication/ReplicationIntegrationSpec.scala
Show resolved
Hide resolved
...ojection-grpc/src/it/scala/akka/projection/grpc/replication/ReplicationIntegrationSpec.scala
Show resolved
Hide resolved
akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/Replication.scala
Outdated
Show resolved
Hide resolved
5d4dd5f
to
34c361c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catching up on this with a read-through. Looks like it's coming together really nicely 👍🏼
...ojection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala
Outdated
Show resolved
Hide resolved
akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/Replication.scala
Outdated
Show resolved
Hide resolved
akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/Replication.scala
Outdated
Show resolved
Hide resolved
docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md
Outdated
Show resolved
Hide resolved
docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md
Outdated
Show resolved
Hide resolved
docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md
Outdated
Show resolved
Hide resolved
docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md
Outdated
Show resolved
Hide resolved
docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md
Outdated
Show resolved
Hide resolved
docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md
Outdated
Show resolved
Hide resolved
docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md
Outdated
Show resolved
Hide resolved
* Important: Note that this does not publish the endpoint, additional steps are needed! | ||
*/ | ||
def grpcReplication[Command, Event, State](settings: ReplicationSettingsImpl[Command])( | ||
replicatedBehaviorFactory: ReplicationContext => EventSourcedBehavior[Command, Event, State])( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One problem with this type of factory is that since it needs to return an event sourced behavior it can't be composed to access context and timers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Peter has a solution for that in #769
Latest commit depends on Akka changes from akka/akka#31817 and r2dbc changes in akka/akka-persistence-r2dbc#348 |
...-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala
Outdated
Show resolved
Hide resolved
...-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala
Outdated
Show resolved
Hide resolved
...ojection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala
Outdated
Show resolved
Hide resolved
4f59a4b
to
929b6a6
Compare
…ndpoints in one server
Two missing parts are samples and ability to compose behavior with setup/timers, Peter has pretty much completed samples and took a shot at composing so let's add that on top after merging this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Didn't re-review everything from earlier in detail. But with all the testing that's been done, and follow-ups for the final touches—massaging APIs some more, composability, samples, and extending the docs—looks good to merge now.
...ojection-grpc/src/it/scala/akka/projection/grpc/replication/ReplicationIntegrationSpec.scala
Outdated
Show resolved
Hide resolved
...ojection-grpc/src/it/scala/akka/projection/grpc/replication/ReplicationIntegrationSpec.scala
Outdated
Show resolved
Hide resolved
akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala
Outdated
Show resolved
Hide resolved
...rojection-grpc/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala
Outdated
Show resolved
Hide resolved
Co-authored-by: Peter Vlugter <59895+pvlugter@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good, I'll also look at Peter's samples and the benchmark project
akka.persistence.r2dbc { | ||
query { | ||
refresh-interval = 500 millis | ||
# reducing this to have quicker test, triggers backtracking earlier |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's also the backtracking.window that triggers the backtracking switch. When half of the backtracking.window has elapsed it will switch to backtracking
...-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala
Outdated
Show resolved
Hide resolved
akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/Transformation.scala
Outdated
Show resolved
Hide resolved
...ojection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala
Outdated
Show resolved
Hide resolved
...ojection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala
Show resolved
Hide resolved
docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md
Outdated
Show resolved
Hide resolved
docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md
Outdated
Show resolved
Hide resolved
...rojection-grpc/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala
Show resolved
Hide resolved
Also: * Re-renamed the JavaSourceProviderAdapter since it is used in Akka Persistence R2DBC * Some missing APIs added
…t still not working
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, when the small things have been fixed I think we can merge this and iterate on the remaining in separate PRs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to have the integration test using the Java DSL.
All looks fine and working in the samples, rebased on latest changes.
Looks like comments have been addressed, or follow-ups added, so seems good to merge.
|
||
/** | ||
* INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider | ||
*/ | ||
@InternalApi private[projection] class SourceProviderAdapter[Offset, Envelope]( | ||
@InternalStableApi private[projection] class SourceProviderAdapter[Offset, Envelope]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Useful to have this back to SourceProviderAdapter
. I had added it back and deprecated this, with the rename to JavaSourceProviderAdapter
, so the java sample could run without a local r2dbc snapshot.
/** | ||
* INTERNAL API: Adapter from scaladsl.SourceProvider with BySlicesSourceProvider to javadsl.SourceProvider with BySlicesSourceProvider | ||
*/ | ||
@InternalApi private[projection] class ScalaBySlicesSourceProviderAdapter[Offset, Envelope]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏼 the other missing adaptation for Java DSL.
Triggered rerun for CI job and unrelated failure (#465) |
Will merge now, and retarget the samples PR. |
Depends on upstream Akka changes: akka/akka#31771