-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka SourceProvider implementation and Slick integration example #80
Conversation
akka-projection-core/src/main/scala/akka/projection/internal/OffsetStore.scala
Outdated
Show resolved
Hide resolved
akka-projection-kafka/src/main/scala/akka/kafka/scaladsl/Processor.scala
Outdated
Show resolved
Hide resolved
akka-projection-kafka/src/main/scala/akka/projection/kafka/KafkaSourceProvider.scala
Outdated
Show resolved
Hide resolved
akka-projection-kafka/src/main/scala/akka/projection/kafka/internal/KafkaOffsetStore.scala
Outdated
Show resolved
Hide resolved
akka-projection-kafka/src/main/scala/akka/projection/kafka/internal/KafkaProjectionImpl.scala
Outdated
Show resolved
Hide resolved
@@ -15,6 +15,7 @@ object Dependencies { | |||
object Versions { | |||
val akka = "2.6.4" | |||
val alpakka = "2.0.0-RC2" | |||
val alpakkaKafka = "2.0.2+21-0427b570" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To use the unreleased SendProducer
API
akka-projection-kafka/src/main/scala/akka/projection/kafka/MessageEnvelope.scala
Outdated
Show resolved
Hide resolved
akka-projection-kafka/src/main/scala/akka/projection/kafka/KafkaSourceProvider.scala
Outdated
Show resolved
Hide resolved
akka-projection-kafka/src/main/scala/akka/projection/kafka/KafkaSourceProvider.scala
Outdated
Show resolved
Hide resolved
akka-projection-kafka/src/main/scala/akka/projection/kafka/KafkaSourceProvider.scala
Outdated
Show resolved
Hide resolved
akka-projection-kafka/src/main/scala/akka/projection/kafka/internal/KafkaOffsetStore.scala
Outdated
Show resolved
Hide resolved
akka-projection-kafka/src/main/scala/akka/projection/kafka/internal/KafkaProjectionImpl.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I said most of it in the meeting, but sending my comments anyway to get it registered.
akka-projection-core/src/main/scala/akka/projection/internal/OffsetStore.scala
Outdated
Show resolved
Hide resolved
akka-projection-kafka/src/main/scala/akka/projection/kafka/KafkaSourceProvider.scala
Outdated
Show resolved
Hide resolved
akka-projection-kafka/src/main/scala/akka/projection/kafka/internal/KafkaProjectionImpl.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking promising
akka-projection-kafka/src/main/scala/akka/projection/kafka/KafkaSourceProvider.scala
Outdated
Show resolved
Hide resolved
akka-projection-slick/src/main/scala/akka/projection/slick/internal/SlickOffsetStore.scala
Outdated
Show resolved
Hide resolved
akka-projection-slick/src/main/scala/akka/projection/slick/internal/SlickOffsetStore.scala
Outdated
Show resolved
Hide resolved
} | ||
} | ||
} | ||
""") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
config should be for the ActorSystem, and here accessed via context.system.settings.config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean move this to an application.conf
and load it from the ActorSystem? How would we provide configuration for other examples in this project if it's in a file? Should we create a sub project per example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can still be in code, but just defined above when the ActorSystem is created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, got it.
TODO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you meant that I should extend ScalaTestWithActorTestKit
like the other integration tests, but I couldn't do this since I'm using Alpakka Kafka's testkit. I had to recreate some of the boilerplate, but the config is applied in the same way as SlickSpec
. WDYT?
akka-projection-kafka/src/main/scala/akka/projection/kafka/KafkaSourceProvider.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few comments here and there.
I have concerns about the Mergeable offsets and how it is increasing the complexity. I would like to avoid that one use case affects the APIs in a non-obvious way for the users.
If we can keep it as an implementation detail that will be great.
akka-projection-core/src/main/scala/akka/projection/internal/OffsetSerialization.scala
Outdated
Show resolved
Hide resolved
akka-projection-kafka/src/main/scala/akka/projection/kafka/KafkaSourceProvider.scala
Outdated
Show resolved
Hide resolved
akka-projection-kafka/src/main/scala/akka/projection/kafka/KafkaSourceProvider.scala
Outdated
Show resolved
Hide resolved
akka-projection-slick/src/main/scala/akka/projection/slick/internal/SlickOffsetStore.scala
Outdated
Show resolved
Hide resolved
akka-projection-slick/src/main/scala/akka/projection/slick/internal/SlickOffsetStore.scala
Outdated
Show resolved
Hide resolved
akka-projection-slick/src/main/scala/akka/projection/slick/internal/SlickOffsetStore.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good
akka-projection-core/src/main/scala/akka/projection/internal/OffsetSerialization.scala
Outdated
Show resolved
Hide resolved
akka-projection-kafka/src/main/scala/akka/projection/kafka/KafkaSourceProvider.scala
Outdated
Show resolved
Hide resolved
akka-projection-slick/src/main/scala/akka/projection/slick/internal/SlickOffsetStore.scala
Outdated
Show resolved
Hide resolved
} | ||
|
||
class OffsetStoreTable(tag: Tag) extends Table[OffsetRow](tag, "AKKA_PROJECTION_OFFSET_STORE") { | ||
|
||
def projectionId = column[String]("PROJECTION_ID", O.Length(255, varying = false), O.PrimaryKey) | ||
def projectionName = column[String]("PROJECTION_NAME", O.Length(255, varying = false)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That can be useful for backoffice admin ui also. We should add it to the Cassandra table also even though we don't support the MergeableOffset there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Though I think we should support MergeableOffset
in the CassandraOffsetStore
too.
offsetTable.filter(_.projectionId === projectionId.id).result.headOption.map { maybeRow => | ||
maybeRow.map(row => fromStorageRepresentation[Offset](row.offsetStr, row.manifest)) | ||
offsetTable.filter(_.projectionName === projectionId.name).result.map { maybeRow => | ||
maybeRow.map(row => SingleOffset(projectionId, row.manifest, row.offsetStr, row.mergeable)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a good first implementation. Later we could optimize for the single-offset case by first reading one row for the projectionId.id
, and if that is mergable == false then it's know to be single-offset, otherwise (including when not found) we can read all rows for the projectionId.name
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I think the worst case would be loading all offsets for a Kafka consumer group with 1000's of partitions, but even then the readOffset
use case only happens at startup or a rebalance, so it's going to be a negligible performance impact in the majority of use cases.
@patriknw @renatocaval I think this is ready for another [final] review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @seglo. It's looking really good.
I'm concerned with the idea of subscribing to more than one topic from inside one projection.
A part from that, I left here and there a few comments.
There is one issue with the ActorSystem in KafkaSpecBase that we should address.
akka-projection-core/src/main/scala/akka/projection/internal/MergeableOffset.scala
Outdated
Show resolved
Hide resolved
akka-projection-core/src/test/scala/akka/projection/internal/OffsetSerializationSpec.scala
Outdated
Show resolved
Hide resolved
akka-projection-core/src/main/scala/akka/projection/scaladsl/SourceProvider.scala
Show resolved
Hide resolved
akka-projection-kafka/src/main/scala/akka/projection/kafka/KafkaSourceProvider.scala
Outdated
Show resolved
Hide resolved
akka-projection-kafka/src/main/scala/akka/projection/kafka/KafkaSourceProvider.scala
Outdated
Show resolved
Hide resolved
override def source(readOffsets: ReadOffsetsHandler): Future[Source[ConsumerRecord[K, V], _]] = { | ||
// get the total number of partitions to configure the `breadth` parameter, or we could just use a really large | ||
// number. i don't think using a large number would present a problem. | ||
val numPartitionsF = Future.sequence(topics.map(client.getPartitionsFor)).map(_.map(_.length).sum) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we keep one topic per projection that issue disappears
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it just removes one layer of nesting. We'll still query the topic for the number of partitions, the above is just summing the number of partitions of all topics passed in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I got it, but I understood that the concern was the Future.sequence
. Of course, this only a real concern if the set of topics is very large.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I think it's probably fine for now. I've never seen any client subscriptions for too many topics.
akka-projection-kafka/src/test/scala/akka/projection/kafka/KafkaSpecBase.scala
Show resolved
Hide resolved
examples/src/test/scala/akka/projection/examples/KafkaToSlickIntegrationSpec.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @seglo. It's looking really good.
I'm concerned with the idea of subscribing to more than one topic from inside one projection.
A part from that, I left here and there a few comments.
There is one issue with the ActorSystem in KafkaSpecBase that we should address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good, a few comments...
...ction-cassandra/src/main/scala/akka/projection/cassandra/internal/CassandraOffsetStore.scala
Outdated
Show resolved
Hide resolved
...ction-cassandra/src/main/scala/akka/projection/cassandra/internal/CassandraOffsetStore.scala
Outdated
Show resolved
Hide resolved
} | ||
|
||
val futSource = sourceProvider.source(readOffsets) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One funny thing is possible for the AtLeastOnce. One projection instance is consuming from topic-partition and saving the offsets with AtLeastOnce. Then that topic-partition is re-assigned to another projection instance. Some elements from the first projection might still be in flight and not saved yet, but when they are saved the offset could be less than than what the new projection instance has saved. Old is overwriting new.
Probably not a big problem since the new projection will soon overwrite again. It would still be at-least-once if 3rd projection instance takes over. We could track this in a separate issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related thought, but for exactlyOnce. The old projection instance is processing offset 17, but before it has been committed to the database there is a hand-over of that topic-partition to another projection instance, reading offset 16, and processing 17. The old instance saves has successfully processed 17 and saves offset 17. The new instance will also process 17. It's not exactly-once.
Any way we can control the hand-over for such scenario?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related thought, but for exactlyOnce. The old projection instance is processing offset 17, but before it has been committed to the database there is a hand-over of that topic-partition to another projection instance, reading offset 16, and processing 17. The old instance saves has successfully processed 17 and saves offset 17. The new instance will also process 17. It's not exactly-once.
Any way we can control the hand-over for such scenario?
That's a good point. If we were using Alpakka Kafka transactions then we could rely on logic within Kafka transactions and extra bookkeeping done in Alpakka Kafka to ensure inflight messages from a revoked partition aren't committed, or all committed before the partition is revoked.
The Source
we're using right now doesn't have any of these guarantees and is really only suitable to guarantee at-least-once. We could implement an Alpakka Kafka PartitionAssignmentHandler
that lets us perform blocking operations during a rebalance. Such as waiting for the partitioned Source
for a revoked partition complete, before letting the rebalance process continue (or cancelling it pre-emptively). This will require some not-so-fun concurrency locking code to implement though. But since we control the streams implementation it might be an option in projections.
These weird partition rebalancing problems come up often with Alpakka Kafka and result in a lot of the project's complexity. I'll create an issue to consider solutions for this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
akka-projection-kafka/src/main/scala/akka/projection/kafka/KafkaSourceProvider.scala
Outdated
Show resolved
Hide resolved
akka-projection-core/src/main/scala/akka/projection/scaladsl/SourceProvider.scala
Show resolved
Hide resolved
akka-projection-kafka/src/test/scala/akka/projection/kafka/KafkaSpecBase.scala
Show resolved
Hide resolved
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com> | ||
*/ | ||
|
||
package akka.projection.examples |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good if this is placed in a non-akka package, docs.projection.exampes
or such. Then we verify that we don't use any internal APIs here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, my example uses SlickOffsetStore
to assert offset state. I was thinking this example could serve also as an integration test, but maybe we need both. Should we create an integration
project that can remain in the akka
namespace and then clean up the example to not reference internal APIs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep it as is. We'll see when we write the doc examples.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May I suggest the Alpakka standard of docs.scaladsl
and docs.javadsl
.
case Some(userEventCount) => userEventCount.copy(count = userEventCount.count + 1) | ||
case _ => UserEventCount(eventType, 1) | ||
} | ||
_ <- userEventCountTable.insertOrUpdate(count) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so the findByEventType and insertOrUpdate are atomic in same transaction?
Is the count read by findByEventType guaranteed to not be updated by another EventTypeCountRepository instance inbetween the read and the insertOrUpdate?
Is all of it translated to sql running in the DB, or is it some DB lock, or how does that work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I don't know how that works. I doubt read and writes would be automatically handled in one transaction. I was following the same pattern used in SlickProjectionImplSpec
, but this breaks down when more than 1 projection is updating some projection source with a different partition/shard/slice. /cc @renatocaval
I'll need to rethink this example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I would use for CRUD, like this, is optimistic locking. A version number column that is read in findByEventType and then used as condition in the update to be the same as what was read. The update also increments it by 1. If no row was updated it means that someone else updated inbetween. Then we could fail, and even use the retry mechanism from #95
Would be pretty nice to include that pattern in this example.
An alternative would be to have one counter per projectionId, so that it's a single writer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added issue #107 for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reads and writes will be run in same transaction, but the issue here is not at Slick level, but the configured transaction isolation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be more precise with what a mean, this is a general transaction issue and users need to configure the DB they way they want/need it and code accordingly to it.
To have full protection here we will need either some optimistic locking or use a strict transaction isolation level (eg: Serializable). I don't think it's worth doing that in our tests unless we want to use it as a documentation sample.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the purpose of this example I was thinking of just querying to see if a count for an event type exists (instead of retrieving the count value itself) and then do either of the following:
- If it exists, issue an update statement and increment the count in place, instead of first reading it and incrementing it in code. i.e.
UPDATE EVENTS_TYPE_COUNT SET COUNT = COUNT + 1 WHERE EVENT_TYPE = ?
- If it does not exist, insert the the record. If the insert fails because another projection created it already then an update will happen on the retry. An alternative would just be to seed the records with the projection table DDL, but as @patriknw mentioned it would be a nice way to showcase retry logic.
akka-projection-core/src/main/scala/akka/projection/javadsl/SourceProvider.scala
Outdated
Show resolved
Hide resolved
...projection-kafka/src/main/scala/akka/projection/kafka/internal/KafkaSourceProviderImpl.scala
Outdated
Show resolved
Hide resolved
...projection-kafka/src/main/scala/akka/projection/kafka/internal/KafkaSourceProviderImpl.scala
Outdated
Show resolved
Hide resolved
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com> | ||
*/ | ||
|
||
package akka.projection.examples |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May I suggest the Alpakka standard of docs.scaladsl
and docs.javadsl
.
@@ -62,6 +63,8 @@ object Common extends AutoPlugin { | |||
"-skip-packages", | |||
"akka.pattern" // for some reason Scaladoc creates this | |||
), | |||
// FIXME enable "-Xfatal-warnings", but then we need the silencer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Scala 2.13.2 @nowarn
is built-in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
created issue #108
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, ready for merge.
(did I last push to remove .DS_Store files that were committed by accident)
b54886c
to
0b10150
Compare
Merged to avoid further merge conflicts. Some follow up in separate PRs. |
SourceProvider
that allows for consumer group membership as well as seeking to externally provided offsets.MergeableOffset
- An offset that translates into multiple rows in the offset store. Projections persist 1 or more rows withsaveOffset
. Projections read all rows withreadOffset
and merge them together into aMergeableOffset
that can be used to store a Kafka partition to offset map.exactlyOnce
API