Skip to content

Commit

Permalink
trigger worker after upload
Browse files Browse the repository at this point in the history
provide file details needed for merging to worker

prevent form submission if no files selected

WIP - File Type not being seeded; show File Type in Uploads history

corrected association; updated layout

validate field before next step

changed alert style

tested merging code

WIP - resume ability for failed uploads
  • Loading branch information
ri-pandey committed Nov 13, 2023
1 parent c8786fe commit 351e1a6
Show file tree
Hide file tree
Showing 20 changed files with 808 additions and 496 deletions.
23 changes: 22 additions & 1 deletion api/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"level": "debug"
},
"workflow_server": {
"base_url": "http://127.0.0.1:5000",
"base_url": "http://host.docker.internal:5000",
"auth_token": ""
},
"auth": {
Expand Down Expand Up @@ -82,12 +82,33 @@
"task": "delete_dataset"
}
]
},
"merge_files": {
"steps": [
{
"name": "merge_test",
"task": "create_dataset_files"
}
]
},
"create_files": {
"steps": [
{
"name": "merge_file_chunks",
"task": "create_dataset_files"
}
]
}
},
"dataset_types" : ["RAW_DATA", "DATA_PRODUCT", "UPLOAD"],
"upload_status": {
"PROCESSING": "PROCESSING",
"COMPLETE": "COMPLETE",
"FAILED": "FAILED"
},
"DONE_STATUSES": {
"REVOKED": "REVOKED",
"FAILURE": "FAILURE",
"SUCCESS": "SUCCESS"
}
}
13 changes: 13 additions & 0 deletions api/prisma/migrations/20231112200240_99_file_upload/migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- CreateTable
CREATE TABLE "file_upload" (
"id" SERIAL NOT NULL,
"name" TEXT NOT NULL,
"hash" TEXT NOT NULL,
"num_chunks" INTEGER,
"dataset_upload_id" INTEGER,

CONSTRAINT "file_upload_pkey" PRIMARY KEY ("id")
);

-- AddForeignKey
ALTER TABLE "file_upload" ADD CONSTRAINT "file_upload_dataset_upload_id_fkey" FOREIGN KEY ("dataset_upload_id") REFERENCES "dataset_upload"("id") ON DELETE SET NULL ON UPDATE CASCADE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*
Warnings:
- Added the required column `status` to the `file_upload` table without a default value. This is not possible if the table is not empty.
*/
-- AlterTable
ALTER TABLE "file_upload" ADD COLUMN "status" TEXT NOT NULL;
11 changes: 11 additions & 0 deletions api/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,22 @@ model dataset_upload {
status upload_status
dataset_id Int? @unique
dataset dataset? @relation(fields: [dataset_id], references: [id])
files file_upload[]
created_date DateTime @default(now()) @db.Timestamp(6)
user_id Int
user user @relation(fields: [user_id], references: [id])
}

model file_upload {
id Int @id @default(autoincrement())
name String
hash String
num_chunks Int?
status String
dataset_upload dataset_upload? @relation(fields: [dataset_upload_id], references: [id])
dataset_upload_id Int?
}

model dataset_file_type {
id Int @id @default(autoincrement())
name String
Expand Down
2 changes: 2 additions & 0 deletions api/prisma/seed.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ async function main() {
await prisma.dataset_file_type.deleteMany();
await prisma.dataset_file_type.createMany({ data: data.dataset_file_types });

// create datasets
const datasetPromises = data.datasets.map((dataset) => {
const { workflows, ...dataset_obj } = dataset;
if (workflows) {
Expand Down Expand Up @@ -301,6 +302,7 @@ async function main() {
const operator = operators[Math.floor(Math.random() * operators.length)];
return { ...e, user_id: operator.id };
});
await prisma.dataset_upload.deleteMany();
await prisma.dataset_upload.createMany({
data: uploads,
});
Expand Down
37 changes: 29 additions & 8 deletions api/prisma/seed_data/data.js
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,22 @@ const datasets = [
},
{
id: 11,
name: 'Uploaded_data_product_4',
type: 'DATA_PRODUCT',
name: 'Uploaded data product 1',
type: 'UPLOAD',
dataset_file_type_id: 1,
},
{
id: 12,
name: 'Uploaded data product 2',
type: 'UPLOAD',
dataset_file_type_id: 2,
},
{
id: 13,
name: 'Uploaded data product 3',
type: 'DATA_PRODUCT',
dataset_file_type_id: 3,
},
];

const dataset_heirarchical_association = [{
Expand All @@ -208,6 +220,15 @@ const dataset_heirarchical_association = [{
}, {
source_id: 2,
derived_id: 8,
}, {
source_id: 9,
derived_id: 11,
}, {
source_id: 10,
derived_id: 12,
}, {
source_id: 10,
derived_id: 13,
}];

const metrics = [{
Expand Down Expand Up @@ -340,16 +361,16 @@ const dataset_file_types = [{

const data_product_uploads = [{
status: 'PROCESSING',
dataset_id: undefined,
}, {
status: 'PROCESSING',
dataset_id: undefined,
dataset_id: 11,
user_id: 60,
}, {
status: 'FAILED',
dataset_id: undefined,
dataset_id: 12,
user_id: 61,
}, {
status: 'COMPLETE',
dataset_id: 11,
dataset_id: 13,
user_id: 61,
}];

module.exports = {
Expand Down
113 changes: 98 additions & 15 deletions api/src/routes/datasets.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const config = require('config');
// const logger = require('../services/logger');
const path = require('path');
const fs = require('fs');
const wfService = require('../services/workflow');
const asyncHandler = require('../middleware/asyncHandler');
const { accessControl, getPermission } = require('../middleware/auth');
const { validate } = require('../middleware/validators');
Expand All @@ -22,8 +23,8 @@ const authService = require('../services/auth');
const isPermittedTo = accessControl('datasets');

const router = express.Router();
// const prisma = new PrismaClient({ log: ['query', 'info', 'warn', 'error'] });
const prisma = new PrismaClient();

// stats - UI
router.get(
'/stats',
Expand Down Expand Up @@ -263,10 +264,20 @@ router.get(
'/data-product-uploads',
isPermittedTo('read'),
asyncHandler(async (req, res) => {
const uploads = await prisma.data_upload.findMany({
const uploads = await prisma.dataset_upload.findMany({
include: {
source_dataset: true,
dataset: {
include: {
source_datasets: {
include: {
source_dataset: true,
},
},
file_type: true,
},
},
user: true,
files: true,
},
});
res.json(uploads);
Expand Down Expand Up @@ -724,7 +735,7 @@ router.get(
const DATA_PRODUCT_UPLOAD_PATH = '/tmp/dataProductUploads';
const getDataProductUploadPath = (req) => path.join(
DATA_PRODUCT_UPLOAD_PATH,
req.body.dataProduct,
`dataProduct_${req.body.dataProductId}`,
req.body.hash,
);
const getDataProductFileChunkName = (req) => `${req.body.hash}-${req.body.index}`;
Expand All @@ -743,6 +754,16 @@ const uploadFileStorage = multer.diskStorage({
},
});

const mkdirsSync = (dirname) => {
if (fs.existsSync(dirname)) {
return true;
}
if (mkdirsSync(path.dirname(dirname))) {
fs.mkdirSync(dirname);
return true;
}
};

router.post(
'/file-chunk',
// isPermittedTo('uploadFileChunk'),
Expand All @@ -756,6 +777,10 @@ router.post(
// eslint-disable-next-line no-console
console.log('Processing file piece...', name, total, index, size, hash);

if (name === 'failed_file.pdf') {
throw new Error('will fail for failed_file');
}

// Create a folder based on the file hash and move the uploaded chunk under the current hash
// folder.
const chunksPath = path.join(getDataProductUploadPath(req), 'chunks');
Expand All @@ -773,7 +798,7 @@ router.post(
}),
);

// Post a Dataset's upload log and the Dataset itself to the database
// Post a Dataset's upload log and the Dataset to the database
router.post(
'/upload-log',
validate([
Expand All @@ -795,7 +820,7 @@ router.post(
},
};

await prisma.dataset.create({
const dataset = await prisma.dataset.create({
data: {
source_datasets: {
create: [{
Expand All @@ -815,19 +840,77 @@ router.post(
},
},
},
include: {
dataset_upload: true,
},
});

res.json(dataset);
}),
);

// Initiate the creation of Data Product's files - worker
router.post(
'/create-files',
isPermittedTo('update'),
asyncHandler(async (req, res, next) => {
const WORKFLOW_NAME = 'create_files';

const { data_product_id, dataset_upload_id, files_attrs } = req.body;

// create a deep copy of the config object because it is immutable
const wf_body = { ...config.workflow_registry[WORKFLOW_NAME] };

wf_body.name = WORKFLOW_NAME;
wf_body.app_id = config.app_id;
wf_body.steps = wf_body.steps.map((step) => ({
...step,
queue: step.queue || `${config.app_id}.q`,
}));
wf_body.args = [{
data_product_id,
dataset_upload_id,
files_attrs,
}];

await wfService.create(wf_body);
res.json('success');
}),
);
const mkdirsSync = (dirname) => {
if (fs.existsSync(dirname)) {
return true;
}
if (mkdirsSync(path.dirname(dirname))) {
fs.mkdirSync(dirname);
return true;
}
};

// Log the creation status of Data Product's files - worker
router.post(
'/file-upload-log',
isPermittedTo('update'),
asyncHandler(async (req, res, next) => {
const {
dataset_upload_id, file_id, file_name, num_chunks, file_hash, status,
} = req.body;

let file_upload;
if (!file_id) {
file_upload = await prisma.file_upload.create({
data: {
dataset_upload: { connect: { id: dataset_upload_id } },
name: file_name,
hash: file_hash,
num_chunks,
status,
},
});
} else {
file_upload = await prisma.file_upload.update({
where: {
id: file_id,
},
data: {
status,
},
});
}

res.json(file_upload);
}),
);

module.exports = router;
9 changes: 6 additions & 3 deletions api/src/services/dataset.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ const INCLUDE_AUDIT_LOGS = {
},
};

const DONE_STATUSES = ['REVOKED', 'FAILURE', 'SUCCESS'];

function get_wf_body(wf_name) {
assert(config.workflow_registry.has(wf_name), `${wf_name} workflow is not registered`);

Expand All @@ -67,13 +65,18 @@ function get_wf_body(wf_name) {
async function create_workflow(dataset, wf_name) {
const wf_body = get_wf_body(wf_name);

// console.log('WF_BODY');
// console.log(wf_body);
// check if a workflow with the same name is not already running / pending on this dataset
const active_wfs_with_same_name = dataset.workflows
.filter((_wf) => _wf.name === wf_body.name)
.filter((_wf) => !DONE_STATUSES.includes(_wf.status));
.filter((_wf) => !Object.values(config.DONE_STATUSES).includes(_wf.status));

// console.log('active_wfs_with_same_name');
// console.log(active_wfs_with_same_name);
assert(active_wfs_with_same_name.length === 0, 'A workflow with the same name is either pending / running');

// console.log('creating POST request');
// create the workflow
const wf = (await wfService.create({
...wf_body,
Expand Down
2 changes: 2 additions & 0 deletions api/src/services/workflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ function resume(id) {
}

function create(wf) {
// console.log('WF');
// console.log(wf);
return wfApi.post('/workflows', wf);
}

Expand Down
Loading

0 comments on commit 351e1a6

Please sign in to comment.