Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 66 additions & 27 deletions crates/hts/src/backends/postgres/code_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,42 +268,81 @@ impl CodeSystemOperations for PostgresTerminologyBackend {
/// Resolve a code system by URL, optional version, and optional date.
///
/// Returns `(id, name_or_url, version)`.
///
/// Mirrors the SQLite implementation: an unspecified version defaults to the
/// most recent (textual COALESCE-DESC), an explicit version with `.x` segments
/// (or a bare numeric prefix like `"1"`) matches the highest version that
/// shares the literal segments, and an exact version requires an exact match.
async fn resolve_code_system(
client: &tokio_postgres::Client,
url: &str,
version: Option<&str>,
date: Option<&str>,
) -> Result<(String, String, Option<String>), HtsError> {
let rows = if let Some(ver) = version {
client
.query(
"SELECT id, COALESCE(name, url), version
FROM code_systems
WHERE url = $1 AND version = $2
AND ($3::text IS NULL OR (resource_json->>'date') <= $3)",
&[&url, &ver, &date],
)
.await
.map_err(|e| HtsError::StorageError(e.to_string()))?
} else {
client
.query(
"SELECT id, COALESCE(name, url), version
FROM code_systems
WHERE url = $1
AND ($2::text IS NULL OR (resource_json->>'date') <= $2)",
&[&url, &date],
)
.await
.map_err(|e| HtsError::StorageError(e.to_string()))?
};
let rows = client
.query(
"SELECT id, COALESCE(name, url), version
FROM code_systems
WHERE url = $1
AND ($2::text IS NULL OR (resource_json->>'date') <= $2)
ORDER BY COALESCE(version, '') DESC",
&[&url, &date],
)
.await
.map_err(|e| HtsError::StorageError(e.to_string()))?;

let row = rows
if rows.is_empty() {
return Err(HtsError::NotFound(format!("CodeSystem not found: {url}")));
}
let candidates: Vec<(String, String, Option<String>)> = rows
.into_iter()
.next()
.ok_or_else(|| HtsError::NotFound(format!("CodeSystem not found: {url}")))?;
.map(|r| (r.get(0), r.get(1), r.get(2)))
.collect();

match version {
Some(ver) if ver.contains(".x") || ver == "x" || is_short_version(ver) => {
select_best_version_match(&candidates, ver).ok_or_else(|| {
HtsError::NotFound(format!("CodeSystem not found: {url} (version {ver})"))
})
}
Some(ver) => candidates
.into_iter()
.find(|(_, _, v)| v.as_deref() == Some(ver))
.ok_or_else(|| {
HtsError::NotFound(format!("CodeSystem not found: {url} (version {ver})"))
}),
None => Ok(candidates.into_iter().next().expect("non-empty checked")),
}
}

Ok((row.get(0), row.get(1), row.get(2)))
fn is_short_version(ver: &str) -> bool {
!ver.contains('.') && ver.chars().all(|c| c.is_ascii_digit())
}

fn select_best_version_match(
candidates: &[(String, String, Option<String>)],
pattern: &str,
) -> Option<(String, String, Option<String>)> {
let pattern_segments: Vec<&str> = pattern.split('.').collect();
candidates
.iter()
.filter(|(_, _, v)| match v {
Some(actual) => version_matches(actual, &pattern_segments),
None => false,
})
.max_by(|a, b| a.2.cmp(&b.2))
.cloned()
}

fn version_matches(actual: &str, pattern_segments: &[&str]) -> bool {
let actual_segments: Vec<&str> = actual.split('.').collect();
if pattern_segments.len() > actual_segments.len() {
return false;
}
pattern_segments
.iter()
.zip(actual_segments.iter())
.all(|(p, a)| *p == "x" || *p == *a)
}

/// Look up a concept row by `(system_id, code)`.
Expand Down
6 changes: 5 additions & 1 deletion crates/hts/src/backends/postgres/concept_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ impl ConceptMapOperations for PostgresTerminologyBackend {

for (system_url, codes) in &by_system {
let id_rows = client
.query("SELECT id FROM code_systems WHERE url = $1", &[system_url])
.query(
"SELECT id FROM code_systems WHERE url = $1 \
ORDER BY COALESCE(version, '') DESC LIMIT 1",
&[system_url],
)
.await
.map_err(|e| HtsError::StorageError(format!("DB error: {e}")))?;

Expand Down
87 changes: 62 additions & 25 deletions crates/hts/src/backends/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,48 @@ impl TerminologyMetadata for PostgresTerminologyBackend {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async move {
let client = pool.get().await.ok()?;
let sql = match resource_type.as_str() {
"CodeSystem" => "SELECT url FROM code_systems WHERE id = $1",
"ValueSet" => "SELECT url FROM value_sets WHERE id = $1",
"ConceptMap" => "SELECT url FROM concept_maps WHERE id = $1",
_ => return None,
};
let rows = client.query(sql, &[&id]).await.ok()?;
rows.into_iter().next().map(|r| r.get::<_, String>(0))
match resource_type.as_str() {
"CodeSystem" => {
// Storage id may be the synthetic `<id>|<version>` form,
// so first try a direct hit, then fall back to matching
// the FHIR resource id captured in `resource_json` and
// pick the latest version.
if let Ok(rows) = client
.query("SELECT url FROM code_systems WHERE id = $1", &[&id])
.await
{
if let Some(row) = rows.into_iter().next() {
return Some(row.get::<_, String>(0));
}
}
let rows = client
.query(
"SELECT url FROM code_systems \
WHERE (resource_json->>'id') = $1 \
ORDER BY COALESCE(version, '') DESC \
LIMIT 1",
&[&id],
)
.await
.ok()?;
rows.into_iter().next().map(|r| r.get::<_, String>(0))
}
"ValueSet" => {
let rows = client
.query("SELECT url FROM value_sets WHERE id = $1", &[&id])
.await
.ok()?;
rows.into_iter().next().map(|r| r.get::<_, String>(0))
}
"ConceptMap" => {
let rows = client
.query("SELECT url FROM concept_maps WHERE id = $1", &[&id])
.await
.ok()?;
rows.into_iter().next().map(|r| r.get::<_, String>(0))
}
_ => None,
}
})
})
}
Expand Down Expand Up @@ -255,21 +289,25 @@ async fn write_code_system(
let resource_json = Some(cs.resource_json.clone());
let now = utc_now();

// Two-step upsert so we handle conflicts on *either* unique constraint
// (the `url` index and the PK on `id`). `ON CONFLICT (url) DO UPDATE`
// alone does not catch a PK collision that would happen when two
// concurrent importers use the same `cs.id` — the INSERT can fail on the
// PK arbiter index before the URL arbiter is consulted. `ON CONFLICT DO
// NOTHING` (no target) swallows any unique-constraint conflict, after
// which a plain UPDATE by URL refreshes the row and returns its id.
// Synthetic storage id encodes the version so multiple versions of the
// same canonical URL coexist even when they share the same FHIR `id`
// (tx-ecosystem `version/codesystem-version-1.json` + `-2.json` both ship
// `"id":"version"`). See `crate::import::fhir_bundle::storage_id_for`.
let storage_id = crate::import::fhir_bundle::storage_id_for(&cs.id, cs.version.as_deref());

// Upsert keyed on (url, version). The composite UNIQUE index
// `idx_code_systems_url_version` is the conflict arbiter; the legacy
// `ON CONFLICT (url)` is no longer applicable now that two rows can share
// a URL. `ON CONFLICT DO NOTHING` keeps a prior row in place; the UPDATE
// below then refreshes its mutable columns by (url, version).
client
.execute(
"INSERT INTO code_systems
(id, url, version, name, title, status, content, resource_json, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $9)
ON CONFLICT DO NOTHING",
&[
&cs.id,
&storage_id,
&cs.url,
&cs.version,
&cs.name,
Expand All @@ -286,24 +324,23 @@ async fn write_code_system(
let cs_rows = client
.query(
"UPDATE code_systems SET
version = $1,
name = $2,
title = $3,
status = $4,
content = $5,
resource_json = $6,
updated_at = $7
WHERE url = $8
name = $1,
title = $2,
status = $3,
content = $4,
resource_json = $5,
updated_at = $6
WHERE url = $7 AND COALESCE(version, '') = COALESCE($8, '')
RETURNING id",
&[
&cs.version,
&cs.name,
&cs.title,
&cs.status,
&cs.content,
&resource_json,
&now,
&cs.url,
&cs.version,
],
)
.await
Expand Down
30 changes: 29 additions & 1 deletion crates/hts/src/backends/postgres/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@ pub const SCHEMA: &str = "
CREATE EXTENSION IF NOT EXISTS pg_trgm;

-- ── Code Systems ──────────────────────────────────────────────────────────────
-- Multi-version: a canonical URL may have multiple rows so long as each row has
-- a distinct `version`. Uniqueness is enforced via the composite expression
-- index `idx_code_systems_url_version` (the column-level UNIQUE on `url` was
-- dropped). Rows with NULL version are coalesced to the empty string by the
-- index so two un-versioned imports of the same URL still collide.
CREATE TABLE IF NOT EXISTS code_systems (
id TEXT PRIMARY KEY,
url TEXT NOT NULL UNIQUE,
url TEXT NOT NULL,
version TEXT,
name TEXT,
title TEXT,
Expand All @@ -27,6 +32,29 @@ CREATE TABLE IF NOT EXISTS code_systems (
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_code_systems_url_version
ON code_systems(url, COALESCE(version, ''));
CREATE INDEX IF NOT EXISTS idx_code_systems_url ON code_systems(url);

-- Legacy installs created code_systems with `url TEXT NOT NULL UNIQUE`, which
-- bakes uniqueness into a hidden constraint that blocks multi-version imports.
-- Drop both the standalone constraint name and the auto-named one so subsequent
-- (url, version) duplicates can coexist; both forms are silently ignored when
-- absent (legacy index is unnamed across PG versions).
DO $$
DECLARE
cons_name text;
BEGIN
FOR cons_name IN
SELECT conname FROM pg_constraint c
JOIN pg_class t ON t.oid = c.conrelid
WHERE t.relname = 'code_systems'
AND c.contype = 'u'
AND pg_get_constraintdef(c.oid) = 'UNIQUE (url)'
LOOP
EXECUTE format('ALTER TABLE code_systems DROP CONSTRAINT %I', cons_name);
END LOOP;
END $$;

-- ── Concepts ───────────────────────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS concepts (
Expand Down
Loading