Skip to content

Commit

Permalink
Event source concept (#62)
Browse files Browse the repository at this point in the history
* Update references to contracts

* Change partition and event source id to be string instead of guid and fix all build errors
  • Loading branch information
woksin committed Oct 5, 2021
1 parent e980d30 commit 06ac206
Show file tree
Hide file tree
Showing 25 changed files with 54 additions and 54 deletions.
2 changes: 1 addition & 1 deletion Source/artifacts/package.json
Expand Up @@ -36,7 +36,7 @@
"@dolittle/concepts": "5.0.1",
"@dolittle/rudiments": "5.0.1",
"@dolittle/types": "5.0.1",
"@dolittle/runtime.contracts": "5.4.0"
"@dolittle/runtime.contracts": "6.0.0-eventsource.0"
},
"devDependencies": {
"@types/is-natural-number": "^4.0.0"
Expand Down
2 changes: 1 addition & 1 deletion Source/embeddings/Internal/EmbeddingProcessor.ts
Expand Up @@ -197,7 +197,7 @@ export class EmbeddingProcessor<TReadModel> extends ClientProcessor<EmbeddingId,
const embeddingProjectContext = new EmbeddingProjectContext(
pbStateType === ProjectionCurrentStateType.CREATED_FROM_INITIAL_STATE,
Key.from(pbKey),
EventSourceId.from(guids.toSDK(pbEventSourceId)),
EventSourceId.from(pbEventSourceId),
executionContext);

let event = JSON.parse(pbEvent.getContent());
Expand Down
4 changes: 2 additions & 2 deletions Source/embeddings/package.json
Expand Up @@ -34,9 +34,9 @@
},
"dependencies": {
"@dolittle/concepts": "5.0.1",
"@dolittle/contracts": "5.4.0",
"@dolittle/contracts": "6.0.0-eventsource.0",
"@dolittle/rudiments": "5.0.1",
"@dolittle/runtime.contracts": "5.4.0",
"@dolittle/runtime.contracts": "6.0.0-eventsource.0",
"@dolittle/types": "5.0.1",
"luxon": "1.24.1",
"@types/luxon": "1.24.1"
Expand Down
2 changes: 1 addition & 1 deletion Source/eventHorizon/EventHorizons.ts
Expand Up @@ -131,7 +131,7 @@ export class EventHorizons extends IEventHorizons {

const pbSubscription = new PbSubscription();
pbSubscription.setCallcontext(callContext);
pbSubscription.setPartitionid(guids.toProtobuf(subscription.partition.value));
pbSubscription.setPartitionid(subscription.partition.value);
pbSubscription.setScopeid(guids.toProtobuf(subscription.scope.value));
pbSubscription.setStreamid(guids.toProtobuf(subscription.stream.value));
pbSubscription.setTenantid(guids.toProtobuf(subscription.tenant.value));
Expand Down
4 changes: 2 additions & 2 deletions Source/eventHorizon/SubscriptionBuilderForProducerStream.ts
Expand Up @@ -33,9 +33,9 @@ export class SubscriptionBuilderForProducerStream {

/**
* Sets the producer stream to subscribe to events from.
* @param {PartitionId | Guid | string} tenant Stream to subscribe to events from.
* @param {PartitionId | string} partitionId Stream partition to subscribe to events from.
*/
fromProducerPartition(partitionId: PartitionId | Guid | string): SubscriptionBuilderForProducerPartition {
fromProducerPartition(partitionId: PartitionId | string): SubscriptionBuilderForProducerPartition {
this.throwIfProducerPartitionIsAlreadyDefined();
this._producerPartitionId = PartitionId.from(partitionId);
this._builder = new SubscriptionBuilderForProducerPartition(
Expand Down
2 changes: 1 addition & 1 deletion Source/eventHorizon/package.json
Expand Up @@ -34,7 +34,7 @@
},
"dependencies": {
"@dolittle/rudiments": "5.0.1",
"@dolittle/runtime.contracts": "5.4.0",
"@dolittle/runtime.contracts": "6.0.0-eventsource.0",
"@dolittle/sdk.events": "17.0.3",
"@dolittle/sdk.artifacts": "17.0.3",
"@dolittle/sdk.execution": "17.0.3",
Expand Down
2 changes: 1 addition & 1 deletion Source/events.filtering/Internal/FilterEventProcessor.ts
Expand Up @@ -58,7 +58,7 @@ export abstract class FilterEventProcessor<TRegisterArguments, TResponse> extend

const eventContext = new EventContext(
pbSequenceNumber,
EventSourceId.from(guids.toSDK(pbEventSourceId)),
EventSourceId.from(pbEventSourceId),
DateTime.fromJSDate(pbOccurred.toDate()),
executionContext);

Expand Down
Expand Up @@ -75,7 +75,7 @@ export class PartitionedEventFilterProcessor extends FilterEventProcessor<Partit

const response = new PartitionedFilterResponse();
response.setIsincluded(result.shouldInclude);
response.setPartitionid(guids.toProtobuf(result.partitionId.value));
response.setPartitionid(result.partitionId.value);
return response;
}
}
Expand Up @@ -74,7 +74,7 @@ export class PublicEventFilterProcessor extends FilterEventProcessor<PublicFilte

const response = new PartitionedFilterResponse();
response.setIsincluded(result.shouldInclude);
response.setPartitionid(guids.toProtobuf(result.partitionId.value));
response.setPartitionid(result.partitionId.value);
return response;
}
}
4 changes: 2 additions & 2 deletions Source/events.filtering/PartitionedFilterResult.ts
Expand Up @@ -13,9 +13,9 @@ export class PartitionedFilterResult {
/**
* Initializes a new instance of {@link PartitionedFilterResult}.
* @param {boolean} shouldInclude Tells whether or not the event should be included.
* @param {PartitionId | Guid | string} partitionId Tells which partition the event should be partitioned into.
* @param {PartitionId | string} partitionId Tells which partition the event should be partitioned into.
*/
constructor(readonly shouldInclude: boolean, partitionId: PartitionId | Guid | string) {
constructor(readonly shouldInclude: boolean, partitionId: PartitionId | string) {
this.partitionId = PartitionId.from(partitionId);
}
}
4 changes: 2 additions & 2 deletions Source/events.filtering/package.json
Expand Up @@ -32,9 +32,9 @@
"lint:fix": "gulp lint-fix --gulpfile ../../node_modules/@dolittle/typescript.build/Gulpfile.js"
},
"dependencies": {
"@dolittle/contracts": "5.4.0",
"@dolittle/contracts": "6.0.0-eventsource.0",
"@dolittle/rudiments": "5.0.1",
"@dolittle/runtime.contracts": "5.4.0",
"@dolittle/runtime.contracts": "6.0.0-eventsource.0",
"@dolittle/sdk.artifacts": "17.0.3",
"@dolittle/sdk.events": "17.0.3",
"@dolittle/sdk.events.handling": "17.0.3",
Expand Down
2 changes: 1 addition & 1 deletion Source/events.handling/Internal/EventHandlerProcessor.ts
Expand Up @@ -128,7 +128,7 @@ export class EventHandlerProcessor extends internal.EventProcessor<EventHandlerI

const eventContext = new EventContext(
pbSequenceNumber,
EventSourceId.from(guids.toSDK(pbEventSourceId)),
EventSourceId.from(pbEventSourceId),
DateTime.fromJSDate(pbOccurred.toDate()),
executionContext);

Expand Down
4 changes: 2 additions & 2 deletions Source/events.handling/package.json
Expand Up @@ -33,9 +33,9 @@
"lint:fix": "gulp lint-fix --gulpfile ../../node_modules/@dolittle/typescript.build/Gulpfile.js"
},
"dependencies": {
"@dolittle/contracts": "5.4.0",
"@dolittle/contracts": "6.0.0-eventsource.0",
"@dolittle/types": "5.0.1",
"@dolittle/runtime.contracts": "5.4.0",
"@dolittle/runtime.contracts": "6.0.0-eventsource.0",
"@dolittle/sdk.artifacts": "17.0.3",
"@dolittle/sdk.common": "17.0.3",
"@dolittle/sdk.events": "17.0.3",
Expand Down
4 changes: 2 additions & 2 deletions Source/events.processing/package.json
Expand Up @@ -32,10 +32,10 @@
"lint:fix": "gulp lint-fix --gulpfile ../../node_modules/@dolittle/typescript.build/Gulpfile.js"
},
"dependencies": {
"@dolittle/contracts": "5.4.0",
"@dolittle/contracts": "6.0.0-eventsource.0",
"@dolittle/concepts": "5.0.1",
"@dolittle/rudiments": "5.0.1",
"@dolittle/runtime.contracts": "5.4.0",
"@dolittle/runtime.contracts": "6.0.0-eventsource.0",
"@dolittle/sdk.artifacts": "17.0.3",
"@dolittle/sdk.events": "17.0.3",
"@dolittle/sdk.execution": "17.0.3",
Expand Down
6 changes: 3 additions & 3 deletions Source/events/EventConverters.ts
Expand Up @@ -40,7 +40,7 @@ export class EventConverters {
static getUncommittedEventFrom(event: any, eventSourceId: EventSourceId, eventType: EventType, isPublic: boolean): PbUncommittedEvent {
const uncommittedEvent = new PbUncommittedEvent();
uncommittedEvent.setArtifact(eventTypes.toProtobuf(eventType));
uncommittedEvent.setEventsourceid(guids.toProtobuf(eventSourceId.value));
uncommittedEvent.setEventsourceid(eventSourceId.value);
uncommittedEvent.setPublic(isPublic);
uncommittedEvent.setContent(JSON.stringify(event));
return uncommittedEvent;
Expand Down Expand Up @@ -113,7 +113,7 @@ export class EventConverters {
const committedEvent = new SdkCommittedEvent(
EventLogSequenceNumber.from(input.getEventlogsequencenumber()),
DateTime.fromJSDate((input.getOccurred()?.toDate() || new Date())),
EventSourceId.from(guids.toSDK(input.getEventsourceid())),
EventSourceId.from(input.getEventsourceid()),
executionContexts.toSDK(executionContext),
eventTypes.toSDK(input.getType()),
JSON.parse(input.getContent()),
Expand All @@ -139,7 +139,7 @@ export class EventConverters {
const committedEvent = new PbCommittedEvent();
committedEvent.setEventlogsequencenumber(input.eventLogSequenceNumber.value);
committedEvent.setOccurred(occurred);
committedEvent.setEventsourceid(guids.toProtobuf(input.eventSourceId.value));
committedEvent.setEventsourceid(input.eventSourceId.value);
committedEvent.setExecutioncontext(executionContexts.toProtobuf(input.executionContext));
committedEvent.setType(eventTypes.toProtobuf(input.type));
committedEvent.setContent(JSON.stringify(input.content));
Expand Down
12 changes: 6 additions & 6 deletions Source/events/EventSourceId.ts
Expand Up @@ -9,10 +9,10 @@ import { Guid } from '@dolittle/rudiments';
*
* @export
* @class EventSourceId
* @extends {ConceptAs<Guid, '@dolittle/sdk.events.EventSourceId'>}
* @extends {ConceptAs<string, '@dolittle/sdk.events.EventSourceId'>}
*/
export class EventSourceId extends ConceptAs<Guid, '@dolittle/sdk.events.EventSourceId'> {
constructor(id: Guid) {
export class EventSourceId extends ConceptAs<string, '@dolittle/sdk.events.EventSourceId'> {
constructor(id: string) {
super(id, '@dolittle/sdk.events.EventSourceId');
}

Expand All @@ -21,7 +21,7 @@ export class EventSourceId extends ConceptAs<Guid, '@dolittle/sdk.events.EventSo
* @returns {EventSourceId}
*/
static new(): EventSourceId {
return new EventSourceId(Guid.create());
return new EventSourceId(Guid.create().toString());
}

/**
Expand All @@ -31,8 +31,8 @@ export class EventSourceId extends ConceptAs<Guid, '@dolittle/sdk.events.EventSo
* @param {EventSourceId | Guid | string)} id
* @returns {EventSourceId}
*/
static from(id: EventSourceId | Guid | string): EventSourceId {
static from(id: EventSourceId | string): EventSourceId {
if (id instanceof EventSourceId) return id;
return new EventSourceId(Guid.as(id));
return new EventSourceId(id);
}
};
20 changes: 10 additions & 10 deletions Source/events/EventStore.ts
Expand Up @@ -58,31 +58,31 @@ export class EventStore extends IEventStore {
}

/** @inheritdoc */
commit(event: any, eventSourceId: EventSourceId | Guid | string, eventType?: EventType | EventTypeId | Guid | string, cancellation?: Cancellation): Promise<CommitEventsResult>;
commit(event: any, eventSourceId: EventSourceId | string, eventType?: EventType | EventTypeId | Guid | string, cancellation?: Cancellation): Promise<CommitEventsResult>;
commit(eventOrEvents: UncommittedEvent | UncommittedEvent[], cancellation?: Cancellation): Promise<CommitEventsResult>;
commit(eventOrEvents: any, eventSourceIdOrCancellation?: EventSourceId | Guid | string | Cancellation, eventType?: EventType | EventTypeId | Guid | string, cancellation?: Cancellation): Promise<CommitEventsResult> {
commit(eventOrEvents: any, eventSourceIdOrCancellation?: EventSourceId | string | Cancellation, eventType?: EventType | EventTypeId | Guid | string, cancellation?: Cancellation): Promise<CommitEventsResult> {
if (this.isUncommittedEventOrEvents(eventOrEvents)) {
return this.commitInternal(this.asArray(eventOrEvents), eventSourceIdOrCancellation as Cancellation);
}
const eventSourceId = eventSourceIdOrCancellation as Guid | string;
const eventSourceId = eventSourceIdOrCancellation as string;
return this.commitInternal([this.toUncommittedEvent(eventOrEvents, eventSourceId, eventType, false)], cancellation);
}

/** @inheritdoc */
commitPublic(event: any, eventSourceId: EventSourceId | Guid | string, eventType?: EventType | EventTypeId | Guid | string, cancellation?: Cancellation): Promise<CommitEventsResult> {
commitPublic(event: any, eventSourceId: EventSourceId | string, eventType?: EventType | EventTypeId | Guid | string, cancellation?: Cancellation): Promise<CommitEventsResult> {
const events: UncommittedEvent[] = [this.toUncommittedEvent(event, eventSourceId, eventType, true)];
return this.commitInternal(events, cancellation);
}

/** @inheritdoc */
/** @inheritdoc */
commitForAggregate(event: any, eventSourceId: EventSourceId | Guid | string, aggregateRootId: AggregateRootId, expectedAggregateRootVersion: AggregateRootVersion, eventType?: EventType | EventTypeId | Guid | string, cancellation?: Cancellation): Promise<CommitAggregateEventsResult>;
commitForAggregate(event: any, eventSourceId: EventSourceId | string, aggregateRootId: AggregateRootId, expectedAggregateRootVersion: AggregateRootVersion, eventType?: EventType | EventTypeId | Guid | string, cancellation?: Cancellation): Promise<CommitAggregateEventsResult>;
commitForAggregate(events: UncommittedAggregateEvents, cancellation?: Cancellation): Promise<CommitAggregateEventsResult>;
commitForAggregate(eventOrEvents: any, eventSourceIdOrCancellation?: EventSourceId | Guid | string | Cancellation, aggregateRootId?: AggregateRootId, expectedAggregateRootVersion?: AggregateRootVersion, eventType?: EventType | EventTypeId | Guid | string, cancellation?: Cancellation): Promise<CommitAggregateEventsResult> {
commitForAggregate(eventOrEvents: any, eventSourceIdOrCancellation?: EventSourceId | string | Cancellation, aggregateRootId?: AggregateRootId, expectedAggregateRootVersion?: AggregateRootVersion, eventType?: EventType | EventTypeId | Guid | string, cancellation?: Cancellation): Promise<CommitAggregateEventsResult> {
if (this.isUncommittedAggregateEvents(eventOrEvents)) {
return this.commitAggregateInternal(eventOrEvents, eventSourceIdOrCancellation as Cancellation);
}
const eventSourceId = eventSourceIdOrCancellation as Guid | string;
const eventSourceId = eventSourceIdOrCancellation as string;
return this.commitAggregateInternal(
UncommittedAggregateEvents.from(
eventSourceId,
Expand All @@ -107,7 +107,7 @@ export class EventStore extends IEventStore {
request.setCallcontext(callContexts.toProtobuf(this._executionContext));
const aggregate = new Aggregate();
aggregate.setAggregaterootid(guids.toProtobuf(aggregateRootId.value));
aggregate.setEventsourceid(guids.toProtobuf(eventSourceId.value));
aggregate.setEventsourceid(eventSourceId.value);
request.setAggregate(aggregate);

return reactiveUnary(this._eventStoreClient, this._eventStoreClient.fetchForAggregate, request, cancellation)
Expand Down Expand Up @@ -161,7 +161,7 @@ export class EventStore extends IEventStore {
const pbEvents = new PbUncommittedAggregateEvents();
pbEvents.setEventsList(uncommittedAggregateEvents);
pbEvents.setAggregaterootid(guids.toProtobuf(aggregateRootId.value));
pbEvents.setEventsourceid(guids.toProtobuf(events.eventSourceId.value));
pbEvents.setEventsourceid(events.eventSourceId.value);
pbEvents.setExpectedaggregaterootversion(events.expectedAggregateRootVersion.value);
request.setCallcontext(callContexts.toProtobuf(this._executionContext));
request.setEvents(pbEvents);
Expand Down Expand Up @@ -193,7 +193,7 @@ export class EventStore extends IEventStore {
return new CommittedAggregateEvents(eventSourceId, aggregateRootId, ...convertedEvents);
}

private toUncommittedEvent(content: any, eventSourceId: EventSourceId | Guid | string, eventTypeOrId?: EventType | EventTypeId | Guid | string, isPublic = false): UncommittedEvent {
private toUncommittedEvent(content: any, eventSourceId: EventSourceId | string, eventTypeOrId?: EventType | EventTypeId | Guid | string, isPublic = false): UncommittedEvent {
let eventType: EventType | EventTypeId | undefined;
if (eventTypeOrId !== undefined) {
if (eventTypeOrId instanceof EventType) eventType = eventTypeOrId;
Expand Down
12 changes: 6 additions & 6 deletions Source/events/PartitionId.ts
Expand Up @@ -7,25 +7,25 @@ import { ConceptAs } from '@dolittle/concepts';
/**
* Represents the unique identifier of a partition.
*/
export class PartitionId extends ConceptAs<Guid, '@dolittle/sdk.events.PartitionId'> {
constructor(id: Guid) {
export class PartitionId extends ConceptAs<string, '@dolittle/sdk.events.PartitionId'> {
constructor(id: string) {
super(id, '@dolittle/sdk.events.PartitionId');
}

/**
* Gets the unspecified partition id
*/
static unspecified: PartitionId = PartitionId.from(Guid.empty);
static unspecified: PartitionId = PartitionId.from(Guid.empty.toString());

/**
* Creates a {PartitionId} from a guid.
*
* @static
* @param {PartitionId | Guid | string} id
* @param {PartitionId | string} id
* @returns {PartitionId}
*/
static from(id: PartitionId | Guid | string): PartitionId {
static from(id: PartitionId | string): PartitionId {
if (id instanceof PartitionId) return id;
return new PartitionId(Guid.as(id));
return new PartitionId(id);
}
};
2 changes: 1 addition & 1 deletion Source/events/UncommittedAggregateEvents.ts
Expand Up @@ -54,7 +54,7 @@ export class UncommittedAggregateEvents implements Iterable<UncommittedAggregate
}


static from(eventSourceId: Guid | string, aggregateRootId: AggregateRootId, expectedAggregateRootVersion: AggregateRootVersion, ...events: UncommittedAggregateEvent[]): UncommittedAggregateEvents {
static from(eventSourceId: string, aggregateRootId: AggregateRootId, expectedAggregateRootVersion: AggregateRootVersion, ...events: UncommittedAggregateEvent[]): UncommittedAggregateEvents {
return new UncommittedAggregateEvents(EventSourceId.from(eventSourceId), aggregateRootId, expectedAggregateRootVersion, ...events);
}

Expand Down
4 changes: 2 additions & 2 deletions Source/events/package.json
Expand Up @@ -34,9 +34,9 @@
},
"dependencies": {
"@dolittle/concepts": "5.0.1",
"@dolittle/contracts": "5.4.0",
"@dolittle/contracts": "6.0.0-eventsource.0",
"@dolittle/rudiments": "5.0.1",
"@dolittle/runtime.contracts": "5.4.0",
"@dolittle/runtime.contracts": "6.0.0-eventsource.0",
"@dolittle/sdk.artifacts": "17.0.3",
"@dolittle/sdk.execution": "17.0.3",
"@dolittle/sdk.protobuf": "17.0.3",
Expand Down
2 changes: 1 addition & 1 deletion Source/projections/Internal/ProjectionProcessor.ts
Expand Up @@ -158,7 +158,7 @@ export class ProjectionProcessor<T> extends internal.EventProcessor<ProjectionId

const eventContext = new EventContext(
pbSequenceNumber,
EventSourceId.from(guids.toSDK(pbEventSourceId)),
EventSourceId.from(pbEventSourceId),
DateTime.fromJSDate(pbOccurred.toDate()),
executionContext);

Expand Down
4 changes: 2 additions & 2 deletions Source/projections/package.json
Expand Up @@ -34,9 +34,9 @@
},
"dependencies": {
"@dolittle/concepts": "5.0.1",
"@dolittle/contracts": "5.4.0",
"@dolittle/contracts": "6.0.0-eventsource.0",
"@dolittle/rudiments": "5.0.1",
"@dolittle/runtime.contracts": "5.4.0",
"@dolittle/runtime.contracts": "6.0.0-eventsource.0",
"@dolittle/types": "5.0.1",
"luxon": "1.24.1",
"@types/luxon": "1.24.1"
Expand Down
2 changes: 1 addition & 1 deletion Source/protobuf/package.json
Expand Up @@ -35,7 +35,7 @@
},
"dependencies": {
"@dolittle/concepts": "5.0.1",
"@dolittle/contracts": "5.4.0",
"@dolittle/contracts": "6.0.0-eventsource.0",
"@dolittle/rudiments": "5.0.1",
"@dolittle/sdk.artifacts": "17.0.3",
"@dolittle/sdk.execution": "17.0.3"
Expand Down
2 changes: 1 addition & 1 deletion Source/sdk/package.json
Expand Up @@ -35,7 +35,7 @@
},
"dependencies": {
"@dolittle/rudiments": "5.0.1",
"@dolittle/runtime.contracts": "5.4.0",
"@dolittle/runtime.contracts": "6.0.0-eventsource.0",
"@dolittle/sdk.aggregates": "17.0.3",
"@dolittle/sdk.artifacts": "17.0.3",
"@dolittle/sdk.common": "17.0.3",
Expand Down
2 changes: 1 addition & 1 deletion Source/services/package.json
Expand Up @@ -33,7 +33,7 @@
"lint:fix": "gulp lint-fix --gulpfile ../../node_modules/@dolittle/typescript.build/Gulpfile.js"
},
"dependencies": {
"@dolittle/contracts": "5.4.0",
"@dolittle/contracts": "6.0.0-eventsource.0",
"@dolittle/rudiments": "5.0.1",
"@dolittle/sdk.execution": "17.0.3",
"@dolittle/sdk.resilience": "17.0.3",
Expand Down

0 comments on commit 06ac206

Please sign in to comment.