fix(background-jobs): normalize DateTime via EF Core ValueConverter and simplify job store API#56
Conversation
…rter and guard job re-dispatch by status Scope 1 — EF Core DateTime read normalization (closes #55 scope 1): - Add ClockDateTimeValueConverter and ClockDateTimeOffsetValueConverter, both backed by IClock.NormalizeToUtc, covering DateTime, DateTime?, DateTimeOffset, and DateTimeOffset? via EF Core null-lifting. - Inject optional IClock into AetherDbContext<TDbContext> primary constructor so the model-building pipeline can access the clock without breaking existing derived DbContextes. - Thread IClock through ConfigureByConvention → TryConfigureDateTimeUtc and apply the appropriate converter to every DateTime/DateTimeOffset property at model build time, ensuring consistent UTC Kind/Offset on both read and write paths for all entities. Scope 2 — DaprJobExecutionBridge status-aware dispatch guard (closes #55 scope 2): - Add IJobStore.GetByJobNameAsync(string, BackgroundJobStatus?, CancellationToken) overload; nulls preserves existing no-filter semantics. - Implement the overload in EfCoreJobStore with a conditional Where clause. - Update DaprJobExecutionBridge to query only Scheduled jobs; log a warning and return early when the job is not found in the expected state, preventing re-dispatch of already completed, failed, or cancelled jobs.
…ency - Remove overloaded and consolidate active-job filtering (Scheduled | Running) into the single method; introduce computed property on - Change to look up existing jobs by instead of uid=502(U0B006) gid=20(staff) groups=20(staff),501(awagent),12(everyone),61(localaccounts),79(_appserverusr),80(admin),81(_appserveradm),701(com.apple.sharepoint.group.1),702(com.apple.sharepoint.group.2),33(_appstore),98(_lpadmin),100(_lpoperator),204(_developer),250(_analyticsusers),395(com.apple.access_ftp),398(com.apple.access_screensharing),399(com.apple.access_ssh),400(com.apple.access_remote_ae), enabling upsert-by-name semantics - Add status to idempotency check in so previously-failed jobs are not reprocessed - Pass when scheduling Dapr jobs to allow rescheduling without conflicts - Comment out marker in to prevent automatic UoW wrapping on all application services
Reviewer's GuideNormalizes all EF Core DateTime/DateTimeOffset properties via clock-based value converters, adjusts the DbContext/model configuration to accept an IClock, simplifies and makes background job storage/idempotency semantics more robust (upsert-by-name, active-only lookups, skipping failed jobs), relaxes automatic UoW aspects, and hardens domain event and Dapr job handling/logging. Sequence diagram for job persistence upsert-by-name and Dapr scheduling overwritesequenceDiagram
actor Client
participant ApplicationService
participant IJobStore
participant EfCoreJobStore
participant AetherDbContext as DbContext
participant Database
participant DaprJobScheduler
participant DaprJobsClient
Client->>ApplicationService: Request to schedule job(jobName, handlerName, payload)
ApplicationService->>IJobStore: SaveAsync(jobInfo)
IJobStore->>EfCoreJobStore: SaveAsync(jobInfo)
EfCoreJobStore->>EfCoreJobStore: GetByJobNameAsync(jobInfo.JobName)
EfCoreJobStore->>DbContext: Query BackgroundJobs where JobName==jobInfo.JobName and Status in {Scheduled,Running}
DbContext->>Database: SELECT ...
Database-->>DbContext: Existing active job or null
DbContext-->>EfCoreJobStore: Materialized BackgroundJobInfo
alt Existing active job found
EfCoreJobStore->>EfCoreJobStore: Update existing job, set ModifiedAt=UtcNow
EfCoreJobStore->>DbContext: Apply current values
else No active job found
EfCoreJobStore->>EfCoreJobStore: Initialize new job, set Status=Scheduled
EfCoreJobStore->>DbContext: Add job entity
end
DbContext->>Database: SaveChanges (insert or update)
Database-->>DbContext: Acknowledged
DbContext-->>EfCoreJobStore: Save completed
EfCoreJobStore-->>IJobStore: SaveAsync completed
IJobStore-->>ApplicationService: Return
ApplicationService->>DaprJobScheduler: ScheduleJobAsync(jobName, schedule, payload)
DaprJobScheduler->>DaprJobsClient: ScheduleJobAsync(jobName, schedule, payload, overwrite=true)
DaprJobsClient-->>DaprJobScheduler: Scheduled or rescheduled
DaprJobScheduler-->>ApplicationService: Return
ApplicationService-->>Client: Job scheduled (upserted by name and Dapr job overwritten if existed)
Sequence diagram for domain event dispatch with outbox error handlingsequenceDiagram
participant DbContext
participant DomainEventDispatcher
participant EventBus
participant OutboxStore
participant Logger
DbContext->>DomainEventDispatcher: DispatchEventsAsync(eventEnvelopes)
loop For each eventEnvelope
DomainEventDispatcher->>DomainEventDispatcher: Extract metadata and event
alt useOutboxDirectly is true
DomainEventDispatcher->>Logger: LogDebug Writing domain event to outbox
DomainEventDispatcher->>EventBus: PublishAsync(event, metadata, subject, useOutbox=true)
EventBus->>OutboxStore: Persist outbox message within transaction
OutboxStore-->>EventBus: Persisted
EventBus-->>DomainEventDispatcher: Publish completed
DomainEventDispatcher->>Logger: LogDebug Successfully wrote domain event to outbox
else useOutboxDirectly is false
DomainEventDispatcher->>EventBus: PublishAsync(event, metadata, subject, useOutbox=false)
EventBus-->>DomainEventDispatcher: Publish completed
end
end
par On PublishAsync exception when useOutboxDirectly is true
EventBus-->>DomainEventDispatcher: throws Exception
DomainEventDispatcher->>Logger: LogError Failed to publish domain event to outbox, continue with remaining events
end
DomainEventDispatcher-->>DbContext: DispatchEventsAsync completed (remaining events processed despite failures)
Class diagram for background job store and idempotency changesclassDiagram
class BackgroundJobStatus {
<<enum>>
Scheduled
Running
Completed
Failed
Cancelled
}
class BackgroundJobInfo {
+Guid Id
+string HandlerName
+string JobName
+BackgroundJobStatus Status
+bool IsActive
+DateTime? CreationTime
+DateTime? ModifiedAt
+DateTime? HandledTime
+byte[] Payload
+string ExtraProperties
+BackgroundJobInfo(Guid id, string handlerName, string jobName)
}
class IJobStore {
<<interface>>
+Task SaveAsync(BackgroundJobInfo jobInfo, CancellationToken cancellationToken)
+Task<BackgroundJobInfo> GetAsync(Guid id, CancellationToken cancellationToken)
+Task<BackgroundJobInfo> GetByJobNameAsync(string jobName, CancellationToken cancellationToken)
+Task<IEnumerable<BackgroundJobInfo>> GetByHandlerNameAsync(string handlerName, CancellationToken cancellationToken)
+Task UpdateStatusAsync(Guid id, BackgroundJobStatus status, DateTime? handledTime, CancellationToken cancellationToken)
}
class EfCoreJobStore {
-AetherDbContext dbContext
+EfCoreJobStore(AetherDbContext dbContext)
+Task SaveAsync(BackgroundJobInfo jobInfo, CancellationToken cancellationToken)
+Task<BackgroundJobInfo> GetAsync(Guid id, CancellationToken cancellationToken)
+Task<BackgroundJobInfo> GetByJobNameAsync(string jobName, CancellationToken cancellationToken)
+Task<IEnumerable<BackgroundJobInfo>> GetByHandlerNameAsync(string handlerName, CancellationToken cancellationToken)
+Task UpdateStatusAsync(Guid id, BackgroundJobStatus status, DateTime? handledTime, CancellationToken cancellationToken)
}
class JobDispatcher {
-ILogger logger
+Task DispatchAsync(Guid jobId, string handlerName, CancellationToken cancellationToken)
-bool IsJobAlreadyProcessed(BackgroundJobInfo jobInfo, Guid jobId, string handlerName)
}
class DaprJobExecutionBridge {
-ILogger logger
-IJobStore jobStore
+Task ExecuteAsync(string jobName, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
}
IJobStore <|.. EfCoreJobStore
BackgroundJobInfo --> BackgroundJobStatus
DaprJobExecutionBridge ..> IJobStore : uses
JobDispatcher ..> IJobStore : uses
note for BackgroundJobInfo "IsActive returns true when Status is Scheduled or Running"
note for EfCoreJobStore "SaveAsync performs upsert by JobName via GetByJobNameAsync filtering active jobs"
note for JobDispatcher "IsJobAlreadyProcessed returns true for Completed, Cancelled, or Failed statuses"
Class diagram for EF Core clock-based DateTime normalizationclassDiagram
class IClock {
<<interface>>
+DateTime NormalizeToUtc(DateTime value)
+DateTimeOffset NormalizeToUtc(DateTimeOffset value)
}
class ClockDateTimeValueConverter {
+ClockDateTimeValueConverter(IClock clock)
}
class ClockDateTimeOffsetValueConverter {
+ClockDateTimeOffsetValueConverter(IClock clock)
}
class EntityTypeBuilder {
+IMutableEntityType Metadata
+PropertyBuilder Property(string name)
}
class AetherEntityTypeBuilderExtensions {
<<static>>
+void ConfigureByConvention(EntityTypeBuilder b, IClock clock)
+void TryConfigureDateTimeUtc(EntityTypeBuilder b, IClock clock)
}
class AetherDbContext~TDbContext~ {
-IClock clock
-IDomainEventSink eventSink
+AetherDbContext(DbContextOptions~TDbContext~ options, IDomainEventSink eventSink, IClock clock)
+Task<int> SaveChangesAsync(CancellationToken cancellationToken)
+void OnModelCreating(ModelBuilder modelBuilder)
+void ConfigureBaseProperties~TEntity~(ModelBuilder modelBuilder)
}
IClock <.. ClockDateTimeValueConverter : uses
IClock <.. ClockDateTimeOffsetValueConverter : uses
EntityTypeBuilder <.. AetherEntityTypeBuilderExtensions : extends
IClock <.. AetherEntityTypeBuilderExtensions : uses
AetherDbContext~TDbContext~ o--> IClock : holds
AetherDbContext~TDbContext~ ..> AetherEntityTypeBuilderExtensions : calls ConfigureByConvention
note for AetherEntityTypeBuilderExtensions "ConfigureByConvention applies TryConfigureDateTimeUtc and other conventions using the injected IClock"
note for ClockDateTimeValueConverter "Normalizes DateTime via IClock.NormalizeToUtc on read and write"
note for ClockDateTimeOffsetValueConverter "Normalizes DateTimeOffset via IClock.NormalizeToUtc on read and write"
note for AetherDbContext~TDbContext~ "ConfigureBaseProperties calls ConfigureByConvention(clock) so all DateTime and DateTimeOffset properties use UTC normalization"
File-Level Changes
Assessment against linked issues
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 22 minutes and 58 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughChanges introduce active-state filtering for background jobs, add clock-based DateTime normalization for EF Core entities, enhance job idempotency checks, and improve error handling in domain event dispatching. Job lookup now filters by active status; job scheduling enables overwrite mode; domain events include retry logic on publish failures. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- In
DomainEventDispatcher, the new catch block foruseOutboxDirectlylogs and then swallowsPublishAsyncfailures; consider whether these errors should propagate (or be aggregated) to avoid silently losing outbox messages when a write fails. - The new
ClockDateTimeValueConverterandClockDateTimeOffsetValueConvertertypes live under the...ValueComparersnamespace even though they are converters; renaming the namespace (or folder) toValueConverterswould better reflect their purpose and make discovery easier. - The XML summary on
IJobStore.GetByJobNameAsyncreads "Retrieves active background job information by the job name to active"; consider rephrasing to something like "Retrieves active background job information by job name" to avoid confusion.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `DomainEventDispatcher`, the new catch block for `useOutboxDirectly` logs and then swallows `PublishAsync` failures; consider whether these errors should propagate (or be aggregated) to avoid silently losing outbox messages when a write fails.
- The new `ClockDateTimeValueConverter` and `ClockDateTimeOffsetValueConverter` types live under the `...ValueComparers` namespace even though they are converters; renaming the namespace (or folder) to `ValueConverters` would better reflect their purpose and make discovery easier.
- The XML summary on `IJobStore.GetByJobNameAsync` reads "Retrieves active background job information by the job name to active"; consider rephrasing to something like "Retrieves active background job information by job name" to avoid confusion.
## Individual Comments
### Comment 1
<location path="framework/src/BBT.Aether.Domain/BBT/Aether/Domain/Repositories/IJobStore.cs" line_range="39-41" />
<code_context>
/// <summary>
- /// Retrieves background job information by the job name (external scheduler identifier).
+ /// Retrieves active background job information by the job name to active (external scheduler identifier).
/// </summary>
/// <param name="jobName">The unique job name used by the external scheduler (e.g., "send-email-order-123").</param>
</code_context>
<issue_to_address>
**nitpick (typo):** Doc comment has a small wording issue that may confuse readers.
The wording "by the job name to active" is confusing. Consider something like: "Retrieves active background job information by the job name (external scheduler identifier)." to clearly convey that this method filters to active jobs.
```suggestion
/// <summary>
/// Retrieves active background job information by the job name (external scheduler identifier).
/// </summary>
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Code Review
This pull request implements UTC normalization for date-time properties using EF Core value converters and an IClock abstraction. It also enhances background job management by refining idempotency checks, updating job retrieval logic to focus on active states, and transitioning to name-based lookups in the job store. Feedback highlights a potential primary key conflict during job updates in EfCoreJobStore, the presence of commented-out code in the aspect provider, a documentation typo in the job store interface, and concerns regarding the suppression of exceptions during domain event publication which could lead to data inconsistency.
| // Update existing job | ||
| jobInfo.ModifiedAt = DateTime.UtcNow; | ||
| jobInfo.ModifiedAt = DateTime.UtcNow; | ||
| _dbContext.Entry(existingJob).CurrentValues.SetValues(jobInfo); |
There was a problem hiding this comment.
Using SetValues(jobInfo) will attempt to copy the Id property from jobInfo to existingJob. Since existingJob is a tracked entity and Id is its primary key, EF Core will throw an exception if the IDs differ (which is likely when performing an upsert by name with a newly instantiated jobInfo). You should ensure the primary key is not modified during the update.
_dbContext.Entry(existingJob).CurrentValues.SetValues(jobInfo);
_dbContext.Entry(existingJob).Property(x => x.Id).IsModified = false;| private readonly static HashSet<string> MarkerInterfaceNames = new() | ||
| { | ||
| "BBT.Aether.Application.IApplicationService" | ||
| //"BBT.Aether.Application.IApplicationService" |
|
|
||
| /// <summary> | ||
| /// Retrieves background job information by the job name (external scheduler identifier). | ||
| /// Retrieves active background job information by the job name to active (external scheduler identifier). |
| catch (Exception ex) | ||
| { | ||
| logger.LogError(ex, | ||
| "Failed to publish domain event to outbox: {EventType}. Continuing with remaining events", | ||
| metadata.EventType.Name); | ||
| } |
There was a problem hiding this comment.
Swallowing all exceptions during domain event publication to the outbox can lead to silent data inconsistencies. If an event fails to be written to the outbox, it usually indicates a failure that should prevent the transaction from proceeding, or at least be handled more strictly than just logging and continuing with other events.
Up to standards ✅🟢 Issues
|
|
❌ The last analysis has failed. |
Summary
ClockDateTimeValueConverterandClockDateTimeOffsetValueConverterbacked byIClock.NormalizeToUtc, applied automatically to allDateTime/DateTimeOffsetproperties at model-build time viaConfigureByConvention.GetByJobNameAsync(name, status)and consolidated active-job filtering (Scheduled | Running) into the singleGetByJobNameAsync(name)method. IntroducedIsActivecomputed property onBackgroundJobInfo.SaveOrUpdateAsyncnow looks up existing jobs byJobNameinstead ofId, enabling correct upsert-by-name semantics.Failedstatus to the idempotency guard inJobDispatcherso previously-failed jobs are not reprocessed.overwrite: truewhen scheduling Dapr jobs to allow rescheduling without conflicts.IApplicationServicemarker inUnitOfWorkAspectProviderto prevent automatic UoW wrapping on all application services.Test plan
DateTime/DateTimeOffsetcolumns with correct UTC kind after normalizationFailedjobs are skipped on re-dispatchoverwrite: true)Made with Cursor
Summary by Sourcery
Normalize DateTime handling via clock-based EF Core converters and refine background job processing semantics, including upsert-by-name behavior, active-job filtering, and more robust dispatch and scheduling.
New Features:
Bug Fixes:
Enhancements:
Summary by CodeRabbit
New Features
IsActiveproperty to background job information.Bug Fixes
Documentation