Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions nx/blocks/media-library/core/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ export const IndexConfig = Object.freeze({
DISCOVERY_MAX_PATHS_PER_JOB: 250,
/* Larger batch to minimize UI update overhead - updates every ~100 seconds */
USAGE_MAP_PROGRESSIVE_BATCH_SIZE: 1000,
/* Index chunking configuration */
MEDIA_INDEX_CHUNK_SIZE: 20_000, /* Entries per chunk (~15-20MB per chunk) */
LOCK_HEARTBEAT_INTERVAL_MS: 60_000,
LOCK_STALE_THRESHOLD_MS: 10 * 60_000,
BUILD_MAX_DURATION_MS: 30 * 60 * 1000,
Expand Down
67 changes: 0 additions & 67 deletions nx/blocks/media-library/indexing/admin-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,6 @@ function getChunkFileName(chunkNum) {
return `${IndexFiles.MEDIA_INDEX_CHUNK_PREFIX}${String(chunkNum).padStart(3, '0')}.json`;
}

/**
* Split media sheet into chunks
* @param {Array} mediaData - Full media sheet data
* @param {number} chunkSize - Entries per chunk
* @returns {Array<Array>} Array of chunks
*/
function chunkMediaSheet(mediaData, chunkSize) {
const chunks = [];
for (let i = 0; i < mediaData.length; i += chunkSize) {
chunks.push(mediaData.slice(i, i + chunkSize));
}
return chunks;
}

const DEFAULT_TIMEFRAME_DAYS = 3650; /* 10 years */

export async function fetchWithAuth(url, opts = {}) {
Expand Down Expand Up @@ -321,59 +307,6 @@ export async function loadIndexChunks(basePath, chunkCount, sheetName, onProgres
return results.map((r) => r.data).flat();
}

/**
* Save index as chunks
* @param {string} basePath - Base path without filename
* @param {Array} mediaData - Media sheet data (must be pre-sorted)
* @param {Array} usageData - Usage sheet data
* @param {number} chunkSize - Entries per chunk
* @returns {Promise<number>} Number of chunks created
*/
export async function saveIndexChunks(basePath, mediaData, usageData, chunkSize) {
const mediaChunks = chunkMediaSheet(mediaData, chunkSize);

// Always save at least chunk 0, even if empty (for consistency)
const chunksToSave = mediaChunks.length > 0 ? mediaChunks : [[]];
const savePromises = [];

for (let i = 0; i < chunksToSave.length; i += 1) {
const chunkFileName = getChunkFileName(i);
const chunkPath = `${basePath}/${chunkFileName}`;

// Only include usage sheet in first chunk to avoid duplication
const sheets = {
media: chunksToSave[i],
usage: i === 0 ? usageData : [],
};

const formData = await createMultiSheet(sheets);
const savePromise = daFetch(`${DA_ORIGIN}/source${chunkPath}`, {
method: 'PUT',
body: formData,
});

savePromises.push(savePromise);
}

const responses = await Promise.all(savePromises);

// Validate all chunks saved successfully
const failedChunks = [];
responses.forEach((resp, i) => {
if (!resp.ok) {
failedChunks.push({ chunk: i, status: resp.status });
}
});

if (failedChunks.length > 0) {
const error = new Error(`Failed to save ${failedChunks.length}/${chunksToSave.length} chunks: ${failedChunks.map((f) => `chunk ${f.chunk} (${f.status})`).join(', ')}`);
error.failedChunks = failedChunks;
throw error;
}

return chunksToSave.length;
}

export async function saveSheet(data, path) {
const formData = await createSheet(data);
return daFetch(`${DA_ORIGIN}/source${path}`, {
Expand Down
39 changes: 24 additions & 15 deletions nx/blocks/media-library/indexing/medialog.js
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,27 @@ export function processPageMediaUpdates(
const pageData = pageMediaMap.get(normalizedPath);
const newEntries = pageData ? pageData.entries : [];

// Filter oldHashes to only include medialog-sourced entries for accurate comparison
// External media (extlinks-parsed, markdown-parsed) should not be compared against medialog
// because they're never in medialog - they come from markdown parsing
const oldMedialogHashes = new Set();
oldHashes.forEach((hash) => {
const entry = updatedIndex.find((e) => e.hash === hash && e.doc === normalizedPath);
if (entry) {
const op = entry.operation || entry.source;
const isFromMedialog = op !== 'extlinks-parsed' && op !== 'markdown-parsed' && op !== 'auditlog-parsed';
if (isFromMedialog) {
oldMedialogHashes.add(hash);
}
}
});

onLog(`--- Page: ${normalizedPath} ---`);
onLog(` Old (bypage): ${oldHashes.size}, New (page-based): ${newEntries.length}`);
onLog(` Old (bypage): ${oldMedialogHashes.size}, New (page-based): ${newEntries.length}`);

const newHashes = new Set(newEntries.map((e) => e.hash));
const toRemove = [...oldHashes].filter((h) => !newHashes.has(h));
const toAdd = [...newHashes].filter((h) => !oldHashes.has(h));
const toRemove = [...oldMedialogHashes].filter((h) => !newHashes.has(h));
const toAdd = [...newHashes].filter((h) => !oldMedialogHashes.has(h));

if (toRemove.length || toAdd.length) {
onLog(` Diff: remove ${toRemove.length}, add ${toAdd.length}`);
Expand All @@ -329,18 +344,12 @@ export function processPageMediaUpdates(
toRemove.forEach((hash) => {
const oldEntry = updatedIndex.find((e) => e.hash === hash && e.doc === normalizedPath);
if (oldEntry) {
// Don't remove external media (extlinks-parsed/markdown-parsed) or auditlog-parsed entries
// They come from markdown parsing, not medialog, so they're handled by processLinkedContent
const op = oldEntry.operation || oldEntry.source;
const isFromMarkdown = op === 'extlinks-parsed' || op === 'markdown-parsed' || op === 'auditlog-parsed';
if (!isFromMarkdown) {
removed += removeOrOrphanMedia(
updatedIndex,
oldEntry,
normalizedPath,
medialogEntries,
);
}
removed += removeOrOrphanMedia(
updatedIndex,
oldEntry,
normalizedPath,
medialogEntries,
);
}
});

Expand Down
122 changes: 81 additions & 41 deletions nx/blocks/media-library/indexing/worker/fetch.js
Original file line number Diff line number Diff line change
Expand Up @@ -729,74 +729,114 @@ function chunkMediaSheet(mediaData, chunkSize) {
}

/**
* Worker-safe version of saveIndexChunks from admin-api.js
* Determine optimal chunk size based on total entry count
*
* @param {string} basePath - Base path for chunks (e.g., /org/repo/.da/media-insights)
* @param {Array} mediaData - Media sheet data
* @param {Array} usageData - Usage sheet data
* @param {number} chunkSize - Entries per chunk
* @param {string} daOrigin - DA origin (e.g., https://admin.da.live)
* Rationale:
* - Small sites (<10k entries): Single file (100k chunk size ensures no chunking)
* - No overhead from loading multiple chunks
* - Simpler debugging and inspection
*
* - Medium sites (10k-200k): 8k entries per chunk (~4-5MB files)
* - Prevents CF Worker 128MB memory limit errors during PUT
* - Balances file size vs chunk count overhead
* - Progressive loading: chunk 0 loads quickly for default Images view
*
* - Large sites (>200k entries): 6k entries per chunk (~3-4MB files)
* - Smaller files for better reliability on massive indexes
* - More chunks acceptable given already high chunk count
* - Further reduces memory pressure on uploads
*
* Chunk size targets file sizes ≤5MB to avoid DA Admin/S3 timeouts
* Average media entry size: ~550 bytes (URL + metadata + doc field)
*
* @param {number} totalEntries - Total number of media entries in index
* @returns {number} Optimal chunk size (entries per chunk)
*/
export function getAdaptiveChunkSize(totalEntries) {
if (totalEntries < 10_000) {
return 100_000;
}

if (totalEntries < 200_000) {
return 8_000;
}

return 6_000;
}

/**
* Save index as chunks with batched uploads to prevent rate limiting
* Uploads 3 chunks concurrently with 500ms delays between batches
*
* @param {string} basePath - Base path without filename (e.g., '/site/.da/media-insights')
* @param {Array} mediaData - Media sheet data (must be pre-sorted)
* @param {number} chunkSize - Entries per chunk (from getAdaptiveChunkSize)
* @param {string} daOrigin - DA origin (e.g., 'https://admin.da.live')
* @param {string} imsToken - IMS access token
* @param {string} indexFilesChunkPrefix - Chunk filename prefix (e.g., 'index-')
* @returns {Promise<number>} Number of chunks saved
* @returns {Promise<number>} Number of chunks created
*/
export async function saveIndexChunks(
basePath,
mediaData,
usageData,
chunkSize,
daOrigin,
imsToken,
indexFilesChunkPrefix,
) {
const mediaChunks = chunkMediaSheet(mediaData, chunkSize);

// Always save at least chunk 0, even if empty (for consistency)
const chunksToSave = mediaChunks.length > 0 ? mediaChunks : [[]];
const savePromises = [];

for (let i = 0; i < chunksToSave.length; i += 1) {
const chunkFileName = getChunkFileName(i, indexFilesChunkPrefix);
const chunkPath = `${basePath}/${chunkFileName}`;

// Only include usage sheet in first chunk to avoid duplication
const sheets = {
media: chunksToSave[i],
usage: i === 0 ? usageData : [],
};

const formData = await createMultiSheet(sheets);
const savePromise = workerDaFetch(`${daOrigin}/source${chunkPath}`, imsToken, {
method: 'PUT',
body: formData,
// Rate limiting to prevent DA Admin endpoint overload:
// - batchSize=3: Limit concurrent uploads (prevents 503 errors)
// - delayMs=500: 500ms delay between batches (~20 req/sec rate limit)
// - Prevents CF Worker 128MB memory errors from large concurrent PUTs
const batchSize = 3;
const delayMs = 500;

for (let i = 0; i < chunksToSave.length; i += batchSize) {
const batch = chunksToSave.slice(i, i + batchSize);
const batchPromises = batch.map(async (chunk, idx) => {
const chunkNum = i + idx;
const chunkFileName = getChunkFileName(chunkNum, indexFilesChunkPrefix);
const chunkPath = `${basePath}/${chunkFileName}`;
const sheets = { media: chunk };

const formData = await createMultiSheet(sheets);
return workerDaFetch(`${daOrigin}/source${chunkPath}`, imsToken, {
method: 'PUT',
body: formData,
});
});

savePromises.push(savePromise);
}

const responses = await Promise.all(savePromises);
const responses = await Promise.all(batchPromises);

// Validate all chunks saved successfully
const failedChunks = [];
responses.forEach((resp, i) => {
if (!resp.ok) {
failedChunks.push(i);
const failed = [];
responses.forEach((r, idx) => {
if (!r.ok) {
failed.push({ chunkNum: i + idx, status: r.status });
}
});
if (failed.length > 0) {
const chunkNums = failed.map((f) => `${f.chunkNum} (${f.status})`).join(', ');
throw new Error(`Failed to save ${failed.length} chunk(s): ${chunkNums}`);
}
});

if (failedChunks.length > 0) {
throw new Error(`Failed to save chunks: ${failedChunks.join(', ')}`);
if (i + batchSize < chunksToSave.length) {
await new Promise((resolve) => { setTimeout(resolve, delayMs); });
}
}

return chunksToSave.length;
}

/**
* Worker-safe version of saveIndexMeta from admin-api.js
* Save index metadata to DA storage
* Must be called AFTER saveIndexChunks to ensure chunkCount is accurate
*
* @param {object} meta - Metadata object
* @param {string} path - Full path to meta file
* @param {string} daOrigin - DA origin (e.g., https://admin.da.live)
* @param {object} meta - Metadata object containing indexType, timestamp, chunkCount, etc.
* @param {string} path - Full path to meta file (e.g., '/site/.da/media-insights/index-meta.json')
* @param {string} daOrigin - DA origin (e.g., 'https://admin.da.live')
* @param {string} imsToken - IMS access token
* @returns {Promise<Response>}
*/
Expand Down
Loading
Loading