Skip to content

Conversation

@yilmaztayfun
Copy link
Contributor

@yilmaztayfun yilmaztayfun commented Dec 14, 2025

Summary by Sourcery

Refine background job dispatching and Dapr bridge behavior to better support multi-schema payloads, scoped dependency resolution, and reliable status updates.

New Features:

  • Introduce a shared CloudEventEnvelopeHelper to parse CloudEvent envelopes and extract inner data payloads for event and job processing.
  • Support schema-based multi-tenant context resolution during job dispatch and Dapr job execution using CloudEvent envelope metadata.

Bug Fixes:

  • Ensure job status updates for cancellation and failure are persisted using separate unit-of-work scopes, even when the main transaction fails.
  • Avoid reprocessing already completed or cancelled jobs by performing idempotency checks inside a dedicated transactional scope.

Enhancements:

  • Rework JobDispatcher to resolve IJobStore and IUnitOfWorkManager from a scoped service provider and to clearly separate status updates from handler execution within distinct unit-of-work scopes.
  • Simplify DaprJobExecutionBridge to rely on scoped dependency resolution, remove explicit unit-of-work management, and handle missing jobs gracefully without throwing.
  • Adjust background job invocation to use an existing scoped IServiceProvider instead of creating new scopes per invocation for better alignment with surrounding UoW and schema context.

Summary by CodeRabbit

  • Refactor
    • Improved internal background job processing architecture for enhanced reliability and better dependency injection scope management.
    • Streamlined error handling and logging in job execution workflows.
    • Optimized multi-tenant envelope handling for background job operations.

✏️ Tip: You can customize this high-level summary in your review settings.

Refactored background job dispatching to resolve dependencies from IServiceProvider instead of IServiceScopeFactory, and centralized CloudEventEnvelope parsing and data extraction into a new CloudEventEnvelopeHelper. Updated DaprJobExecutionBridge and JobDispatcher to use the helper for schema context and payload extraction, improving multi-tenant support and code reuse. Simplified unit of work handling and error management in job dispatching.
Refactor job dispatching and envelope handling logic
@yilmaztayfun yilmaztayfun requested review from a team as code owners December 14, 2025 19:22
@yilmaztayfun yilmaztayfun requested review from ukaratas and removed request for a team December 14, 2025 19:22
@sourcery-ai
Copy link

sourcery-ai bot commented Dec 14, 2025

Reviewer's Guide

Refactors background job dispatching and Dapr execution to be fully scope- and schema-aware, ensures idempotent status transitions using explicit unit-of-work boundaries, centralizes CloudEvent envelope handling, and simplifies handler invocation to use an existing scoped service provider.

Sequence diagram for Dapr-triggered background job execution with schema and UoW handling

sequenceDiagram
    participant Dapr as DaprRuntime
    participant Bridge as DaprJobExecutionBridge
    participant ScopeFactory as IServiceScopeFactory
    participant Scope as IServiceScope
    participant SP as IServiceProvider
    participant Schema as ICurrentSchema
    participant JobStore as IJobStore
    participant Dispatcher as JobDispatcher
    participant UowMgr as IUnitOfWorkManager
    participant Uow1 as IUnitOfWork
    participant Uow2 as IUnitOfWork
    participant Invoker as IBackgroundJobInvoker
    participant EnvHelper as CloudEventEnvelopeHelper

    Dapr->>Bridge: ExecuteAsync(jobName, payload, cancellationToken)
    Bridge->>ScopeFactory: CreateAsyncScope()
    ScopeFactory-->>Bridge: Scope
    Bridge->>Scope: ServiceProvider
    Scope-->>Bridge: SP

    Bridge->>EnvHelper: ExtractDataPayload(eventSerializer, payload, out envelope)
    EnvHelper-->>Bridge: dataPayload, envelope

    alt envelope has schema
        Bridge->>SP: GetRequiredService~ICurrentSchema~()
        SP-->>Bridge: Schema
        Bridge->>Schema: Set(envelope.Schema)
    end

    Bridge->>SP: GetRequiredService~IJobStore~()
    SP-->>Bridge: JobStore
    Bridge->>JobStore: GetByJobNameAsync(jobName, cancellationToken)
    JobStore-->>Bridge: jobInfo or null

    alt jobInfo is null
        Bridge->>Bridge: Log error and return
        Bridge-->>Dapr: throw or complete
    else jobInfo found
        Bridge->>Dispatcher: DispatchAsync(jobInfo.Id, jobInfo.HandlerName, dataPayload, cancellationToken)

        activate Dispatcher
        Dispatcher->>ScopeFactory: CreateAsyncScope()
        ScopeFactory-->>Dispatcher: Scope
        Dispatcher->>Scope: ServiceProvider
        Scope-->>Dispatcher: SP

        Dispatcher->>EnvHelper: ExtractDataPayload(eventSerializer, jobPayload, out envelope)
        EnvHelper-->>Dispatcher: argsPayload, envelope

        alt envelope has schema
            Dispatcher->>SP: GetRequiredService~ICurrentSchema~()
            SP-->>Dispatcher: Schema
            Dispatcher->>Schema: Set(envelope.Schema)
        end

        Dispatcher->>SP: GetRequiredService~IUnitOfWorkManager~()
        SP-->>Dispatcher: UowMgr
        Dispatcher->>SP: GetRequiredService~IJobStore~()
        SP-->>Dispatcher: JobStore

        Note over Dispatcher,Uow1: First UoW: idempotency and set Running
        Dispatcher->>UowMgr: BeginRequiresNew(cancellationToken)
        UowMgr-->>Dispatcher: Uow1
        Dispatcher->>JobStore: GetAsync(jobId, cancellationToken)
        JobStore-->>Dispatcher: jobInfo
        Dispatcher->>Dispatcher: IsJobAlreadyProcessedAsync(JobStore, jobId, handlerName, cancellationToken)
        alt already processed
            Dispatcher->>Uow1: CommitAsync(cancellationToken)
            Dispatcher-->>Bridge: return
        else not processed
            alt handlerName not found
                Dispatcher->>Dispatcher: Log warning
                Dispatcher->>JobStore: UpdateStatusAsync(jobId, Failed, clock.UtcNow, error, cancellationToken)
                Dispatcher->>Uow1: CommitAsync(cancellationToken)
                Dispatcher-->>Bridge: return
            else handlerName found
                Dispatcher->>JobStore: UpdateStatusAsync(jobId, Running, null, null, cancellationToken)
                Dispatcher->>Uow1: CommitAsync(cancellationToken)
            end
        end

        Note over Dispatcher,Uow2: Second UoW: handler execution and completion
        Dispatcher->>UowMgr: BeginRequiresNew(cancellationToken)
        UowMgr-->>Dispatcher: Uow2

        Dispatcher->>BackgroundJobOptions: Resolve invoker by handlerName
        BackgroundJobOptions-->>Dispatcher: Invoker
        Dispatcher->>Invoker: InvokeAsync(SP, eventSerializer, argsPayload, cancellationToken)
        Invoker->>SP: GetRequiredService~IBackgroundJobHandler~TArgs~~()
        SP-->>Invoker: handler
        Invoker->>handler: HandleAsync(args, cancellationToken)
        handler-->>Invoker: complete

        Dispatcher->>JobStore: UpdateStatusAsync(jobId, Completed, clock.UtcNow, null, cancellationToken)
        Dispatcher->>Uow2: CommitAsync(cancellationToken)
        deactivate Dispatcher
    end

    Bridge-->>Dapr: return

    par cancellation or failure
        alt OperationCanceledException
            Dispatcher->>Dispatcher: Log cancellation
            Dispatcher->>JobStore: MarkJobStatusAsync(UowMgr, JobStore, jobId, Cancelled, message, cancellationToken)
        else other Exception
            Dispatcher->>Dispatcher: Log failure
            Dispatcher->>JobStore: MarkJobStatusAsync(UowMgr, JobStore, jobId, Failed, errorMessage, cancellationToken)
        end
    end
Loading

Class diagram for updated background job dispatching and envelope handling

classDiagram
    direction LR

    class JobDispatcher {
        +JobDispatcher(IServiceScopeFactory scopeFactory, BackgroundJobOptions options, IClock clock, IEventSerializer eventSerializer, ILogger~JobDispatcher~ logger)
        +Task DispatchAsync(Guid jobId, string handlerName, ReadOnlyMemory~byte~ jobPayload, CancellationToken cancellationToken)
        -Task~bool~ IsJobAlreadyProcessedAsync(IJobStore jobStore, Guid jobId, string handlerName, CancellationToken cancellationToken)
        -Task MarkJobStatusAsync(IUnitOfWorkManager uowManager, IJobStore jobStore, Guid jobId, BackgroundJobStatus status, string errorMessage, CancellationToken cancellationToken)
        -Task InvokeHandlerAsync(IServiceProvider scopedProvider, string handlerName, ReadOnlyMemory~byte~ jobPayload, CancellationToken cancellationToken)
    }

    class DaprJobExecutionBridge {
        +DaprJobExecutionBridge(IServiceScopeFactory scopeFactory, IJobDispatcher jobDispatcher, IEventSerializer eventSerializer, ILogger~DaprJobExecutionBridge~ logger)
        +Task ExecuteAsync(string jobName, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
    }

    class BackgroundJobInvoker~TArgs~ {
        +Task InvokeAsync(IServiceProvider serviceProvider, IEventSerializer eventSerializer, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
    }

    class IBackgroundJobInvoker {
        <<interface>>
        +Task InvokeAsync(IServiceProvider scopeFactory, IEventSerializer eventSerializer, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
    }

    class CloudEventEnvelopeHelper {
        <<static>>
        +CloudEventEnvelope TryParseEnvelope(IEventSerializer eventSerializer, byte[] payload)
        +ReadOnlyMemory~byte~ ExtractDataPayload(IEventSerializer eventSerializer, ReadOnlyMemory~byte~ payload, out CloudEventEnvelope envelope)
    }

    class IBackgroundJobHandler~TArgs~ {
        <<interface>>
        +Task HandleAsync(TArgs args, CancellationToken cancellationToken)
    }

    class IJobDispatcher {
        <<interface>>
        +Task DispatchAsync(Guid jobId, string handlerName, ReadOnlyMemory~byte~ jobPayload, CancellationToken cancellationToken)
    }

    class IJobStore {
        <<interface>>
        +Task JobInfo GetAsync(Guid jobId, CancellationToken cancellationToken)
        +Task JobInfo GetByJobNameAsync(string jobName, CancellationToken cancellationToken)
        +Task UpdateStatusAsync(Guid jobId, BackgroundJobStatus status, DateTime? timestamp, string error, CancellationToken cancellationToken)
    }

    class IUnitOfWorkManager {
        <<interface>>
        +Task IUnitOfWork BeginRequiresNew(CancellationToken cancellationToken)
        +Task IUnitOfWork BeginAsync(CancellationToken cancellationToken)
    }

    class IUnitOfWork {
        <<interface>>
        +Task CommitAsync(CancellationToken cancellationToken)
        +Task RollbackAsync(CancellationToken cancellationToken)
    }

    class IEventSerializer {
        <<interface>>
        +T Deserialize~T~(ReadOnlySpan~byte~ payload)
        +byte[] Serialize(object data)
    }

    class ICurrentSchema {
        <<interface>>
        +void Set(string schema)
    }

    class CloudEventEnvelope {
        +string Type
        +string Schema
        +object Data
    }

    class BackgroundJobOptions {
        +Dictionary~string, IBackgroundJobInvoker~ Invokers
    }

    class IServiceScopeFactory {
        <<interface>>
        +IServiceScope CreateScope()
        +IAsyncDisposable CreateAsyncScope()
    }

    class IServiceProvider {
        <<interface>>
        +T GetRequiredService~T~()
    }

    class IJobExecutionBridge {
        <<interface>>
        +Task ExecuteAsync(string jobName, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
    }

    JobDispatcher ..> BackgroundJobOptions
    JobDispatcher ..> IClock
    JobDispatcher ..> IEventSerializer
    JobDispatcher ..> IServiceScopeFactory
    JobDispatcher ..> IUnitOfWorkManager
    JobDispatcher ..> IJobStore
    JobDispatcher ..> IBackgroundJobInvoker
    JobDispatcher ..> CloudEventEnvelopeHelper

    DaprJobExecutionBridge ..|> IJobExecutionBridge
    DaprJobExecutionBridge ..> IServiceScopeFactory
    DaprJobExecutionBridge ..> IJobDispatcher
    DaprJobExecutionBridge ..> IJobStore
    DaprJobExecutionBridge ..> ICurrentSchema
    DaprJobExecutionBridge ..> IEventSerializer
    DaprJobExecutionBridge ..> CloudEventEnvelopeHelper

    BackgroundJobInvoker~TArgs~ ..|> IBackgroundJobInvoker
    BackgroundJobInvoker~TArgs~ ..> IBackgroundJobHandler~TArgs~
    BackgroundJobInvoker~TArgs~ ..> IEventSerializer

    CloudEventEnvelopeHelper ..> CloudEventEnvelope
    CloudEventEnvelopeHelper ..> IEventSerializer

    IBackgroundJobInvoker <.. BackgroundJobOptions
    IBackgroundJobHandler~TArgs~ <.. BackgroundJobInvoker~TArgs~
Loading

File-Level Changes

Change Details Files
Refactor JobDispatcher to be schema-aware, use scoped services, and enforce idempotent status transitions via explicit unit-of-work boundaries.
  • Remove constructor injection of IJobStore and IUnitOfWorkManager and instead resolve them from a newly created async scope per dispatch.
  • Extract CloudEvent envelope and schema from the job payload, set ICurrentSchema when present, and pass only the inner data payload to handlers.
  • Split job execution into two separate unit-of-work scopes: one for idempotency check and marking status as Running, and another for handler invocation and marking as Completed.
  • Replace UpdateStatusWithinUowAsync/HandleJobCancellationAsync/HandleJobFailureAsync with a generic MarkJobStatusAsync that uses BeginRequiresNew to persist status changes even when the main transaction fails.
  • Change InvokeHandlerAsync to take an IServiceProvider for the current scope and pass it to IBackgroundJobInvoker.InvokeAsync.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs
Simplify DaprJobExecutionBridge to be scope-based and reuse common CloudEvent envelope parsing while delegating transactional concerns to JobDispatcher.
  • Introduce IServiceScopeFactory and resolve IJobStore and ICurrentSchema from a new async scope per execution.
  • Use CloudEventEnvelopeHelper.ExtractDataPayload to parse envelope, set schema before jobStore access, and obtain the inner data payload.
  • Remove its own unit-of-work management and envelope parsing, and delegate to IJobDispatcher.DispatchAsync using the resolved job metadata and data payload.
  • Handle missing jobs by logging an error and returning instead of throwing.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs
Adjust background job invocation to work with an existing scoped IServiceProvider rather than creating a new scope internally.
  • Change IBackgroundJobInvoker.InvokeAsync signature to accept IServiceProvider instead of IServiceScopeFactory.
  • Update BackgroundJobInvoker to resolve IBackgroundJobHandler directly from the provided IServiceProvider and stop creating its own scope.
  • Align InboxProcessor usage to pass the scoped service provider when invoking handlers.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobInvoker.cs
framework/src/BBT.Aether.Core/BBT/Aether/BackgroundJob/IBackgroundJobInvoker.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs
Introduce a shared CloudEventEnvelopeHelper to centralize envelope parsing and data extraction logic.
  • Add CloudEventEnvelopeHelper with TryParseEnvelope to safely deserialize CloudEventEnvelope instances and validate them.
  • Add ExtractDataPayload helper to return either the serialized envelope.Data or the original payload while exposing the parsed envelope via an out parameter.
  • Adopt the helper in JobDispatcher and DaprJobExecutionBridge for consistent envelope handling.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/CloudEventEnvelopeHelper.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@yilmaztayfun yilmaztayfun merged commit 26117a2 into master Dec 14, 2025
3 of 6 checks passed
@coderabbitai
Copy link

coderabbitai bot commented Dec 14, 2025

Caution

Review failed

The pull request is closed.

Note

.coderabbit.yaml has unrecognized properties

CodeRabbit is using all valid settings from your configuration. Unrecognized properties (listed below) have been ignored and may indicate typos or deprecated fields that can be removed.

⚠️ Parsing warnings (1)
Validation error: Unrecognized key(s) in object: 'review'
⚙️ Configuration instructions
  • Please see the configuration documentation for more information.
  • You can also validate your configuration using the online YAML validator.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Walkthrough

This PR refactors background job execution scope management by shifting from per-handler invocation scopes to per-dispatch and per-bridge operation scopes. The IBackgroundJobInvoker interface now accepts IServiceProvider instead of IServiceScopeFactory. DaprJobExecutionBridge and JobDispatcher are restructured to manage scopes internally and handle CloudEvent envelopes for multi-tenant schema resolution.

Changes

Cohort / File(s) Summary
Interface Updates
framework/src/BBT.Aether.Core/BBT/Aether/BackgroundJob/IBackgroundJobInvoker.cs
Changed InvokeAsync parameter type from IServiceScopeFactory scopeFactory to IServiceProvider scopeFactory (parameter name unchanged).
Core Invoker Implementation
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobInvoker.cs
Updated method signature to match interface; handler now resolved directly from provided IServiceProvider instead of creating a scoped instance.
Envelope Handling Utility
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/CloudEventEnvelopeHelper.cs
New internal static helper class with TryParseEnvelope and ExtractDataPayload methods for safe deserialization and data extraction from CloudEventEnvelope payloads.
Dapr Bridge Refactoring
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs
Constructor now accepts IServiceScopeFactory; removed direct IJobStore, ICurrentSchema, and IUnitOfWorkManager parameters. Creates async scope per invocation; uses CloudEventEnvelopeHelper to extract payload and handle schema from envelope; simplifies error handling.
Dispatcher Refactoring
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs
Constructor simplified (removed IJobStore and IUnitOfWorkManager); introduces per-dispatch async scope. Implements two-phase unit-of-work: Phase 1 (idempotency check and status update to Running), Phase 2 (handler execution and status update to Completed). New MarkJobStatusAsync helper for status persistence. Envelope schema handled via scoped ICurrentSchema. InvokeHandlerAsync now accepts scoped IServiceProvider.
Minor Formatting
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs
Blank line added between UoW creation and handler invocation (no functional change).

Sequence Diagram

sequenceDiagram
    participant DaprBridge as DaprJobExecutionBridge
    participant Helper as CloudEventEnvelopeHelper
    participant Dispatcher as JobDispatcher
    participant JobStore as IJobStore
    participant CurrentSchema as ICurrentSchema
    participant UoW as IUnitOfWorkManager
    participant Invoker as IBackgroundJobInvoker
    
    DaprBridge->>Helper: ExtractDataPayload(payload)
    Helper-->>DaprBridge: dataPayload, envelope
    
    DaprBridge->>JobStore: CreateAsyncScope → GetJobByNameAsync()
    JobStore-->>DaprBridge: jobInfo
    
    alt Envelope has Schema
        DaprBridge->>CurrentSchema: SetCurrentSchemaAsync(envelope.Schema)
    end
    
    DaprBridge->>Dispatcher: DispatchAsync(jobInfo, dataPayload)
    
    Dispatcher->>Dispatcher: Phase 1: Create Scope
    Dispatcher->>UoW: IsJobAlreadyProcessedAsync()
    Dispatcher->>UoW: UpdateStatusToRunning(jobId)
    Dispatcher->>UoW: Commit()
    
    Dispatcher->>Dispatcher: Phase 2: Create New Scope
    Dispatcher->>Invoker: InvokeHandlerAsync(scopedProvider, handlerName, payload)
    Invoker-->>Dispatcher: (success or exception)
    
    alt Success
        Dispatcher->>UoW: UpdateStatusToCompleted(jobId)
        Dispatcher->>UoW: Commit()
    else Failure/Cancellation
        Dispatcher->>Dispatcher: MarkJobStatusAsync(status, errorMessage)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • JobDispatcher: Two-phase unit-of-work flow with new scope creation per phase; removal of direct DI dependencies from constructor; revised error handling and status marking logic requires careful verification of atomicity and consistency.
  • DaprJobExecutionBridge: Significant refactoring of scope management and envelope handling; removal of legacy envelope logic and schema setting flow.
  • CloudEventEnvelopeHelper: New envelope parsing and deserialization logic with safe fallback; verify re-serialization correctness.
  • Scope lifecycle changes: Verify that scoped resolution and DI lifetime expectations are met across all affected components.

Possibly related PRs

  • PR #28: Modifies DaprJobExecutionBridge, JobDispatcher, and scope/UoW handling patterns in background-job execution.
  • PR #29: Changes BackgroundJobInvoker, JobDispatcher, and DaprJobExecutionBridge with related constructor and method signature updates for DI scope management.

Suggested reviewers

  • middt

🐇 Scopes now dance per dispatch, not per call,
Envelopes unwrapped with helper's careful hall.
Two phases bloom where one did stand,
UoW commits held steady in our hand.
Refactored, resilient—the job flows free! 🌿✨

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch release-v1.0

📜 Recent review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between aadff7b and 504173e.

📒 Files selected for processing (6)
  • framework/src/BBT.Aether.Core/BBT/Aether/BackgroundJob/IBackgroundJobInvoker.cs (1 hunks)
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobInvoker.cs (1 hunks)
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs (2 hunks)
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs (6 hunks)
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/CloudEventEnvelopeHelper.cs (1 hunks)
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs (1 hunks)

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @yilmaztayfun, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a substantial refactoring of the background job processing and Dapr job execution mechanisms. The primary goal is to enhance the system's robustness, modularity, and transactional integrity by strategically managing service lifetimes and Unit of Work boundaries. These changes centralize common logic for CloudEvent handling and ensure that job status updates are reliably persisted, contributing to a more resilient background processing infrastructure.

Highlights

  • Refactored Dependency Resolution: IServiceScopeFactory is now strategically used to create dedicated service scopes for background job and Dapr job executions. This allows dependencies like IJobStore and IUnitOfWorkManager to be resolved within these specific scopes, reducing direct constructor dependencies and enhancing modularity.
  • Enhanced Unit of Work Management: The JobDispatcher now employs a more robust Unit of Work strategy, utilizing BeginRequiresNew for job status updates. This ensures that job status (Running, Completed, Failed, Cancelled) is persisted transactionally and independently, even if the main job execution encounters an error, significantly improving reliability.
  • Centralized CloudEvent Envelope Handling: A new CloudEventEnvelopeHelper class has been introduced to encapsulate the logic for parsing and extracting data from CloudEvent envelopes. This promotes code reusability and simplifies the handling of event payloads in both DaprJobExecutionBridge and JobDispatcher components.
  • Simplified IBackgroundJobInvoker Interface: The IBackgroundJobInvoker interface and its implementation now directly accept IServiceProvider instead of IServiceScopeFactory. This change streamlines the dependency injection pattern for job invocation, making it more direct and potentially easier to manage.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • In IBackgroundJobInvoker.InvokeAsync and its implementations, the parameter is now an IServiceProvider but still named scopeFactory; consider renaming it (and updating XML docs) to avoid confusion about what is being passed in.
  • In DaprJobExecutionBridge.ExecuteAsync, the behavior for a missing job has changed from throwing an exception to logging and returning; if callers rely on failure semantics, consider whether this should still surface as an error (e.g., via an exception) or at least be logged at a more prominent level.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `IBackgroundJobInvoker.InvokeAsync` and its implementations, the parameter is now an `IServiceProvider` but still named `scopeFactory`; consider renaming it (and updating XML docs) to avoid confusion about what is being passed in.
- In `DaprJobExecutionBridge.ExecuteAsync`, the behavior for a missing job has changed from throwing an exception to logging and returning; if callers rely on failure semantics, consider whether this should still surface as an error (e.g., via an exception) or at least be logged at a more prominent level.

## Individual Comments

### Comment 1
<location> `framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs:90-94` </location>
<code_context>
-            logger.LogInformation("Successfully completed handler '{HandlerName}' for job id '{JobId}'", handlerName, jobId);
+            await handlerUow.CommitAsync(cancellationToken);
         }
         catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
         {
             logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId);
-            await HandleJobCancellationAsync(jobId, cancellationToken);
-            throw;
+            await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Cancelled,
+                "Job was cancelled", cancellationToken);
         }
         catch (Exception ex)
</code_context>

<issue_to_address>
**issue (bug_risk):** Cancellation/failure paths no longer rethrow, which changes the observable behavior for callers of DispatchAsync.

Previously, both cancellation and generic exceptions were rethrown after updating status, allowing callers to detect cancellations/failures via exceptions. Now both OperationCanceledException and other exceptions are swallowed after marking status, so DispatchAsync will appear to succeed even when the job was cancelled or failed, which can break consumers that rely on exception-based flow control or retries. If this behavior change is intended, consider at least rethrowing OperationCanceledException to preserve cancellation semantics, and evaluate whether other exceptions should also continue to propagate.
</issue_to_address>

### Comment 2
<location> `framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs:43-49` </location>
<code_context>
-                    logger.LogError("Job with name '{JobName}' not found in store", jobName);
-                    throw new InvalidOperationException($"Job with name '{jobName}' not found in store.");
-                }
+            var jobStore = scope.ServiceProvider.GetRequiredService<IJobStore>();
+            var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken);

-                // Dispatch to handler using the handler name from job entity
-                await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken);
-
-                await uow.CommitAsync(cancellationToken);
-            }
-            catch (Exception)
+            if (jobInfo == null)
             {
-                await uow.RollbackAsync(cancellationToken);
-                throw;
+                logger.LogError("Job with name '{JobName}' not found in store", jobName);
+                return;
             }
+
</code_context>

<issue_to_address>
**question (bug_risk):** Changing missing-job handling from throwing to just logging-and-returning may hide problems in upstream callers.

Previously, a missing job caused an InvalidOperationException, clearly signaling failure to Dapr or any upstream caller. Now we only log and return, so ExecuteAsync will look successful even when the job is missing, which may lead to silent failures (no retries, no visibility beyond logs). If the goal is to suppress retries for missing jobs, this behavior is fine; otherwise, consider still throwing (ideally a specific exception) so upstream logic can detect and react to this condition.
</issue_to_address>

### Comment 3
<location> `framework/src/BBT.Aether.Core/BBT/Aether/BackgroundJob/IBackgroundJobInvoker.cs:25` </location>
<code_context>
     /// <param name="cancellationToken">Cancellation token</param>
     /// <returns>A task representing the asynchronous operation</returns>
     Task InvokeAsync(
-        IServiceScopeFactory scopeFactory,
+        IServiceProvider scopeFactory,
         IEventSerializer eventSerializer,
         ReadOnlyMemory<byte> payload,
</code_context>

<issue_to_address>
**nitpick:** The parameter name `scopeFactory` no longer matches its type `IServiceProvider`, which can be confusing.

This mismatch makes the parameter easy to misinterpret (e.g., thinking it can create scopes). Please rename it to something like `serviceProvider` and update implementations accordingly so the name reflects the actual type and usage.
</issue_to_address>

### Comment 4
<location> `framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/CloudEventEnvelopeHelper.cs:51` </location>
<code_context>
+        ReadOnlyMemory<byte> payload,
+        out CloudEventEnvelope? envelope)
+    {
+        envelope = TryParseEnvelope(eventSerializer, payload.ToArray());
+
+        if (envelope != null)
</code_context>

<issue_to_address>
**suggestion (performance):** Converting ReadOnlyMemory<byte> to a new byte[] for every call may be unnecessarily expensive.

`ExtractDataPayload` currently calls `payload.ToArray()` and passes that to `TryParseEnvelope`, allocating on every call. Since this helper may be used on hot paths, consider adding a `TryParseEnvelope` overload that takes `ReadOnlySpan<byte>`/`ReadOnlyMemory<byte>` (or updating the serializer to support span-based deserialization) so you can avoid the extra copy.

Suggested implementation:

```csharp
    /// <param name="eventSerializer">The event serializer to use.</param>
    /// <param name="payload">The raw payload bytes.</param>
    /// <param name="envelope">Output: the parsed envelope if successful, null otherwise.</param>
    /// <returns>The parsed envelope if successful, null otherwise.</returns>
    public static CloudEventEnvelope? TryParseEnvelope(
        IEventSerializer eventSerializer,
        ReadOnlyMemory<byte> payload)
    {
        // Fast path: payload is backed by a byte[] with no offset – reuse the array without copying.
        if (MemoryMarshal.TryGetArray(payload, out ArraySegment<byte> segment) &&
            segment.Array is not null &&
            segment.Offset == 0 &&
            segment.Count == segment.Array.Length)
        {
            return TryParseEnvelope(eventSerializer, segment.Array);
        }

        // Fallback: either we have an offset into the array or a non-array-backed memory;
        // copy into a new array so we can reuse the existing byte[]-based implementation.
        var buffer = payload.ToArray();
        return TryParseEnvelope(eventSerializer, buffer);
    }

    /// <param name="eventSerializer">The event serializer to use.</param>
    /// <param name="payload">The raw payload bytes.</param>
    /// <param name="envelope">Output: the parsed envelope if successful, null otherwise.</param>
    /// <returns>The data payload bytes (either from envelope.Data or original payload).</returns>
    public static ReadOnlyMemory<byte> ExtractDataPayload(
        IEventSerializer eventSerializer,
        ReadOnlyMemory<byte> payload,
        out CloudEventEnvelope? envelope)

```

```csharp
    {
        envelope = TryParseEnvelope(eventSerializer, payload);

        if (envelope != null)
        {
            var argsBytes = eventSerializer.Serialize(envelope.Data);
            return new ReadOnlyMemory<byte>(argsBytes);
        }

        return payload;
    }
}

```

1. Ensure `using System;` and `using System.Runtime.InteropServices;` are present at the top of `CloudEventEnvelopeHelper.cs` so that `ArraySegment<T>` and `MemoryMarshal` are available.
2. The new `TryParseEnvelope(IEventSerializer, ReadOnlyMemory<byte>)` overload assumes an existing `TryParseEnvelope(IEventSerializer, byte[])` method in the same class. If the name or signature differs, adjust the calls inside the new overload accordingly.
3. If the serializer can be updated to support `ReadOnlySpan<byte>` directly, you can further optimize the new overload by calling that span-based API instead of materializing a `byte[]` with `payload.ToArray()` in the fallback path.
</issue_to_address>

### Comment 5
<location> `framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobInvoker.cs:24-25` </location>
<code_context>
         // Resolve dependencies from DI
-        await using var scope = scopeFactory.CreateAsyncScope();
-        var handler = scope.ServiceProvider.GetRequiredService<IBackgroundJobHandler<TArgs>>();
+        // await using var scope = scopeFactory.CreateAsyncScope();
+        var handler = serviceProvider.GetRequiredService<IBackgroundJobHandler<TArgs>>();

         var args = eventSerializer.Deserialize<TArgs>(payload.Span);
</code_context>

<issue_to_address>
**suggestion (bug_risk):** The commented-out scope creation and direct use of IServiceProvider could be clarified to avoid lifetime/usage confusion.

This now resolves the handler directly from the passed IServiceProvider while leaving the scope creation commented out, which makes the intended lifetime unclear. Since callers seem to pass a scoped provider, this is likely correct, but to avoid confusion or accidental root-provider use, either remove the commented scope line and rename the parameter to `scopedServiceProvider`, or instead take an IServiceScopeFactory and create a scope here explicitly.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines 90 to +94
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId);
await HandleJobCancellationAsync(jobId, cancellationToken);
throw;
await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Cancelled,
"Job was cancelled", cancellationToken);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Cancellation/failure paths no longer rethrow, which changes the observable behavior for callers of DispatchAsync.

Previously, both cancellation and generic exceptions were rethrown after updating status, allowing callers to detect cancellations/failures via exceptions. Now both OperationCanceledException and other exceptions are swallowed after marking status, so DispatchAsync will appear to succeed even when the job was cancelled or failed, which can break consumers that rely on exception-based flow control or retries. If this behavior change is intended, consider at least rethrowing OperationCanceledException to preserve cancellation semantics, and evaluate whether other exceptions should also continue to propagate.

/// <param name="cancellationToken">Cancellation token</param>
/// <returns>A task representing the asynchronous operation</returns>
Task InvokeAsync(
IServiceScopeFactory scopeFactory,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: The parameter name scopeFactory no longer matches its type IServiceProvider, which can be confusing.

This mismatch makes the parameter easy to misinterpret (e.g., thinking it can create scopes). Please rename it to something like serviceProvider and update implementations accordingly so the name reflects the actual type and usage.

ReadOnlyMemory<byte> payload,
out CloudEventEnvelope? envelope)
{
envelope = TryParseEnvelope(eventSerializer, payload.ToArray());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): Converting ReadOnlyMemory to a new byte[] for every call may be unnecessarily expensive.

ExtractDataPayload currently calls payload.ToArray() and passes that to TryParseEnvelope, allocating on every call. Since this helper may be used on hot paths, consider adding a TryParseEnvelope overload that takes ReadOnlySpan<byte>/ReadOnlyMemory<byte> (or updating the serializer to support span-based deserialization) so you can avoid the extra copy.

Suggested implementation:

    /// <param name="eventSerializer">The event serializer to use.</param>
    /// <param name="payload">The raw payload bytes.</param>
    /// <param name="envelope">Output: the parsed envelope if successful, null otherwise.</param>
    /// <returns>The parsed envelope if successful, null otherwise.</returns>
    public static CloudEventEnvelope? TryParseEnvelope(
        IEventSerializer eventSerializer,
        ReadOnlyMemory<byte> payload)
    {
        // Fast path: payload is backed by a byte[] with no offset – reuse the array without copying.
        if (MemoryMarshal.TryGetArray(payload, out ArraySegment<byte> segment) &&
            segment.Array is not null &&
            segment.Offset == 0 &&
            segment.Count == segment.Array.Length)
        {
            return TryParseEnvelope(eventSerializer, segment.Array);
        }

        // Fallback: either we have an offset into the array or a non-array-backed memory;
        // copy into a new array so we can reuse the existing byte[]-based implementation.
        var buffer = payload.ToArray();
        return TryParseEnvelope(eventSerializer, buffer);
    }

    /// <param name="eventSerializer">The event serializer to use.</param>
    /// <param name="payload">The raw payload bytes.</param>
    /// <param name="envelope">Output: the parsed envelope if successful, null otherwise.</param>
    /// <returns>The data payload bytes (either from envelope.Data or original payload).</returns>
    public static ReadOnlyMemory<byte> ExtractDataPayload(
        IEventSerializer eventSerializer,
        ReadOnlyMemory<byte> payload,
        out CloudEventEnvelope? envelope)
    {
        envelope = TryParseEnvelope(eventSerializer, payload);

        if (envelope != null)
        {
            var argsBytes = eventSerializer.Serialize(envelope.Data);
            return new ReadOnlyMemory<byte>(argsBytes);
        }

        return payload;
    }
}
  1. Ensure using System; and using System.Runtime.InteropServices; are present at the top of CloudEventEnvelopeHelper.cs so that ArraySegment<T> and MemoryMarshal are available.
  2. The new TryParseEnvelope(IEventSerializer, ReadOnlyMemory<byte>) overload assumes an existing TryParseEnvelope(IEventSerializer, byte[]) method in the same class. If the name or signature differs, adjust the calls inside the new overload accordingly.
  3. If the serializer can be updated to support ReadOnlySpan<byte> directly, you can further optimize the new overload by calling that span-based API instead of materializing a byte[] with payload.ToArray() in the fallback path.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and well-structured refactoring of the background job processing logic. The changes enhance robustness by using separate unit-of-work scopes for status updates, improve multi-tenancy support with schema resolution from CloudEvents, and streamline dependency management by leveraging scoped service providers. The introduction of CloudEventEnvelopeHelper is a good step towards centralizing payload parsing. My review includes a critical point about a change in exception handling that could affect scheduler behavior, a suggestion for a performance improvement in payload processing, and a minor naming clarification in an interface.

Comment on lines 90 to 102
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId);
await HandleJobCancellationAsync(jobId, cancellationToken);
throw;
await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Cancelled,
"Job was cancelled", cancellationToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Handler '{HandlerName}' for job id '{JobId}' failed", handlerName, jobId);
await HandleJobFailureAsync(jobId, ex, cancellationToken);
throw;
var errorMessage = $"{ex.GetType().Name}: {ex.Message}".Truncate(4000);
await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Failed,
errorMessage, cancellationToken);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The try-catch block for handler execution now catches and logs exceptions without re-throwing them. This is a significant behavioral change from the previous implementation which re-threw exceptions. By swallowing the exception, the DispatchAsync method will complete successfully from the caller's perspective (e.g., DaprJobExecutionBridge), which may prevent job schedulers like Dapr from performing configured retries on job failure. This could lead to failed jobs being reported as successful to the scheduler. Consider re-throwing the exceptions after marking the job status to ensure failures are propagated to the caller.

        catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
        {
            logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId);
            await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Cancelled,
                "Job was cancelled", cancellationToken);
            throw;
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "Handler '{HandlerName}' for job id '{JobId}' failed", handlerName, jobId);
            var errorMessage = $"{ex.GetType().Name}: {ex.Message}".Truncate(4000);
            await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Failed,
                errorMessage, cancellationToken);
            throw;
        }

/// <returns>A task representing the asynchronous operation</returns>
Task InvokeAsync(
IServiceScopeFactory scopeFactory,
IServiceProvider scopeFactory,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The parameter name scopeFactory is misleading for a parameter of type IServiceProvider. It should be renamed to serviceProvider to accurately reflect its purpose. Additionally, the XML documentation for this parameter on line 19 is now incorrect and should be updated to describe an IServiceProvider.

        IServiceProvider serviceProvider,

Comment on lines +9 to +61
internal static class CloudEventEnvelopeHelper
{
/// <summary>
/// Attempts to parse the payload as a CloudEventEnvelope.
/// Returns null if the payload is not in envelope format (legacy format).
/// </summary>
/// <param name="eventSerializer">The event serializer to use for deserialization.</param>
/// <param name="payload">The raw payload bytes to parse.</param>
/// <returns>The parsed envelope or null if parsing fails or payload is not in envelope format.</returns>
public static CloudEventEnvelope? TryParseEnvelope(IEventSerializer eventSerializer, byte[] payload)
{
try
{
var envelope = eventSerializer.Deserialize<CloudEventEnvelope>(payload);

// Validate it's actually an envelope by checking required properties
if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Type))
{
return envelope;
}

return null;
}
catch
{
return null;
}
}

/// <summary>
/// Extracts the data payload from a CloudEventEnvelope, serializing it back to bytes.
/// If payload is not in envelope format, returns the original payload.
/// </summary>
/// <param name="eventSerializer">The event serializer to use.</param>
/// <param name="payload">The raw payload bytes.</param>
/// <param name="envelope">Output: the parsed envelope if successful, null otherwise.</param>
/// <returns>The data payload bytes (either from envelope.Data or original payload).</returns>
public static ReadOnlyMemory<byte> ExtractDataPayload(
IEventSerializer eventSerializer,
ReadOnlyMemory<byte> payload,
out CloudEventEnvelope? envelope)
{
envelope = TryParseEnvelope(eventSerializer, payload.ToArray());

if (envelope != null)
{
var argsBytes = eventSerializer.Serialize(envelope.Data);
return new ReadOnlyMemory<byte>(argsBytes);
}

return payload;
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation of ExtractDataPayload calls payload.ToArray(), which creates an unnecessary memory allocation and copy for every job. To improve performance, TryParseEnvelope can be updated to accept a ReadOnlySpan<byte>, and ExtractDataPayload can then pass payload.Span to it, avoiding the overhead. I've also updated the doc comments to reflect this change.

internal static class CloudEventEnvelopeHelper
{
    /// <summary>
    /// Attempts to parse the payload as a CloudEventEnvelope.
    /// Returns null if the payload is not in envelope format (legacy format).
    /// </summary>
    /// <param name="eventSerializer">The event serializer to use for deserialization.</param>
    /// <param name="payload">The raw payload span to parse.</param>
    /// <returns>The parsed envelope or null if parsing fails or payload is not in envelope format.</returns>
    public static CloudEventEnvelope? TryParseEnvelope(IEventSerializer eventSerializer, ReadOnlySpan<byte> payload)
    {
        try
        {
            var envelope = eventSerializer.Deserialize<CloudEventEnvelope>(payload);

            // Validate it's actually an envelope by checking required properties
            if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Type))
            {
                return envelope;
            }

            return null;
        }
        catch
        {
            return null;
        }
    }

    /// <summary>
    /// Extracts the data payload from a CloudEventEnvelope, serializing it back to bytes.
    /// If payload is not in envelope format, returns the original payload.
    /// </summary>
    /// <param name="eventSerializer">The event serializer to use.</param>
    /// <param name="payload">The raw payload bytes.</param>
    /// <param name="envelope">Output: the parsed envelope if successful, null otherwise.</param>
    /// <returns>The data payload bytes (either from envelope.Data or original payload).</returns>
    public static ReadOnlyMemory<byte> ExtractDataPayload(
        IEventSerializer eventSerializer,
        ReadOnlyMemory<byte> payload,
        out CloudEventEnvelope? envelope)
    {
        envelope = TryParseEnvelope(eventSerializer, payload.Span);

        if (envelope != null)
        {
            var argsBytes = eventSerializer.Serialize(envelope.Data);
            return new ReadOnlyMemory<byte>(argsBytes);
        }

        return payload;
    }
}

@sonarqubecloud
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants