Skip to content

Commit

Permalink
+per #16541 allow using javadsl implemented journals as-if scaladsl
Browse files Browse the repository at this point in the history
  • Loading branch information
Patrik Nordwall & Konrad Malawski authored and ktoso committed Aug 12, 2015
1 parent c5d377f commit eeec14e
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ private FiniteDuration refreshInterval(Hint[] hints) {
private <I, M> akka.japi.function.Function<I, M> noMaterializedValue () {
return param -> (M) null;
}

}
//#my-read-journal
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,44 @@
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/

package akka.persistence.query.javadsl
package akka.persistence.query.javadsl;

import akka.persistence.query.{ Query, Hint }
import akka.stream.javadsl.Source

import scala.annotation.varargs
import akka.persistence.query.Query;
import akka.persistence.query.Hint;
import akka.stream.javadsl.Source;
import scala.annotation.varargs;

/**
* Java API
*
* <p>
* API for reading persistent events and information derived
* from stored persistent events.
*
* <p>
* The purpose of the API is not to enforce compatibility between different
* journal implementations, because the technical capabilities may be very different.
* The interface is very open so that different journals may implement specific queries.
*
* <p>
* Usage:
* {{{
* <pre><code>
* final ReadJournal journal =
* PersistenceQuery.get(system).getReadJournalFor(queryPluginConfigPath);
*
* final Source&lt;EventEnvelope, ?&gt; events =
* journal.query(new EventsByTag("mytag", 0L));
* }}}
* </pre></code>
*/

trait ReadJournal {
public interface ReadJournal {

/**
* Java API
*
* <p>
* A query that returns a `Source` with output type `T` and materialized value `M`.
*
* <p>
* The `hints` are optional parameters that defines how to execute the
* query, typically specific to the journal implementation.
*
*/
@varargs def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M]

<T, M> Source<T, M> query(Query<T, M> q, Hint... hints);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/

package akka.persistence.query.javadsl;

import akka.japi.Util;
import akka.persistence.query.Hint;
import akka.persistence.query.Query;
import akka.stream.javadsl.Source;

/**
* INTERNAL API
*
* Adapter from ScalaDSL {@link akka.persistence.query.scaladsl.ReadJournal}
* to JavaDSL {@link ReadJournal}.
*/
public final class ReadJournalAdapter implements ReadJournal {

private final akka.persistence.query.scaladsl.ReadJournal backing;

public ReadJournalAdapter(akka.persistence.query.scaladsl.ReadJournal backing) {
this.backing = backing;
}

@Override
public <T, M> Source<T, M> query(Query<T, M> q, Hint... hints) {
return backing.query(q, Util.immutableSeq(hints)).asJava();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import java.util.concurrent.atomic.AtomicReference

import akka.actor._
import akka.event.Logging
import akka.stream.javadsl.Source

import scala.annotation.{varargs, tailrec}
import scala.annotation.tailrec
import scala.util.Failure

/**
Expand Down Expand Up @@ -63,12 +62,7 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension {
* Returns the [[akka.persistence.query.javadsl.ReadJournal]] specified by the given read journal configuration entry.
*/
final def getReadJournalFor(readJournalPluginId: String): javadsl.ReadJournal =
new javadsl.ReadJournal {
val backing = readJournalFor(readJournalPluginId)
@varargs def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] =
backing.query(q, hints: _*).asJava
}

new javadsl.ReadJournalAdapter(readJournalFor(readJournalPluginId))

private def createPlugin(configPath: String): scaladsl.ReadJournal = {
require(!isEmpty(configPath) && system.settings.config.hasPath(configPath),
Expand All @@ -79,14 +73,24 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension {
log.debug(s"Create plugin: ${pluginActorName} ${pluginClassName}")
val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get

val plugin = system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil)
.orElse(system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, Nil))
.recoverWith {
case ex: Exception Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex))
}

// TODO possibly apply event adapters here
plugin.get
// TODO remove duplication
val scalaPlugin =
if (classOf[scaladsl.ReadJournal].isAssignableFrom(pluginClass))
system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil)
.orElse(system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, Nil))
.recoverWith {
case ex: Exception Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex))
}
else if (classOf[javadsl.ReadJournal].isAssignableFrom(pluginClass))
system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil)
.orElse(system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, Nil))
.map(jj new scaladsl.ReadJournalAdapter(jj))
.recoverWith {
case ex: Exception Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex))
}
else throw new IllegalArgumentException(s"Configured class ${pluginClass} does not extend")

scalaPlugin.get
}

/** Check for default or missing identity. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,9 @@ abstract class ReadJournal {
def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M]

}

/** INTERNAL API */
private[akka] final class ReadJournalAdapter(backing: akka.persistence.query.javadsl.ReadJournal) extends ReadJournal {
override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] =
backing.query(q, hints: _*).asScala
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/

package akka.persistence.query;

import akka.persistence.query.javadsl.ReadJournal;
import akka.stream.javadsl.Source;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import java.util.Iterator;

/**
* Use for tests only!
* Emits infinite stream of strings (representing queried for events).
*/
class MockJavaReadJournal implements ReadJournal {
public static final String Identifier = "akka.persistence.query.journal.mock-java";

public static final Config config = ConfigFactory.parseString(
Identifier + " { \n" +
" class = \"" + MockJavaReadJournal.class.getCanonicalName() + "\" \n" +
" }\n\n");

@Override
@SuppressWarnings("unchecked")
public <T, M> Source<T, M> query(Query<T, M> q, Hint... hints) {
return (Source<T, M>) Source.fromIterator(() -> new Iterator<String>() {
private int i = 0;
@Override public boolean hasNext() { return true; }

@Override public String next() {
return "" + (i++);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/

package akka.persistence.query;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ import akka.actor.ActorSystem
import akka.persistence.journal.{ EventAdapter, EventSeq }
import com.typesafe.config.ConfigFactory
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
import org.scalautils.ConversionCheckedTripleEquals

import scala.concurrent.Await
import scala.concurrent.duration._

class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfterAll with ConversionCheckedTripleEquals {
class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfterAll {

val anything: Query[String, _] = null

Expand All @@ -42,12 +41,24 @@ class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfte
}.getMessage should include("missing persistence read journal")
}
}

"expose scaladsl implemented journal as javadsl.ReadJournal" in {
withActorSystem() { system
val j: javadsl.ReadJournal = PersistenceQuery.get(system).getReadJournalFor(MockReadJournal.Identifier)
}
}
"expose javadsl implemented journal as scaladsl.ReadJournal" in {
withActorSystem() { system
val j: scaladsl.ReadJournal = PersistenceQuery.get(system).readJournalFor(MockJavaReadJournal.Identifier)
}
}
}

private val systemCounter = new AtomicInteger()
private def withActorSystem(conf: String = "")(block: ActorSystem Unit): Unit = {
val config =
MockReadJournal.config
.withFallback(MockJavaReadJournal.config)
.withFallback(ConfigFactory.parseString(conf))
.withFallback(ConfigFactory.parseString(eventAdaptersConfig))
.withFallback(ConfigFactory.load())
Expand Down

0 comments on commit eeec14e

Please sign in to comment.