Skip to content

Commit

Permalink
Use env settings for eventstore
Browse files Browse the repository at this point in the history
  • Loading branch information
morrislaptop committed May 18, 2020
1 parent e855070 commit 0c21178
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 36 deletions.
10 changes: 10 additions & 0 deletions README.md
Expand Up @@ -2,3 +2,13 @@

Or German for `read`.

TODO

* [ ] Refactor ResolvedEvent to StoredEvent
* [x] Use env connection settings
* [ ] Support reads larger than 4096 events (use yield?)
* [ ] Callbacks for received, processed, failed
* [ ] Callbacks for aggregate stream, aggregate snapshots
* [ ] Support reading from $all stream or a user stream
* [ ] Trait to help aggregates?
* [ ] persist to persistMany instead of other way round
29 changes: 12 additions & 17 deletions src/EventStoreStoredEventRepository.php
Expand Up @@ -23,17 +23,18 @@
use Prooph\EventStore\Internal\Consts;
use Illuminate\Support\Str;
use Spatie\EventSourcing\Exceptions\InvalidStoredEvent;
use Prooph\EventStore\EventStoreConnection;

class EventStoreStoredEventRepository implements StoredEventRepository
{
protected string $storedEventModel;
public static $all = '$ce-account';

protected $category;
protected EventStoreConnection $eventstore;

public static $all = '$ce-account';

public function __construct($category = null)
public function __construct(EventStoreConnection $eventstore, $category = null)
{
$this->eventstore = $eventstore;
$this->category = $category;
}

Expand All @@ -48,10 +49,11 @@ public function retrieveAll(string $uuid = null): LazyCollection
*/
public function retrieveAllStartingFrom(int $startingFrom, string $uuid = null): LazyCollection
{
throw_if(self::$all === '$all' && $startingFrom > 0, 'Starting from not valid for $all stream');

$startingFrom = max($startingFrom - 1, 0); // if we wanted to start from 1, we actually mean event 0
$connection = EventStoreConnectionFactory::create();

$slice = $connection->readStreamEventsForward(
$slice = $this->eventstore->readStreamEventsForward(
self::$all,
$startingFrom,
Consts::MAX_READ_SIZE,
Expand Down Expand Up @@ -83,9 +85,7 @@ public function retrieveAllStartingFrom(int $startingFrom, string $uuid = null):
*/
public function retrieveAllAfterVersion(int $aggregateVersion, string $aggregateUuid): LazyCollection
{
$connection = EventStoreConnectionFactory::create();

$slice = $connection->readStreamEventsForward(
$slice = $this->eventstore->readStreamEventsForward(
$this->category . '-' . $aggregateUuid,
$aggregateVersion,
Consts::MAX_READ_SIZE,
Expand Down Expand Up @@ -119,9 +119,8 @@ public function retrieveAllAfterVersion(int $aggregateVersion, string $aggregate
public function countAllStartingFrom(int $startingFrom, string $uuid = null): int
{
$startingFrom = max($startingFrom - 1, 0); // if we wanted to start from 1, we actually mean event 0
$connection = EventStoreConnectionFactory::create();

$slice = $connection->readStreamEventsBackward(
$slice = $this->eventstore->readStreamEventsBackward(
self::$all,
-1,
1,
Expand All @@ -140,13 +139,11 @@ public function countAllStartingFrom(int $startingFrom, string $uuid = null): in

public function persist(ShouldBeStored $event, string $uuid = null, int $aggregateVersion = null): StoredEvent
{
$connection = EventStoreConnectionFactory::create();

$json = app(EventSerializer::class)->serialize(clone $event);
$metadata = '{}';
$event = new EventData(EventId::generate(), get_class($event), true, $json, $metadata);

$write = $connection->appendToStream(
$write = $this->eventstore->appendToStream(
$this->category . '-' . $uuid,
ExpectedVersion::ANY,
[$event],
Expand Down Expand Up @@ -185,9 +182,7 @@ public function update(StoredEvent $storedEvent): StoredEvent

public function getLatestAggregateVersion(string $aggregateUuid): int
{
$connection = EventStoreConnectionFactory::create();

$slice = $connection->readStreamEventsBackward(
$slice = $this->eventstore->readStreamEventsBackward(
$this->category . '-' . $aggregateUuid,
-1,
1,
Expand Down
32 changes: 18 additions & 14 deletions src/EventStoreSubscribeCommand.php
Expand Up @@ -23,6 +23,7 @@
use Spatie\EventSourcing\Projectionist;
use Throwable;
use Illuminate\Support\Str;
use Prooph\EventStore\Async\EventStoreConnection;
use Spatie\EventSourcing\StoredEvent;
use Spatie\SchemalessAttributes\SchemalessAttributes;

Expand All @@ -32,32 +33,35 @@ class EventStoreSubscribeCommand extends Command

protected $description = 'Subscribe to a persistent subscription';

public function handle(): void
public function handle(EventStoreConnection $eventstore): void
{
Loop::run(function () {
$connection = EventStoreConnectionFactory::createFromEndPoint(
new EndPoint('localhost', 1113)
);

$connection->onConnected(function (): void {
Loop::run(function () use ($eventstore) {
$eventstore->onConnected(function (): void {
echo 'connected' . PHP_EOL;
});

$connection->onClosed(function (): void {
$eventstore->onClosed(function (): void {
echo 'connection closed' . PHP_EOL;
});

yield $connection->connectAsync();
$eventstore->onErrorOccurred(function () {
echo 'error';
});

$eventstore->onDisconnected(function () {
echo 'error';
});

yield $eventstore->connectAsync();

foreach (config('eventstore.subscription_streams') as $stream) {
yield $connection->connectToPersistentSubscriptionAsync(
foreach (config('lese.subscription_streams') as $stream) {
yield $eventstore->connectToPersistentSubscriptionAsync(
$stream,
config('eventstore.group'),
config('lese.group'),
new OnEvent(),
new OnDropped(),
10,
true,
new UserCredentials('admin', 'changeit')
false, // we ack
);
}
});
Expand Down
57 changes: 57 additions & 0 deletions src/ServiceProvider.php
Expand Up @@ -8,8 +8,18 @@
use DigitalRisks\LaravelEventStore\Contracts\ShouldBeStored;
use DigitalRisks\LaravelEventStore\EventStore;
use DigitalRisks\LaravelEventStore\Listeners\SendToEventStoreListener;
use DigitalRisks\Lese\Tests\TestClasses\AccountAggregateRoot;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\ServiceProvider as LaravelServiceProvider;
use Prooph\EventStore\Async\EventStoreConnection as TcpEventStoreConnection;
use Prooph\EventStore\EndPoint;
use Prooph\EventStore\UserCredentials;
use Prooph\EventStoreClient\ConnectionSettingsBuilder as TcpConnectionSettingsBuilder;
use Prooph\EventStoreClient\EventStoreConnectionFactory as TcpEventStoreConnectionFactory;
use Spatie\EventSourcing\StoredEventRepository;
use Prooph\EventStore\EventStoreConnection as HttpEventStoreConnection;
use Prooph\EventStoreHttpClient\ConnectionSettings as HttpConnectionSettings;
use Prooph\EventStoreHttpClient\EventStoreConnectionFactory as HttpEventStoreConnectionFactory;

class ServiceProvider extends LaravelServiceProvider
{
Expand Down Expand Up @@ -38,5 +48,52 @@ public function register()
EventStoreSubscribeCommand::class,
]);
}

$this->bindTcpEventStore();
$this->bindHttpEventStore();
$this->bindAggregates();
}

protected function bindTcpEventStore()
{
$tcp = parse_url(config('lese.tcp_url'));

$creds = new UserCredentials($tcp['user'], $tcp['pass']);

$builder = new TcpConnectionSettingsBuilder();
if ($tcp['scheme'] === 'tls') $builder->useSslConnection($tcp['host'], false);
$builder->setDefaultUserCredentials($creds);
$settings = $builder->build();

$connection = TcpEventStoreConnectionFactory::createFromEndPoint(
new EndPoint($tcp['host'], $tcp['port']),
$settings,
);

$this->app->instance(TcpEventStoreConnection::class, $connection);
}

protected function bindHttpEventStore()
{
$http = parse_url(config('lese.http_url'));

$creds = new UserCredentials($http['user'], $http['pass']);

$settings = new HttpConnectionSettings(
new EndPoint($http['host'], $http['port']),
$http['scheme'],
$creds
);

$connection = HttpEventStoreConnectionFactory::create($settings);

$this->app->instance(HttpEventStoreConnection::class, $connection);
}

protected function bindAggregates()
{
$this->app->when(AccountAggregateRoot::class)->needs(StoredEventRepository::class)->give(function () {
return resolve(EventStoreStoredEventRepository::class, ['category' => 'account']);
});
}
}
2 changes: 1 addition & 1 deletion tests/EventStoreStoredEventRepositoryTest.php
Expand Up @@ -15,7 +15,7 @@ class EventStoreStoredEventRepositoryTest extends TestCase
/** @test */
public function it_can_get_the_latest_version_id_for_a_given_aggregate_uuid()
{
$eloquentStoredEventRepository = new EventStoreStoredEventRepository('account');
$eloquentStoredEventRepository = resolve(EventStoreStoredEventRepository::class, ['category' => 'account']);

$this->assertEquals(0, $eloquentStoredEventRepository->getLatestAggregateVersion($this->faker->uuid));

Expand Down
2 changes: 1 addition & 1 deletion tests/ReplayCommandTest.php
Expand Up @@ -24,7 +24,7 @@ public function setUp(): void
{
parent::setUp();

AccountAggregateRoot::$category = $this->faker->domainWord;
AccountAggregateRoot::$category = $this->faker->md5;
EventStoreStoredEventRepository::$all = '$ce-' . AccountAggregateRoot::$category;

$account = AccountAggregateRoot::retrieve($this->faker->uuid);
Expand Down
4 changes: 3 additions & 1 deletion tests/TestCase.php
Expand Up @@ -4,6 +4,7 @@

use DigitalRisks\Lese\EventStoreSnapshotRepository;
use DigitalRisks\Lese\EventStoreStoredEventRepository;
use DigitalRisks\Lese\ServiceProvider;
use Illuminate\Foundation\Testing\WithFaker;
use Illuminate\Support\Facades\Schema;
use Spatie\EventSourcing\EventSourcingServiceProvider;
Expand All @@ -27,7 +28,8 @@ protected function setUp(): void
protected function getPackageProviders($app)
{
return [
EventSourcingServiceProvider::class
EventSourcingServiceProvider::class,
ServiceProvider::class,
];
}
}
4 changes: 2 additions & 2 deletions tests/TestClasses/AccountAggregateRoot.php
Expand Up @@ -20,12 +20,12 @@ class AccountAggregateRoot extends AggregateRoot

protected function getStoredEventRepository(): StoredEventRepository
{
return new EventStoreStoredEventRepository(self::$category);
return resolve(EventStoreStoredEventRepository::class, ['category' => self::$category]);
}

protected function getSnapshotRepository(): SnapshotRepository
{
return new EventStoreSnapshotRepository(self::$category);
return resolve(EventStoreSnapshotRepository::class, ['category' => self::$category]);
}

protected function getState(): array
Expand Down

0 comments on commit 0c21178

Please sign in to comment.