Skip to content

[Enhancement] Refactor: Unify Runtime, Ingress/Egress Pipelines, and Protocol Processor Migration #5249

@qqeasonchen

Description

@qqeasonchen

Search before asking

  • I had searched in the issues and found no similar issues.

Enhancement Request

Background

EventMesh's current architecture has three separate runtime modules (eventmesh-runtime, eventmesh-runtime-v2, and various connector/function SDKs) that duplicate and fragment core processing logic. Each protocol processor (HTTP, gRPC, TCP) independently handles filtering, transformation, and routing, resulting in inconsistent behavior across protocols and high maintenance overhead. This PR consolidates these into a single unified runtime with standardized processing pipelines.

Describe the solution you'd like

Changes

Commit 1: Unify Connector, Function, and Core Runtime into eventmesh-runtime

  • Removed the standalone eventmesh-runtime-v2 module entirely (33 files, ~1700 lines deleted), including its separate runtime lifecycle, connector/function managers, health/monitor/status services, meta storage, and configuration
  • Moved core abstractions (Router, RouterBuilder, ConnectorEventPublisher, SourceWorker) into eventmesh-runtime and eventmesh-function modules
  • Added RouterEngine, FilterEngine, TransformerEngine as core processing engines in eventmesh-runtime
  • Added EventMeshConnectorBootstrap to manage connector lifecycle within the unified runtime
  • Refactored EventMeshServer to integrate engines and A2A service initialization
  • Refactored ClientGroupWrapper (TCP) to delegate to the new engines
  • Added comprehensive tests: ClientGroupWrapperTest

Commit 2: Unified Ingress/Egress Pipelines

  • Introduced IngressProcessor — a centralized pipeline implementing Filter -> Transformer -> Router for incoming messages
  • Introduced EgressProcessor — a centralized pipeline implementing Filter -> Transformer for outgoing messages
  • Refactored ClientGroupWrapper (TCP SDK) to use IngressProcessor and EgressProcessor
  • Refactored EventMeshConnectorBootstrap (Connectors) to use the new processors
  • Updated SinkWorker to support embedded mode for unified runtime execution
  • Updated documentation (unified-runtime-design.md)

Commit 3: Complete HTTP and gRPC Processor Migration

  • HTTP Processors (5): Migrated SendAsyncMessageProcessor, SendSyncMessageProcessor, BatchSendMessageProcessor, BatchSendMessageV2Processor, and SendAsyncEventProcessor to use IngressProcessor
  • gRPC Processors (3): Migrated PublishCloudEventsProcessor, BatchPublishCloudEventProcessor, and RequestCloudEventProcessor with bidirectional pipeline support (Ingress for requests, Egress for responses)
  • Added BatchProcessResult — utility class for tracking success/filtered/failed counts with detailed per-message statistics
  • Added cross-module accessors (IngressProcessor, EgressProcessor getters in EventMeshServer and EventMeshGrpcServer)
  • Unified pipeline key format: {producerGroup}-{topic}
  • Standardized filter behavior: filtered messages return SUCCESS status (except request-reply which returns error)
  • Added comprehensive tests: IngressProcessorTest, EgressProcessorTest, BatchProcessResultTest, enhanced SendAsyncEventProcessorTest

Key Files

File Action
eventmesh-runtime-v2/ (entire module) Deleted
eventmesh-runtime/src/main/java/.../core/protocol/IngressProcessor.java Added
eventmesh-runtime/src/main/java/.../core/protocol/EgressProcessor.java Added
eventmesh-runtime/src/main/java/.../core/protocol/BatchProcessResult.java Added
eventmesh-runtime/src/main/java/.../boot/RouterEngine.java Added
eventmesh-runtime/src/main/java/.../boot/EventMeshConnectorBootstrap.java Added
eventmesh-runtime/src/main/java/.../boot/EventMeshServer.java Modified
eventmesh-runtime/src/main/java/.../boot/FilterEngine.java Modified
eventmesh-runtime/src/main/java/.../boot/TransformerEngine.java Modified
All HTTP gRPC processors in .../protocol/http/processor/ and .../protocol/grpc/processor/ Modified

Impact

  • No functional regression — all existing tests pass
  • Consistent Filter -> Transformer -> Router pipeline across all three protocols (TCP, HTTP, gRPC)
  • Simplified build and module structure with single eventmesh-runtime module
  • Centralized batch statistics and error handling

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions