Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 18 additions & 21 deletions Samples/Basic/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,29 @@ const client = Client
.forMicroservice('7a6155dd-9109-4488-8f6f-c57fe4b65bfb', microservice => {
microservice.withVersion(Version.first);
})
.withEventStore(eventStore => {
eventStore.withFilters(filterBuilder => {
filterBuilder.createPrivateFilter('79e12ab3-2751-47e1-b959-d898dc4d6ee8', fb => {
fb
.handle((event: any, context: EventContext) => {
return new Promise((resolve, reject) => {
console.log('Filtering event', event);
});
.withFilters(filterBuilder =>
filterBuilder
.createPrivateFilter('79e12ab3-2751-47e1-b959-d898dc4d6ee8', fb =>
fb.handle((event: any, context: EventContext) => {
return new Promise((resolve, reject) => {
console.log('Filtering event', event);
});
});
filterBuilder.createPublicFilter('2c087657-b318-40b1-ae92-a400de44e507', fb => {
fb
.handle((event: any, context: EventContext) => {
return new PartitionedFilterResult(true, PartitionId.unspecified);
});
});
});
})
})
)
.createPublicFilter('2c087657-b318-40b1-ae92-a400de44e507', fb =>
fb.handle((event: any, context: EventContext) => {
return new PartitionedFilterResult(true, PartitionId.unspecified);
})
)
)
.build();


client.executionContextManager.forTenant('900893e7-c4cc-4873-8032-884e965e4b97');


const event = new MyEvent();
event.anInteger = 42;
event.aString = 'Forty two';

client.eventStore.commitPublic(event, 'd8cb7301-4bec-4451-a72b-2db53c6dc05d');
client
.eventStore
.forTenant('900893e7-c4cc-4873-8032-884e965e4b97')
.commitPublic(event, 'd8cb7301-4bec-4451-a72b-2db53c6dc05d');
7 changes: 4 additions & 3 deletions Samples/Container/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ const client = Client
.withLogging(_ => _.useWinston(_ => _.level = 'debug'))
.build();

client.executionContextManager.forTenant('900893e7-c4cc-4873-8032-884e965e4b97');

const event = new MyEvent();
event.anInteger = 42;
event.aString = 'Forty two';
client.eventStore.commit(event, 'd8cb7301-4bec-4451-a72b-2db53c6dc05d');
client
.eventStore
.forTenant('900893e7-c4cc-4873-8032-884e965e4b97')
.commit(event, 'd8cb7301-4bec-4451-a72b-2db53c6dc05d');
7 changes: 4 additions & 3 deletions Samples/EventHorizon/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ const client = Client
})
.build();

client.executionContextManager.forTenant('900893e7-c4cc-4873-8032-884e965e4b97');

const event = new MyEvent();
event.anInteger = 42;
event.aString = 'Forty two';

client.eventStore.commitPublic(event, 'd8cb7301-4bec-4451-a72b-2db53c6dc05d');
client
.eventStore
.forTenant('900893e7-c4cc-4873-8032-884e965e4b97')
.commitPublic(event, 'd8cb7301-4bec-4451-a72b-2db53c6dc05d');
10 changes: 5 additions & 5 deletions Source/eventHorizon/EventHorizons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import { Guid } from '@dolittle/rudiments';
import { Logger } from 'winston';
import * as grpc from 'grpc';
import { IExecutionContextManager } from '@dolittle/sdk.execution';
import { ExecutionContext } from '@dolittle/sdk.execution';
import { callContexts, failures, guids } from '@dolittle/sdk.protobuf';
import { SubscriptionsClient } from '@dolittle/runtime.contracts/Runtime/EventHorizon/Subscriptions_grpc_pb';
import { Subscription as PbSubscription, SubscriptionResponse as PbSubscriptionResponse } from '@dolittle/runtime.contracts/Runtime/EventHorizon/Subscriptions_pb';
Expand All @@ -25,14 +25,14 @@ export class EventHorizons implements IEventHorizons {
/**
* Initializes a new instance of {@link EventHorizons}.
* @param {SubscriptionsClient} subscriptionsClient The runtime client for working with subscriptions.
* @param {IExecutionContextManager} executionContextManager For Managing execution context.
* @param {ExecutionContext} executionContext The execution context.
* @param {TenantWithSubscriptions[]} tenantSubscriptions Tenant subscriptions to connect.
* @param {SubscriptionCallbacks} callbacks Callbacks for handling responses of subscribing.
* @param {Logger} logger Logger for logging;
*/
constructor(
private _subscriptionsClient: SubscriptionsClient,
private _executionContextManager: IExecutionContextManager,
private _executionContext: ExecutionContext,
readonly subscriptions: TenantWithSubscriptions[],
readonly callbacks: SubscriptionCallbacks,
private _logger: Logger) {
Expand All @@ -56,11 +56,11 @@ export class EventHorizons implements IEventHorizons {
const consumerTenant = tenantWithSubscriptions.tenant;

for (const subscription of tenantWithSubscriptions.subscriptions) {
this._executionContextManager.forTenant(consumerTenant);
const executionContext = this._executionContext.forTenant(consumerTenant.value);

this._logger.debug(`Subscribing to events from ${subscription.partition} in ${subscription.stream} of ${subscription.tenant} in ${subscription.microservice} for ${consumerTenant} into ${subscription.scope}`);

const callContext = callContexts.toProtobuf(this._executionContextManager.current);
const callContext = callContexts.toProtobuf(executionContext);
callContext.setHeadid(guids.toProtobuf(Guid.create()));

const pbSubscription = new PbSubscription();
Expand Down
8 changes: 4 additions & 4 deletions Source/eventHorizon/EventHorizonsBuilder.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

import { TenantId, IExecutionContextManager } from '@dolittle/sdk.execution';
import { TenantId, ExecutionContext } from '@dolittle/sdk.execution';

import { SubscriptionsClient } from '@dolittle/runtime.contracts/Runtime/EventHorizon/Subscriptions_grpc_pb';
import { Logger } from 'winston';
Expand Down Expand Up @@ -70,15 +70,15 @@ export class EventHorizonsBuilder {
/**
* Build all configured {@link TenantSubscriptions}
* @param {SubscriptionsClient} subscriptionsClient The runtime client for working with subscriptions.
* @param {IExecutionContextManager} executionContextManager For Managing execution context.
* @param {ExecutionContext} executionContext The execution context.
* @param {Logger} logger Logger for logging;
* @returns {TenantSubscriptions[]}
*/
build(subscriptionsClient: SubscriptionsClient, executionContextManager: IExecutionContextManager, logger: Logger): IEventHorizons {
build(subscriptionsClient: SubscriptionsClient, executionContext: ExecutionContext, logger: Logger): IEventHorizons {
const tenantSubscriptions = this._tenantSubscriptionsBuilders.map(_ => _.build());
return new EventHorizons(
subscriptionsClient,
executionContextManager,
executionContext,
tenantSubscriptions,
this.callbacks,
logger);
Expand Down
11 changes: 6 additions & 5 deletions Source/events.filtering/EventFiltersBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

import { Logger } from 'winston';

import { Guid } from '@dolittle/rudiments';

import { IArtifacts } from '@dolittle/sdk.artifacts';
import { IExecutionContextManager } from '@dolittle/sdk.execution';
import { ExecutionContext } from '@dolittle/sdk.execution';
import { Cancellation } from '@dolittle/sdk.resilience';

import { FiltersClient } from '@dolittle/runtime.contracts/Runtime/Events.Processing/Filters_grpc_pb';
Expand All @@ -13,7 +15,6 @@ import { FilterId } from './FilterId';
import { Filters } from './Filters';
import { IFilters } from './IFilters';
import { PublicEventFilterBuilder } from './PublicEventFilterBuilder';
import { Guid } from '@dolittle/rudiments';
import { PrivateEventFilterBuilder } from './PrivateEventFilterBuilder';


Expand Down Expand Up @@ -63,18 +64,18 @@ export class EventFiltersBuilder {
*/
build(
client: FiltersClient,
executionContextManager: IExecutionContextManager,
executionContext: ExecutionContext,
artifacts: IArtifacts,
logger: Logger,
cancellation: Cancellation): IFilters {
const filters = new Filters(logger);

for (const privateFilterBuilder of this._privateFilterBuilders) {
const filterProcessor = privateFilterBuilder.build(client, executionContextManager, artifacts, logger);
const filterProcessor = privateFilterBuilder.build(client, executionContext, artifacts, logger);
filters.register(filterProcessor, cancellation);
}
for (const publicFilterBuilder of this._publicFilterBuilders) {
const filterProcessor = publicFilterBuilder.build(client, executionContextManager, artifacts, logger);
const filterProcessor = publicFilterBuilder.build(client, executionContext, artifacts, logger);
filters.register(filterProcessor, cancellation);
}

Expand Down
12 changes: 8 additions & 4 deletions Source/events.filtering/Internal/EventFilterProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Logger } from 'winston';
import { Guid } from '@dolittle/rudiments';
import { IArtifacts } from '@dolittle/sdk.artifacts';
import { EventContext, ScopeId } from '@dolittle/sdk.events';
import { IExecutionContextManager } from '@dolittle/sdk.execution';
import { ExecutionContext } from '@dolittle/sdk.execution';
import { guids } from '@dolittle/sdk.protobuf';
import { Cancellation } from '@dolittle/sdk.resilience';
import { IReverseCallClient, ReverseCallClient, reactiveDuplex } from '@dolittle/sdk.services';
Expand All @@ -25,7 +25,7 @@ export class EventFilterProcessor extends FilterEventProcessor<FilterRegistratio
private _scopeId: ScopeId,
private _callback: FilterEventCallback,
private _client: FiltersClient,
private _executionContextManager: IExecutionContextManager,
private _executionContext: ExecutionContext,
artifacts: IArtifacts,
logger: Logger
) {
Expand All @@ -39,7 +39,11 @@ export class EventFilterProcessor extends FilterEventProcessor<FilterRegistratio
return registerArguments;
}

protected createClient(registerArguments: FilterRegistrationRequest, callback: (request: FilterEventRequest) => Promise<FilterResponse>, pingTimeout: number, cancellation: Cancellation): IReverseCallClient<FilterRegistrationResponse> {
protected createClient(
registerArguments: FilterRegistrationRequest,
callback: (request: FilterEventRequest, executionContext: ExecutionContext) => Promise<FilterResponse>,
pingTimeout: number,
cancellation: Cancellation): IReverseCallClient<FilterRegistrationResponse> {
return new ReverseCallClient<FilterClientToRuntimeMessage, FilterRuntimeToClientMessage, FilterRegistrationRequest, FilterRegistrationResponse, FilterEventRequest, FilterResponse> (
(requests, cancellation) => reactiveDuplex(this._client, this._client.connect, requests, cancellation),
FilterClientToRuntimeMessage,
Expand All @@ -52,7 +56,7 @@ export class EventFilterProcessor extends FilterEventProcessor<FilterRegistratio
(response, context) => response.setCallcontext(context),
(message) => message.getPing(),
(message, pong) => message.setPong(pong),
this._executionContextManager,
this._executionContext,
registerArguments,
pingTimeout,
callback,
Expand Down
6 changes: 3 additions & 3 deletions Source/events.filtering/Internal/FilterEventProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { IArtifacts } from '@dolittle/sdk.artifacts';
import { EventContext, EventSourceId } from '@dolittle/sdk.events';
import { EventProcessor } from '@dolittle/sdk.events.processing';
import { MissingEventInformation } from '@dolittle/sdk.events.handling';
import { ExecutionContext } from '@dolittle/sdk.execution';
import { guids, executionContexts, artifacts } from '@dolittle/sdk.protobuf';

import { Failure } from '@dolittle/runtime.contracts/Fundamentals/Protobuf/Failure_pb';
Expand Down Expand Up @@ -35,7 +36,7 @@ export abstract class FilterEventProcessor<TRegisterArguments, TResponse> extend
return request.getRetryprocessingstate();
}

protected async handle(request: FilterEventRequest): Promise<TResponse> {
protected async handle(request: FilterEventRequest, executionContext: ExecutionContext): Promise<TResponse> {
if (!request.getEvent()) {
throw new MissingEventInformation('no event in FilterEventRequest');
}
Expand All @@ -61,8 +62,7 @@ export abstract class FilterEventProcessor<TRegisterArguments, TResponse> extend
pbSequenceNumber,
EventSourceId.from(guids.toSDK(pbEventSourceId)),
DateTime.fromJSDate(pbOccurred.toDate()),
executionContexts.toSDK(pbExecutionContext)
);
executionContext);

let event = JSON.parse(pbEvent.getContent());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Logger } from 'winston';
import { Guid } from '@dolittle/rudiments';
import { IArtifacts } from '@dolittle/sdk.artifacts';
import { EventContext, ScopeId } from '@dolittle/sdk.events';
import { IExecutionContextManager } from '@dolittle/sdk.execution';
import { ExecutionContext } from '@dolittle/sdk.execution';
import { guids } from '@dolittle/sdk.protobuf';
import { Cancellation } from '@dolittle/sdk.resilience';
import { IReverseCallClient, ReverseCallClient, reactiveDuplex } from '@dolittle/sdk.services';
Expand All @@ -26,7 +26,7 @@ export class PartitionedEventFilterProcessor extends FilterEventProcessor<Partit
private _scopeId: ScopeId,
private _callback: PartitionedFilterEventCallback,
private _client: FiltersClient,
private _executionContextManager: IExecutionContextManager,
private _executionContext: ExecutionContext,
artifacts: IArtifacts,
logger: Logger
) {
Expand All @@ -40,7 +40,11 @@ export class PartitionedEventFilterProcessor extends FilterEventProcessor<Partit
return registerArguments;
}

protected createClient(registerArguments: PartitionedFilterRegistrationRequest, callback: (request: FilterEventRequest) => Promise<PartitionedFilterResponse>, pingTimeout: number, cancellation: Cancellation): IReverseCallClient<FilterRegistrationResponse> {
protected createClient(
registerArguments: PartitionedFilterRegistrationRequest,
callback: (request: FilterEventRequest, executionContext: ExecutionContext) => Promise<PartitionedFilterResponse>,
pingTimeout: number,
cancellation: Cancellation): IReverseCallClient<FilterRegistrationResponse> {
return new ReverseCallClient<PartitionedFilterClientToRuntimeMessage, FilterRuntimeToClientMessage, PartitionedFilterRegistrationRequest, FilterRegistrationResponse, FilterEventRequest, PartitionedFilterResponse> (
(requests, cancellation) => reactiveDuplex(this._client, this._client.connectPartitioned, requests, cancellation),
PartitionedFilterClientToRuntimeMessage,
Expand All @@ -53,7 +57,7 @@ export class PartitionedEventFilterProcessor extends FilterEventProcessor<Partit
(response, context) => response.setCallcontext(context),
(message) => message.getPing(),
(message, pong) => message.setPong(pong),
this._executionContextManager,
this._executionContext,
registerArguments,
pingTimeout,
callback,
Expand Down
12 changes: 8 additions & 4 deletions Source/events.filtering/Internal/PublicEventFilterProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Logger } from 'winston';
import { Guid } from '@dolittle/rudiments';
import { IArtifacts } from '@dolittle/sdk.artifacts';
import { EventContext } from '@dolittle/sdk.events';
import { IExecutionContextManager } from '@dolittle/sdk.execution';
import { ExecutionContext } from '@dolittle/sdk.execution';
import { guids } from '@dolittle/sdk.protobuf';
import { Cancellation } from '@dolittle/sdk.resilience';
import { IReverseCallClient, ReverseCallClient, reactiveDuplex } from '@dolittle/sdk.services';
Expand All @@ -26,7 +26,7 @@ export class PublicEventFilterProcessor extends FilterEventProcessor<PublicFilte
filterId: FilterId,
private _callback: PartitionedFilterEventCallback,
private _client: FiltersClient,
private _executionContextManager: IExecutionContextManager,
private _executionContext: ExecutionContext,
artifacts: IArtifacts,
logger: Logger
) {
Expand All @@ -39,7 +39,11 @@ export class PublicEventFilterProcessor extends FilterEventProcessor<PublicFilte
return registerArguments;
}

protected createClient(registerArguments: PublicFilterRegistrationRequest, callback: (request: FilterEventRequest) => Promise<PartitionedFilterResponse>, pingTimeout: number, cancellation: Cancellation): IReverseCallClient<FilterRegistrationResponse> {
protected createClient(
registerArguments: PublicFilterRegistrationRequest,
callback: (request: FilterEventRequest, executionContext: ExecutionContext) => Promise<PartitionedFilterResponse>,
pingTimeout: number,
cancellation: Cancellation): IReverseCallClient<FilterRegistrationResponse> {
return new ReverseCallClient<PublicFilterClientToRuntimeMessage, FilterRuntimeToClientMessage, PublicFilterRegistrationRequest, FilterRegistrationResponse, FilterEventRequest, PartitionedFilterResponse> (
(requests, cancellation) => reactiveDuplex(this._client, this._client.connectPublic, requests, cancellation),
PublicFilterClientToRuntimeMessage,
Expand All @@ -52,7 +56,7 @@ export class PublicEventFilterProcessor extends FilterEventProcessor<PublicFilte
(response, context) => response.setCallcontext(context),
(message) => message.getPing(),
(message, pong) => message.setPong(pong),
this._executionContextManager,
this._executionContext,
registerArguments,
pingTimeout,
callback,
Expand Down
8 changes: 4 additions & 4 deletions Source/events.filtering/PartitionedEventFilterBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { Logger } from 'winston';

import { IArtifacts } from '@dolittle/sdk.artifacts';
import { ScopeId } from '@dolittle/sdk.events';
import { IExecutionContextManager } from '@dolittle/sdk.execution';
import { ExecutionContext } from '@dolittle/sdk.execution';

import { FiltersClient } from '@dolittle/runtime.contracts/Runtime/Events.Processing/Filters_grpc_pb';

Expand Down Expand Up @@ -53,7 +53,7 @@ export class PartitionedEventFilterBuilder {
* @param {FilterId} filterId Unique identifier for the filter.
* @param {ScopeId} scopeId The identifier of the scope the filter runs on.
* @param {FiltersClient} client The client for working with the filters in the runtime.
* @param {IExecutionContextManager} executionContextManager Execution context manager for working with execution context.
* @param {ExecutionContext} executionContext Execution context.
* @param {IArtifacts} artifacts Artifacts for identifying artifacts.
* @param {Logger} logger Logger for logging.
* @returns {IFilterProcessor}
Expand All @@ -62,11 +62,11 @@ export class PartitionedEventFilterBuilder {
filterId: FilterId,
scopeId: ScopeId,
client: FiltersClient,
executionContextManager: IExecutionContextManager,
executionContext: ExecutionContext,
artifacts: IArtifacts,
logger: Logger): IFilterProcessor {
this.throwIfCallbackIsMissing(filterId, scopeId);
return new internal.PartitionedEventFilterProcessor(filterId, scopeId, this._callback!, client, executionContextManager, artifacts, logger);
return new internal.PartitionedEventFilterProcessor(filterId, scopeId, this._callback!, client, executionContext, artifacts, logger);
}

private throwIfCallbackIsMissing(filterId: FilterId, scopeId: ScopeId) {
Expand Down
Loading