Add automatic retries to data-sink-worker #1842
Merged
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Changes proposed ✍️
What
🤖 Generated by Copilot at 1ea4963
This pull request adds a feature to the data sink worker app and its related modules to retry and delay results that fail or take too long to process. It uses the
DataSinkWorkerEmitter
class from the@crowd/sqs
library to communicate with the data sink worker queue and theDataSinkRepository
class to update the result state and timestamps. It also adds new columns, types, and messages to support this feature.
🤖 Generated by Copilot at 1ea4963
Why
How
🤖 Generated by Copilot at 1ea4963
retries
anddelayedUntil
to theintegration.results
table inV1699626027__add-retries-to-results.sql
(link)retries
anddelayedUntil
properties to theIResultData
interface indataSink.data.ts
(link)delayedUntil
column to null when marking a result as success indataSink.repo.ts
(link)delayResult
andgetDelayedResults
to theDataSinkRepository
class indataSink.repo.ts
to update and select delayed results (link)retries
value and calculate thedelayedUntil
timestamp in thetriggerResultError
method of theDataSinkService
class indataSink.service.ts
(link)DELAYED
to theIntegrationResultState
enum inintegrations.ts
(link)CHECK_RESULTS
to theDataSinkWorkerQueueMessageType
enum and a new classCheckResultsQueueMessage
inindex.ts
(link, link)DataSinkWorkerEmitter
class from the@crowd/sqs
library (link, link, link, link, link, link, link, link, link, link, link, link, link, link, link, link, link, link, link, link, link, link, link, link, link)DataSinkWorkerEmitter
class inintegrationTickProcessor.ts
,serviceSQS.ts
,processOldResults.ts
,main.ts
,index.ts
,dataSink.service.ts
, anddataSinkWorker.ts
(link, link, link, link, link, link, link)getDataSinkWorkerEmitter
inserviceSQS.ts
to get an instance of theDataSinkWorkerEmitter
class (link)dataSinkWorkerEmitter
to theIntegrationTickProcessor
andWorkerQueueReceiver
classes inintegrationTickProcessor.ts
andindex.ts
(link, link)dataSinkWorkerEmitter
property in the constructors of theIntegrationTickProcessor
andWorkerQueueReceiver
classes using thegetDataSinkWorkerEmitter
function or the parameter (link, link)dataSinkWorkerEmitter
parameter or constant to theDataSinkService
constructor inintegrationTickProcessor.ts
,processOldResults.ts
, andindex.ts
(link, link, link)dataSinkWorkerEmitter
constant to theprocessOldResultsJob
function inmain.ts
(link)dataWorkerEmitter
that holds an instance of theDataSinkWorkerEmitter
class inmain.ts
(link)dataWorkerEmitter
constant to the list of parameters passed to theWorkerQueueReceiver
constructor inmain.ts
(link)init
method of thedataWorkerEmitter
constant inmain.ts
(link)checkResults
to theDataSinkWorkerEmitter
class indataSinkWorker.ts
to send a message to the data sink worker queue to check for delayed results (link)checkResults
method of thedataSinkWorkerEmitter
property inintegrationTickProcessor.ts
(link)CHECK_RESULTS
message type in thehandleQueueMessage
method of theWorkerQueueReceiver
class inindex.ts
to call thecheckResults
method of theDataSinkService
instance (link)CheckResultsQueueMessage
class indataSinkWorker.ts
(link)IResultData
interface, theaddSeconds
function, and theWORKER_SETTINGS
function indataSink.service.ts
(link)dataSinkWorkerEmitter
to theDataSinkService
class indataSink.service.ts
(link)resultId
parameter and argument of thetriggerResultError
method of theDataSinkService
class indataSink.service.ts
andindex.ts
toIResultData
(link, link)PlatformType
enum from the@crowd/types
library indataSink.repo.ts
to identify the platform of a result (link)Checklist ✅
Feature
,Improvement
, orBug
.