Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka SourceProvider implementation and Slick integration example #80

Merged
merged 30 commits into from
May 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c940210
WIP
seglo Apr 24, 2020
00aa567
WIP akka-projection-kafka
seglo Apr 29, 2020
616ebd1
example of Kafka offsets stored in Cassandra
patriknw Apr 29, 2020
cafdab1
MergeableOffsets
seglo Apr 29, 2020
d777606
Merge branch 'seglo/kafka-impl-2' into seglo/kafka-impl
seglo Apr 29, 2020
636f28c
Update example
seglo Apr 29, 2020
16ba475
cleanup
seglo Apr 30, 2020
4011dab
fix test regressions
seglo Apr 30, 2020
e78c5e1
MergeableOffset serialization tests
seglo Apr 30, 2020
a5cf833
cleanup
seglo Apr 30, 2020
a2c2442
Recursive offset type for offsets that make up MergeableOffset
seglo May 1, 2020
7a937ad
Integration spec
seglo May 4, 2020
8449293
DBIO.sequence
seglo May 4, 2020
02b623b
Use slick exactlyOnce projection in example
seglo May 4, 2020
0126e35
Test cleanup
seglo May 4, 2020
851ad1f
immutable.Seq
seglo May 4, 2020
2ef8ed8
KafkaSourceProviderSpec
seglo May 6, 2020
6510b9e
Merge branch 'master' into seglo/kafka-impl
seglo May 6, 2020
3339aad
awaitProduce for longer timeout
seglo May 6, 2020
69b956c
Remove nesting of MergeableOffsets.Offset
seglo May 7, 2020
48dc024
Update javadsl.SourceProvider and impls
seglo May 7, 2020
056832c
review feedback
seglo May 7, 2020
7c53a55
Merge branch 'master' into seglo/kafka-impl
seglo May 8, 2020
d7f03ee
Merge branch 'master' into seglo/kafka-impl
seglo May 8, 2020
088682e
One actorsystem per kafkaspec
seglo May 8, 2020
8d3d68e
No support for MergeableOffset in CassandraOffsetStore
seglo May 8, 2020
136012e
envelope->record
seglo May 8, 2020
2691b83
headers
patriknw May 11, 2020
13c2940
pending test
patriknw May 11, 2020
0b10150
Merge branch 'master' into seglo/kafka-impl
patriknw May 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import java.time.Instant

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import akka.Done
import akka.annotation.InternalApi
import akka.projection.ProjectionId
import akka.projection.internal.OffsetSerialization
import akka.projection.internal.{ MergeableOffset, OffsetSerialization }
import akka.projection.internal.OffsetSerialization.SingleOffset
import akka.stream.alpakka.cassandra.scaladsl.CassandraSession

/**
Expand All @@ -39,15 +39,20 @@ import akka.stream.alpakka.cassandra.scaladsl.CassandraSession
}
}

def saveOffset[Offset](projectionId: ProjectionId, offset: Offset): Future[Done] = {
val (offsetStr, manifest) = toStorageRepresentation(offset)
session.executeWrite(
s"INSERT INTO $keyspace.$table (projection_id, offset, manifest, last_updated) VALUES (?, ?, ?, ?)",
projectionId.id,
offsetStr,
manifest,
Instant.now(clock))
}
def saveOffset[Offset](projectionId: ProjectionId, offset: Offset): Future[Done] =
offset match {
case _: MergeableOffset[_] =>
throw new IllegalArgumentException("The CassandraOffsetStore does not currently support MergeableOffset")
case _ =>
val SingleOffset(_, manifest, offsetStr, _) =
toStorageRepresentation(projectionId, offset).asInstanceOf[SingleOffset]
session.executeWrite(
s"INSERT INTO $keyspace.$table (projection_id, offset, manifest, last_updated) VALUES (?, ?, ?, ?)",
projectionId.id,
offsetStr,
manifest,
Instant.now(clock))
}

// FIXME maybe we need to make this public for user's tests
def createKeyspaceAndTable(): Future[Done] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ import akka.stream.scaladsl.Source

val offsetStore = new CassandraOffsetStore(session)

val lastKnownOffset: Future[Option[Offset]] = offsetStore.readOffset(projectionId)
val readOffsets = () => offsetStore.readOffset(projectionId)

val source: Source[(Offset, Envelope), NotUsed] =
Source
.futureSource(lastKnownOffset.map(sourceProvider.source))
.futureSource(sourceProvider.source(readOffsets))
.via(killSwitch.flow)
.map(envelope => sourceProvider.extractOffset(envelope) -> envelope)
.mapMaterializedValue(_ => NotUsed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import akka.Done;
import akka.actor.testkit.typed.javadsl.LogCapturing;
Expand Down Expand Up @@ -88,11 +90,13 @@ static class TestSourceProvider implements SourceProvider<Long, Envelope> {
}

@Override
public Source<Envelope, ?> source(Optional<Long> offset) {
if (offset.isPresent())
return Source.from(envelopes).drop(offset.get().intValue());
else
return Source.from(envelopes);
public CompletionStage<Source<Envelope, ?>> source(Supplier<CompletionStage<Optional<Long>>> offsetF) {
return offsetF.get().toCompletableFuture().thenApplyAsync(offset -> {
if (offset.isPresent())
return Source.from(envelopes).drop(offset.get().intValue());
else
return Source.from(envelopes);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import scala.util.Try

import akka.Done
import akka.NotUsed
import akka.actor.ClassicActorSystemProvider
import akka.actor.testkit.typed.TestException
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
Expand All @@ -43,7 +44,7 @@ object CassandraProjectionSpec {

def offsetExtractor(env: Envelope): Long = env.offset

def sourceProvider(id: String): SourceProvider[Long, Envelope] = {
def sourceProvider(systemProvider: ClassicActorSystemProvider, id: String): SourceProvider[Long, Envelope] = {

val envelopes =
List(
Expand All @@ -54,17 +55,17 @@ object CassandraProjectionSpec {
Envelope(id, 5L, "mno"),
Envelope(id, 6L, "pqr"))

TestSourceProvider(Source(envelopes))
TestSourceProvider(systemProvider, Source(envelopes))
}

case class TestSourceProvider(src: Source[Envelope, _]) extends SourceProvider[Long, Envelope] {

override def source(offset: Option[Long]): Source[Envelope, _] = {
offset match {
case class TestSourceProvider(systemProvider: ClassicActorSystemProvider, src: Source[Envelope, _])
extends SourceProvider[Long, Envelope] {
implicit val dispatcher: ExecutionContext = systemProvider.classicSystem.dispatcher
override def source(offset: () => Future[Option[Long]]): Future[Source[Envelope, _]] =
offset().map {
case Some(o) => src.dropWhile(_.offset <= o)
case _ => src
}
}

override def extractOffset(env: Envelope): Long = env.offset
}
Expand Down Expand Up @@ -205,7 +206,7 @@ class CassandraProjectionSpec
CassandraProjection
.atLeastOnce[Long, Envelope](
projectionId,
sourceProvider(entityId),
sourceProvider(system, entityId),
saveOffsetAfterEnvelopes = 1,
saveOffsetAfterDuration = Duration.Zero,
concatHandler())
Expand All @@ -230,7 +231,7 @@ class CassandraProjectionSpec
CassandraProjection
.atLeastOnce[Long, Envelope](
projectionId,
sourceProvider(entityId),
sourceProvider(system, entityId),
saveOffsetAfterEnvelopes = 1,
saveOffsetAfterDuration = Duration.Zero,
concatHandlerFail4())
Expand Down Expand Up @@ -259,7 +260,7 @@ class CassandraProjectionSpec
CassandraProjection
.atLeastOnce[Long, Envelope](
projectionId,
sourceProvider(entityId),
sourceProvider(system, entityId),
saveOffsetAfterEnvelopes = 1,
saveOffsetAfterDuration = Duration.Zero,
concatHandler())
Expand All @@ -285,7 +286,7 @@ class CassandraProjectionSpec
CassandraProjection
.atLeastOnce[Long, Envelope](
projectionId,
sourceProvider(entityId),
sourceProvider(system, entityId),
saveOffsetAfterEnvelopes = 2,
saveOffsetAfterDuration = 1.minute,
concatHandlerFail4())
Expand Down Expand Up @@ -314,7 +315,7 @@ class CassandraProjectionSpec
CassandraProjection
.atLeastOnce[Long, Envelope](
projectionId,
sourceProvider(entityId),
sourceProvider(system, entityId),
saveOffsetAfterEnvelopes = 2,
saveOffsetAfterDuration = 1.minute,
concatHandler())
Expand Down Expand Up @@ -347,7 +348,7 @@ class CassandraProjectionSpec
val projection =
CassandraProjection.atLeastOnce[Long, Envelope](
projectionId,
TestSourceProvider(source),
TestSourceProvider(system, source),
saveOffsetAfterEnvelopes = 10,
saveOffsetAfterDuration = 1.minute,
concatHandler())
Expand Down Expand Up @@ -391,7 +392,7 @@ class CassandraProjectionSpec
val projection =
CassandraProjection.atLeastOnce[Long, Envelope](
projectionId,
TestSourceProvider(source),
TestSourceProvider(system, source),
saveOffsetAfterEnvelopes = 10,
saveOffsetAfterDuration = 2.seconds,
concatHandler())
Expand Down Expand Up @@ -429,7 +430,7 @@ class CassandraProjectionSpec
CassandraProjection
.atLeastOnce[Long, Envelope](
projectionId,
sourceProvider(entityId),
sourceProvider(system, entityId),
saveOffsetAfterEnvelopes = 2,
saveOffsetAfterDuration = 1.minute,
concatHandlerFail4(HandlerRecoveryStrategy.skip))
Expand Down Expand Up @@ -458,7 +459,7 @@ class CassandraProjectionSpec
CassandraProjection
.atLeastOnce[Long, Envelope](
projectionId,
sourceProvider(entityId),
sourceProvider(system, entityId),
saveOffsetAfterEnvelopes = 2,
saveOffsetAfterDuration = 1.minute,
handler)
Expand Down Expand Up @@ -492,7 +493,7 @@ class CassandraProjectionSpec
CassandraProjection
.atLeastOnce[Long, Envelope](
projectionId,
sourceProvider(entityId),
sourceProvider(system, entityId),
saveOffsetAfterEnvelopes = 2,
saveOffsetAfterDuration = 1.minute,
handler)
Expand Down Expand Up @@ -525,7 +526,7 @@ class CassandraProjectionSpec
}

val projection =
CassandraProjection.atMostOnce[Long, Envelope](projectionId, sourceProvider(entityId), concatHandler())
CassandraProjection.atMostOnce[Long, Envelope](projectionId, sourceProvider(system, entityId), concatHandler())

projectionTestKit.run(projection) {
withClue("check - all values were concatenated") {
Expand All @@ -544,7 +545,8 @@ class CassandraProjectionSpec
val projectionId = genRandomProjectionId()

val failingProjection =
CassandraProjection.atMostOnce[Long, Envelope](projectionId, sourceProvider(entityId), concatHandlerFail4())
CassandraProjection
.atMostOnce[Long, Envelope](projectionId, sourceProvider(system, entityId), concatHandlerFail4())

withClue("check - offset is empty") {
val offsetOpt = offsetStore.readOffset[Long](projectionId).futureValue
Expand All @@ -567,7 +569,7 @@ class CassandraProjectionSpec

// re-run projection without failing function
val projection =
CassandraProjection.atMostOnce[Long, Envelope](projectionId, sourceProvider(entityId), concatHandler())
CassandraProjection.atMostOnce[Long, Envelope](projectionId, sourceProvider(system, entityId), concatHandler())

projectionTestKit.run(projection) {
withClue("checking: all values were concatenated") {
Expand All @@ -591,7 +593,7 @@ class CassandraProjectionSpec

val projection =
CassandraProjection
.atMostOnce[Long, Envelope](projectionId, sourceProvider(entityId), handler)
.atMostOnce[Long, Envelope](projectionId, sourceProvider(system, entityId), handler)

intercept[TestException] {
LoggingTestKit.warn("RetryAndFail not supported").expect {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.projection.internal

import akka.annotation.InternalApi

@InternalApi
final case class MergeableOffset[Offset](entries: Map[String, Offset])
Original file line number Diff line number Diff line change
Expand Up @@ -6,45 +6,79 @@ package akka.projection.internal

import java.util.UUID

import scala.collection.immutable

import akka.annotation.InternalApi
import akka.persistence.query
import akka.projection.ProjectionId

/**
* INTERNAL API
*/
@InternalApi private[akka] object OffsetSerialization {
sealed trait StorageRepresentation
final case class SingleOffset(id: ProjectionId, manifest: String, offsetStr: String, mergeable: Boolean = false)
extends StorageRepresentation
final case class MultipleOffsets(reps: immutable.Seq[SingleOffset]) extends StorageRepresentation

final val StringManifest = "STR"
final val LongManifest = "LNG"
final val IntManifest = "INT"
final val SequenceManifest = "SEQ"
final val TimeBasedUUIDManifest = "TBU"

/**
* Deserialize an offset from a storage representation of one or more offsets.
* The offset is converted from its string representation to its real type.
*/
def fromStorageRepresentation[Offset, Inner](rep: StorageRepresentation): Offset = {
val offset: Offset = rep match {
case SingleOffset(_, manifest, offsetStr, _) => fromStorageRepresentation[Offset](offsetStr, manifest)
case MultipleOffsets(reps) =>
val offsets: Map[String, Inner] = reps.map {
case SingleOffset(id, manifest, offsetStr, _) =>
id.key -> fromStorageRepresentation[Inner](offsetStr, manifest)
}.toMap
MergeableOffset[Inner](offsets).asInstanceOf[Offset]
}
offset
}

/**
* Deserialize an offset from a stored string representation and manifest.
* The offset is converted from its string representation to its real type.
*/
def fromStorageRepresentation[Offset](offsetStr: String, manifest: String): Offset = {
def fromStorageRepresentation[Offset](offsetStr: String, manifest: String): Offset =
(manifest match {
case StringManifest => offsetStr
case LongManifest => offsetStr.toLong
case IntManifest => offsetStr.toInt
case SequenceManifest => query.Offset.sequence(offsetStr.toLong)
case TimeBasedUUIDManifest => query.Offset.timeBasedUUID(UUID.fromString(offsetStr))
}).asInstanceOf[Offset]
}

/**
* Convert the offset to a tuple (String, String) where the first element is
* the String representation of the offset and the second its manifest
*/
def toStorageRepresentation[Offset](offset: Offset): (String, String) = {
offset match {
case s: String => s -> StringManifest
case l: Long => l.toString -> LongManifest
case i: Int => i.toString -> IntManifest
case seq: query.Sequence => seq.value.toString -> SequenceManifest
case tbu: query.TimeBasedUUID => tbu.value.toString -> TimeBasedUUIDManifest
case _ => throw new IllegalArgumentException(s"Unsupported offset type, found [${offset.getClass.getName}]")
def toStorageRepresentation[Offset](
id: ProjectionId,
offset: Offset,
mergeable: Boolean = false): StorageRepresentation = {
val reps = offset match {
case s: String => SingleOffset(id, StringManifest, s, mergeable)
case l: Long => SingleOffset(id, LongManifest, l.toString, mergeable)
case i: Int => SingleOffset(id, IntManifest, i.toString, mergeable)
case seq: query.Sequence => SingleOffset(id, SequenceManifest, seq.value.toString, mergeable)
case tbu: query.TimeBasedUUID => SingleOffset(id, TimeBasedUUIDManifest, tbu.value.toString, mergeable)
case mrg: MergeableOffset[_] =>
MultipleOffsets(mrg.entries.map {
case (surrogateKey, innerOffset) =>
toStorageRepresentation(ProjectionId(id.name, surrogateKey), innerOffset, mergeable = true)
.asInstanceOf[SingleOffset]
}.toSeq)
case _ => throw new IllegalArgumentException(s"Unsupported offset type, found [${offset.getClass.getName}]")
}
reps
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@

package akka.projection.internal

import scala.compat.java8.OptionConverters._
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.Supplier

import scala.concurrent.Future
import scala.jdk.FutureConverters._
import scala.jdk.OptionConverters._

import akka.annotation.InternalApi
import akka.projection.javadsl
Expand All @@ -18,8 +24,15 @@ import akka.stream.scaladsl.Source
delegate: javadsl.SourceProvider[Offset, Envelope])
extends scaladsl.SourceProvider[Offset, Envelope] {

def source(offset: Option[Offset]): Source[Envelope, _] =
delegate.source(offset.asJava).asScala
def source(offset: () => Future[Option[Offset]]): Future[Source[Envelope, _]] = {
// the parasitic context is used to convert the Optional to Option and a java streams Source to a scala Source,
// it _should_ not be used for the blocking operation of getting offsets themselves
val ec = akka.dispatch.ExecutionContexts.parasitic
val offsetAdapter = new Supplier[CompletionStage[Optional[Offset]]] {
override def get(): CompletionStage[Optional[Offset]] = offset().map(_.toJava)(ec).asJava
}
delegate.source(offsetAdapter).asScala.map(_.asScala)(ec)
}

def extractOffset(envelope: Envelope): Offset =
delegate.extractOffset(envelope)
Expand Down
Loading