Skip to content

Commit

Permalink
Added subscription documentation, aligned smaller bugs in other docum…
Browse files Browse the repository at this point in the history
…entation
  • Loading branch information
oskardudycz committed Dec 21, 2020
1 parent f3557df commit 56bf59f
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 45 deletions.
6 changes: 2 additions & 4 deletions docs/clients/grpc/getting-started/connecting.md
Expand Up @@ -100,7 +100,7 @@ const writeResult = await writeEventsToStream("testStream")
</xode-block>
</xode-group>

Here we are writing events without checking if the stream exists or if the stream version matches the expected event version.
Here we are writing events without checking if the stream exists or if the stream version matches the expected event version. See more advanced scenarios in [writing events documentation](../writing-events/README.md).

## Reading events

Expand All @@ -126,6 +126,4 @@ const events = await readEventsFromStream("testStream")
</xode-block>
</xode-group>

When you read events from the stream, you can a collection of `ResolvedEvent` structures. The event payload is returned as a byte array and needs to be deserialized.


When you read events from the stream, you can a collection of `ResolvedEvent` structures. The event payload is returned as a byte array and needs to be deserialized. See more advanced scenarios in [reading events documentation](../reading-events/README.md).
Expand Up @@ -43,7 +43,7 @@ When using projections to create new events you can set whether the generated ev
<xode-group>
<xode-block title="C#">

TODO
<!--TODO -->

</xode-block>
<xode-block title="NodeJS">
Expand Down
100 changes: 100 additions & 0 deletions docs/clients/grpc/subscribing-to-streams/README.md
Expand Up @@ -10,15 +10,33 @@ If events already exist, the handler will be called for each event one by one un

The simplest stream subscription looks like the following :

<xode-group>
<xode-block title="C#">

<<< @/docs/clients/dotnet/generated/v20.6.1/samples/subscribing-to-streams/Program.cs#subscribe-to-stream
</xode-block>
<xode-block title="NodeJS">

<<< @/samples/grpc/nodejs/samples/subscribing-to-streams/index.js#subscribe-to-stream
</xode-block>
</xode-group>

The provided handler will be called for every event in the stream.

## Subscribing to `$all`

Subscribing to `$all` is much the same as subscribing to a single stream. The handler will be called for every event written after the starting position.

<xode-group>
<xode-block title="C#">

<<< @/docs/clients/dotnet/generated/v20.6.1/samples/subscribing-to-streams/Program.cs#subscribe-to-all
</xode-block>
<xode-block title="NodeJS">

<<< @/samples/grpc/nodejs/samples/subscribing-to-streams/index.js#subscribe-to-all
</xode-block>
</xode-group>

## Subscribing from a specific position

Expand All @@ -39,7 +57,16 @@ To subscribe to a stream from a specific position, you need to provide a `Stream

The following subscribes to the stream "some-stream" at position 20, this means that events 21 and onward will be handled:

<xode-group>
<xode-block title="C#">

<<< @/docs/clients/dotnet/generated/v20.6.1/samples/subscribing-to-streams/Program.cs#subscribe-to-stream-from-position
</xode-block>
<xode-block title="NodeJS">

<<< @/samples/grpc/nodejs/samples/subscribing-to-streams/index.js#subscribe-to-stream-from-position
</xode-block>
</xode-group>

### Subscribing to $all

Expand All @@ -49,17 +76,44 @@ The corresponding `$all` subscription will subscribe from the event after the on

Please note that this position will need to be a legitimate position in `$all`.

<xode-group>
<xode-block title="C#">

<<< @/docs/clients/dotnet/generated/v20.6.1/samples/subscribing-to-streams/Program.cs#subscribe-to-all-from-position
</xode-block>
<xode-block title="NodeJS">

<<< @/samples/grpc/nodejs/samples/subscribing-to-streams/index.js#subscribe-to-all-from-position
</xode-block>
</xode-group>

## Subscribing to a stream for live updates

You can subscribe to a stream to get live updates by subscribing to the end of the stream :

<xode-group>
<xode-block title="C#">

<<< @/docs/clients/dotnet/generated/v20.6.1/samples/subscribing-to-streams/Program.cs#subscribe-to-stream-live
</xode-block>
<xode-block title="NodeJS">

<<< @/samples/grpc/nodejs/samples/subscribing-to-streams/index.js#subscribe-to-stream-live
</xode-block>
</xode-group>

And the same works with `$all` :

<xode-group>
<xode-block title="C#">

<<< @/docs/clients/dotnet/generated/v20.6.1/samples/subscribing-to-streams/Program.cs#subscribe-to-all-live
</xode-block>
<xode-block title="NodeJS">

<<< @/samples/grpc/nodejs/samples/subscribing-to-streams/index.js#subscribe-to-all-live
</xode-block>
</xode-group>

This won't read through the history of the stream, but will rather notify the handler when a new event appears in the respective stream.

Expand All @@ -73,7 +127,16 @@ Filtered subscriptions make it easier and faster to subscribe to all events of a

When reading a stream you can specify whether to resolve link tos or not. By default, link-to events are not resolved. You can set this with the `resolveLinkTos` parameter :

<xode-group>
<xode-block title="C#">

<<< @/docs/clients/dotnet/generated/v20.6.1/samples/subscribing-to-streams/Program.cs#subscribe-to-stream-resolving-linktos
</xode-block>
<xode-block title="NodeJS">

<<< @/samples/grpc/nodejs/samples/subscribing-to-streams/index.js#subscribe-to-stream-resolving-linktos
</xode-block>
</xode-group>

## Dropped subscriptions

Expand All @@ -93,19 +156,46 @@ The possible reasons for a subscription dropping are :

You can start from where you left off by keeping a record of the last processed event and continuing from there :

<xode-group>
<xode-block title="C#">

<<< @/docs/clients/dotnet/generated/v20.6.1/samples/subscribing-to-streams/Program.cs#subscribe-to-stream-subscription-dropped
</xode-block>
<xode-block title="NodeJS">

<!--TODO -->
</xode-block>
</xode-group>

When subscribed to `$all` you want to keep the position of the event in the `$all` stream :

<xode-group>
<xode-block title="C#">

<<< @/docs/clients/dotnet/generated/v20.6.1/samples/subscribing-to-streams/Program.cs#subscribe-to-all-subscription-dropped
</xode-block>
<xode-block title="NodeJS">

<!--TODO -->
</xode-block>
</xode-group>

## Filter options

Subscriptions to `$all` can include a filter option. This will only notify the event handler if the event matches the provided filter.

A simple stream prefix filter looks like this :

<xode-group>
<xode-block title="C#">

<<< @/docs/clients/dotnet/generated/v20.6.1/samples/subscribing-to-streams/Program.cs#stream-prefix-filtered-subscription
</xode-block>
<xode-block title="NodeJS">

<<< @/samples/grpc/nodejs/samples/subscribing-to-streams/index.js#stream-prefix-filtered-subscription
</xode-block>
</xode-group>

The filtering api is described more in-depth in the filtering section.

Expand All @@ -115,4 +205,14 @@ The user creating a subscription must have read access to the stream it's subscr

You can provide user credentials to be used by the subscription as follows. This will override the default credentials set on the connection.

<xode-group>
<xode-block title="C#">

<<< @/docs/clients/dotnet/generated/v20.6.1/samples/subscribing-to-streams/Program.cs#overriding-user-credentials
</xode-block>
<xode-block title="NodeJS">

<<< @/samples/grpc/nodejs/samples/subscribing-to-streams/index.js#overriding-user-credentials
</xode-block>
</xode-group>

30 changes: 15 additions & 15 deletions samples/grpc/nodejs/samples/reading-events/index.js
Expand Up @@ -9,8 +9,8 @@ import {
export async function readFromStream(client) {
// region read-from-stream
const events = await client.readEventsFromStream("some-stream", 10, {
fromRevision: START,
direction: FORWARD,
fromRevision: START,
});
// endregion read-from-stream

Expand All @@ -26,8 +26,8 @@ export async function readFromStream(client) {
export async function readFromStreamPosition(client) {
// region read-from-stream-position
const events = await client.readEventsFromStream("some-stream", 20, {
fromRevision: BigInt(10),
direction: FORWARD,
fromRevision: BigInt(10),
});
// endregion read-from-stream-position

Expand All @@ -45,8 +45,8 @@ export async function readFromStreamPositionCheck(client) {
let events = [];
try {
events = await client.readEventsFromStream("some-stream", 20, {
fromRevision: BigInt(10),
direction: FORWARD,
fromRevision: BigInt(10),
});
} catch (error) {
if (error.type == ErrorType.STREAM_NOT_FOUND) return;
Expand All @@ -63,10 +63,10 @@ export async function readFromStreamPositionCheck(client) {
}

export async function readFromStreamBackwards(client) {
// region read-from-stream
// region reading-backwards
const events = await client.readEventsFromStream("some-stream", 10, {
fromRevision: END,
direction: BACKWARD,
fromRevision: END,
});

for (var resolvedEvent of events) {
Expand All @@ -79,9 +79,9 @@ export async function readFromStreamBackwards(client) {

export async function readFromAllStream(client) {
// region read-from-all-stream
const events = await client.readEventsFromStream(10, {
fromPosition: START,
const events = await client.readAllEvents(10, {
direction: FORWARD,
fromPosition: START,
});
// endregion read-from-all-stream

Expand All @@ -96,9 +96,9 @@ export async function readFromAllStream(client) {

export async function ignoreSystemEvents(client) {
// region ignore-system-events
const events = await client.readEventsFromStream(10, {
fromPosition: START,
const events = await client.readAllEvents(10, {
direction: FORWARD,
fromPosition: START,
});

for (var resolvedEvent of events) {
Expand All @@ -114,9 +114,9 @@ export async function ignoreSystemEvents(client) {

export async function readFromAllStreamBackwards(client) {
// region read-from-all-stream-backwards
const events = await client.readEventsFromStream(10, {
fromPosition: END,
const events = await client.readAllEvents(10, {
direction: BACKWARD,
fromPosition: END,
});
// endregion read-from-all-stream-backwards

Expand All @@ -131,9 +131,9 @@ export async function readFromAllStreamBackwards(client) {

export async function filterOutSystemEvents(client) {
// region filter-out-system-events
const events = await client.readEventsFromStream(10, {
fromPosition: START,
const events = await client.readAllEvents(10, {
direction: FORWARD,
fromPosition: START,
});

for (var resolvedEvent of events) {
Expand All @@ -149,9 +149,9 @@ export async function filterOutSystemEvents(client) {

export async function readFromAllStreamResolvingLinkTos(client) {
// region read-from-all-stream-resolving-link-Tos
const events = await client.readEventsFromStream(10, {
fromPosition: END,
const events = await client.readAllEvents(10, {
direction: BACKWARD,
fromPosition: END,
resolveLinks: true,
});
// endregion read-from-all-stream-resolving-link-Tos
Expand Down
Expand Up @@ -24,27 +24,22 @@ export async function subscribeToStreamFromPosition(client) {
// region subscribe-to-stream-from-position
const subscription = client
.subscribeToStream("some-stream", { fromRevision: BigInt(20) })
.on("data", function (resolvedEvent) {
console.log(
`Received event ${resolvedEvent.event.revision}@${resolvedEvent.event.streamId}`
);
handleEvent(resolvedEvent);
});
.on("data", handleEvent);
// endregion subscribe-to-stream-from-position
}

export async function subscribeToStreamLive(client) {
// region subscribe-to-stream-from-position
// region subscribe-to-stream-live
const subscription = client
.subscribeToStream("some-stream", { fromRevision: END })
.on("data", handleEvent);
// endregion subscribe-to-stream-from-position
// endregion subscribe-to-stream-live
}

export async function subscribeToStreamResolvingLinkTos(client) {
// region subscribe-to-stream-resolving-linktos
const subscription = client
.subscribeToStream("some-stream", {
.subscribeToStream("$et-myEventType", {
fromRevision: START,
resolveLinks: true,
})
Expand All @@ -67,9 +62,6 @@ export async function subscribeToStreamSubscriptionDropped(client) {
// endregion subscribe-to-stream-resolving-linktos
}

function handleEvent(event) {
console.log(event);
}
// #region subscribe-to-stream-subscription-dropped
// var checkpoint = StreamPosition.Start;
// await client.SubscribeToStreamAsync(
Expand All @@ -91,21 +83,14 @@ function handleEvent(event) {

export async function subscribeToAll(client) {
// region subscribe-to-all
const subscription = client
.subscribeToAll()
.on("data", function (resolvedEvent) {
console.log(
`Received event ${resolvedEvent.event.revision}@${resolvedEvent.event.streamId}`
);
handleEvent(resolvedEvent);
});
const subscription = client.subscribeToAll().on("data", handleEvent);
// endregion subscribe-to-all
}

export async function subscribeToAllFromPosition(client) {
// region subscribe-to-all-from-position
const subscription = client
.subscribeToAll({ fromRevision: BigInt(20) })
.subscribeToAll({ fromRevision: BigInt(1056) })
.on("data", function (resolvedEvent) {
console.log(
`Received event ${resolvedEvent.event.revision}@${resolvedEvent.event.streamId}`
Expand All @@ -118,7 +103,7 @@ export async function subscribeToAllFromPosition(client) {
export async function subscribeToAllLive(client) {
// region subscribe-to-all-live
const subscription = client
.subscribeToStream({ fromRevision: END })
.subscribeToAll({ fromRevision: END })
.on("data", handleEvent);
// endregion subscribe-to-all-live
}
Expand Down Expand Up @@ -178,3 +163,7 @@ export async function subscribeToAllOverridingUserCredentials(client) {
.on("data", handleEvent);
// endregion overriding-user-credentials
}

function handleEvent(event) {
console.log(event);
}

0 comments on commit 56bf59f

Please sign in to comment.