Skip to content

Commit

Permalink
use ported pekko-persistence-inmemory
Browse files Browse the repository at this point in the history
Signed-off-by: Stanchev Aleksandar <aleksandar.stanchev@bosch.com>
  • Loading branch information
alstanchev authored and Aleksandar Stanchev committed Aug 28, 2023
1 parent d34f982 commit 265ba91
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 45 deletions.
22 changes: 15 additions & 7 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,14 @@
<pekko-bom.version>1.0.1</pekko-bom.version>
<pekko-http-bom.version>1.0.0</pekko-http-bom.version>
<pekko-persistence-mongodb.version>1.0.0-SNAPSHOT</pekko-persistence-mongodb.version>
<pekko-persistence-inmemory.version>0.0.0-SNAPSHOT</pekko-persistence-inmemory.version>
<pekko-management.version>1.0.0-RC2</pekko-management.version>
<pekko-connector-kafka.version>1.0.0</pekko-connector-kafka.version>
<parboiled.version>2.5.0</parboiled.version>

<!-- Necessary for pekko-persistence-mongodb -->
<metrics4-scala.version>4.2.9</metrics4-scala.version>

<!-- Keep these version consistent with pekko-persistence-mongodb.version's build.sbt -->
<mongo-java-driver.version>4.3.4</mongo-java-driver.version>

Expand Down Expand Up @@ -746,7 +750,11 @@
<artifactId>ditto-thingsearch-service</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>nl.grons</groupId>
<artifactId>metrics4-scala_${scala.version}</artifactId>
<version>${metrics4-scala.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand Down Expand Up @@ -885,12 +893,12 @@
<version>${scalatest.version}</version> <!-- version used by pekko-http-testkit @ pekko-http-bom.version -->
<scope>test</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.github.dnvriend</groupId>-->
<!-- <artifactId>akka-persistence-inmemory_${scala.version}</artifactId>-->
<!-- <version>${akka-persistence-inmemory.version}</version>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>com.github.alstanchev</groupId>
<artifactId>pekko-persistence-inmemory_${scala.version}</artifactId>
<version>${pekko-persistence-inmemory.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-core</artifactId>
Expand Down
10 changes: 5 additions & 5 deletions connectivity/service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,11 @@ jmh-generator-annprocess). jmh-generator-annprocess overwrites the whole META-IN
<scope>test</scope>
<type>test-jar</type>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.github.dnvriend</groupId>-->
<!-- <artifactId>akka-persistence-inmemory_${scala.version}</artifactId>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>com.github.alstanchev</groupId>
<artifactId>pekko-persistence-inmemory_${scala.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public final class ConnectionPersistenceActorRecoveryTest extends WithMockServer
DittoTracingInitResource.disableDittoTracing();

private static final String PERSISTENCE_ID_PREFIX = "connection:";
private static final String JOURNAL_PLUGIN_ID = "akka-contrib-mongodb-persistence-connection-journal";
private static final String SNAPSHOT_PLUGIN_ID = "akka-contrib-mongodb-persistence-connection-snapshots";
private static final String JOURNAL_PLUGIN_ID = "pekko-contrib-mongodb-persistence-connection-journal";
private static final String SNAPSHOT_PLUGIN_ID = "pekko-contrib-mongodb-persistence-connection-snapshots";

private static ActorSystem actorSystem;
private static ActorRef pubSubMediator;
Expand Down
20 changes: 8 additions & 12 deletions connectivity/service/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,8 @@ pekko.persistence {
]
}

//TODO inMemory
akka-contrib-mongodb-persistence-connection-journal {
class = "akka.persistence.inmemory.journal.InMemoryAsyncWriteJournal"
pekko-contrib-mongodb-persistence-connection-journal {
class = "org.apache.pekko.persistence.inmemory.journal.InMemoryAsyncWriteJournal"
plugin-dispatcher = "connection-persistence-dispatcher"

ask-timeout = 10s
Expand All @@ -485,17 +484,15 @@ akka-contrib-mongodb-persistence-connection-journal {
}
}

//TODO inMemory
akka-contrib-mongodb-persistence-connection-snapshots {
class = "akka.persistence.inmemory.snapshot.InMemorySnapshotStore"
pekko-contrib-mongodb-persistence-connection-snapshots {
class = "org.apache.pekko.persistence.inmemory.snapshot.InMemorySnapshotStore"
plugin-dispatcher = "connection-persistence-dispatcher"

ask-timeout = 10s
}

//TODO inMemory
akka-contrib-mongodb-persistence-reconnect-journal {
class = "akka.persistence.inmemory.journal.InMemoryAsyncWriteJournal"
pekko-contrib-mongodb-persistence-reconnect-journal {
class = "org.apache.pekko.persistence.inmemory.journal.InMemoryAsyncWriteJournal"
plugin-dispatcher = "reconnect-persistence-dispatcher"

ask-timeout = 10s
Expand All @@ -510,9 +507,8 @@ akka-contrib-mongodb-persistence-reconnect-journal {
}
}

//TODO inMemory
akka-contrib-mongodb-persistence-reconnect-snapshots {
class = "akka.persistence.inmemory.snapshot.InMemorySnapshotStore"
pekko-contrib-mongodb-persistence-reconnect-snapshots {
class = "org.apache.pekko.persistence.inmemory.snapshot.InMemorySnapshotStore"
plugin-dispatcher = "reconnect-persistence-dispatcher"

ask-timeout = 10s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ private Route increaseHttpRequestTimeout(final java.util.function.Function<Durat

private Route increaseHttpRequestTimeout(final java.util.function.Function<Duration, Route> inner,
final scala.concurrent.duration.Duration requestTimeout) {
return withRequestTimeout( PEKKO__HTTP_TIMEOUT,
return withRequestTimeout(PEKKO_HTTP_TIMEOUT,
() -> inner.apply(Duration.ofMillis(requestTimeout.toMillis())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pekko {
}

extensions = [
"pekko.cluster.pubsub.DistributedPubSub"
"org.apache.pekko.cluster.pubsub.DistributedPubSub"
]

remote {
Expand Down
6 changes: 6 additions & 0 deletions internal/utils/metrics/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@
<artifactId>pekko-persistence-mongodb_${scala.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<!-- This dependency is required to build the ScalaDriverPersistenceExtension. Otherwise
java.lang.ClassNotFoundException: nl.grons.metrics4.scala.BaseBuilder is thrown -->
<groupId>nl.grons</groupId>
<artifactId>metrics4-scala_${scala.version}</artifactId>
</dependency>
</dependencies>

</project>
10 changes: 5 additions & 5 deletions policies/service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.github.dnvriend</groupId>-->
<!-- <artifactId>akka-persistence-inmemory_${scala.version}</artifactId>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>com.github.alstanchev</groupId>
<artifactId>pekko-persistence-inmemory_${scala.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@
import org.bson.BsonDocument;
import org.eclipse.ditto.policies.model.PolicyId;

//TODO InMemoryReadJournal
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
//import org.apache.pekko.persistence.inmemory.query.javadsl.InMemoryReadJournal;
import org.apache.pekko.persistence.inmemory.query.javadsl.InMemoryReadJournal;
import org.apache.pekko.persistence.query.EventEnvelope;
import org.apache.pekko.persistence.query.PersistenceQuery;
import org.apache.pekko.stream.javadsl.Sink;
Expand All @@ -47,7 +46,7 @@ public final class PoliciesJournalTestHelper<J> {
private static final int WAIT_TIMEOUT = 3;
private final Function<PolicyId, String> domainIdToPersistenceId;
private final BiFunction<BsonDocument, Long, J> journalEntryToDomainObject;
// private final InMemoryReadJournal readJournal;
private final InMemoryReadJournal readJournal;
private final ActorSystem actorSystem;

/**
Expand All @@ -66,8 +65,8 @@ public PoliciesJournalTestHelper(final ActorSystem actorSystem,
this.domainIdToPersistenceId = requireNonNull(domainIdToPersistenceId);
this.actorSystem = actorSystem;

// readJournal = PersistenceQuery.get(actorSystem).
// getReadJournalFor(InMemoryReadJournal.class, InMemoryReadJournal.Identifier());
readJournal = PersistenceQuery.get(actorSystem).
getReadJournalFor(InMemoryReadJournal.class, InMemoryReadJournal.Identifier());
}

/**
Expand All @@ -84,8 +83,7 @@ public List<J> getAllEvents(final PolicyId domainId) {
}

private List<EventEnvelope> getAllEventEnvelopes(final String persistenceId) {
return null;
// return runBlockingWithReturn(readJournal.currentEventsByPersistenceId(persistenceId, 0, Long.MAX_VALUE));
return runBlockingWithReturn(readJournal.currentEventsByPersistenceId(persistenceId, 0, Long.MAX_VALUE));
}

private J convertEventEnvelopeToDomainObject(final EventEnvelope eventEnvelope) {
Expand Down
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@
</properties>

<repositories>
<repository>
<id>github</id>
<url>https://maven.pkg.github.com/alstanchev/pekko-persistence-inmemory/</url>
</repository>
<repository>
<id>apache-releases</id>
<name>apache-releases</name>
Expand Down
10 changes: 5 additions & 5 deletions things/service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,11 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.github.dnvriend</groupId>-->
<!-- <artifactId>akka-persistence-inmemory_${scala.version}</artifactId>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>com.github.alstanchev</groupId>
<artifactId>pekko-persistence-inmemory_${scala.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand Down

0 comments on commit 265ba91

Please sign in to comment.