feat: accept worker for custodial inbound transfers#268
Conversation
…tstrap Switch from CIP56TransferFactory (atomic, local-only) to DA's AllocationFactory from utility_registry_app_v0. This factory creates TransferOffer contracts on TransferFactory_Transfer, matching devnet behaviour where the receiver must exercise TransferInstruction_Accept to complete the transfer. Also bootstraps TransferRule and InstrumentConfiguration (with empty holderRequirements) which are required as choiceContextData when the receiver accepts the transfer offer. Closes #258
Adds the receiver-side accept context endpoint used by the two-step AllocationFactory transfer flow. The endpoint returns TransferRule and InstrumentConfiguration contract IDs as AV_ContractId AnyValues plus their createdEventBlobs as disclosedContracts, matching DA's registrar protocol. Also migrates the sender-side factory lookup from CIP56TransferFactory (cip56PkgID) to AllocationFactory (utility_registry_app_v0 package), and adds a shared fetchContractFromACS helper used by both paths. Closes #259
Adds receiver-side accept support to the Canton token SDK:
- types.go: AnyValue, AcceptChoiceContext, AcceptContextResponse
- encode.go: encodeAnyValue (full AV_* ADT encoder), encodeChoiceContextRecord
(builds ChoiceContext { values: TextMap AnyValue } for TransferInstruction_Accept)
- registry_client.go: GetAcceptChoiceContext (POST to registrar accept endpoint),
parseTemplateID (handles both "pkg:module:entity" string and object forms),
convertDisclosedContractSlice (sets TemplateId on proto, shared by both paths);
registryDisclosedContract.TemplateID changed to json.RawMessage
- config.go: UtilityRegistryAppPackageID for TransferOffer ACS queries
- client.go: FindPendingInboundTransferInstructions, AcceptTransferInstruction
(calls registrar, encodes AnyValue context, submits via SubmitAndWait)
- encode_test.go: unit tests for all AV_* variants and encodeChoiceContextRecord
- mocks: update both Token mocks with the two new interface methods
Closes #260
…adow, wrap long sig
…ng TransferOffer tracking - Generalise Stream[*ParsedEvent] → Stream[any] with type-switched batch processing - Fetcher now takes []TemplateID (was single) + any-typed decode function - NewMultiDecoder wraps TokenTransfer + TransferOffer decoders into single any decoder - Add PendingOffer type + IndexerPendingOffers table (migration #5) - InsertPendingOffer on CREATED, DeletePendingOffer on ARCHIVED event - GET /indexer/v1/admin/parties/{partyID}/pending-offers?after_offset=N endpoint - Add GetPendingOffersForParty to service, store, and HTTP client interfaces - All mocks regenerated to reflect interface changes Closes #264
… pagination - Replace DELETE on archive with status='ACCEPTED' UPDATE (historical audit trail) - Add OfferStatus type (PENDING/ACCEPTED) to types.go and store model - GetPendingOffersForParty now returns *Page[PendingOffer] matching all other list APIs - engine.Store: rename DeletePendingOffer → MarkOfferAccepted; remove ListPendingOffersForParty - service.Store: ListPendingOffersForParty uses Pagination (page/limit) not afterOffset - client.Client: GetPendingOffersForParty uses Pagination and returns *Page[PendingOffer] - Regenerate all affected mocks
- decoder: add logger to NewOfferDecoder; warn when receiver is empty on CREATED event - processor: add default case in type switch to log unknown item types instead of silently dropping - store: fix InsertPendingOffer to not mutate caller input (set status on DAO copy) - migration: use composite index (receiver_party_id, status) for ListPendingOffersForParty query - tests: add PendingOffer test cases (CREATED insert, ARCHIVED mark-accepted, mixed batch)
Codecov Report❌ Patch coverage is ❌ Your patch status has failed because the patch coverage (46.03%) is below the target coverage (50.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #268 +/- ##
=======================================
Coverage ? 31.96%
=======================================
Files ? 129
Lines ? 9485
Branches ? 0
=======================================
Hits ? 3032
Misses ? 6189
Partials ? 264
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Code Review
This pull request introduces a background worker, AcceptWorker, designed to automatically accept inbound USDCx TransferOffers for custodial parties by polling an indexer service. The implementation includes new configuration structures, integration into the API server startup, and comprehensive unit tests with mocks. Feedback focuses on improving responsiveness by performing an initial poll immediately upon startup and optimizing performance by filtering for custodial users at the database level rather than in memory.
- Adds GetAllPendingOffers to indexer (store/service/http/client) so the worker can fetch all PENDING offers in one paginated stream instead of one query per custodial user - Inverts loop: ListUsers once → build custodialParties map → paginate all offers → skip non-custodial receivers; O(1 DB + P pages) per cycle - Moves AcceptWorker from pkg/transfer/ to new pkg/custodial/ package
|
pkg/token/mocks/mock_canton_token.go looks like several copies of Token mock. just flagging, cleaning it up could be its own PR later, ill make issue |
Replace in-memory KeyMode filter with a DB-level WHERE clause. UserLister.ListUsers renamed to ListCustodialUsers; pgStore implements it with WHERE key_mode = 'custodial'.
Summary
AcceptWorkerthat polls the indexer for pending inboundTransferOffers and auto-accepts them on behalf of custodial partiesAcceptWorkerConfigtoAPIServerconfig; omitting the block disables the workerTest plan
TestAcceptWorker_SkipsExternalUsers— external-key users are skippedTestAcceptWorker_AcceptsSingleOffer— happy path, single offer acceptedTestAcceptWorker_LogsAndContinuesOnAcceptError— first offer fails, second still processedTestAcceptWorker_LogsAndContinuesOnIndexerError— indexer down, no panicTestAcceptWorker_StopsOnContextCancel— goroutine exits cleanly on cancelTestAcceptWorker_PaginatesAllOffers— 2-page scenario, both pages fetched and acceptedTestAcceptWorker_ListUsersError— DB error on ListUsers, no panic