From 818d432890817b49809fb23451ce88dc0e7ecf10 Mon Sep 17 00:00:00 2001 From: Grahame Grieve Date: Tue, 14 Apr 2026 23:15:09 +0800 Subject: [PATCH 1/3] Fix SCT import to handle SCT DK --- tx/importers/import-sct.module.js | 246 ++++++++++++++++++++---------- 1 file changed, 167 insertions(+), 79 deletions(-) diff --git a/tx/importers/import-sct.module.js b/tx/importers/import-sct.module.js index 2e0333ac..a1a52eec 100644 --- a/tx/importers/import-sct.module.js +++ b/tx/importers/import-sct.module.js @@ -48,37 +48,37 @@ class SnomedModule extends BaseTerminologyModule { registerCommands(terminologyCommand, globalOptions) { // Import command terminologyCommand - .command('import') - .description('Import SNOMED CT data from RF2 source directory') - .option('-s, --source ', 'Source directory containing RF2 files') - .option('-b, --base ', 'Base edition directory (for extensions)') - .option('-d, --dest ', 'Destination cache file') - .option('-e, --edition ', 'Edition code (e.g., 900000000000207008 for International)') - .option('-v, --version ', 'Version in YYYYMMDD format (e.g., 20250801)') - .option('-u, --uri ', 'Version URI (overrides edition/version if provided)') - .option('-l, --language ', 'Default language code (overrides edition default if provided)') - .option('-y, --yes', 'Skip confirmations') - .action(async (options) => { - await this.handleImportCommand({...globalOptions, ...options}); - }); + .command('import') + .description('Import SNOMED CT data from RF2 source directory') + .option('-s, --source ', 'Source directory containing RF2 files') + .option('-b, --base ', 'Base edition directory (for extensions)') + .option('-d, --dest ', 'Destination cache file') + .option('-e, --edition ', 'Edition code (e.g., 900000000000207008 for International)') + .option('-v, --version ', 'Version in YYYYMMDD format (e.g., 20250801)') + .option('-u, --uri ', 'Version URI (overrides edition/version if provided)') + .option('-l, --language ', 'Default language code (overrides edition default if provided)') + .option('-y, --yes', 'Skip confirmations') + .action(async (options) => { + await this.handleImportCommand({...globalOptions, ...options}); + }); // Validate command terminologyCommand - .command('validate') - .description('Validate SNOMED CT RF2 directory structure') - .option('-s, --source ', 'Source directory to validate') - .action(async (options) => { - await this.handleValidateCommand({...globalOptions, ...options}); - }); + .command('validate') + .description('Validate SNOMED CT RF2 directory structure') + .option('-s, --source ', 'Source directory to validate') + .action(async (options) => { + await this.handleValidateCommand({...globalOptions, ...options}); + }); // Status command terminologyCommand - .command('status') - .description('Show status of SNOMED CT cache') - .option('-d, --dest ', 'Cache file to check') - .action(async (options) => { - await this.handleStatusCommand({...globalOptions, ...options}); - }); + .command('status') + .description('Show status of SNOMED CT cache') + .option('-d, --dest ', 'Cache file to check') + .action(async (options) => { + await this.handleStatusCommand({...globalOptions, ...options}); + }); } async handleImportCommand(options) { @@ -633,7 +633,7 @@ class SnomedModule extends BaseTerminologyModule { } const additionalAnswers = additionalQuestions.length > 0 ? - await inquirer.prompt(additionalQuestions) : {}; + await inquirer.prompt(additionalQuestions) : {}; // Build the final configuration const config = { @@ -774,7 +774,7 @@ class SnomedModule extends BaseTerminologyModule { } else if (firstLine.startsWith('id\teffectiveTime\tactive\tmoduleId\tconceptId\tlanguageCode\ttypeId\tterm\tcaseSignificanceId')) { files.descriptions.push(filePath); } else if (firstLine.startsWith('id\teffectiveTime\tactive\tmoduleId\tsourceId\tdestinationId\trelationshipGroup\ttypeId\tcharacteristicTypeId\tmodifierId') && - !filePath.includes('StatedRelationship')) { + !filePath.includes('StatedRelationship')) { files.relationships.push(filePath); } } catch (error) { @@ -1165,6 +1165,19 @@ class SnomedImporter { refsetDirectories: [] }; + // For extensions: load base edition files first so that all International + // Edition concepts, descriptions, and relationships are present before the + // extension content is layered on top. + if (this.config.base) { + if (this.config.verbose) { + console.log(`Loading base edition from: ${this.config.base}`); + } + this._scanDirectory(this.config.base, files); + } + + // Then load the extension (or standalone edition) source files. + // For extensions these come second so that extension rows can override + // base rows where the same component has been updated. this._scanDirectory(this.config.source, files); return files; } @@ -1200,7 +1213,7 @@ class SnomedImporter { } else if (firstLine.startsWith('id\teffectiveTime\tactive\tmoduleId\tconceptId\tlanguageCode\ttypeId\tterm\tcaseSignificanceId')) { files.descriptions.push(filePath); } else if (firstLine.startsWith('id\teffectiveTime\tactive\tmoduleId\tsourceId\tdestinationId\trelationshipGroup\ttypeId\tcharacteristicTypeId\tmodifierId') && - !filePath.includes('StatedRelationship')) { + !filePath.includes('StatedRelationship')) { files.relationships.push(filePath); } } catch (error) { @@ -1250,6 +1263,9 @@ class SnomedImporter { this.conceptList = []; let processedLines = 0; + // When loading base + extension, track list indices for fast replacement + const conceptIdToListIndex = this.config.base ? new Map() : null; + for (let i = 0; i < this.files.concepts.length; i++) { const file = this.files.concepts[i]; const rl = readline.createInterface({ @@ -1275,8 +1291,23 @@ class SnomedImporter { }; if (this.conceptMap.has(concept.id)) { - throw new Error(`Duplicate Concept Id at line ${lineCount}: ${concept.id} - check you are processing the snapshot not the full edition`); + // When loading base + extension, the same concept may appear in both. + // The extension snapshot row takes precedence (it is loaded second). + // If there is no base directory this is a genuine duplicate in a single + // snapshot and we should still raise an error. + if (!this.config.base) { + throw new Error(`Duplicate Concept Id at line ${lineCount}: ${concept.id} - check you are processing the snapshot not the full edition`); + } + // Replace the base edition row with the extension row + const idx = conceptIdToListIndex.get(concept.id); + if (idx !== undefined) { + this.conceptList[idx] = concept; + } + this.conceptMap.set(concept.id, concept); } else { + if (conceptIdToListIndex) { + conceptIdToListIndex.set(concept.id, this.conceptList.length); + } this.conceptList.push(concept); this.conceptMap.set(concept.id, concept); } @@ -1347,6 +1378,12 @@ class SnomedImporter { const descriptionList = []; let processedLines = 0; + // Build a lookup from description id -> index in descriptionList so that + // extension rows can replace base rows for the same description. + if (this.config.base) { + this._descriptionIdSet = new Map(); + } + for (const file of this.files.descriptions) { const rl = readline.createInterface({ input: fs.createReadStream(file), @@ -1372,7 +1409,19 @@ class SnomedImporter { caseSignificanceId: BigInt(parts[8]) }; - descriptionList.push(desc); + // When loading base + extension, the same description may appear in + // both. The extension row (loaded second) takes precedence. + if (this.config.base && this._descriptionIdSet) { + const existingIdx = this._descriptionIdSet.get(desc.id); + if (existingIdx !== undefined) { + descriptionList[existingIdx] = desc; + } else { + this._descriptionIdSet.set(desc.id, descriptionList.length); + descriptionList.push(desc); + } + } else { + descriptionList.push(desc); + } } processedLines++; @@ -1417,8 +1466,8 @@ class SnomedImporter { const caps = this.conceptMap.get(desc.caseSignificanceId); const descOffset = this.descriptions.addDescription( - termOffset, desc.id, effectiveTime, concept.index, - module.index, kind.index, caps.index, desc.active, lang + termOffset, desc.id, effectiveTime, concept.index, + module.index, kind.index, caps.index, desc.active, lang ); // Track description on concept @@ -1692,6 +1741,11 @@ class SnomedImporter { } this.isAIndex = isAConcept.index; + // Pass 1: collect all relationship rows, deduplicating so that extension + // rows (loaded second) override base rows with the same relationship id. + const relationshipRows = []; + const relationshipIdMap = this.config.base ? new Map() : null; // id -> index in relationshipRows + for (const file of this.files.relationships) { const rl = readline.createInterface({ input: fs.createReadStream(file), @@ -1718,40 +1772,16 @@ class SnomedImporter { modifierId: BigInt(parts[9]) }; - const source = this.conceptMap.get(rel.sourceId); - const destination = this.conceptMap.get(rel.destinationId); - const type = this.conceptMap.get(rel.typeId); - - if (source && destination && type) { - const effectiveTime = this.convertDateToSnomedDate(rel.effectiveTime); - - // Check if this is a defining relationship - const defining = rel.characteristicTypeId === RF2_MAGIC_RELN_DEFINING || - rel.characteristicTypeId === RF2_MAGIC_RELN_STATED || - rel.characteristicTypeId === RF2_MAGIC_RELN_INFERRED; - - const relationshipIndex = this.relationships.addRelationship( - rel.id, source.index, destination.index, type.index, - 0, 0, 0, effectiveTime, rel.active, defining, rel.relationshipGroup - ); - - // Track parent/child relationships for is-a relationships - if (type.index === this.isAIndex && defining) { - const sourceTracker = this.getOrCreateConceptTracker(source.index); - if (rel.active) { - sourceTracker.addActiveParent(destination.index); - } else { - sourceTracker.addInactiveParent(destination.index); - } + if (relationshipIdMap) { + const existingIdx = relationshipIdMap.get(rel.id); + if (existingIdx !== undefined) { + relationshipRows[existingIdx] = rel; + } else { + relationshipIdMap.set(rel.id, relationshipRows.length); + relationshipRows.push(rel); } - - // Track inbound/outbound relationships - const sourceTracker = this.getOrCreateConceptTracker(source.index); - const destTracker = this.getOrCreateConceptTracker(destination.index); - - sourceTracker.addOutbound(relationshipIndex); - destTracker.addInbound(relationshipIndex); - + } else { + relationshipRows.push(rel); } } @@ -1762,10 +1792,62 @@ class SnomedImporter { } } + if (this.progressReporter) { + this.progressReporter.completeTask('Reading Relationships', processedLines, totalLines); + } + + // Pass 2: process the deduplicated relationship rows into the binary + // structures and concept trackers. + const buildProgressBar = this.progressReporter?.createTaskProgressBar('Building Relationships'); + buildProgressBar?.start(relationshipRows.length, 0); + + for (let i = 0; i < relationshipRows.length; i++) { + const rel = relationshipRows[i]; + + const source = this.conceptMap.get(rel.sourceId); + const destination = this.conceptMap.get(rel.destinationId); + const type = this.conceptMap.get(rel.typeId); + + if (source && destination && type) { + const effectiveTime = this.convertDateToSnomedDate(rel.effectiveTime); + + // Check if this is a defining relationship + const defining = rel.characteristicTypeId === RF2_MAGIC_RELN_DEFINING || + rel.characteristicTypeId === RF2_MAGIC_RELN_STATED || + rel.characteristicTypeId === RF2_MAGIC_RELN_INFERRED; + + const relationshipIndex = this.relationships.addRelationship( + rel.id, source.index, destination.index, type.index, + 0, 0, 0, effectiveTime, rel.active, defining, rel.relationshipGroup + ); + + // Track parent/child relationships for is-a relationships + if (type.index === this.isAIndex && defining) { + const sourceTracker = this.getOrCreateConceptTracker(source.index); + if (rel.active) { + sourceTracker.addActiveParent(destination.index); + } else { + sourceTracker.addInactiveParent(destination.index); + } + } + + // Track inbound/outbound relationships + const sourceTracker = this.getOrCreateConceptTracker(source.index); + const destTracker = this.getOrCreateConceptTracker(destination.index); + + sourceTracker.addOutbound(relationshipIndex); + destTracker.addInbound(relationshipIndex); + } + + if (i % 1000 === 0) { + buildProgressBar?.update(i); + } + } + this.relationships.doneBuild(); if (this.progressReporter) { - this.progressReporter.completeTask('Reading Relationships', processedLines, totalLines); + this.progressReporter.completeTask('Building Relationships', relationshipRows.length, relationshipRows.length); } } @@ -1800,9 +1882,9 @@ class SnomedImporter { // Set parents if concept has any if (tracker.activeParents.length > 0 || tracker.inactiveParents.length > 0) { const activeParentsRef = tracker.activeParents.length > 0 ? - this.refs.addReferences(tracker.activeParents) : 0; + this.refs.addReferences(tracker.activeParents) : 0; const inactiveParentsRef = tracker.inactiveParents.length > 0 ? - this.refs.addReferences(tracker.inactiveParents) : 0; + this.refs.addReferences(tracker.inactiveParents) : 0; this.concepts.setParents(concept.index, activeParentsRef, inactiveParentsRef); } else { @@ -2104,14 +2186,14 @@ class SnomedImporter { // NOTE: This calls addString() so it must happen AFTER strings.reopen() for (const refSet of refSetsArray) { this.refsetIndex.addReferenceSet( - this.addString(refSet.title), // This needs strings builder to be active - refSet.filename, - refSet.index, - refSet.membersByRef, - refSet.membersByName, - refSet.fieldTypes, - refSet.fieldNames, - refSet.langs + this.addString(refSet.title), // This needs strings builder to be active + refSet.filename, + refSet.index, + refSet.membersByRef, + refSet.membersByName, + refSet.fieldTypes, + refSet.fieldNames, + refSet.langs ); } } @@ -2216,7 +2298,13 @@ class SnomedImporter { if (!refSet || currentRefSetId !== refSetId) { currentRefSetId = refSetId; refSet = this.getOrCreateRefSet(refSetId, displayName, isLangRefset); - refSet.filename = this.addString(path.relative(this.config.source, filePath)); + // Compute relative path — the file may live under the base directory + // rather than the extension source directory. + let relPath = path.relative(this.config.source, filePath); + if (this.config.base && relPath.startsWith('..')) { + relPath = path.relative(this.config.base, filePath); + } + refSet.filename = this.addString(relPath); refSet.fieldTypes = this.getOrCreateFieldTypes(fieldTypes); refSet.fieldNames = this.getOrCreateFieldNames(headers.slice(6), fieldTypes); // Additional fields beyond standard 6 } @@ -2577,8 +2665,8 @@ class SnomedImporter { }; const services = new SnomedExpressionServices( - snomedStructures, - this.isAIndex + snomedStructures, + this.isAIndex ); // Set building flag to true so services will generate normal forms dynamically From f7017d3a316de9c2e91cb3139651a6db79d3a473 Mon Sep 17 00:00:00 2001 From: Grahame Grieve Date: Tue, 14 Apr 2026 23:15:34 +0800 Subject: [PATCH 2/3] better error handling of start up errors --- tx/library.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tx/library.js b/tx/library.js index 4bea49f9..9e32e400 100644 --- a/tx/library.js +++ b/tx/library.js @@ -35,6 +35,7 @@ const { OCLCodeSystemProvider, OCLSourceCodeSystemFactory } = require('./ocl/cs- const { OCLValueSetProvider } = require('./ocl/vs-ocl'); const { OCLConceptMapProvider } = require('./ocl/cm-ocl'); const {UriServicesFactory} = require("./cs/cs-uri"); +const {debugLog} = require("./operation-context"); /** * This class holds all the loaded content ready for processing @@ -185,6 +186,7 @@ class Library { try { await this.processSource(source, this.packageManager, "cs"); } catch (error) { + debugLog(error); console.error(`Failed to load code systems from '${source}': ${error.message}`); throw error; } @@ -196,6 +198,7 @@ class Library { try { await this.processSource(source, this.packageManager, "npm"); } catch (error) { + debugLog(error); console.error(`Failed to load package '${source}': ${error.message}`); throw error; } From dadeb6594413bd64b150f195095609cf0c73dd4f Mon Sep 17 00:00:00 2001 From: Grahame Grieve Date: Tue, 14 Apr 2026 23:15:43 +0800 Subject: [PATCH 3/3] better logging of vsac updates --- tx/vs/vs-database.js | 305 ++++++++++++++++++++++++++++++------------- tx/vs/vs-vsac.js | 166 ++++++++++++++++------- 2 files changed, 330 insertions(+), 141 deletions(-) diff --git a/tx/vs/vs-database.js b/tx/vs/vs-database.js index 08087ac6..634a073d 100644 --- a/tx/vs/vs-database.js +++ b/tx/vs/vs-database.js @@ -1,4 +1,5 @@ const fs = require('fs').promises; +const crypto = require('crypto'); const sqlite3 = require('sqlite3').verbose(); const { VersionUtilities } = require('../../library/version-utilities'); const ValueSet = require("../library/valueset"); @@ -18,8 +19,13 @@ class ValueSetDatabase { */ constructor(dbPath) { this.dbPath = dbPath; - this._db = null; // Shared read-only connection - this._writeDb = null; // Write connection (opened only when needed) + // Single read-write connection used for everything. Using a separate + // OPEN_READONLY connection for reads can miss WAL-based schema changes + // made through the write connection (because read-only opens can't fully + // participate in the shared-memory protocol), so queries issued right + // after a migration ALTER TABLE can fail with a stale schema cache. + this._writeDb = null; + this._migrationPromise = null; } /** @@ -29,46 +35,104 @@ class ValueSetDatabase { * @private */ _migrateIfNeeded(db) { - return new Promise((resolve, reject) => { - db.all("PRAGMA table_info(valuesets)", [], (err, cols) => { - if (err) { reject(err); return; } - const hasCol = cols.some(c => c.name === 'date_first_seen'); - const migrations = []; - if (!hasCol) { - migrations.push(new Promise((res, rej) => { + // Run migrations SEQUENTIALLY. node-sqlite3 does not guarantee that + // separately-submitted statements run in submission order on the same + // connection — `db.serialize()` is opt-in. Without sequencing, a + // `CREATE INDEX` can race ahead of its `CREATE TABLE`, or a `PRAGMA + // table_info` can race ahead of a `CREATE TABLE IF NOT EXISTS`, and + // you get "no such table" errors on DDL that should have been fine. + const run = (sql) => new Promise((res, rej) => { + db.run(sql, [], (err) => err ? rej(err) : res()); + }); + const all = (sql) => new Promise((res, rej) => { + db.all(sql, [], (err, rows) => err ? rej(err) : res(rows)); + }); + + return (async () => { + const cols = await all("PRAGMA table_info(valuesets)"); + const hasDateFirstSeen = cols.some(c => c.name === 'date_first_seen'); + const hasContentHash = cols.some(c => c.name === 'content_hash'); + + if (!hasDateFirstSeen) { + await run("ALTER TABLE valuesets ADD COLUMN date_first_seen INTEGER DEFAULT 0"); + } + if (!hasContentHash) { + await run("ALTER TABLE valuesets ADD COLUMN content_hash TEXT"); + } + + // Ensure vsac_runs table exists (with total_updated for fresh installs) + await run(` + CREATE TABLE IF NOT EXISTS vsac_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + started_at INTEGER NOT NULL, + finished_at INTEGER, + status TEXT NOT NULL DEFAULT 'running', + error_message TEXT, + total_fetched INTEGER, + total_new INTEGER, + total_updated INTEGER + ) + `); + + // If vsac_runs already existed (older schema), add total_updated column + const runCols = await all("PRAGMA table_info(vsac_runs)"); + const hasTotalUpdated = runCols.some(c => c.name === 'total_updated'); + if (!hasTotalUpdated && runCols.length > 0) { + await run("ALTER TABLE vsac_runs ADD COLUMN total_updated INTEGER"); + } + + // Ensure vsac_settings table exists (for _lastUpdated tracking etc.) + await run(` + CREATE TABLE IF NOT EXISTS vsac_settings ( + key TEXT PRIMARY KEY, + value TEXT + ) + `); + + // Ensure vsac_events table exists (audit log of new/updated/deleted value sets) + await run(` + CREATE TABLE IF NOT EXISTS vsac_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp INTEGER NOT NULL, + event_type TEXT NOT NULL, + url TEXT NOT NULL, + version TEXT, + detail TEXT + ) + `); + await run("CREATE INDEX IF NOT EXISTS idx_events_timestamp ON vsac_events(timestamp)"); + + // Backfill content_hash for any existing rows that don't have one. + // This establishes a baseline so the NEXT sync can detect real content + // changes immediately (otherwise the first sync just silently populates + // hashes and can never flag anything as 'updated'). + const needHash = await all( + "SELECT COUNT(*) AS n FROM valuesets WHERE content_hash IS NULL" + ); + const missing = (needHash[0] && needHash[0].n) || 0; + if (missing > 0) { + console.log(`Backfilling content_hash for ${missing} existing value sets...`); + const rows = await all( + "SELECT id, content FROM valuesets WHERE content_hash IS NULL" + ); + let done = 0; + for (const row of rows) { + const hash = crypto.createHash('sha256').update(row.content).digest('hex'); + await new Promise((res, rej) => { db.run( - "ALTER TABLE valuesets ADD COLUMN date_first_seen INTEGER DEFAULT 0", - [], + 'UPDATE valuesets SET content_hash = ? WHERE id = ?', + [hash, row.id], (err) => err ? rej(err) : res() ); - })); + }); + done++; + if (done % 1000 === 0) { + console.log(` ...${done}/${missing}`); + } } - // Ensure vsac_runs table exists - migrations.push(new Promise((res, rej) => { - db.run(` - CREATE TABLE IF NOT EXISTS vsac_runs ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - started_at INTEGER NOT NULL, - finished_at INTEGER, - status TEXT NOT NULL DEFAULT 'running', - error_message TEXT, - total_fetched INTEGER, - total_new INTEGER - ) - `, [], (err) => err ? rej(err) : res()); - })); - // Ensure vsac_settings table exists (for _lastUpdated tracking etc.) - migrations.push(new Promise((res, rej) => { - db.run(` - CREATE TABLE IF NOT EXISTS vsac_settings ( - key TEXT PRIMARY KEY, - value TEXT - ) - `, [], (err) => err ? rej(err) : res()); - })); - Promise.all(migrations).then(() => resolve()).catch(reject); - }); - }); + console.log(`Backfilled ${done} hashes.`); + } + })(); } /** @@ -77,21 +141,9 @@ class ValueSetDatabase { * @private */ _getReadConnection() { - return new Promise((resolve, reject) => { - if (this._db) { - resolve(this._db); - return; - } - - this._db = new sqlite3.Database(this.dbPath, sqlite3.OPEN_READONLY, (err) => { - if (err) { - this._db = null; - reject(new Error(`Failed to open database ${this.dbPath}: ${err.message}`)); - } else { - resolve(this._db); - } - }); - }); + // Reads go through the same connection as writes. See the constructor + // comment for why we don't use a separate OPEN_READONLY connection. + return this._ensureMigrated().then(() => this._writeDb); } /** @@ -100,21 +152,38 @@ class ValueSetDatabase { * @private */ _getWriteConnection() { - return new Promise((resolve, reject) => { + return this._ensureMigrated().then(() => this._writeDb); + } + + /** + * Ensure the database schema is migrated. Idempotent: subsequent calls + * return the cached promise. Opens a write connection (which is required + * for ALTER TABLE) if one is not already open. The write connection is + * kept open for reuse by later _getWriteConnection calls. + * @returns {Promise} + * @private + */ + _ensureMigrated() { + if (this._migrationPromise) { + return this._migrationPromise; + } + this._migrationPromise = new Promise((resolve, reject) => { if (this._writeDb) { - resolve(this._writeDb); + this._migrateIfNeeded(this._writeDb).then(resolve).catch(reject); return; } - this._writeDb = new sqlite3.Database(this.dbPath, (err) => { if (err) { this._writeDb = null; reject(new Error(`Failed to open database for writing: ${err.message}`)); - } else { - this._migrateIfNeeded(this._writeDb).then(() => resolve(this._writeDb)).catch(reject); + return; } + this._migrateIfNeeded(this._writeDb).then(resolve).catch(reject); }); }); + // If migration fails, clear the cached promise so a retry can attempt again + this._migrationPromise.catch(() => { this._migrationPromise = null; }); + return this._migrationPromise; } /** @@ -122,29 +191,20 @@ class ValueSetDatabase { * @returns {Promise} */ async close() { - const closePromises = []; - - if (this._db) { - closePromises.push(new Promise((resolve) => { - this._db.close((err) => { - if (err) console.warn(`Warning closing read connection: ${err.message}`); - this._db = null; - resolve(); - }); - })); - } + // Clear the cached migration promise so a subsequent open re-migrates + this._migrationPromise = null; - if (this._writeDb) { - closePromises.push(new Promise((resolve) => { - this._writeDb.close((err) => { - if (err) console.warn(`Warning closing write connection: ${err.message}`); - this._writeDb = null; - resolve(); - }); - })); + if (!this._writeDb) { + return; } - await Promise.all(closePromises); + await new Promise((resolve) => { + this._writeDb.close((err) => { + if (err) console.warn(`Warning closing database connection: ${err.message}`); + this._writeDb = null; + resolve(); + }); + }); } /** @@ -193,6 +253,7 @@ class ValueSetDatabase { status TEXT, title TEXT, content TEXT NOT NULL, + content_hash TEXT, last_seen INTEGER DEFAULT (strftime('%s', 'now')), date_first_seen INTEGER DEFAULT (strftime('%s', 'now')) ) @@ -241,17 +302,31 @@ class ValueSetDatabase { status TEXT NOT NULL DEFAULT 'running', error_message TEXT, total_fetched INTEGER, - total_new INTEGER + total_new INTEGER, + total_updated INTEGER ) `); // Settings table (key-value store for _lastUpdated tracking etc.) db.run(` - CREATE TABLE IF NOT EXISTS vsac_settings ( - key TEXT PRIMARY KEY, - value TEXT - ) + CREATE TABLE IF NOT EXISTS vsac_settings ( + key TEXT PRIMARY KEY, + value TEXT + ) + `); + + // Event log table (new/updated/deleted value sets) + db.run(` + CREATE TABLE IF NOT EXISTS vsac_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp INTEGER NOT NULL, + event_type TEXT NOT NULL, + url TEXT NOT NULL, + version TEXT, + detail TEXT + ) `); + db.run('CREATE INDEX idx_events_timestamp ON vsac_events(timestamp)'); // Create indexes for better search performance db.run('CREATE INDEX idx_valuesets_url ON valuesets(url, version)'); @@ -300,15 +375,36 @@ class ValueSetDatabase { * @param {number} id - The run ID from startRun() * @param {number} totalFetched - Total value sets fetched * @param {number} totalNew - Number of new value sets found + * @param {number} [totalUpdated=0] - Number of existing value sets whose content changed * @returns {Promise} */ - async finishRun(id, totalFetched, totalNew) { + async finishRun(id, totalFetched, totalNew, totalUpdated = 0) { const db = await this._getWriteConnection(); return new Promise((resolve, reject) => { db.run( `UPDATE vsac_runs SET finished_at = strftime('%s','now'), status = 'ok', - total_fetched = ?, total_new = ? WHERE id = ?`, - [totalFetched, totalNew, id], + total_fetched = ?, total_new = ?, total_updated = ? WHERE id = ?`, + [totalFetched, totalNew, totalUpdated, id], + err => err ? reject(err) : resolve() + ); + }); + } + + /** + * Record a VSAC event in the audit log + * @param {string} eventType - 'new', 'updated', or 'deleted' + * @param {string} url - The value set URL + * @param {string|null} version - The version, or null + * @param {string|null} [detail] - Optional detail string + * @returns {Promise} + */ + async recordEvent(eventType, url, version, detail = null) { + const db = await this._getWriteConnection(); + return new Promise((resolve, reject) => { + db.run( + `INSERT INTO vsac_events (timestamp, event_type, url, version, detail) + VALUES (strftime('%s','now'), ?, ?, ?, ?)`, + [eventType, url, version || null, detail], err => err ? reject(err) : resolve() ); }); @@ -358,7 +454,7 @@ class ValueSetDatabase { return new Promise((resolve, reject) => { db.run( `INSERT INTO vsac_settings (key, value) VALUES (?, ?) - ON CONFLICT(key) DO UPDATE SET value = excluded.value`, + ON CONFLICT(key) DO UPDATE SET value = excluded.value`, [key, value], err => err ? reject(err) : resolve() ); @@ -368,9 +464,10 @@ class ValueSetDatabase { /** * Insert or update a single ValueSet in the database * @param {Object} valueSet - The ValueSet resource + * @param {string} [contentHash] - Optional pre-computed content hash to store * @returns {Promise} */ - async upsertValueSet(valueSet) { + async upsertValueSet(valueSet, contentHash = null) { if (!valueSet.url) { throw new Error('ValueSet must have a url property'); } @@ -405,8 +502,9 @@ class ValueSetDatabase { db.run(` INSERT INTO valuesets ( id, url, version, date, description, effectivePeriod_start, effectivePeriod_end, - expansion_identifier, name, publisher, status, title, content, last_seen, date_first_seen - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%s', 'now'), strftime('%s', 'now')) + expansion_identifier, name, publisher, status, title, content, content_hash, + last_seen, date_first_seen + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%s', 'now'), strftime('%s', 'now')) ON CONFLICT(id) DO UPDATE SET url=excluded.url, version=excluded.version, @@ -420,6 +518,7 @@ class ValueSetDatabase { status=excluded.status, title=excluded.title, content=excluded.content, + content_hash=excluded.content_hash, last_seen=strftime('%s', 'now') `, [ valueSet.id, @@ -434,7 +533,8 @@ class ValueSetDatabase { valueSet.publisher || null, valueSet.status || null, valueSet.title || null, - JSON.stringify(valueSet) + JSON.stringify(valueSet), + contentHash ], (err) => { if (err) { reject(new Error(`Failed to insert main record: ${err.message}`)); @@ -450,6 +550,24 @@ class ValueSetDatabase { }); } + /** + * Backfill the content_hash column for a row without rewriting content or + * emitting an event. Used for legacy rows from before content_hash existed. + * @param {string} id - The ValueSet id + * @param {string} hash - The SHA-256 hex hash to store + * @returns {Promise} + */ + async setContentHash(id, hash) { + const db = await this._getWriteConnection(); + return new Promise((resolve, reject) => { + db.run( + 'UPDATE valuesets SET content_hash = ? WHERE id = ?', + [hash, id], + err => err ? reject(err) : resolve() + ); + }); + } + /** * Just update the timestamp on the valueset * @param {Object} valueSet - The ValueSet resource @@ -590,7 +708,7 @@ class ValueSetDatabase { const db = await this._getReadConnection(); return new Promise((resolve, reject) => { - db.all('SELECT id, url, version, content FROM valuesets', [], (err, rows) => { + db.all('SELECT id, url, version, content, content_hash FROM valuesets', [], (err, rows) => { if (err) { reject(new Error(`Failed to load value sets: ${err.message}`)); return; @@ -603,6 +721,9 @@ class ValueSetDatabase { for (const row of rows) { const valueSet = new ValueSet(JSON.parse(row.content)); valueSet.sourcePackage = source; + // Attach the stored content hash so callers can detect changes + // without recomputing over the full JSON. + valueSet.contentHash = row.content_hash || null; // Store by URL and id alone this.addToMap(valueSetMap, row.id, row.url, row.version, valueSet); } diff --git a/tx/vs/vs-vsac.js b/tx/vs/vs-vsac.js index f67ea601..58d038f0 100644 --- a/tx/vs/vs-vsac.js +++ b/tx/vs/vs-vsac.js @@ -1,4 +1,5 @@ const path = require('path'); +const crypto = require('crypto'); const axios = require('axios'); const { AbstractValueSetProvider } = require('./vs-api'); const { ValueSetDatabase } = require('./vs-database'); @@ -11,6 +12,8 @@ const {debugLog} = require("../operation-context"); * Fetches and caches ValueSets from the NLM VSAC FHIR server */ class VSACValueSetProvider extends AbstractValueSetProvider { + SYNC_AT_START_UP = true; + /** * @param {Object} config - Configuration object * @param {string} config.apiKey - API key for VSAC authentication @@ -71,12 +74,11 @@ class VSACValueSetProvider extends AbstractValueSetProvider { if (!(await this.database.exists())) { await this.database.create(); } else { - // Ensure schema is up to date (e.g. date_first_seen column added after initial deploy) - await this.database._migrateIfNeeded(await this.database._getWriteConnection()); - // Load existing data + // Schema migrations are applied lazily by the database layer on first + // connection. Just load existing data. await this._reloadMap(); } - if (this.valueSetMap.size == 0) { + if (this.SYNC_AT_START_UP || this.valueSetMap.size == 0) { await this.refreshValueSets(); } // Start periodic refresh @@ -168,6 +170,7 @@ class VSACValueSetProvider extends AbstractValueSetProvider { console.log(`Reached total count (${total}), stopping`); break; } + break; } this.lastRefresh = new Date(); @@ -182,11 +185,11 @@ class VSACValueSetProvider extends AbstractValueSetProvider { // deduplicate the queue this.queue = [...new Set(this.queue)]; - let tracking = { totalFetched: 0, totalNew: 0, count: 0, newCount : 0 }; + let tracking = { totalFetched: 0, totalNew: 0, totalUpdated: 0, count: 0, newCount : 0 }; // phase 2: query for history & content this.requeue = []; for (let q of this.queue) { - this.stats.task('VSAC History for '+q, `running (${tracking.totalFetched} fetched, ${tracking.totalNew} new)`); + this.stats.task('VSAC History for '+q, `running (${tracking.totalFetched} fetched, ${tracking.totalNew} new, ${tracking.totalUpdated} updated)`); try { await this.processContentAndHistory(q, tracking, this.queue.length); } catch (error) { @@ -194,29 +197,27 @@ class VSACValueSetProvider extends AbstractValueSetProvider { debugLog(error); this.stats.task('VSAC Sync', error.message); } - // `running (${totalFetched} fetched, ${totalNew} new)`) tracking.count++; } console.log("Requeue"); for (let q of this.requeue) { - this.stats.task('VSAC History for '+q, `running (${tracking.totalFetched} fetched, ${tracking.totalNew} new)`); + this.stats.task('VSAC History for '+q, `running (${tracking.totalFetched} fetched, ${tracking.totalNew} new, ${tracking.totalUpdated} updated)`); try { await this.processContentAndHistory(q, tracking, this.requeue.length); } catch (error) { debugLog(error); this.stats.task('VSAC Sync', error.message); } - // `running (${totalFetched} fetched, ${totalNew} new)`) tracking.count++; } // Reload map with fresh data await this._reloadMap(); - let msg = `VSAC refresh completed. Total: ${tracking.totalFetched} ValueSets, Deleted: ${tracking.deletedCount}`; + let msg = `VSAC refresh completed. Total: ${tracking.totalFetched} ValueSets, New: ${tracking.totalNew}, Updated: ${tracking.totalUpdated}`; this.stats.taskDone('VSAC Sync', msg); console.log(msg); - await this.database.finishRun(runId, tracking.totalFetched, tracking.totalNew); + await this.database.finishRun(runId, tracking.totalFetched, tracking.totalNew, tracking.totalUpdated); } catch (error) { debugLog(error, 'Error during VSAC refresh:'); this.stats.taskError('VSAC Sync', `Error (${error.message})`); @@ -228,30 +229,71 @@ class VSACValueSetProvider extends AbstractValueSetProvider { } /** - * Insert multiple ValueSets in a batch operation + * Compute a SHA-256 hash of the ValueSet content for change detection. + * @param {Object} vs - The ValueSet resource (plain JSON object) + * @returns {string} hex-encoded SHA-256 + * @private + */ + _hashValueSet(vs) { + return crypto.createHash('sha256').update(JSON.stringify(vs)).digest('hex'); + } + + /** + * Insert multiple ValueSets in a batch operation. + * For each value set: if url|version is already known, compare content hashes. + * - hash unchanged -> touch last_seen only (seeValueSet) + * - hash changed -> upsert and record an 'updated' event + * - not seen before -> upsert and record a 'new' event * @param {Array} valueSets - Array of ValueSet resources - * @returns {Promise} + * @returns {Promise<{newCount: number, updatedCount: number}>} */ async batchUpsertValueSets(valueSets) { if (valueSets.length === 0) { - return; + return { newCount: 0, updatedCount: 0 }; } - let count = 0; + let newCount = 0; + let updatedCount = 0; + // Process sequentially to avoid database locking for (const valueSet of valueSets) { - let key = valueSet.url+"|"+valueSet.version; - let vs = this.valueSetMap.get(key); - if (vs) { - // we've seen this before, and maybe fetched it's history, so just update - // the timestamp - await this.database.seeValueSet(valueSet); + const key = valueSet.url+"|"+valueSet.version; + const existing = this.valueSetMap.get(key); + const newHash = this._hashValueSet(valueSet); + + if (existing) { + // We've seen this url|version before. Decide whether the content + // has actually changed by comparing hashes. + // + // Note: _reloadMap() mutates the in-memory jsonObj (strips inc.version + // from compose.include/exclude), so we cannot reliably recompute a + // hash from existing.jsonObj — it would not match the hash of the + // original unmutated JSON we stored. For rows predating this feature + // (content_hash NULL), we defer update detection until the next cycle: + // the upsert below runs only when hashes differ, so on the *next* + // sync after migration we'll have a proper baseline. + if (existing.contentHash && existing.contentHash === newHash) { + // No change - just touch last_seen + await this.database.seeValueSet(valueSet); + } else if (!existing.contentHash) { + // Legacy row without a stored hash - backfill the hash silently + // without emitting a spurious 'updated' event. We do a lightweight + // touch + hash update rather than a full upsert+event. + await this.database.seeValueSet(valueSet); + await this.database.setContentHash(valueSet.id, newHash); + } else { + // Content has changed - treat as update + await this.database.upsertValueSet(valueSet, newHash); + await this.database.recordEvent('updated', valueSet.url, valueSet.version); + updatedCount++; + } } else { - await this.database.upsertValueSet(valueSet); - count++; + await this.database.upsertValueSet(valueSet, newHash); + await this.database.recordEvent('new', valueSet.url, valueSet.version); + newCount++; } } - return count; + return { newCount, updatedCount }; } /** @@ -511,18 +553,21 @@ class VSACValueSetProvider extends AbstractValueSetProvider { const bundle = await this._fetchBundle(url); let vcount = 0; + let perRun = { newCount: 0, updatedCount: 0 }; if (bundle.entry && bundle.entry.length > 0) { // Extract ValueSets from bundle entries const valueSets = bundle.entry .filter(entry => entry.resource && entry.resource.resourceType === 'ValueSet') .map(entry => entry.resource); if (valueSets.length > 0) { - tracking.totalNew = tracking.totalNew + await this.batchUpsertValueSets(valueSets); + perRun = await this.batchUpsertValueSets(valueSets); + tracking.totalNew += perRun.newCount; + tracking.totalUpdated += perRun.updatedCount; tracking.totalFetched += valueSets.length; vcount = valueSets.length; } } - let logMsg = `VSAC (${tracking.count} of ${length}) ${q}: ${vcount} versions`; + let logMsg = `VSAC (${tracking.count} of ${length}) ${q}: ${vcount} versions (${perRun.newCount} new, ${perRun.updatedCount} updated)`; console.log(logMsg); this.stats.task('VSAC Sync', logMsg); } @@ -593,30 +638,33 @@ class VSACValueSetProvider extends AbstractValueSetProvider { const rows = await new Promise((resolve, reject) => { db.all( - `SELECT 'vs' AS kind, + `SELECT 'event' AS kind, url, version, - date_first_seen AS ts, - NULL AS status, - NULL AS error_message, - NULL AS finished_at, - NULL AS total_fetched, - NULL AS total_new - FROM valuesets - WHERE date_first_seen > 0 + timestamp AS ts, + event_type, + NULL AS status, + NULL AS error_message, + NULL AS finished_at, + NULL AS total_fetched, + NULL AS total_new, + NULL AS total_updated + FROM vsac_events UNION ALL - SELECT 'run' AS kind, - NULL, - NULL, - started_at AS ts, - status, - error_message, - finished_at, - total_fetched, - total_new - FROM vsac_runs - ORDER BY ts DESC - LIMIT 200`, + SELECT 'run' AS kind, + NULL, + NULL, + started_at AS ts, + NULL AS event_type, + status, + error_message, + finished_at, + total_fetched, + total_new, + total_updated + FROM vsac_runs + ORDER BY ts DESC + LIMIT 200`, [], (err, rows) => err ? reject(err) : resolve(rows) ); @@ -636,7 +684,8 @@ class VSACValueSetProvider extends AbstractValueSetProvider { const duration = row.finished_at ? `${row.finished_at - row.ts}s` : 'in progress'; let detail, colour; if (row.status === 'ok') { - detail = `${row.total_fetched} fetched, ${row.total_new} new, ${duration}`; + const updated = row.total_updated != null ? `, ${row.total_updated} updated` : ''; + detail = `${row.total_fetched} fetched, ${row.total_new} new${updated}, ${duration}`; colour = 'green'; } else if (row.status === 'error') { detail = `Failed: ${escape(row.error_message || '')} (${duration})`; @@ -651,9 +700,28 @@ class VSACValueSetProvider extends AbstractValueSetProvider { html += `${detail}`; html += ``; } else { + // Event row: 'new', 'updated', or 'deleted' + let label, colour; + switch (row.event_type) { + case 'new': + label = 'New value set'; + colour = 'green'; + break; + case 'updated': + label = 'Updated value set'; + colour = 'blue'; + break; + case 'deleted': + label = 'Deleted value set'; + colour = 'red'; + break; + default: + label = escape(row.event_type || 'Event'); + colour = 'black'; + } html += ``; html += `${escape(fmt(row.ts))}`; - html += `New value set`; + html += `${label}`; html += `${escape(row.url || '')}#${escape(row.version || '')}`; html += ``; }