Skip to content

Commit

Permalink
Adding submission backlog, check for held submission following this one
Browse files Browse the repository at this point in the history
  • Loading branch information
ktuite committed Jun 12, 2024
1 parent 8521f8d commit e82b5de
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 5 deletions.
30 changes: 30 additions & 0 deletions lib/model/migrations/20240607-02-add-submission-backlog.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2024 ODK Central Developers
// See the NOTICE file at the top-level directory of this distribution and at
// https://github.com/getodk/central-backend/blob/master/NOTICE.
// This file is part of ODK Central. It is subject to the license terms in
// the LICENSE file found in the top-level directory of this distribution and at
// https://www.apache.org/licenses/LICENSE-2.0. No part of ODK Central,
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.

const up = async (db) => {
await db.raw(`CREATE TABLE entity_submission_backlog (
"submissionId" INT4,
"submissionDefId" INT4,
"branchId" UUID,
"baseVersion" INT4,
"loggedAt" TIMESTAMPTZ(3),
CONSTRAINT fk_submission_defs
FOREIGN KEY("submissionDefId")
REFERENCES submission_defs(id)
ON DELETE CASCADE,
CONSTRAINT fk_submissions
FOREIGN KEY("submissionId")
REFERENCES submissions(id)
ON DELETE CASCADE
)`);
};

const down = (db) => db.raw('DROP TABLE entity_submission_backlog');

module.exports = { up, down };
32 changes: 28 additions & 4 deletions lib/model/query/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,15 @@ SELECT actions
FROM dataset_form_defs
WHERE "datasetId" = ${datasetId} AND "formDefId" = ${formDefId}`);

const _holdSubmission = (run, submissionId, submissionDefId, branchId, baseVersion) => run(sql`
INSERT INTO entity_submission_backlog ("submissionId", "submissionDefId", "branchId", "baseVersion", "loggedAt")
VALUES (${submissionId}, ${submissionDefId}, ${branchId}, ${baseVersion}, CLOCK_TIMESTAMP())
`);

const _checkHeldSubmission = (maybeOne, branchId, baseVersion) => maybeOne(sql`
SELECT * FROM entity_submission_backlog
WHERE "branchId"=${branchId} AND "baseVersion" = ${baseVersion}`);

const _createEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent) => async ({ Audits, Entities }) => {
// If dataset requires approval on submission to create an entity and this event is not
// an approval event, then don't create an entity
Expand All @@ -218,8 +227,10 @@ const _createEntity = (dataset, entityData, submissionId, submissionDef, submiss
});
};

const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event) => async ({ Audits, Entities, maybeOne }) => {
if (!(event.action === 'submission.create' || event.action === 'submission.update.version'))
const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event) => async ({ Audits, Entities, maybeOne, run }) => {
if (!(event.action === 'submission.create'
|| event.action === 'submission.update.version'
|| event.action === 'submission.reprocess'))
return null;

// Get client version of entity
Expand All @@ -246,6 +257,7 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss
const previousInBranch = (await _getDef(maybeOne, new QueryOptions({ condition })));
if (!previousInBranch.isDefined()) {
// not ready to process this submission. eventually hold it for later.
await _holdSubmission(run, submissionDef.submissionId, submissionDef.id, clientEntity.def.branchId, clientEntity.def.baseVersion);
return null;
} else {
baseVersion = previousInBranch.get().version;
Expand Down Expand Up @@ -309,7 +321,7 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss
};

// Entrypoint to where submissions (a specific version) become entities
const _processSubmissionEvent = (event, parentEvent) => async ({ Datasets, Entities, Submissions, Forms, oneFirst }) => {
const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Datasets, Entities, Submissions, Forms, maybeOne, oneFirst }) => {
const { submissionId, submissionDefId } = event.details;

const form = await Forms.getByActeeId(event.acteeId);
Expand Down Expand Up @@ -374,7 +386,19 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Datasets, Entit
}
}
else if (entityData.system.create === '1' || entityData.system.create === 'true')
return Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent);
await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent);

// Check for held submissions that follow this one in the same branch
if (entityData.system.branchId != null) {
// baseVersion could be '', meaning its a create
const currentBaseVersion = entityData.system.baseVersion === '' ? 1 : parseInt(entityData.system.baseVersion, 10);
const nextSub = await _checkHeldSubmission(maybeOne, entityData.system.branchId, currentBaseVersion + 1);
if (nextSub.isDefined()) {
const { submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId } = nextSub.get();
await Audits.log({ id: event.actorId }, 'submission.reprocess', { acteeId: event.acteeId },
{ submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId });
}
}

return null;
};
Expand Down
1 change: 1 addition & 0 deletions lib/worker/jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const jobs = {
'submission.update.version': [ require('./submission').submissionUpdateVersion, require('./entity').createOrUpdateEntityFromSubmission ],

'submission.update': [ require('./entity').createOrUpdateEntityFromSubmission ],
'submission.reprocess': [ require('./entity').createOrUpdateEntityFromSubmission ],

'form.create': [ require('./form').create ],
'form.update.draft.set': [ require('./form').updateDraftSet ],
Expand Down
7 changes: 6 additions & 1 deletion test/integration/api/offline-entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const testData = require('../../data/xml');
const { getOrNotFound } = require('../../../lib/util/promise');
const uuid = require('uuid').v4;
const should = require('should');
const { sql } = require('slonik');

const { exhaust } = require(appRoot + '/lib/worker/worker');

Expand Down Expand Up @@ -290,7 +291,7 @@ describe('Offline Entities', () => {
entity.aux.currentVersion.data.should.eql({ age: '22', first_name: 'Johnny' });
}));

it.skip('should apply later run received earlier', testOfflineEntities(async (service, container) => {
it('should apply later run received earlier', testOfflineEntities(async (service, container) => {
const asAlice = await service.login('alice');
const branchId = uuid();
const dataset = await container.Datasets.get(1, 'people', true).then(getOrNotFound);
Expand All @@ -303,6 +304,7 @@ describe('Offline Entities', () => {
.set('Content-Type', 'application/xml')
.expect(200);

// have two updates within the run
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.one
.replace('branchId=""', `branchId="${branchId}"`)
Expand All @@ -315,6 +317,9 @@ describe('Offline Entities', () => {

await exhaust(container);

const backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
backlogCount.should.equal(1);

await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.one
.replace('branchId=""', `branchId="${branchId}"`)
Expand Down

0 comments on commit e82b5de

Please sign in to comment.