From 230277ef7c8c0b55baaa630b18fdc163eda7b7d3 Mon Sep 17 00:00:00 2001 From: Kamil Listopad Date: Mon, 23 Mar 2026 14:35:50 +0100 Subject: [PATCH 1/5] fix: prevent concurrent publish from producing empty files Two concurrent `mops publish` sessions for the same package@version could interleave uploads on the storage canister, causing some files to be stored with empty chunk data. This happened with core@2.3.0 where 3 files ended up empty (src/Queue.mo, src/Result.mo, src/Runtime.mo). Root cause: the storage canister's `startUpload` overwrites active uploads (via `put`), so a second session's `startUpload` would reset chunks that the first session had already filled. When the first session then called `finishUploads`, it committed the now-empty chunks. Fix (defense in depth): - Storage canister: validate all chunks are non-empty in `finishUploads` before committing to permanent storage - CLI parallel.ts: propagate errors from concurrent upload tasks instead of silently swallowing rejections Made-with: Cursor --- backend/storage/storage-canister.mo | 8 +- cli/parallel.ts | 20 ++- test/storage-actor.test.mo | 7 + test/storage-publish-bug.test.mo | 224 ++++++++++++++++++++++++++++ test/storage.test.mo | 7 + 5 files changed, 259 insertions(+), 7 deletions(-) create mode 100644 test/storage-publish-bug.test.mo diff --git a/backend/storage/storage-canister.mo b/backend/storage/storage-canister.mo index 39ad241f..7ec4b36c 100644 --- a/backend/storage/storage-canister.mo +++ b/backend/storage/storage-canister.mo @@ -94,8 +94,12 @@ shared ({ caller = parent }) persistent actor class Storage() { if (Option.isNull(activeUploadsMeta.get(fileId))) { return #err("File '" # fileId # "' is not uploading"); }; - if (Option.isNull(activeUploadsChunks.get(fileId))) { - return #err("File '" # fileId # "' is not uploading"); + let ?chunks = activeUploadsChunks.get(fileId) else return #err("File '" # fileId # "' is not uploading"); + + for (i in chunks.keys()) { + if (chunks[i].size() == 0) { + return #err("File '" # fileId # "' has empty chunk at index " # Nat.toText(i)); + }; }; }; diff --git a/cli/parallel.ts b/cli/parallel.ts index 265cfaf2..09c20b49 100644 --- a/cli/parallel.ts +++ b/cli/parallel.ts @@ -3,11 +3,15 @@ export async function parallel( items: T[], fn: (item: T) => Promise, ) { - return new Promise((resolve) => { + return new Promise((resolve, reject) => { let busyThreads = 0; + let failed = false; items = items.slice(); let loop = () => { + if (failed) { + return; + } if (!items.length) { if (busyThreads === 0) { resolve(); @@ -18,10 +22,16 @@ export async function parallel( return; } busyThreads++; - fn(items.shift() as T).then(() => { - busyThreads--; - loop(); - }); + fn(items.shift() as T).then( + () => { + busyThreads--; + loop(); + }, + (err) => { + failed = true; + reject(err); + }, + ); loop(); }; loop(); diff --git a/test/storage-actor.test.mo b/test/storage-actor.test.mo index cedc279d..a749fef2 100644 --- a/test/storage-actor.test.mo +++ b/test/storage-actor.test.mo @@ -41,6 +41,13 @@ actor { }, ); + await test( + "upload chunk", + func() : async () { + assert Result.isOk(await storage.uploadChunk(fileId, 0, Blob.fromArray([1, 2, 3]))); + }, + ); + await test( "try to finish upload with unknown file id", func() : async () { diff --git a/test/storage-publish-bug.test.mo b/test/storage-publish-bug.test.mo new file mode 100644 index 00000000..9e4d8ba7 --- /dev/null +++ b/test/storage-publish-bug.test.mo @@ -0,0 +1,224 @@ +import Result "mo:base/Result"; +import Blob "mo:base/Blob"; +import { test; suite } "mo:test/async"; + +import Storage "../backend/storage/storage-canister"; + +var storage = await Storage.Storage(); + +// PRECONDITION: startUpload is idempotent — resets active upload (prevents stale data) +await suite( + "FIX: startUpload resets active upload, finishUploads catches empty chunks", + func() : async () { + let fileId = "core@2.3.0/src/Runtime.mo"; + let realData = Blob.fromArray([1, 2, 3, 4, 5, 6, 7, 8]); + + await test( + "start upload and fill chunk", + func() : async () { + assert Result.isOk(await storage.startUpload({ + id = fileId; + path = "src/Runtime.mo"; + chunkCount = 1; + owners = []; + })); + assert Result.isOk(await storage.uploadChunk(fileId, 0, realData)); + }, + ); + + await test( + "second startUpload resets the active upload (idempotent)", + func() : async () { + assert Result.isOk(await storage.startUpload({ + id = fileId; + path = "src/Runtime.mo"; + chunkCount = 1; + owners = []; + })); + }, + ); + + await test( + "finishUploads rejects because reset cleared the chunk data", + func() : async () { + assert Result.isErr(await storage.finishUploads([fileId])); + }, + ); + }, +); + +// PRECONDITION: idempotent startUpload allows a clean retry to succeed +await suite( + "FIX: retry after reset succeeds with fresh data", + func() : async () { + storage := await Storage.Storage(); + let fileId = "core@2.3.0/src/Runtime.mo"; + let staleData = Blob.fromArray([1, 2, 3]); + let freshData = Blob.fromArray([10, 20, 30, 40, 50]); + + await test( + "stale session: start and upload", + func() : async () { + assert Result.isOk(await storage.startUpload({ + id = fileId; + path = "src/Runtime.mo"; + chunkCount = 1; + owners = []; + })); + assert Result.isOk(await storage.uploadChunk(fileId, 0, staleData)); + }, + ); + + await test( + "retry session: startUpload resets, then upload fresh data", + func() : async () { + assert Result.isOk(await storage.startUpload({ + id = fileId; + path = "src/Runtime.mo"; + chunkCount = 1; + owners = []; + })); + assert Result.isOk(await storage.uploadChunk(fileId, 0, freshData)); + }, + ); + + await test( + "finishUploads succeeds with fresh data", + func() : async () { + assert Result.isOk(await storage.finishUploads([fileId])); + }, + ); + + await test( + "downloaded file has the fresh data", + func() : async () { + let chunkRes = await storage.downloadChunk(fileId, 0); + switch (chunkRes) { + case (#ok(chunk)) { + assert chunk == freshData; + }; + case (#err(_)) { + assert false; + }; + }; + }, + ); + }, +); + +// FIX VERIFICATION: finishUploads rejects files with empty chunks +await suite( + "FIX: finishUploads rejects empty chunks", + func() : async () { + storage := await Storage.Storage(); + let fileId = "pkg@1.0.0/src/Lib.mo"; + + await test( + "start upload with chunkCount=2 but upload only chunk 0", + func() : async () { + assert Result.isOk(await storage.startUpload({ + id = fileId; + path = "src/Lib.mo"; + chunkCount = 2; + owners = []; + })); + assert Result.isOk(await storage.uploadChunk(fileId, 0, Blob.fromArray([10, 20, 30]))); + }, + ); + + await test( + "finishUploads is rejected because chunk 1 was never uploaded", + func() : async () { + let res = await storage.finishUploads([fileId]); + assert Result.isErr(res); + }, + ); + }, +); + +// REGRESSION: normal upload flow still works +await suite( + "REGRESSION: normal upload flow", + func() : async () { + storage := await Storage.Storage(); + let fileId = "pkg@2.0.0/src/Main.mo"; + let data1 = Blob.fromArray([10, 20, 30, 40, 50]); + let data2 = Blob.fromArray([60, 70, 80]); + + await test( + "upload file with 2 chunks", + func() : async () { + assert Result.isOk(await storage.startUpload({ + id = fileId; + path = "src/Main.mo"; + chunkCount = 2; + owners = []; + })); + assert Result.isOk(await storage.uploadChunk(fileId, 0, data1)); + assert Result.isOk(await storage.uploadChunk(fileId, 1, data2)); + }, + ); + + await test( + "finishUploads succeeds when all chunks are present", + func() : async () { + assert Result.isOk(await storage.finishUploads([fileId])); + }, + ); + + await test( + "downloaded chunks have correct data", + func() : async () { + let c0 = await storage.downloadChunk(fileId, 0); + switch (c0) { + case (#ok(chunk)) { assert chunk == data1; }; + case (#err(_)) { assert false; }; + }; + let c1 = await storage.downloadChunk(fileId, 1); + switch (c1) { + case (#ok(chunk)) { assert chunk == data2; }; + case (#err(_)) { assert false; }; + }; + }, + ); + }, +); + +// REGRESSION: empty files (chunkCount=0) still work +await suite( + "REGRESSION: empty files (chunkCount=0) are allowed", + func() : async () { + storage := await Storage.Storage(); + let fileId = "pkg@3.0.0/src/Empty.mo"; + + await test( + "start upload with chunkCount=0", + func() : async () { + assert Result.isOk(await storage.startUpload({ + id = fileId; + path = "src/Empty.mo"; + chunkCount = 0; + owners = []; + })); + }, + ); + + await test( + "finishUploads succeeds for empty files", + func() : async () { + assert Result.isOk(await storage.finishUploads([fileId])); + }, + ); + + await test( + "file meta reports 0 chunks", + func() : async () { + let res = await storage.getFileMeta(fileId); + switch (res) { + case (#ok(meta)) { assert meta.chunkCount == 0; }; + case (#err(_)) { assert false; }; + }; + }, + ); + }, +); diff --git a/test/storage.test.mo b/test/storage.test.mo index d7e5d2a5..a2850968 100644 --- a/test/storage.test.mo +++ b/test/storage.test.mo @@ -33,6 +33,13 @@ await suite( }, ); + await test( + "upload chunk", + func() : async () { + assert Result.isOk(await storage.uploadChunk(fileId, 0, Blob.fromArray([1, 2, 3]))); + }, + ); + await test( "try to finish upload with unknown file id", func() : async () { From 165a1181f2fe3e573f594ab5854b85140f1596be Mon Sep 17 00:00:00 2001 From: Kamil Listopad Date: Mon, 23 Mar 2026 14:49:51 +0100 Subject: [PATCH 2/5] style: format storage-publish-bug test (prettier) Made-with: Cursor --- test/storage-publish-bug.test.mo | 61 +++++++------------------------- 1 file changed, 13 insertions(+), 48 deletions(-) diff --git a/test/storage-publish-bug.test.mo b/test/storage-publish-bug.test.mo index 9e4d8ba7..8e4991a0 100644 --- a/test/storage-publish-bug.test.mo +++ b/test/storage-publish-bug.test.mo @@ -16,12 +16,7 @@ await suite( await test( "start upload and fill chunk", func() : async () { - assert Result.isOk(await storage.startUpload({ - id = fileId; - path = "src/Runtime.mo"; - chunkCount = 1; - owners = []; - })); + assert Result.isOk(await storage.startUpload({ id = fileId; path = "src/Runtime.mo"; chunkCount = 1; owners = [] })); assert Result.isOk(await storage.uploadChunk(fileId, 0, realData)); }, ); @@ -29,12 +24,7 @@ await suite( await test( "second startUpload resets the active upload (idempotent)", func() : async () { - assert Result.isOk(await storage.startUpload({ - id = fileId; - path = "src/Runtime.mo"; - chunkCount = 1; - owners = []; - })); + assert Result.isOk(await storage.startUpload({ id = fileId; path = "src/Runtime.mo"; chunkCount = 1; owners = [] })); }, ); @@ -59,12 +49,7 @@ await suite( await test( "stale session: start and upload", func() : async () { - assert Result.isOk(await storage.startUpload({ - id = fileId; - path = "src/Runtime.mo"; - chunkCount = 1; - owners = []; - })); + assert Result.isOk(await storage.startUpload({ id = fileId; path = "src/Runtime.mo"; chunkCount = 1; owners = [] })); assert Result.isOk(await storage.uploadChunk(fileId, 0, staleData)); }, ); @@ -72,12 +57,7 @@ await suite( await test( "retry session: startUpload resets, then upload fresh data", func() : async () { - assert Result.isOk(await storage.startUpload({ - id = fileId; - path = "src/Runtime.mo"; - chunkCount = 1; - owners = []; - })); + assert Result.isOk(await storage.startUpload({ id = fileId; path = "src/Runtime.mo"; chunkCount = 1; owners = [] })); assert Result.isOk(await storage.uploadChunk(fileId, 0, freshData)); }, ); @@ -116,12 +96,7 @@ await suite( await test( "start upload with chunkCount=2 but upload only chunk 0", func() : async () { - assert Result.isOk(await storage.startUpload({ - id = fileId; - path = "src/Lib.mo"; - chunkCount = 2; - owners = []; - })); + assert Result.isOk(await storage.startUpload({ id = fileId; path = "src/Lib.mo"; chunkCount = 2; owners = [] })); assert Result.isOk(await storage.uploadChunk(fileId, 0, Blob.fromArray([10, 20, 30]))); }, ); @@ -148,12 +123,7 @@ await suite( await test( "upload file with 2 chunks", func() : async () { - assert Result.isOk(await storage.startUpload({ - id = fileId; - path = "src/Main.mo"; - chunkCount = 2; - owners = []; - })); + assert Result.isOk(await storage.startUpload({ id = fileId; path = "src/Main.mo"; chunkCount = 2; owners = [] })); assert Result.isOk(await storage.uploadChunk(fileId, 0, data1)); assert Result.isOk(await storage.uploadChunk(fileId, 1, data2)); }, @@ -171,13 +141,13 @@ await suite( func() : async () { let c0 = await storage.downloadChunk(fileId, 0); switch (c0) { - case (#ok(chunk)) { assert chunk == data1; }; - case (#err(_)) { assert false; }; + case (#ok(chunk)) { assert chunk == data1 }; + case (#err(_)) { assert false }; }; let c1 = await storage.downloadChunk(fileId, 1); switch (c1) { - case (#ok(chunk)) { assert chunk == data2; }; - case (#err(_)) { assert false; }; + case (#ok(chunk)) { assert chunk == data2 }; + case (#err(_)) { assert false }; }; }, ); @@ -194,12 +164,7 @@ await suite( await test( "start upload with chunkCount=0", func() : async () { - assert Result.isOk(await storage.startUpload({ - id = fileId; - path = "src/Empty.mo"; - chunkCount = 0; - owners = []; - })); + assert Result.isOk(await storage.startUpload({ id = fileId; path = "src/Empty.mo"; chunkCount = 0; owners = [] })); }, ); @@ -215,8 +180,8 @@ await suite( func() : async () { let res = await storage.getFileMeta(fileId); switch (res) { - case (#ok(meta)) { assert meta.chunkCount == 0; }; - case (#err(_)) { assert false; }; + case (#ok(meta)) { assert meta.chunkCount == 0 }; + case (#err(_)) { assert false }; }; }, ); From 54890fa56c06403cb4556269ee75768420c3cde6 Mon Sep 17 00:00:00 2001 From: Kamil Listopad Date: Mon, 23 Mar 2026 14:52:34 +0100 Subject: [PATCH 3/5] fix(cli): decrement busyThreads on parallel task failure Also document the concurrent upload error propagation fix in CHANGELOG. Made-with: Cursor --- cli/CHANGELOG.md | 1 + cli/parallel.ts | 1 + 2 files changed, 2 insertions(+) diff --git a/cli/CHANGELOG.md b/cli/CHANGELOG.md index 18f02028..08639d37 100644 --- a/cli/CHANGELOG.md +++ b/cli/CHANGELOG.md @@ -2,6 +2,7 @@ ## Next - Support `MOPS_REGISTRY_HOST` and `MOPS_REGISTRY_CANISTER_ID` environment variables for custom registry endpoints +- Fix `parallel()` swallowing errors from concurrent tasks (e.g. `mops publish` uploads), which could hang or leave failures unreported ## 2.4.0 - Support `[build].outputDir` config in `mops.toml` for custom build output directory diff --git a/cli/parallel.ts b/cli/parallel.ts index 09c20b49..a3c08d97 100644 --- a/cli/parallel.ts +++ b/cli/parallel.ts @@ -28,6 +28,7 @@ export async function parallel( loop(); }, (err) => { + busyThreads--; failed = true; reject(err); }, From ea2b5fd7ede20d1dd811db2a17129e5db4ff5b74 Mon Sep 17 00:00:00 2001 From: Kamil Listopad Date: Mon, 23 Mar 2026 15:04:46 +0100 Subject: [PATCH 4/5] ci: raise Jest timeout for build integration tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The build suite runs multiple dfx/pocket-ic builds per test; the default 60s limit was flaky on the Node 20 CI matrix (build › ok timed out). Made-with: Cursor --- cli/tests/build.test.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cli/tests/build.test.ts b/cli/tests/build.test.ts index b2e884a0..69f13ada 100644 --- a/cli/tests/build.test.ts +++ b/cli/tests/build.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, test } from "@jest/globals"; +import { describe, expect, jest, test } from "@jest/globals"; import { execa } from "execa"; import { existsSync, rmSync } from "node:fs"; import path from "path"; @@ -14,6 +14,9 @@ function cleanFixture(cwd: string, ...extras: string[]) { } describe("build", () => { + // Several dfx/pocket-ic builds per test; slow CI (e.g. node 20 matrix) can exceed 60s default. + jest.setTimeout(120_000); + test("ok", async () => { const cwd = path.join(import.meta.dirname, "build/success"); try { From 8f995eb760dce0701afc11ae0fc03f65dfa0fe9e Mon Sep 17 00:00:00 2001 From: Kamil Listopad Date: Mon, 23 Mar 2026 15:05:38 +0100 Subject: [PATCH 5/5] test: align storage-publish-bug suite titles with PRECONDITION comments Made-with: Cursor --- test/storage-publish-bug.test.mo | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/storage-publish-bug.test.mo b/test/storage-publish-bug.test.mo index 8e4991a0..4af7f9b3 100644 --- a/test/storage-publish-bug.test.mo +++ b/test/storage-publish-bug.test.mo @@ -8,7 +8,7 @@ var storage = await Storage.Storage(); // PRECONDITION: startUpload is idempotent — resets active upload (prevents stale data) await suite( - "FIX: startUpload resets active upload, finishUploads catches empty chunks", + "PRECONDITION: startUpload reset + finishUploads rejects empty chunks", func() : async () { let fileId = "core@2.3.0/src/Runtime.mo"; let realData = Blob.fromArray([1, 2, 3, 4, 5, 6, 7, 8]); @@ -39,7 +39,7 @@ await suite( // PRECONDITION: idempotent startUpload allows a clean retry to succeed await suite( - "FIX: retry after reset succeeds with fresh data", + "PRECONDITION: retry after reset succeeds with fresh data", func() : async () { storage := await Storage.Storage(); let fileId = "core@2.3.0/src/Runtime.mo";