Skip to content

Commit

Permalink
Issue x-cubed#3 - Catch-up subscriptions - update documentation and a…
Browse files Browse the repository at this point in the history
…lter code interfaces for conformity to existing ones (cosmetic changes only)
  • Loading branch information
Zach Blocker committed Jun 10, 2016
1 parent 824723c commit 932e61f
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 13 deletions.
35 changes: 34 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,19 @@ Subscribes to a stream to receive notifications as soon as an event is written t

Returns a Buffer containing a GUID that identifies the subscription, for use with unsubscribeStream().

### Connection.subscribeToStreamFrom()
Executes a catch-up subscription on the given stream, reading events from a given event number, and continuing with a live subscription when all historical events have been read.

* streamId - The name of the stream in the Event Store (string)
* fromEventNumber - Which event number to start after (if null, then from the beginning of the stream.)
* credentials - The user name and password needed for permission to subscribe to the stream.
* onEventAppeared - Callback for each event received (historical or live)
* onLiveProcessingStarted - Callback when historical events have been read and live events are about to be read.
* onDropped - Callback when subscription drops or is dropped.
* settings - Settings for the catch-up subscription.

Returns an instance representing the catch-up subscription (EventStoreStreamCatchUpSubscription).

### Connection.readAllEventsBackward() / Connection.readAllEventsForward()
Reads events from across all streams, in order (backward = newest first, forward = oldest first).

Expand Down Expand Up @@ -129,4 +142,24 @@ Passed to the onConfirmed callback used by subscribeToStream() when a subscripti
## ISubscriptionDropped interface
Passed to the onDropped callback used by subscribeToStream() when a subscription terminates, or cannot be established.

* reason - The reason why the subscription was dropped (enumeration, 0 = Unsubscribed, 1 = Access Denied)
* reason - The reason why the subscription was dropped (enumeration, 0 = Unsubscribed, 1 = Access Denied)

## CatchUpSubscriptionSettings class
A property bag of settings passed when creating a new catch-up subscription.

* maxLiveQueueSize - The maximum amount to buffer when processing from the live subscription.
* readBatchSize - The number of events to read per batch when reading historical events.
* debug - True if in debug mode.
* resolveLinkTos - Whether or not to resolve link events.

## EventStoreStreamCatchUpSubscription class
Represents a catch-up subscription to a single stream.

### EventStoreStreamCatchUpSubscription.start()
Initiate start of the catch-up subscription.

### EventStoreStreamCatchUpSubscription.stop()
Request to stop the catch-up subscription.

### EventStoreStreamCatchUpSubscription.getCorrelationId()
Get the subscription ID of the underlying Event Store subscription, in order to pass it back to the Connection object, for example.
104 changes: 103 additions & 1 deletion event-store-client.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,20 @@ declare module "event-store-client" {
*/
subscribeToStream(streamId: string, resolveLinkTos: boolean, onEventAppeared: (event: StoredEvent) => void, onConfirmed: (confirmation: ISubscriptionConfirmation) => void, onDropped: (dropped: ISubscriptionDropped) => void, credentials: ICredentials): Buffer;

/***
* Initiate catch-up subscription for one stream.
*
* @param streamId The stream ID (only if subscribing to a single stream)
* @param fromEventNumber Which event number to start after (if null, then from the beginning of the stream.)
* @param credentials User credentials for the operations.
* @param onEventAppeared Callback for each event received
* @param onLiveProcessingStarted Callback when read history phase finishes.
* @param onDropped Callback when subscription drops or is dropped.
* @param settings Settings for this subscription.
* @return The catch-up subscription instance.
*/
subscribeToStreamFrom(streamId: string, fromEventNumber: number, credentials: ICredentials, onEventAppeared: (event: StoredEvent) => void, onLiveProcessingStarted: () => void, onDropped: (EventStoreCatchUpSubscription, string, Error) => void, settings: CatchUpSubscriptionSettings): EventStoreStreamCatchUpSubscription;

/***
* Reads events from across all streams, in order from newest to oldest
* @param commitPosition The commit position to start from
Expand Down Expand Up @@ -234,5 +248,93 @@ declare module "event-store-client" {
* @param callback Invoked once the operation has been completed. Check the result to confirm it was successful.
*/
writeEvents(streamId: string, expectedVersion: number, requireMaster: boolean, events: Event[], credentials: ICredentials, callback: (completed: IWriteEventsCompleted) => void): void;
}
}

/***
* Configuration settings to pass when instantiating a catch-up subscription.
*/
export class CatchUpSubscriptionSettings {

/***
* Creates a new settings instance.
* @param maxLiveQueueSize The max amount to buffer when processing from live subscription.
* @param readBatchSize The number of events to read per batch when reading history
* @param debug True iff in debug mode
* @param resolveLinkTos Whether or not to resolve link events
*/
constructor(maxLiveQueueSize: number, readBatchSize: number, debug: boolean, resolveLinkTos: boolean);

/***
* The max amount to buffer when processing from live subscription.
*/
maxLiveQueueSize: number;

/***
* The number of events to read per batch when reading history
*/
readBatchSize: number;

/***
* True iff in debug mode
*/
debug: boolean;

/***
* Whether or not to resolve link events
*/
resolveLinkTos: boolean;
}

/**
* Abstract base class representing catch-up subscriptions.
*/
export class EventStoreCatchUpSubscription {

/***
* Creates a new EventStoreCatchUpSubscription instance.
* @param connection The connection to Event Store
* @param streamId The stream name (only if subscribing to a single stream)
* @param userCredentials User credentials for the operations.
* @param eventAppeared Callback for each event received
* @param liveProcessingStarted Callback when read history phase finishes.
* @param subscriptionDropped Callback when subscription drops or is dropped.
* @param settings Settings for this subscription.
*/
constructor(connection: Connection, streamId: string, userCredentials: ICredentials, eventAppeared: (event: StoredEvent) => void, liveProcessingStarted: () => void, subscriptionDropped: (EventStoreCatchUpSubscription, string, Error) => void, settings: CatchUpSubscriptionSettings);

/***
* Provides the correlation ID of the Event Store subscription underlying the catch-up subscription.
* @returns Correlation ID of the Event Store subscription
*/
getCorrelationId(): string;

/***
* Attempts to start the subscription.
*/
start(): void;

/***
* Attempts to stop the subscription.
*/
stop(): void;
}

/**
* Catch-up subscription for one stream.
*/
export class EventStoreStreamCatchUpSubscription extends EventStoreCatchUpSubscription {

/***
* Creates a new EventStoreStreamCatchUpSubscription instance.
* @param connection The connection to Event Store
* @param streamId The stream name (only if subscribing to a single stream)
* @param fromEventNumberExclusive Which event number to start after (if null, then from the beginning of the stream.)
* @param userCredentials User credentials for the operations.
* @param eventAppeared Callback for each event received
* @param liveProcessingStarted Callback when read history phase finishes.
* @param subscriptionDropped Callback when subscription drops or is dropped.
* @param settings Settings for this subscription.
*/
constructor(connection: Connection, streamId: string, fromEventNumberExclusive: number, userCredentials: ICredentials, eventAppeared: (event: StoredEvent) => void, liveProcessingStarted: () => void, subscriptionDropped: (EventStoreCatchUpSubscription, string, Error) => void, settings: CatchUpSubscriptionSettings);
}
}
6 changes: 3 additions & 3 deletions lib/catchUpSubscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
* @constructor
* @param {Connection} connection The connection to Event Store
* @param {string} streamId The stream name (only if subscribing to a single stream)
* @param userCredentials User credentials for the operations.
* @param {ICredentials} userCredentials User credentials for the operations.
* @param {function} eventAppeared Callback for each event received
* @param {function} liveProcessingStarted Callback when read history phase finishes.
* @param {function} subscriptionDropped Callback when subscription drops or is dropped.
Expand Down Expand Up @@ -307,8 +307,8 @@
* @constructor
* @param {Connection} connection The connection to Event Store
* @param {string} streamId The stream name (only if subscribing to a single stream)
* @param {number} fromEventNumberExclusive Which event number to start from (if null, then from the beginning of the stream.)
* @param userCredentials User credentials for the operations.
* @param {number} fromEventNumberExclusive Which event number to start after (if null, then from the beginning of the stream.)
* @param {ICredentials} userCredentials User credentials for the operations.
* @param {function} eventAppeared Callback for each event received
* @param {function} liveProcessingStarted Callback when read history phase finishes.
* @param {function} subscriptionDropped Callback when subscription drops or is dropped.
Expand Down
16 changes: 8 additions & 8 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -273,17 +273,17 @@ Connection.prototype.unsubscribeFromStream = function(correlationId, credentials
/**
* Initiate catch-up subscription for one stream.
*
* @param {string} streamName The stream name (only if subscribing to a single stream)
* @param {number} fromEventNumber Which event number to start from (if null, then from the beginning of the stream.)
* @param userCredentials User credentials for the operations.
* @param {function} eventAppeared Callback for each event received
* @param {function} liveProcessingStarted Callback when read history phase finishes.
* @param {function} subscriptionDropped Callback when subscription drops or is dropped.
* @param {string} streamId The stream ID (only if subscribing to a single stream)
* @param {number} fromEventNumber Which event number to start after (if null, then from the beginning of the stream.)
* @param {ICredentials} credentials User credentials for the operations.
* @param {function} onEventAppeared Callback for each event received
* @param {function} onLiveProcessingStarted Callback when read history phase finishes.
* @param {function} onDropped Callback when subscription drops or is dropped.
* @param {CatchUpSubscriptionSettings} settings Settings for this subscription.
*/
Connection.prototype.subscribeToStreamFrom = function (streamName, fromEventNumber, userCredentials, eventAppeared, liveProcessingStarted, subscriptionDropped, settings) {
Connection.prototype.subscribeToStreamFrom = function (streamId, fromEventNumber, credentials, onEventAppeared, onLiveProcessingStarted, onDropped, settings) {
if (!settings) settings = new CatchUpSubscription.Settings();
var subscription = new CatchUpSubscription.Stream(this, streamName, fromEventNumber, userCredentials, eventAppeared, liveProcessingStarted, subscriptionDropped, settings);
var subscription = new CatchUpSubscription.Stream(this, streamId, fromEventNumber, credentials, onEventAppeared, onLiveProcessingStarted, onDropped, settings);
subscription.start();
return subscription;
};
Expand Down

0 comments on commit 932e61f

Please sign in to comment.