diff --git a/Source/Runtime/Events.Processing/EventHandlers.proto b/Source/Runtime/Events.Processing/EventHandlers.proto index 85424b6..913bebd 100644 --- a/Source/Runtime/Events.Processing/EventHandlers.proto +++ b/Source/Runtime/Events.Processing/EventHandlers.proto @@ -13,49 +13,73 @@ import "Runtime/Events.Processing/Processors.proto"; package dolittle.runtime.events.processing; +import "google/protobuf/timestamp.proto"; + + option csharp_namespace = "Dolittle.Runtime.Events.Processing.Contracts"; option go_package = "go.dolittle.io/contracts/runtime/events/processing"; +// Determines where to start processing from if the event handler has no state +message StartFrom{ + oneof Selected { + Position position = 1; + google.protobuf.Timestamp timestamp = 2; + uint64 eventLogSequenceNumber = 3; + } + + enum Position{ + Start = 0; + Latest = 1; + } +} + message EventHandlerRegistrationRequest { - services.ReverseCallArgumentsContext callContext = 1; - protobuf.Uuid scopeId = 2; - protobuf.Uuid eventHandlerId = 3; - repeated artifacts.Artifact eventTypes = 4; - bool partitioned = 5; - optional string alias = 6; + services.ReverseCallArgumentsContext callContext = 1; + protobuf.Uuid scopeId = 2; + protobuf.Uuid eventHandlerId = 3; + repeated artifacts.Artifact eventTypes = 4; + bool partitioned = 5; + optional string alias = 6; + // Optionally, the event handler can be specified to start at a specific point in time, a specific offset or start from the latest events + // Default, the handler will start from the beginning + optional StartFrom startFrom = 7; + // If this handler should not process events produced after a given time, populate this field + optional google.protobuf.Timestamp stopAt = 8; + // How many events to allow to be processed in-flight simultaneously. Defaults to 1 + int32 concurrency = 9; } message EventHandlerResponse { - services.ReverseCallResponseContext callContext = 1; - ProcessorFailure failure = 2; // If not set/empty - no failure + services.ReverseCallResponseContext callContext = 1; + ProcessorFailure failure = 2; // If not set/empty - no failure } message EventHandlerClientToRuntimeMessage { - oneof Message { - EventHandlerRegistrationRequest registrationRequest = 1; - EventHandlerResponse handleResult = 2; - services.Pong pong = 3; - } + oneof Message { + EventHandlerRegistrationRequest registrationRequest = 1; + EventHandlerResponse handleResult = 2; + services.Pong pong = 3; + } } message EventHandlerRegistrationResponse { - protobuf.Failure failure = 1; + protobuf.Failure failure = 1; } message HandleEventRequest { - services.ReverseCallRequestContext callContext = 1; - StreamEvent event = 2; - RetryProcessingState retryProcessingState = 3; + services.ReverseCallRequestContext callContext = 1; + StreamEvent event = 2; + RetryProcessingState retryProcessingState = 3; } message EventHandlerRuntimeToClientMessage { - oneof Message { - EventHandlerRegistrationResponse registrationResponse = 1; - HandleEventRequest handleRequest = 2; - services.Ping ping = 3; - } + oneof Message { + EventHandlerRegistrationResponse registrationResponse = 1; + HandleEventRequest handleRequest = 2; + services.Ping ping = 3; + } } service EventHandlers { - rpc Connect(stream EventHandlerClientToRuntimeMessage) returns(stream EventHandlerRuntimeToClientMessage); + rpc Connect(stream EventHandlerClientToRuntimeMessage) returns(stream EventHandlerRuntimeToClientMessage); } diff --git a/Source/Runtime/Management/Events.Processing/EventHandlers.proto b/Source/Runtime/Management/Events.Processing/EventHandlers.proto index e97aee0..59c2257 100644 --- a/Source/Runtime/Management/Events.Processing/EventHandlers.proto +++ b/Source/Runtime/Management/Events.Processing/EventHandlers.proto @@ -7,6 +7,7 @@ import "Artifacts/Artifact.proto"; import "Protobuf/Uuid.proto"; import "Protobuf/Failure.proto"; import "Runtime/Management/Events.Processing/StreamProcessors.proto"; +import "google/protobuf/timestamp.proto"; package dolittle.runtime.events.processing.management; @@ -25,6 +26,18 @@ message ReprocessEventsFromResponse { protobuf.Failure failure = 1; // not set if not failed } +message ReprocessEventsFromTimestampRequest { + // Tenant is optional here, allowing to update multiple tenants at once if not specified + optional protobuf.Uuid tenantId = 1; + protobuf.Uuid scopeId = 2; + protobuf.Uuid eventHandlerId = 3; + uint64 streamPosition = 4; +} + +message ReprocessEventsFromTimestampResponse { + protobuf.Failure failure = 1; // not set if not failed +} + message ReprocessAllEventsRequest { // TODO: Do we want another kind of execution context here? protobuf.Uuid scopeId = 1; @@ -68,6 +81,7 @@ message GetOneResponse { service EventHandlers { rpc ReprocessEventsFrom(ReprocessEventsFromRequest) returns(ReprocessEventsFromResponse); + rpc ReprocessEventsFromTimestamp(ReprocessEventsFromTimestampRequest) returns(ReprocessEventsFromTimestampResponse); rpc ReprocessAllEvents(ReprocessAllEventsRequest) returns(ReprocessAllEventsResponse); rpc GetAll(GetAllRequest) returns(GetAllResponse); rpc GetOne(GetOneRequest) returns(GetOneResponse);