Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event source concept #62

Merged
merged 2 commits into from Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Source/artifacts/package.json
Expand Up @@ -36,7 +36,7 @@
"@dolittle/concepts": "5.0.1",
"@dolittle/rudiments": "5.0.1",
"@dolittle/types": "5.0.1",
"@dolittle/runtime.contracts": "5.4.0"
"@dolittle/runtime.contracts": "6.0.0-eventsource.0"
},
"devDependencies": {
"@types/is-natural-number": "^4.0.0"
Expand Down
2 changes: 1 addition & 1 deletion Source/embeddings/Internal/EmbeddingProcessor.ts
Expand Up @@ -197,7 +197,7 @@ export class EmbeddingProcessor<TReadModel> extends ClientProcessor<EmbeddingId,
const embeddingProjectContext = new EmbeddingProjectContext(
pbStateType === ProjectionCurrentStateType.CREATED_FROM_INITIAL_STATE,
Key.from(pbKey),
EventSourceId.from(guids.toSDK(pbEventSourceId)),
EventSourceId.from(pbEventSourceId),
executionContext);

let event = JSON.parse(pbEvent.getContent());
Expand Down
4 changes: 2 additions & 2 deletions Source/embeddings/package.json
Expand Up @@ -34,9 +34,9 @@
},
"dependencies": {
"@dolittle/concepts": "5.0.1",
"@dolittle/contracts": "5.4.0",
"@dolittle/contracts": "6.0.0-eventsource.0",
"@dolittle/rudiments": "5.0.1",
"@dolittle/runtime.contracts": "5.4.0",
"@dolittle/runtime.contracts": "6.0.0-eventsource.0",
"@dolittle/types": "5.0.1",
"luxon": "1.24.1",
"@types/luxon": "1.24.1"
Expand Down
2 changes: 1 addition & 1 deletion Source/eventHorizon/EventHorizons.ts
Expand Up @@ -131,7 +131,7 @@ export class EventHorizons extends IEventHorizons {

const pbSubscription = new PbSubscription();
pbSubscription.setCallcontext(callContext);
pbSubscription.setPartitionid(guids.toProtobuf(subscription.partition.value));
pbSubscription.setPartitionid(subscription.partition.value);
pbSubscription.setScopeid(guids.toProtobuf(subscription.scope.value));
pbSubscription.setStreamid(guids.toProtobuf(subscription.stream.value));
pbSubscription.setTenantid(guids.toProtobuf(subscription.tenant.value));
Expand Down
4 changes: 2 additions & 2 deletions Source/eventHorizon/SubscriptionBuilderForProducerStream.ts
Expand Up @@ -33,9 +33,9 @@ export class SubscriptionBuilderForProducerStream {

/**
* Sets the producer stream to subscribe to events from.
* @param {PartitionId | Guid | string} tenant Stream to subscribe to events from.
* @param {PartitionId | string} partitionId Stream partition to subscribe to events from.
*/
fromProducerPartition(partitionId: PartitionId | Guid | string): SubscriptionBuilderForProducerPartition {
fromProducerPartition(partitionId: PartitionId | string): SubscriptionBuilderForProducerPartition {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guid here

this.throwIfProducerPartitionIsAlreadyDefined();
this._producerPartitionId = PartitionId.from(partitionId);
this._builder = new SubscriptionBuilderForProducerPartition(
Expand Down
2 changes: 1 addition & 1 deletion Source/eventHorizon/package.json
Expand Up @@ -34,7 +34,7 @@
},
"dependencies": {
"@dolittle/rudiments": "5.0.1",
"@dolittle/runtime.contracts": "5.4.0",
"@dolittle/runtime.contracts": "6.0.0-eventsource.0",
"@dolittle/sdk.events": "17.0.3",
"@dolittle/sdk.artifacts": "17.0.3",
"@dolittle/sdk.execution": "17.0.3",
Expand Down
2 changes: 1 addition & 1 deletion Source/events.filtering/Internal/FilterEventProcessor.ts
Expand Up @@ -58,7 +58,7 @@ export abstract class FilterEventProcessor<TRegisterArguments, TResponse> extend

const eventContext = new EventContext(
pbSequenceNumber,
EventSourceId.from(guids.toSDK(pbEventSourceId)),
EventSourceId.from(pbEventSourceId),
DateTime.fromJSDate(pbOccurred.toDate()),
executionContext);

Expand Down
Expand Up @@ -75,7 +75,7 @@ export class PartitionedEventFilterProcessor extends FilterEventProcessor<Partit

const response = new PartitionedFilterResponse();
response.setIsincluded(result.shouldInclude);
response.setPartitionid(guids.toProtobuf(result.partitionId.value));
response.setPartitionid(result.partitionId.value);
return response;
}
}
Expand Up @@ -74,7 +74,7 @@ export class PublicEventFilterProcessor extends FilterEventProcessor<PublicFilte

const response = new PartitionedFilterResponse();
response.setIsincluded(result.shouldInclude);
response.setPartitionid(guids.toProtobuf(result.partitionId.value));
response.setPartitionid(result.partitionId.value);
return response;
}
}
4 changes: 2 additions & 2 deletions Source/events.filtering/PartitionedFilterResult.ts
Expand Up @@ -13,9 +13,9 @@ export class PartitionedFilterResult {
/**
* Initializes a new instance of {@link PartitionedFilterResult}.
* @param {boolean} shouldInclude Tells whether or not the event should be included.
* @param {PartitionId | Guid | string} partitionId Tells which partition the event should be partitioned into.
* @param {PartitionId | string} partitionId Tells which partition the event should be partitioned into.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guid here

*/
constructor(readonly shouldInclude: boolean, partitionId: PartitionId | Guid | string) {
constructor(readonly shouldInclude: boolean, partitionId: PartitionId | string) {
this.partitionId = PartitionId.from(partitionId);
}
}
4 changes: 2 additions & 2 deletions Source/events.filtering/package.json
Expand Up @@ -32,9 +32,9 @@
"lint:fix": "gulp lint-fix --gulpfile ../../node_modules/@dolittle/typescript.build/Gulpfile.js"
},
"dependencies": {
"@dolittle/contracts": "5.4.0",
"@dolittle/contracts": "6.0.0-eventsource.0",
"@dolittle/rudiments": "5.0.1",
"@dolittle/runtime.contracts": "5.4.0",
"@dolittle/runtime.contracts": "6.0.0-eventsource.0",
"@dolittle/sdk.artifacts": "17.0.3",
"@dolittle/sdk.events": "17.0.3",
"@dolittle/sdk.events.handling": "17.0.3",
Expand Down
2 changes: 1 addition & 1 deletion Source/events.handling/Internal/EventHandlerProcessor.ts
Expand Up @@ -128,7 +128,7 @@ export class EventHandlerProcessor extends internal.EventProcessor<EventHandlerI

const eventContext = new EventContext(
pbSequenceNumber,
EventSourceId.from(guids.toSDK(pbEventSourceId)),
EventSourceId.from(pbEventSourceId),
DateTime.fromJSDate(pbOccurred.toDate()),
executionContext);

Expand Down
4 changes: 2 additions & 2 deletions Source/events.handling/package.json
Expand Up @@ -33,9 +33,9 @@
"lint:fix": "gulp lint-fix --gulpfile ../../node_modules/@dolittle/typescript.build/Gulpfile.js"
},
"dependencies": {
"@dolittle/contracts": "5.4.0",
"@dolittle/contracts": "6.0.0-eventsource.0",
"@dolittle/types": "5.0.1",
"@dolittle/runtime.contracts": "5.4.0",
"@dolittle/runtime.contracts": "6.0.0-eventsource.0",
"@dolittle/sdk.artifacts": "17.0.3",
"@dolittle/sdk.common": "17.0.3",
"@dolittle/sdk.events": "17.0.3",
Expand Down
4 changes: 2 additions & 2 deletions Source/events.processing/package.json
Expand Up @@ -32,10 +32,10 @@
"lint:fix": "gulp lint-fix --gulpfile ../../node_modules/@dolittle/typescript.build/Gulpfile.js"
},
"dependencies": {
"@dolittle/contracts": "5.4.0",
"@dolittle/contracts": "6.0.0-eventsource.0",
"@dolittle/concepts": "5.0.1",
"@dolittle/rudiments": "5.0.1",
"@dolittle/runtime.contracts": "5.4.0",
"@dolittle/runtime.contracts": "6.0.0-eventsource.0",
"@dolittle/sdk.artifacts": "17.0.3",
"@dolittle/sdk.events": "17.0.3",
"@dolittle/sdk.execution": "17.0.3",
Expand Down
6 changes: 3 additions & 3 deletions Source/events/EventConverters.ts
Expand Up @@ -40,7 +40,7 @@ export class EventConverters {
static getUncommittedEventFrom(event: any, eventSourceId: EventSourceId, eventType: EventType, isPublic: boolean): PbUncommittedEvent {
const uncommittedEvent = new PbUncommittedEvent();
uncommittedEvent.setArtifact(eventTypes.toProtobuf(eventType));
uncommittedEvent.setEventsourceid(guids.toProtobuf(eventSourceId.value));
uncommittedEvent.setEventsourceid(eventSourceId.value);
uncommittedEvent.setPublic(isPublic);
uncommittedEvent.setContent(JSON.stringify(event));
return uncommittedEvent;
Expand Down Expand Up @@ -113,7 +113,7 @@ export class EventConverters {
const committedEvent = new SdkCommittedEvent(
EventLogSequenceNumber.from(input.getEventlogsequencenumber()),
DateTime.fromJSDate((input.getOccurred()?.toDate() || new Date())),
EventSourceId.from(guids.toSDK(input.getEventsourceid())),
EventSourceId.from(input.getEventsourceid()),
executionContexts.toSDK(executionContext),
eventTypes.toSDK(input.getType()),
JSON.parse(input.getContent()),
Expand All @@ -139,7 +139,7 @@ export class EventConverters {
const committedEvent = new PbCommittedEvent();
committedEvent.setEventlogsequencenumber(input.eventLogSequenceNumber.value);
committedEvent.setOccurred(occurred);
committedEvent.setEventsourceid(guids.toProtobuf(input.eventSourceId.value));
committedEvent.setEventsourceid(input.eventSourceId.value);
committedEvent.setExecutioncontext(executionContexts.toProtobuf(input.executionContext));
committedEvent.setType(eventTypes.toProtobuf(input.type));
committedEvent.setContent(JSON.stringify(input.content));
Expand Down
12 changes: 6 additions & 6 deletions Source/events/EventSourceId.ts
Expand Up @@ -9,10 +9,10 @@ import { Guid } from '@dolittle/rudiments';
*
* @export
* @class EventSourceId
* @extends {ConceptAs<Guid, '@dolittle/sdk.events.EventSourceId'>}
* @extends {ConceptAs<string, '@dolittle/sdk.events.EventSourceId'>}
*/
export class EventSourceId extends ConceptAs<Guid, '@dolittle/sdk.events.EventSourceId'> {
constructor(id: Guid) {
export class EventSourceId extends ConceptAs<string, '@dolittle/sdk.events.EventSourceId'> {
constructor(id: string) {
super(id, '@dolittle/sdk.events.EventSourceId');
}

Expand All @@ -21,7 +21,7 @@ export class EventSourceId extends ConceptAs<Guid, '@dolittle/sdk.events.EventSo
* @returns {EventSourceId}
*/
static new(): EventSourceId {
return new EventSourceId(Guid.create());
return new EventSourceId(Guid.create().toString());
}

/**
Expand All @@ -31,8 +31,8 @@ export class EventSourceId extends ConceptAs<Guid, '@dolittle/sdk.events.EventSo
* @param {EventSourceId | Guid | string)} id
* @returns {EventSourceId}
*/
static from(id: EventSourceId | Guid | string): EventSourceId {
static from(id: EventSourceId | string): EventSourceId {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guid here

if (id instanceof EventSourceId) return id;
return new EventSourceId(Guid.as(id));
return new EventSourceId(id);
}
};
20 changes: 10 additions & 10 deletions Source/events/EventStore.ts
Expand Up @@ -58,31 +58,31 @@ export class EventStore extends IEventStore {
}

/** @inheritdoc */
commit(event: any, eventSourceId: EventSourceId | Guid | string, eventType?: EventType | EventTypeId | Guid | string, cancellation?: Cancellation): Promise<CommitEventsResult>;
commit(event: any, eventSourceId: EventSourceId | string, eventType?: EventType | EventTypeId | Guid | string, cancellation?: Cancellation): Promise<CommitEventsResult>;
commit(eventOrEvents: UncommittedEvent | UncommittedEvent[], cancellation?: Cancellation): Promise<CommitEventsResult>;
commit(eventOrEvents: any, eventSourceIdOrCancellation?: EventSourceId | Guid | string | Cancellation, eventType?: EventType | EventTypeId | Guid | string, cancellation?: Cancellation): Promise<CommitEventsResult> {
commit(eventOrEvents: any, eventSourceIdOrCancellation?: EventSourceId | string | Cancellation, eventType?: EventType | EventTypeId | Guid | string, cancellation?: Cancellation): Promise<CommitEventsResult> {
Comment on lines +61 to +63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guid here

if (this.isUncommittedEventOrEvents(eventOrEvents)) {
return this.commitInternal(this.asArray(eventOrEvents), eventSourceIdOrCancellation as Cancellation);
}
const eventSourceId = eventSourceIdOrCancellation as Guid | string;
const eventSourceId = eventSourceIdOrCancellation as string;
return this.commitInternal([this.toUncommittedEvent(eventOrEvents, eventSourceId, eventType, false)], cancellation);
}

/** @inheritdoc */
commitPublic(event: any, eventSourceId: EventSourceId | Guid | string, eventType?: EventType | EventTypeId | Guid | string, cancellation?: Cancellation): Promise<CommitEventsResult> {
commitPublic(event: any, eventSourceId: EventSourceId | string, eventType?: EventType | EventTypeId | Guid | string, cancellation?: Cancellation): Promise<CommitEventsResult> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guid here

const events: UncommittedEvent[] = [this.toUncommittedEvent(event, eventSourceId, eventType, true)];
return this.commitInternal(events, cancellation);
}

/** @inheritdoc */
/** @inheritdoc */
commitForAggregate(event: any, eventSourceId: EventSourceId | Guid | string, aggregateRootId: AggregateRootId, expectedAggregateRootVersion: AggregateRootVersion, eventType?: EventType | EventTypeId | Guid | string, cancellation?: Cancellation): Promise<CommitAggregateEventsResult>;
commitForAggregate(event: any, eventSourceId: EventSourceId | string, aggregateRootId: AggregateRootId, expectedAggregateRootVersion: AggregateRootVersion, eventType?: EventType | EventTypeId | Guid | string, cancellation?: Cancellation): Promise<CommitAggregateEventsResult>;
commitForAggregate(events: UncommittedAggregateEvents, cancellation?: Cancellation): Promise<CommitAggregateEventsResult>;
commitForAggregate(eventOrEvents: any, eventSourceIdOrCancellation?: EventSourceId | Guid | string | Cancellation, aggregateRootId?: AggregateRootId, expectedAggregateRootVersion?: AggregateRootVersion, eventType?: EventType | EventTypeId | Guid | string, cancellation?: Cancellation): Promise<CommitAggregateEventsResult> {
commitForAggregate(eventOrEvents: any, eventSourceIdOrCancellation?: EventSourceId | string | Cancellation, aggregateRootId?: AggregateRootId, expectedAggregateRootVersion?: AggregateRootVersion, eventType?: EventType | EventTypeId | Guid | string, cancellation?: Cancellation): Promise<CommitAggregateEventsResult> {
Comment on lines +79 to +81
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guid here

if (this.isUncommittedAggregateEvents(eventOrEvents)) {
return this.commitAggregateInternal(eventOrEvents, eventSourceIdOrCancellation as Cancellation);
}
const eventSourceId = eventSourceIdOrCancellation as Guid | string;
const eventSourceId = eventSourceIdOrCancellation as string;
return this.commitAggregateInternal(
UncommittedAggregateEvents.from(
eventSourceId,
Expand All @@ -107,7 +107,7 @@ export class EventStore extends IEventStore {
request.setCallcontext(callContexts.toProtobuf(this._executionContext));
const aggregate = new Aggregate();
aggregate.setAggregaterootid(guids.toProtobuf(aggregateRootId.value));
aggregate.setEventsourceid(guids.toProtobuf(eventSourceId.value));
aggregate.setEventsourceid(eventSourceId.value);
request.setAggregate(aggregate);

return reactiveUnary(this._eventStoreClient, this._eventStoreClient.fetchForAggregate, request, cancellation)
Expand Down Expand Up @@ -161,7 +161,7 @@ export class EventStore extends IEventStore {
const pbEvents = new PbUncommittedAggregateEvents();
pbEvents.setEventsList(uncommittedAggregateEvents);
pbEvents.setAggregaterootid(guids.toProtobuf(aggregateRootId.value));
pbEvents.setEventsourceid(guids.toProtobuf(events.eventSourceId.value));
pbEvents.setEventsourceid(events.eventSourceId.value);
pbEvents.setExpectedaggregaterootversion(events.expectedAggregateRootVersion.value);
request.setCallcontext(callContexts.toProtobuf(this._executionContext));
request.setEvents(pbEvents);
Expand Down Expand Up @@ -193,7 +193,7 @@ export class EventStore extends IEventStore {
return new CommittedAggregateEvents(eventSourceId, aggregateRootId, ...convertedEvents);
}

private toUncommittedEvent(content: any, eventSourceId: EventSourceId | Guid | string, eventTypeOrId?: EventType | EventTypeId | Guid | string, isPublic = false): UncommittedEvent {
private toUncommittedEvent(content: any, eventSourceId: EventSourceId | string, eventTypeOrId?: EventType | EventTypeId | Guid | string, isPublic = false): UncommittedEvent {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's bring the Guid conversions back until we get the poll data :)

let eventType: EventType | EventTypeId | undefined;
if (eventTypeOrId !== undefined) {
if (eventTypeOrId instanceof EventType) eventType = eventTypeOrId;
Expand Down
12 changes: 6 additions & 6 deletions Source/events/PartitionId.ts
Expand Up @@ -7,25 +7,25 @@ import { ConceptAs } from '@dolittle/concepts';
/**
* Represents the unique identifier of a partition.
*/
export class PartitionId extends ConceptAs<Guid, '@dolittle/sdk.events.PartitionId'> {
constructor(id: Guid) {
export class PartitionId extends ConceptAs<string, '@dolittle/sdk.events.PartitionId'> {
constructor(id: string) {
super(id, '@dolittle/sdk.events.PartitionId');
}

/**
* Gets the unspecified partition id
*/
static unspecified: PartitionId = PartitionId.from(Guid.empty);
static unspecified: PartitionId = PartitionId.from(Guid.empty.toString());

/**
* Creates a {PartitionId} from a guid.
*
* @static
* @param {PartitionId | Guid | string} id
* @param {PartitionId | string} id
* @returns {PartitionId}
*/
static from(id: PartitionId | Guid | string): PartitionId {
static from(id: PartitionId | string): PartitionId {
if (id instanceof PartitionId) return id;
return new PartitionId(Guid.as(id));
return new PartitionId(id);
}
};
2 changes: 1 addition & 1 deletion Source/events/UncommittedAggregateEvents.ts
Expand Up @@ -54,7 +54,7 @@ export class UncommittedAggregateEvents implements Iterable<UncommittedAggregate
}


static from(eventSourceId: Guid | string, aggregateRootId: AggregateRootId, expectedAggregateRootVersion: AggregateRootVersion, ...events: UncommittedAggregateEvent[]): UncommittedAggregateEvents {
static from(eventSourceId: string, aggregateRootId: AggregateRootId, expectedAggregateRootVersion: AggregateRootVersion, ...events: UncommittedAggregateEvent[]): UncommittedAggregateEvents {
return new UncommittedAggregateEvents(EventSourceId.from(eventSourceId), aggregateRootId, expectedAggregateRootVersion, ...events);
}

Expand Down
4 changes: 2 additions & 2 deletions Source/events/package.json
Expand Up @@ -34,9 +34,9 @@
},
"dependencies": {
"@dolittle/concepts": "5.0.1",
"@dolittle/contracts": "5.4.0",
"@dolittle/contracts": "6.0.0-eventsource.0",
"@dolittle/rudiments": "5.0.1",
"@dolittle/runtime.contracts": "5.4.0",
"@dolittle/runtime.contracts": "6.0.0-eventsource.0",
"@dolittle/sdk.artifacts": "17.0.3",
"@dolittle/sdk.execution": "17.0.3",
"@dolittle/sdk.protobuf": "17.0.3",
Expand Down
2 changes: 1 addition & 1 deletion Source/projections/Internal/ProjectionProcessor.ts
Expand Up @@ -158,7 +158,7 @@ export class ProjectionProcessor<T> extends internal.EventProcessor<ProjectionId

const eventContext = new EventContext(
pbSequenceNumber,
EventSourceId.from(guids.toSDK(pbEventSourceId)),
EventSourceId.from(pbEventSourceId),
DateTime.fromJSDate(pbOccurred.toDate()),
executionContext);

Expand Down
4 changes: 2 additions & 2 deletions Source/projections/package.json
Expand Up @@ -34,9 +34,9 @@
},
"dependencies": {
"@dolittle/concepts": "5.0.1",
"@dolittle/contracts": "5.4.0",
"@dolittle/contracts": "6.0.0-eventsource.0",
"@dolittle/rudiments": "5.0.1",
"@dolittle/runtime.contracts": "5.4.0",
"@dolittle/runtime.contracts": "6.0.0-eventsource.0",
"@dolittle/types": "5.0.1",
"luxon": "1.24.1",
"@types/luxon": "1.24.1"
Expand Down
2 changes: 1 addition & 1 deletion Source/protobuf/package.json
Expand Up @@ -35,7 +35,7 @@
},
"dependencies": {
"@dolittle/concepts": "5.0.1",
"@dolittle/contracts": "5.4.0",
"@dolittle/contracts": "6.0.0-eventsource.0",
"@dolittle/rudiments": "5.0.1",
"@dolittle/sdk.artifacts": "17.0.3",
"@dolittle/sdk.execution": "17.0.3"
Expand Down
2 changes: 1 addition & 1 deletion Source/sdk/package.json
Expand Up @@ -35,7 +35,7 @@
},
"dependencies": {
"@dolittle/rudiments": "5.0.1",
"@dolittle/runtime.contracts": "5.4.0",
"@dolittle/runtime.contracts": "6.0.0-eventsource.0",
"@dolittle/sdk.aggregates": "17.0.3",
"@dolittle/sdk.artifacts": "17.0.3",
"@dolittle/sdk.common": "17.0.3",
Expand Down
2 changes: 1 addition & 1 deletion Source/services/package.json
Expand Up @@ -33,7 +33,7 @@
"lint:fix": "gulp lint-fix --gulpfile ../../node_modules/@dolittle/typescript.build/Gulpfile.js"
},
"dependencies": {
"@dolittle/contracts": "5.4.0",
"@dolittle/contracts": "6.0.0-eventsource.0",
"@dolittle/rudiments": "5.0.1",
"@dolittle/sdk.execution": "17.0.3",
"@dolittle/sdk.resilience": "17.0.3",
Expand Down