Skip to content
This repository has been archived by the owner on Jun 17, 2024. It is now read-only.

Commit

Permalink
Merge 9ffe542 into 0c3cec7
Browse files Browse the repository at this point in the history
  • Loading branch information
ahjohannessen committed Mar 27, 2019
2 parents 0c3cec7 + 9ffe542 commit 19ad057
Show file tree
Hide file tree
Showing 178 changed files with 1,896 additions and 1,167 deletions.
201 changes: 102 additions & 99 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,19 @@ In order to make Java devs happy and not reinvent a wheel, we propose to use too

```java
final EsConnection connection = EsConnectionFactory.create(system);
final Future<Event> future = connection.readEvent("my-stream", new EventNumber.Exact(0), false, null);
final Future<Event> future = connection.readEvent("my-stream", new EventNumber.Exact(0), false, null);
```

```scala
import system.dispatcher
val connection = EsConnection(system)
val future = connection apply ReadEvent(EventStream.Id("my-stream"), EventNumber.First)
val future = connection(ReadEvent(EventStream.Id("my-stream"), EventNumber.First))
```

* Sending messages to `eventstore.ConnectionActor`

```java
final ActorRef connection = system.actorOf(ConnectionActor.getProps());
final ReadEvent readEvent = new ReadEventBuilder("my-stream")
.first()
.build();
final ReadEvent readEvent = new ReadEventBuilder("my-stream").first().build();
connection.tell(readEvent, null);
```

Expand Down Expand Up @@ -71,15 +68,14 @@ libraryDependencies += "com.geteventstore" %% "eventstore-client" % "6.0.0"
### Read event

```java
import java.net.InetSocketAddress;
import akka.actor.*;
import akka.actor.Status.Failure;
import akka.event.*;
import eventstore.*;
import eventstore.j.*;
import eventstore.tcp.ConnectionActor;

import java.net.InetSocketAddress;

import eventstore.core.*;
import eventstore.akka.Settings;
import eventstore.akka.tcp.ConnectionActor;

public class ReadEventExample {

Expand All @@ -102,22 +98,23 @@ public class ReadEventExample {
}


public static class ReadResult extends UntypedActor {
public static class ReadResult extends AbstractActor {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

public void onReceive(Object message) throws Exception {
if (message instanceof ReadEventCompleted) {
final ReadEventCompleted completed = (ReadEventCompleted) message;
final Event event = completed.event();
log.info("event: {}", event);
} else if (message instanceof Failure) {
final Failure failure = ((Failure) message);
final EsException exception = (EsException) failure.cause();
log.error(exception, exception.toString());
} else
unhandled(message);

context().system().shutdown();
@Override
public Receive createReceive() {
return receiveBuilder()
.match(ReadEventCompleted.class, m -> {
final Event event = m.event();
log.info("event: {}", event);
context().system().terminate();
})
.match(Failure.class, f -> {
final EsException exception = (EsException) f.cause();
log.error(exception, exception.toString());
context().system().terminate();
})
.build();
}
}
}
Expand All @@ -126,20 +123,19 @@ public class ReadEventExample {
### Write event

```java
import akka.actor.*;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import eventstore.*;
import eventstore.j.EventDataBuilder;
import eventstore.j.WriteEventsBuilder;
import eventstore.tcp.ConnectionActor;

import java.util.UUID;
import akka.actor.*;
import akka.event.*;
import eventstore.j.*;
import eventstore.core.*;
import eventstore.akka.tcp.ConnectionActor;

public class WriteEventExample {

public static void main(String[] args) {
final ActorSystem system = ActorSystem.create();
final ActorRef connection = system.actorOf(ConnectionActor.getProps());

final ActorSystem system = ActorSystem.create();
final ActorRef connection = system.actorOf(ConnectionActor.getProps());
final ActorRef writeResult = system.actorOf(Props.create(WriteResult.class));

final EventData event = new EventDataBuilder("my-event")
Expand All @@ -156,37 +152,36 @@ public class WriteEventExample {
connection.tell(writeEvents, writeResult);
}

public static class WriteResult extends AbstractActor {

public static class WriteResult extends UntypedActor {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

public void onReceive(Object message) throws Exception {
if (message instanceof WriteEventsCompleted) {
final WriteEventsCompleted completed = (WriteEventsCompleted) message;
log.info("range: {}, position: {}", completed.numbersRange(), completed.position());
} else if (message instanceof Status.Failure) {
final Status.Failure failure = ((Status.Failure) message);
final EsException exception = (EsException) failure.cause();
log.error(exception, exception.toString());
} else
unhandled(message);

context().system().shutdown();
@Override
public Receive createReceive() {
return receiveBuilder()
.match(WriteEventsCompleted.class, m -> {
log.info("range: {}, position: {}", m.numbersRange(), m.position());
context().system().terminate();
})
.match(Status.Failure.class, f -> {
final EsException exception = (EsException) f.cause();
log.error(exception, exception.toString());
})
.build();
}

}
}
```

### Subscribe to All

```java
import akka.actor.ActorSystem;
import eventstore.IndexedEvent;
import eventstore.SubscriptionObserver;
import eventstore.j.EsConnection;
import eventstore.j.EsConnectionFactory;

import java.io.Closeable;
import akka.actor.ActorSystem;
import eventstore.j.*;
import eventstore.core.IndexedEvent;
import eventstore.akka.SubscriptionObserver;

public class SubscribeToAllExample {
public static void main(String[] args) {
Expand Down Expand Up @@ -220,7 +215,13 @@ public class SubscribeToAllExample {
### Build event

```java
final EventData empty = new EventDataBuilder("empty").build();
import java.util.UUID;
import eventstore.core.EventData;
import eventstore.j.EventDataBuilder;

public class EventDataBuilderExample {

final EventData empty = new EventDataBuilder("eventType").build();

final EventData binary = new EventDataBuilder("binary")
.eventId(UUID.randomUUID())
Expand All @@ -239,25 +240,26 @@ public class SubscribeToAllExample {
.jsonData("{\"data\":\"data\"}")
.jsonMetadata("{\"metadata\":\"metadata\"}")
.build();
}
```

## Scala examples

### Read event

```scala
import akka.actor.Status.Failure
import akka.actor._
import eventstore._
import eventstore.tcp.ConnectionActor
import java.net.InetSocketAddress
import _root_.akka.actor._
import _root_.akka.actor.Status.Failure
import eventstore.akka.tcp.ConnectionActor

object ReadEventExample extends App {
val system = ActorSystem()

val settings = Settings(
address = new InetSocketAddress("127.0.0.1", 1113),
defaultCredentials = Some(UserCredentials("admin", "changeit")))
defaultCredentials = Some(UserCredentials("admin", "changeit"))
)

val connection = system.actorOf(ConnectionActor.props(settings))
implicit val readResult = system.actorOf(Props[ReadResult])
Expand All @@ -281,21 +283,23 @@ object ReadEventExample extends App {
### Write event

```scala
import akka.actor.Status.Failure
import akka.actor.{ ActorLogging, Actor, Props, ActorSystem }
import eventstore._
import eventstore.tcp.ConnectionActor
import _root_.akka.actor.Status.Failure
import _root_.akka.actor.{ ActorLogging, Actor, Props, ActorSystem }
import eventstore.core.util.uuid.randomUuid
import eventstore.akka.tcp.ConnectionActor

object WriteEventExample extends App {
val system = ActorSystem()
val connection = system.actorOf(ConnectionActor.props())
implicit val writeResult = system.actorOf(Props[WriteResult])

val event = EventData("my-event", data = Content("my event data"), metadata = Content("my first event"))
val system = ActorSystem()
val connection = system.actorOf(ConnectionActor.props())
val event = EventData("my-event", eventId = randomUuid, data = Content("my event data"), metadata = Content("my first event"))

implicit val writeResult = system.actorOf(Props(WriteResult))

connection ! WriteEvents(EventStream.Id("my-stream"), List(event))

class WriteResult extends Actor with ActorLogging {
case object WriteResult extends Actor with ActorLogging {

def receive = {
case WriteEventsCompleted(range, position) =>
log.info("range: {}, position: {}", range, position)
Expand All @@ -312,34 +316,35 @@ object WriteEventExample extends App {
### Start transaction

```scala
import akka.actor.ActorSystem
import eventstore.TransactionActor._
import eventstore.tcp.ConnectionActor
import eventstore.{ EventData, TransactionActor, EventStream, TransactionStart }
import _root_.akka.actor.{ActorSystem, Props}
import eventstore.core.util.uuid.randomUuid
import eventstore.akka.tcp.ConnectionActor
import eventstore.akka.TransactionActor._

object StartTransactionExample extends App {
val system = ActorSystem()
val connection = system.actorOf(ConnectionActor.props())
val connection = system.actorOf(ConnectionActor.props(), "connection")

val kickoff = Start(TransactionStart(EventStream.Id("my-stream")))
val transaction = system.actorOf(TransactionActor.props(connection, kickoff))
val transaction = system.actorOf(TransactionActor.props(connection, kickoff), "transaction")
implicit val transactionResult = system.actorOf(Props[TransactionResult], "result")

val data = EventData("transaction-event", eventId = randomUuid)

transaction ! GetTransactionId // replies with `TransactionId(transactionId)`
transaction ! Write(EventData("transaction-event")) // replies with `WriteCompleted`
transaction ! Write(EventData("transaction-event")) // replies with `WriteCompleted`
transaction ! Write(EventData("transaction-event")) // replies with `WriteCompleted`
transaction ! Write(data) // replies with `WriteCompleted`
transaction ! Write(data) // replies with `WriteCompleted`
transaction ! Write(data) // replies with `WriteCompleted`
transaction ! Commit // replies with `CommitCompleted`
}
```

### Count all events

```scala
import akka.actor._
import eventstore.LiveProcessingStarted
import eventstore.tcp.ConnectionActor
import eventstore.{ IndexedEvent, Settings, SubscriptionActor }
import _root_.akka.actor._
import scala.concurrent.duration._
import eventstore.akka.tcp.ConnectionActor

object CountAll extends App {
val system = ActorSystem()
Expand All @@ -354,7 +359,7 @@ class CountAll extends Actor with ActorLogging {
def receive = count(0)

def count(n: Long, printed: Boolean = false): Receive = {
case x: IndexedEvent => context become count(n + 1)
case _: IndexedEvent => context become count(n + 1)
case LiveProcessingStarted => log.info("live processing started")
case ReceiveTimeout if !printed =>
log.info("count {}", n)
Expand All @@ -366,9 +371,9 @@ class CountAll extends Actor with ActorLogging {
### Future-like api

```scala
import akka.actor.ActorSystem
import _root_.akka.actor.ActorSystem
import scala.concurrent.Future
import eventstore._
import eventstore.core.util.uuid.randomUuid

object EsConnectionExample extends App {
val system = ActorSystem()
Expand All @@ -380,22 +385,22 @@ object EsConnectionExample extends App {

val stream = EventStream.Id("my-stream")

val readEvent: Future[ReadEventCompleted] = connection.apply(ReadEvent(stream))
val readEvent: Future[ReadEventCompleted] = connection(ReadEvent(stream))
readEvent foreach { x =>
log.info(x.event.toString)
}

val readStreamEvents: Future[ReadStreamEventsCompleted] = connection.apply(ReadStreamEvents(stream))
val readStreamEvents: Future[ReadStreamEventsCompleted] = connection(ReadStreamEvents(stream))
readStreamEvents foreach { x =>
log.info(x.events.toString())
}

val readAllEvents: Future[ReadAllEventsCompleted] = connection.apply(ReadAllEvents(maxCount = 5))
val readAllEvents: Future[ReadAllEventsCompleted] = connection(ReadAllEvents(maxCount = 5))
readAllEvents foreach { x =>
log.info(x.events.toString())
}

val writeEvents: Future[WriteEventsCompleted] = connection.apply(WriteEvents(stream, List(EventData("my-event"))))
val writeEvents: Future[WriteEventsCompleted] = connection(WriteEvents(stream, List(EventData("my-event", eventId = randomUuid))))
writeEvents foreach { x =>
log.info(x.numbersRange.toString)
}
Expand All @@ -422,9 +427,8 @@ Here is a short example on how to use it:
### List all streams

```scala
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import eventstore.{ EventStoreExtension, EventStream }
import _root_.akka.actor.ActorSystem
import _root_.akka.stream.ActorMaterializer

object ListAllStreamsExample extends App {
implicit val system = ActorSystem()
Expand All @@ -447,10 +451,9 @@ by converting an Akka Stream to Publisher. See: [Integrating Akka Streams with R
Here is a short example on how to accomplish that:

```scala
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import eventstore.EventStoreExtension
import _root_.akka.actor.ActorSystem
import _root_.akka.stream.ActorMaterializer
import _root_.akka.stream.scaladsl._
import org.reactivestreams.{Publisher, Subscriber}
import scala.concurrent.duration._

Expand All @@ -474,12 +477,12 @@ object MessagesPerSecondReactiveStreams extends App {

### Configuration

Default client settings defined in [`reference.conf`](src/main/resources/reference.conf).
Default client settings defined in [`core reference.conf`](core/src/main/resources/reference.conf) and [`client reference.conf`](client/src/main/resources/reference.conf).
You can override them via own `application.conf` put in the `src/main/resources`, the same way you might already do for akka.
We are using the same approach - [config](https://github.com/typesafehub/config).
We are using the same approach using the same configuration [library](https://github.com/lightbend/config).

### Cluster

It is possible to use client against cluster of Event Store.
For this you need to configure client via `eventstore.cluster` section in [`reference.conf`](src/main/resources/reference.conf) or [`ClusterSettings`](src/main/scala/eventstore/cluster/ClusterSettings.scala).
For this you need to configure client via `eventstore.cluster` section in [`core reference.conf`](core/src/main/resources/reference.conf) or [`ClusterSettings`](core/src/main/scala/eventstore/core/settings/ClusterSettings.scala).
Using `application.conf` for configuration is more preferable option.
Loading

0 comments on commit 19ad057

Please sign in to comment.