Skip to content

Commit

Permalink
Merge pull request #134 from dolittle/contract-updates
Browse files Browse the repository at this point in the history
Eventhandler contract additions
  • Loading branch information
mhelleborg committed Jun 7, 2023
2 parents f2d618b + f790291 commit c8e3fc0
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 23 deletions.
70 changes: 47 additions & 23 deletions Source/Runtime/Events.Processing/EventHandlers.proto
Expand Up @@ -13,49 +13,73 @@ import "Runtime/Events.Processing/Processors.proto";

package dolittle.runtime.events.processing;

import "google/protobuf/timestamp.proto";


option csharp_namespace = "Dolittle.Runtime.Events.Processing.Contracts";
option go_package = "go.dolittle.io/contracts/runtime/events/processing";

// Determines where to start processing from if the event handler has no state
message StartFrom{
oneof Selected {
Position position = 1;
google.protobuf.Timestamp timestamp = 2;
uint64 eventLogSequenceNumber = 3;
}

enum Position{
Start = 0;
Latest = 1;
}
}

message EventHandlerRegistrationRequest {
services.ReverseCallArgumentsContext callContext = 1;
protobuf.Uuid scopeId = 2;
protobuf.Uuid eventHandlerId = 3;
repeated artifacts.Artifact eventTypes = 4;
bool partitioned = 5;
optional string alias = 6;
services.ReverseCallArgumentsContext callContext = 1;
protobuf.Uuid scopeId = 2;
protobuf.Uuid eventHandlerId = 3;
repeated artifacts.Artifact eventTypes = 4;
bool partitioned = 5;
optional string alias = 6;
// Optionally, the event handler can be specified to start at a specific point in time, a specific offset or start from the latest events
// Default, the handler will start from the beginning
optional StartFrom startFrom = 7;
// If this handler should not process events produced after a given time, populate this field
optional google.protobuf.Timestamp stopAt = 8;
// How many events to allow to be processed in-flight simultaneously. Defaults to 1
int32 concurrency = 9;
}

message EventHandlerResponse {
services.ReverseCallResponseContext callContext = 1;
ProcessorFailure failure = 2; // If not set/empty - no failure
services.ReverseCallResponseContext callContext = 1;
ProcessorFailure failure = 2; // If not set/empty - no failure
}

message EventHandlerClientToRuntimeMessage {
oneof Message {
EventHandlerRegistrationRequest registrationRequest = 1;
EventHandlerResponse handleResult = 2;
services.Pong pong = 3;
}
oneof Message {
EventHandlerRegistrationRequest registrationRequest = 1;
EventHandlerResponse handleResult = 2;
services.Pong pong = 3;
}
}

message EventHandlerRegistrationResponse {
protobuf.Failure failure = 1;
protobuf.Failure failure = 1;
}

message HandleEventRequest {
services.ReverseCallRequestContext callContext = 1;
StreamEvent event = 2;
RetryProcessingState retryProcessingState = 3;
services.ReverseCallRequestContext callContext = 1;
StreamEvent event = 2;
RetryProcessingState retryProcessingState = 3;
}

message EventHandlerRuntimeToClientMessage {
oneof Message {
EventHandlerRegistrationResponse registrationResponse = 1;
HandleEventRequest handleRequest = 2;
services.Ping ping = 3;
}
oneof Message {
EventHandlerRegistrationResponse registrationResponse = 1;
HandleEventRequest handleRequest = 2;
services.Ping ping = 3;
}
}

service EventHandlers {
rpc Connect(stream EventHandlerClientToRuntimeMessage) returns(stream EventHandlerRuntimeToClientMessage);
rpc Connect(stream EventHandlerClientToRuntimeMessage) returns(stream EventHandlerRuntimeToClientMessage);
}
14 changes: 14 additions & 0 deletions Source/Runtime/Management/Events.Processing/EventHandlers.proto
Expand Up @@ -7,6 +7,7 @@ import "Artifacts/Artifact.proto";
import "Protobuf/Uuid.proto";
import "Protobuf/Failure.proto";
import "Runtime/Management/Events.Processing/StreamProcessors.proto";
import "google/protobuf/timestamp.proto";

package dolittle.runtime.events.processing.management;

Expand All @@ -25,6 +26,18 @@ message ReprocessEventsFromResponse {
protobuf.Failure failure = 1; // not set if not failed
}

message ReprocessEventsFromTimestampRequest {
// Tenant is optional here, allowing to update multiple tenants at once if not specified
optional protobuf.Uuid tenantId = 1;
protobuf.Uuid scopeId = 2;
protobuf.Uuid eventHandlerId = 3;
uint64 streamPosition = 4;
}

message ReprocessEventsFromTimestampResponse {
protobuf.Failure failure = 1; // not set if not failed
}

message ReprocessAllEventsRequest {
// TODO: Do we want another kind of execution context here?
protobuf.Uuid scopeId = 1;
Expand Down Expand Up @@ -68,6 +81,7 @@ message GetOneResponse {

service EventHandlers {
rpc ReprocessEventsFrom(ReprocessEventsFromRequest) returns(ReprocessEventsFromResponse);
rpc ReprocessEventsFromTimestamp(ReprocessEventsFromTimestampRequest) returns(ReprocessEventsFromTimestampResponse);
rpc ReprocessAllEvents(ReprocessAllEventsRequest) returns(ReprocessAllEventsResponse);
rpc GetAll(GetAllRequest) returns(GetAllResponse);
rpc GetOne(GetOneRequest) returns(GetOneResponse);
Expand Down

0 comments on commit c8e3fc0

Please sign in to comment.