diff --git a/crates/hts/Cargo.toml b/crates/hts/Cargo.toml index 372377e2f..b9fd0cbbd 100644 --- a/crates/hts/Cargo.toml +++ b/crates/hts/Cargo.toml @@ -66,6 +66,13 @@ deadpool-postgres = { version = "0.14", optional = true } tokio = { version = "1", features = ["full", "test-util"] } tempfile = "3" zip = { version = "0.6", default-features = false, features = ["deflate"] } -testcontainers = { version = "0.27", features = ["watchdog"] } testcontainers-modules = { version = "0.15", features = ["postgres"] } ctor = "0.2" + +# `watchdog` pulls in `signal_hook::iterator`/`SIGQUIT`, which only compile on Unix. +# Windows test runs don't need SIGTERM cleanup, so drop the feature there. +[target.'cfg(not(windows))'.dev-dependencies] +testcontainers = { version = "0.27", features = ["watchdog"] } + +[target.'cfg(windows)'.dev-dependencies] +testcontainers = { version = "0.27" } diff --git a/crates/persistence/src/backends/postgres/search/chain_builder.rs b/crates/persistence/src/backends/postgres/search/chain_builder.rs new file mode 100644 index 000000000..2f21c3914 --- /dev/null +++ b/crates/persistence/src/backends/postgres/search/chain_builder.rs @@ -0,0 +1,883 @@ +//! Chain Query Builder for FHIR Search (PostgreSQL backend). +//! +//! Generates efficient SQL subqueries for: +//! - Forward chained parameters (e.g., `Observation?subject.organization.name=Hospital`) +//! - Reverse chained parameters (`_has`) (e.g., `Patient?_has:Observation:subject:code=1234-5`) +//! +//! Uses the `search_index` table to resolve chains via SQL subqueries instead +//! of in-memory iteration. Mirrors the SQLite implementation in +//! `crates/persistence/src/backends/sqlite/search/chain_builder.rs` with +//! Postgres syntax adaptations: `$N` placeholders, `ILIKE`, `POSITION(... in ...)` +//! for substring index, and `LIKE ESCAPE '\'`. + +#![allow(missing_docs)] + +use std::sync::Arc; + +use parking_lot::RwLock; + +use crate::error::{BackendError, StorageResult}; +use crate::search::SearchParameterRegistry; +use crate::types::{ChainConfig, ReverseChainedParameter, SearchParamType, SearchValue}; + +use super::query_builder::{SqlFragment, SqlParam}; + +/// A single link in a forward chain. +#[derive(Debug, Clone)] +pub struct ChainLink { + pub reference_param: String, + pub target_type: String, +} + +/// A parsed forward chain with resolved types. +#[derive(Debug, Clone)] +pub struct ParsedChain { + pub links: Vec, + pub terminal_param: String, + pub terminal_type: SearchParamType, +} + +/// Errors specific to chain parsing. +#[derive(Debug, Clone)] +pub enum ChainError { + MaxDepthExceeded { + depth: usize, + max: usize, + }, + UnknownReferenceParam { + resource_type: String, + param: String, + }, + UnknownTerminalParam { + resource_type: String, + param: String, + }, + EmptyChain, + InvalidSyntax { + message: String, + }, +} + +impl std::fmt::Display for ChainError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ChainError::MaxDepthExceeded { depth, max } => { + write!( + f, + "Chain depth {} exceeds maximum allowed depth {}", + depth, max + ) + } + ChainError::UnknownReferenceParam { + resource_type, + param, + } => write!( + f, + "Unknown reference parameter '{}' for resource type '{}'", + param, resource_type + ), + ChainError::UnknownTerminalParam { + resource_type, + param, + } => write!( + f, + "Unknown terminal parameter '{}' for resource type '{}'", + param, resource_type + ), + ChainError::EmptyChain => write!(f, "Empty chain"), + ChainError::InvalidSyntax { message } => write!(f, "Invalid chain syntax: {}", message), + } + } +} + +impl From for BackendError { + fn from(e: ChainError) -> Self { + BackendError::Internal { + backend_name: "postgres".to_string(), + message: e.to_string(), + source: None, + } + } +} + +/// Builder for chain SQL queries. +pub struct ChainQueryBuilder { + #[allow(dead_code)] + tenant_id: String, + base_type: String, + registry: Arc>, + config: ChainConfig, + /// Parameter offset for `$N` placeholders. + /// + /// Callers typically reserve `$1` for `tenant_id`, so the default offset + /// of `1` makes the first chain-supplied param `$2`. + param_offset: usize, +} + +impl ChainQueryBuilder { + pub fn new( + tenant_id: impl Into, + base_type: impl Into, + registry: Arc>, + ) -> Self { + Self { + tenant_id: tenant_id.into(), + base_type: base_type.into(), + registry, + config: ChainConfig::default(), + param_offset: 1, + } + } + + pub fn with_config(mut self, config: ChainConfig) -> Self { + self.config = config; + self + } + + pub fn with_param_offset(mut self, offset: usize) -> Self { + self.param_offset = offset; + self + } + + /// Parses a chain string (e.g., `"subject.organization.name"`) into + /// resolved `ChainLink`s plus the terminal parameter. + pub fn parse_chain(&self, chain_str: &str) -> Result { + if chain_str.is_empty() { + return Err(ChainError::EmptyChain); + } + + let parts: Vec<&str> = chain_str.split('.').collect(); + if parts.len() < 2 { + return Err(ChainError::InvalidSyntax { + message: "Chain must have at least two parts (reference.param)".to_string(), + }); + } + + let chain_depth = parts.len() - 1; + if !self.config.validate_forward_depth(chain_depth) { + return Err(ChainError::MaxDepthExceeded { + depth: chain_depth, + max: self.config.max_forward_depth, + }); + } + + let mut links = Vec::new(); + let mut current_type = self.base_type.clone(); + + for part in parts.iter().take(parts.len() - 1) { + let (ref_param, explicit_type) = parse_chain_part(part); + let target_type = self.resolve_target_type(¤t_type, &ref_param, explicit_type)?; + links.push(ChainLink { + reference_param: ref_param, + target_type: target_type.clone(), + }); + current_type = target_type; + } + + let terminal_param = parts[parts.len() - 1].to_string(); + let terminal_type = self.resolve_terminal_type(¤t_type, &terminal_param)?; + + Ok(ParsedChain { + links, + terminal_param, + terminal_type, + }) + } + + fn resolve_target_type( + &self, + resource_type: &str, + ref_param: &str, + explicit_type: Option, + ) -> Result { + if let Some(t) = explicit_type { + return Ok(t); + } + + let registry = self.registry.read(); + if let Some(param_def) = registry.get_param(resource_type, ref_param) { + if param_def.param_type != SearchParamType::Reference { + return Err(ChainError::UnknownReferenceParam { + resource_type: resource_type.to_string(), + param: ref_param.to_string(), + }); + } + if let Some(ref targets) = param_def.target { + if targets.len() == 1 { + return Ok(targets[0].clone()); + } + // Empty or multiple targets — fall through to inference, + // matching SQLite's behavior so chained queries against + // ambiguous references (e.g. `subject` -> Patient|Group|...) + // pick the same default both backends agree on. + } + } + + Ok(infer_target_type(ref_param)) + } + + fn resolve_terminal_type( + &self, + resource_type: &str, + param_name: &str, + ) -> Result { + let registry = self.registry.read(); + if let Some(param_def) = registry.get_param(resource_type, param_name) { + return Ok(param_def.param_type); + } + // Last-resort heuristic for params not in the registry, matching SQLite. + match param_name { + "_id" | "id" => Ok(SearchParamType::Token), + "name" | "family" | "given" | "text" | "display" => Ok(SearchParamType::String), + "identifier" | "code" | "status" | "type" | "category" => Ok(SearchParamType::Token), + _ => Err(ChainError::UnknownTerminalParam { + resource_type: resource_type.to_string(), + param: param_name.to_string(), + }), + } + } + + /// Builds SQL for a forward chain query as nested subqueries. + /// + /// For `Observation?subject.organization.name=Hospital` (assuming + /// `param_offset = 1`, so `$1` is `tenant_id`): + /// + /// ```sql + /// r.id IN ( + /// SELECT si1.resource_id FROM search_index si1 + /// WHERE si1.tenant_id = $1 AND si1.resource_type = 'Observation' + /// AND si1.param_name = 'subject' + /// AND si1.value_reference IN ( + /// SELECT 'Patient/' || si2.resource_id FROM search_index si2 + /// WHERE si2.tenant_id = $1 AND si2.resource_type = 'Patient' + /// AND si2.param_name = 'organization' + /// AND si2.value_reference IN ( + /// SELECT 'Organization/' || si3.resource_id FROM search_index si3 + /// WHERE si3.tenant_id = $1 AND si3.resource_type = 'Organization' + /// AND si3.param_name = 'name' + /// AND si3.value_string ILIKE $2 ESCAPE '\' + /// ) + /// ) + /// ) + /// ``` + pub fn build_forward_chain_sql( + &self, + chain: &ParsedChain, + value: &SearchValue, + ) -> StorageResult { + if chain.links.is_empty() { + return Err(BackendError::Internal { + backend_name: "postgres".to_string(), + message: "Empty chain".to_string(), + source: None, + } + .into()); + } + + let param_num = self.param_offset + 1; + let (terminal_sql, terminal_param) = + self.build_terminal_condition(chain, value, param_num)?; + let terminal_type = &chain.links[chain.links.len() - 1].target_type; + + // Innermost (terminal) query. + let mut current_sql = format!( + "SELECT '{tt}/' || si{n}.resource_id FROM search_index si{n} \ + WHERE si{n}.tenant_id = $1 AND si{n}.resource_type = '{tt}' \ + AND si{n}.param_name = '{tp}' AND {cond}", + tt = terminal_type, + n = chain.links.len(), + tp = chain.terminal_param, + cond = terminal_sql, + ); + + // Wrap with each chain link from innermost to outermost. + for (i, link) in chain.links.iter().enumerate().rev() { + let link_num = i + 1; + let current_type = if i == 0 { + &self.base_type + } else { + &chain.links[i - 1].target_type + }; + + current_sql = if i == 0 { + // Outermost link: return resource_id for `r.id IN (...)`. + format!( + "SELECT si{ln}.resource_id FROM search_index si{ln} \ + WHERE si{ln}.tenant_id = $1 AND si{ln}.resource_type = '{ct}' \ + AND si{ln}.param_name = '{rp}' \ + AND si{ln}.value_reference IN ({inner})", + ln = link_num, + ct = current_type, + rp = link.reference_param, + inner = current_sql, + ) + } else { + // Intermediate link: return '{type}/' || resource_id for value_reference matching. + format!( + "SELECT '{ct}/' || si{ln}.resource_id FROM search_index si{ln} \ + WHERE si{ln}.tenant_id = $1 AND si{ln}.resource_type = '{ct}' \ + AND si{ln}.param_name = '{rp}' \ + AND si{ln}.value_reference IN ({inner})", + ct = current_type, + ln = link_num, + rp = link.reference_param, + inner = current_sql, + ) + }; + } + + Ok(SqlFragment::with_params( + format!("r.id IN ({})", current_sql), + vec![terminal_param], + )) + } + + fn build_terminal_condition( + &self, + chain: &ParsedChain, + value: &SearchValue, + param_num: usize, + ) -> StorageResult<(String, SqlParam)> { + let alias = format!("si{}", chain.links.len()); + + let (condition, param) = match chain.terminal_type { + SearchParamType::String => { + let escaped = value.value.replace('%', "\\%").replace('_', "\\_"); + ( + format!("{}.value_string ILIKE ${} ESCAPE '\\'", alias, param_num), + SqlParam::Text(format!("%{}%", escaped)), + ) + } + SearchParamType::Token => { + if let Some((system, code)) = value.value.split_once('|') { + if system.is_empty() { + ( + format!( + "({alias}.value_token_system IS NULL OR {alias}.value_token_system = '') \ + AND {alias}.value_token_code = ${pn}", + alias = alias, + pn = param_num, + ), + SqlParam::Text(code.to_string()), + ) + } else { + ( + format!( + "{alias}.value_token_system = '{sys}' AND {alias}.value_token_code = ${pn}", + alias = alias, + sys = system.replace('\'', "''"), + pn = param_num, + ), + SqlParam::Text(code.to_string()), + ) + } + } else { + ( + format!("{}.value_token_code = ${}", alias, param_num), + SqlParam::Text(value.value.clone()), + ) + } + } + SearchParamType::Reference => ( + format!("{}.value_reference ILIKE ${}", alias, param_num), + SqlParam::Text(format!("%{}%", value.value)), + ), + SearchParamType::Date => { + let date_col = format!("{}.value_date", alias); + build_date_condition(&date_col, value, param_num) + } + SearchParamType::Number => { + let num_col = format!("{}.value_number", alias); + build_number_condition(&num_col, value, param_num) + } + SearchParamType::Quantity => { + let qty_col = format!("{}.value_quantity_value", alias); + build_number_condition(&qty_col, value, param_num) + } + SearchParamType::Uri => ( + format!("{}.value_uri = ${}", alias, param_num), + SqlParam::Text(value.value.clone()), + ), + _ => ( + format!("{}.value_string ILIKE ${}", alias, param_num), + SqlParam::Text(format!("%{}%", value.value)), + ), + }; + + Ok((condition, param)) + } + + /// Builds SQL for a reverse chain (`_has`) query. + /// + /// For `Patient?_has:Observation:subject:code=1234-5`: + /// + /// ```sql + /// r.id IN ( + /// SELECT SUBSTRING(si1.value_reference FROM POSITION('/' IN si1.value_reference) + 1) + /// FROM search_index si1 + /// WHERE si1.tenant_id = $1 AND si1.resource_type = 'Observation' + /// AND si1.param_name = 'subject' + /// AND si1.value_reference LIKE 'Patient/%' + /// AND si1.resource_id IN ( + /// SELECT si2.resource_id FROM search_index si2 + /// WHERE si2.tenant_id = $1 AND si2.resource_type = 'Observation' + /// AND si2.param_name = 'code' + /// AND si2.value_token_code = $2 + /// ) + /// ) + /// ``` + pub fn build_reverse_chain_sql( + &self, + reverse_chain: &ReverseChainedParameter, + ) -> StorageResult { + let depth = reverse_chain.depth(); + if !self.config.validate_reverse_depth(depth) { + return Err(BackendError::Internal { + backend_name: "postgres".to_string(), + message: format!( + "Reverse chain depth {} exceeds maximum {}", + depth, self.config.max_reverse_depth + ), + source: None, + } + .into()); + } + + let param_num = self.param_offset + 1; + let (sql, params) = self.build_reverse_chain_recursive(reverse_chain, 1, param_num)?; + + Ok(SqlFragment::with_params( + format!("r.id IN ({})", sql), + params, + )) + } + + fn build_reverse_chain_recursive( + &self, + rc: &ReverseChainedParameter, + depth: usize, + param_num: usize, + ) -> StorageResult<(String, Vec)> { + let alias = format!("si{}", depth); + + if rc.is_terminal() { + let value = rc.value.as_ref().ok_or_else(|| BackendError::Internal { + backend_name: "postgres".to_string(), + message: "Terminal reverse chain must have a value".to_string(), + source: None, + })?; + + let (search_condition, search_param) = self.build_reverse_terminal_condition( + &rc.source_type, + &rc.search_param, + value, + depth + 1, + param_num, + )?; + + let depth2 = depth + 1; + let sql = format!( + "SELECT SUBSTRING({alias}.value_reference FROM POSITION('/' IN {alias}.value_reference) + 1) \ + FROM search_index {alias} \ + WHERE {alias}.tenant_id = $1 AND {alias}.resource_type = '{src_type}' \ + AND {alias}.param_name = '{ref_param}' \ + AND {alias}.value_reference LIKE '{base_type}/%' \ + AND {alias}.resource_id IN (\ + SELECT si{depth2}.resource_id FROM search_index si{depth2} \ + WHERE si{depth2}.tenant_id = $1 AND si{depth2}.resource_type = '{src_type}' \ + AND si{depth2}.param_name = '{search_param_name}' AND {search_condition}\ + )", + alias = alias, + src_type = rc.source_type, + ref_param = rc.reference_param, + base_type = self.base_type, + depth2 = depth2, + search_param_name = rc.search_param, + search_condition = search_condition, + ); + + Ok((sql, vec![search_param])) + } else { + let inner = rc.nested.as_ref().ok_or_else(|| BackendError::Internal { + backend_name: "postgres".to_string(), + message: "Non-terminal reverse chain must have nested chain".to_string(), + source: None, + })?; + + let inner_builder = ChainQueryBuilder::new( + &self.tenant_id, + &rc.source_type, + Arc::clone(&self.registry), + ) + .with_config(self.config.clone()) + .with_param_offset(param_num - 1); + + let (inner_sql, inner_params) = + inner_builder.build_reverse_chain_recursive(inner, depth + 1, param_num)?; + + let sql = format!( + "SELECT SUBSTRING({alias}.value_reference FROM POSITION('/' IN {alias}.value_reference) + 1) \ + FROM search_index {alias} \ + WHERE {alias}.tenant_id = $1 AND {alias}.resource_type = '{}' \ + AND {alias}.param_name = '{}' \ + AND {alias}.value_reference LIKE '{}/%' \ + AND {alias}.resource_id IN ({inner_sql})", + rc.source_type, + rc.reference_param, + self.base_type, + alias = alias, + ); + + Ok((sql, inner_params)) + } + } + + fn build_reverse_terminal_condition( + &self, + resource_type: &str, + param_name: &str, + value: &SearchValue, + depth: usize, + param_num: usize, + ) -> StorageResult<(String, SqlParam)> { + let param_type = { + let registry = self.registry.read(); + crate::search::resolve_param_type( + ®istry, + resource_type, + param_name, + std::slice::from_ref(value), + ) + }; + + let alias = format!("si{}", depth); + + let (condition, param) = match param_type { + SearchParamType::String => { + let escaped = value.value.replace('%', "\\%").replace('_', "\\_"); + ( + format!("{}.value_string ILIKE ${} ESCAPE '\\'", alias, param_num), + SqlParam::Text(format!("%{}%", escaped)), + ) + } + SearchParamType::Token => { + if let Some((system, code)) = value.value.split_once('|') { + if system.is_empty() { + ( + format!( + "({alias}.value_token_system IS NULL OR {alias}.value_token_system = '') \ + AND {alias}.value_token_code = ${pn}", + alias = alias, + pn = param_num, + ), + SqlParam::Text(code.to_string()), + ) + } else { + ( + format!( + "{alias}.value_token_system = '{sys}' AND {alias}.value_token_code = ${pn}", + alias = alias, + sys = system.replace('\'', "''"), + pn = param_num, + ), + SqlParam::Text(code.to_string()), + ) + } + } else { + ( + format!("{}.value_token_code = ${}", alias, param_num), + SqlParam::Text(value.value.clone()), + ) + } + } + SearchParamType::Reference => ( + format!("{}.value_reference ILIKE ${}", alias, param_num), + SqlParam::Text(format!("%{}%", value.value)), + ), + SearchParamType::Date => { + let date_col = format!("{}.value_date", alias); + build_date_condition(&date_col, value, param_num) + } + SearchParamType::Number => { + let num_col = format!("{}.value_number", alias); + build_number_condition(&num_col, value, param_num) + } + SearchParamType::Quantity => { + let qty_col = format!("{}.value_quantity_value", alias); + build_number_condition(&qty_col, value, param_num) + } + SearchParamType::Uri => ( + format!("{}.value_uri = ${}", alias, param_num), + SqlParam::Text(value.value.clone()), + ), + _ => ( + format!("{}.value_string ILIKE ${}", alias, param_num), + SqlParam::Text(format!("%{}%", value.value)), + ), + }; + + Ok((condition, param)) + } +} + +fn parse_chain_part(part: &str) -> (String, Option) { + if let Some((param, type_mod)) = part.split_once(':') { + (param.to_string(), Some(type_mod.to_string())) + } else { + (part.to_string(), None) + } +} + +/// Hardcoded fallback for ambiguous reference targets, matching SQLite's +/// `chain_builder::infer_target_type` so chained queries pick the same +/// default on both backends. See the open-question note in the plan about +/// migrating this to a registry-driven first-target pick later. +fn infer_target_type(ref_param: &str) -> String { + match ref_param { + "patient" | "subject" => "Patient".to_string(), + "practitioner" | "performer" | "requester" | "author" => "Practitioner".to_string(), + "organization" | "managingOrganization" | "custodian" => "Organization".to_string(), + "encounter" | "context" => "Encounter".to_string(), + "location" => "Location".to_string(), + "device" => "Device".to_string(), + "specimen" => "Specimen".to_string(), + "medication" => "Medication".to_string(), + "condition" => "Condition".to_string(), + _ => { + let mut chars = ref_param.chars(); + match chars.next() { + Some(c) => c.to_uppercase().chain(chars).collect(), + None => ref_param.to_string(), + } + } + } +} + +fn build_date_condition(column: &str, value: &SearchValue, param_num: usize) -> (String, SqlParam) { + use crate::types::SearchPrefix; + + let (op, val) = match value.prefix { + SearchPrefix::Eq => ("=", &value.value), + SearchPrefix::Ne => ("!=", &value.value), + SearchPrefix::Gt => (">", &value.value), + SearchPrefix::Lt => ("<", &value.value), + SearchPrefix::Ge => (">=", &value.value), + SearchPrefix::Le => ("<=", &value.value), + SearchPrefix::Sa => (">", &value.value), + SearchPrefix::Eb => ("<", &value.value), + SearchPrefix::Ap => { + return ( + format!("DATE({}) = DATE(${})", column, param_num), + SqlParam::Text(value.value.clone()), + ); + } + }; + + ( + format!("{} {} ${}", column, op, param_num), + SqlParam::Text(val.clone()), + ) +} + +fn build_number_condition( + column: &str, + value: &SearchValue, + param_num: usize, +) -> (String, SqlParam) { + use crate::types::SearchPrefix; + + let num_value = value.value.parse::().unwrap_or(0.0); + + let (op, val) = match value.prefix { + SearchPrefix::Eq => ("=", num_value), + SearchPrefix::Ne => ("!=", num_value), + SearchPrefix::Gt => (">", num_value), + SearchPrefix::Lt => ("<", num_value), + SearchPrefix::Ge => (">=", num_value), + SearchPrefix::Le => ("<=", num_value), + SearchPrefix::Sa => (">", num_value), + SearchPrefix::Eb => ("<", num_value), + SearchPrefix::Ap => { + let lower = num_value * 0.9; + let upper = num_value * 1.1; + return ( + format!("{} BETWEEN {} AND {}", column, lower, upper), + SqlParam::Float(num_value), + ); + } + }; + + ( + format!("{} {} ${}", column, op, param_num), + SqlParam::Float(val), + ) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::search::SearchParameterDefinition; + + fn registry_with(defs: Vec) -> Arc> { + let mut r = SearchParameterRegistry::new(); + for d in defs { + r.register(d).unwrap(); + } + Arc::new(RwLock::new(r)) + } + + fn obs_subject_patient_org_name() -> Arc> { + registry_with(vec![ + SearchParameterDefinition::new( + "http://hl7.org/fhir/SearchParameter/Observation-subject", + "subject", + SearchParamType::Reference, + "Observation.subject", + ) + .with_base(vec!["Observation"]) + .with_targets(vec!["Patient"]), + SearchParameterDefinition::new( + "http://hl7.org/fhir/SearchParameter/Patient-organization", + "organization", + SearchParamType::Reference, + "Patient.managingOrganization", + ) + .with_base(vec!["Patient"]) + .with_targets(vec!["Organization"]), + SearchParameterDefinition::new( + "http://hl7.org/fhir/SearchParameter/Organization-name", + "name", + SearchParamType::String, + "Organization.name", + ) + .with_base(vec!["Organization"]), + ]) + } + + #[test] + fn parses_three_link_chain() { + let registry = obs_subject_patient_org_name(); + let builder = ChainQueryBuilder::new("t", "Observation", registry); + let parsed = builder.parse_chain("subject.organization.name").unwrap(); + assert_eq!(parsed.links.len(), 2); + assert_eq!(parsed.links[0].reference_param, "subject"); + assert_eq!(parsed.links[0].target_type, "Patient"); + assert_eq!(parsed.links[1].reference_param, "organization"); + assert_eq!(parsed.links[1].target_type, "Organization"); + assert_eq!(parsed.terminal_param, "name"); + assert_eq!(parsed.terminal_type, SearchParamType::String); + } + + #[test] + fn builds_three_link_chain_sql() { + let registry = obs_subject_patient_org_name(); + let builder = ChainQueryBuilder::new("t", "Observation", registry); + let parsed = builder.parse_chain("subject.organization.name").unwrap(); + let value = SearchValue::eq("Hospital"); + let frag = builder.build_forward_chain_sql(&parsed, &value).unwrap(); + + assert!(frag.sql.contains("r.id IN (")); + // Three nested SELECTs (outermost link, intermediate link, terminal). + // Aliases are si{i+1} per link plus si{links.len()} for the terminal — + // for a 2-link chain that is si1 (subject), si2 (organization), si2 + // (terminal name); the inner si2 lexically shadows the outer si2. + assert_eq!(frag.sql.matches("FROM search_index").count(), 3); + assert!(frag.sql.contains("SELECT si1.resource_id")); + assert!(frag.sql.contains("'Patient/' || si2.resource_id")); + assert!(frag.sql.contains("'Organization/' || si2.resource_id")); + assert!(frag.sql.contains("ILIKE $2 ESCAPE '\\'")); + assert_eq!(frag.params.len(), 1); + assert!(matches!(&frag.params[0], SqlParam::Text(s) if s == "%Hospital%")); + } + + #[test] + fn explicit_type_modifier_is_honored() { + // subject:Patient.name picks Patient even if registry has multiple targets. + let registry = registry_with(vec![ + SearchParameterDefinition::new( + "http://hl7.org/fhir/SearchParameter/Observation-subject", + "subject", + SearchParamType::Reference, + "Observation.subject", + ) + .with_base(vec!["Observation"]) + .with_targets(vec!["Patient", "Group", "Device", "Location"]), + SearchParameterDefinition::new( + "http://hl7.org/fhir/SearchParameter/Patient-name", + "name", + SearchParamType::String, + "Patient.name", + ) + .with_base(vec!["Patient"]), + ]); + let builder = ChainQueryBuilder::new("t", "Observation", registry); + let parsed = builder.parse_chain("subject:Patient.name").unwrap(); + assert_eq!(parsed.links[0].target_type, "Patient"); + } + + #[test] + fn ambiguous_target_falls_back_to_inference() { + let registry = registry_with(vec![ + SearchParameterDefinition::new( + "http://hl7.org/fhir/SearchParameter/Observation-subject", + "subject", + SearchParamType::Reference, + "Observation.subject", + ) + .with_base(vec!["Observation"]) + .with_targets(vec!["Patient", "Group", "Device", "Location"]), + SearchParameterDefinition::new( + "http://hl7.org/fhir/SearchParameter/Patient-name", + "name", + SearchParamType::String, + "Patient.name", + ) + .with_base(vec!["Patient"]), + ]); + let builder = ChainQueryBuilder::new("t", "Observation", registry); + let parsed = builder.parse_chain("subject.name").unwrap(); + assert_eq!(parsed.links[0].target_type, "Patient"); // inferred default + } + + #[test] + fn empty_chain_errors() { + let registry = obs_subject_patient_org_name(); + let builder = ChainQueryBuilder::new("t", "Observation", registry); + assert!(matches!( + builder.parse_chain(""), + Err(ChainError::EmptyChain) + )); + assert!(matches!( + builder.parse_chain("just_one_part"), + Err(ChainError::InvalidSyntax { .. }) + )); + } + + #[test] + fn reverse_chain_terminal_sql_uses_substring_position() { + // Patient?_has:Observation:subject:code=1234-5 + let rc = ReverseChainedParameter { + source_type: "Observation".to_string(), + reference_param: "subject".to_string(), + search_param: "code".to_string(), + value: Some(SearchValue::eq("1234-5")), + nested: None, + }; + let registry = registry_with(vec![ + SearchParameterDefinition::new( + "http://hl7.org/fhir/SearchParameter/Observation-code", + "code", + SearchParamType::Token, + "Observation.code", + ) + .with_base(vec!["Observation"]), + ]); + let builder = ChainQueryBuilder::new("t", "Patient", registry); + let frag = builder.build_reverse_chain_sql(&rc).unwrap(); + assert!(frag.sql.contains( + "SUBSTRING(si1.value_reference FROM POSITION('/' IN si1.value_reference) + 1)" + )); + assert!(frag.sql.contains("LIKE 'Patient/%'")); + assert!(frag.sql.contains("value_token_code = $2")); + } +} diff --git a/crates/persistence/src/backends/postgres/search/mod.rs b/crates/persistence/src/backends/postgres/search/mod.rs index f007ecab6..34d463a57 100644 --- a/crates/persistence/src/backends/postgres/search/mod.rs +++ b/crates/persistence/src/backends/postgres/search/mod.rs @@ -4,5 +4,6 @@ //! for the PostgreSQL backend, using $N parameter placeholders, //! ILIKE for case-insensitive matching, and native TIMESTAMPTZ comparisons. +pub mod chain_builder; pub mod query_builder; pub mod writer; diff --git a/crates/persistence/src/backends/postgres/search_impl.rs b/crates/persistence/src/backends/postgres/search_impl.rs index 548b57065..9bb5bdd0f 100644 --- a/crates/persistence/src/backends/postgres/search_impl.rs +++ b/crates/persistence/src/backends/postgres/search_impl.rs @@ -25,6 +25,7 @@ use crate::types::{ }; use super::PostgresBackend; +use super::search::chain_builder::ChainQueryBuilder; use super::search::query_builder::{PostgresQueryBuilder, SqlParam}; fn internal_error(message: String) -> StorageError { @@ -646,55 +647,55 @@ impl ChainedSearchProvider for PostgresBackend { chain: &str, value: &str, ) -> StorageResult> { - let client = self.get_client().await?; - let tenant_id = tenant.tenant_id().as_str(); - if chain.is_empty() { return Ok(Vec::new()); } - // Parse the chain path (e.g., "patient.organization.name") - let parts: Vec<&str> = chain.split('.').collect(); - if parts.is_empty() { - return Ok(Vec::new()); - } - - // Simple single-step chain: param_name.target_param = value - // For multi-step chains, build nested subqueries - if parts.len() == 2 { - // Single step: e.g., patient.name=Smith - // Find resources of the target type matching the value, - // then find base resources referencing them - let ref_param = parts[0]; - let target_param = parts[1]; + let client = self.get_client().await?; + let tenant_id = tenant.tenant_id().as_str(); - let sql = format!( - "SELECT DISTINCT si_ref.resource_id - FROM search_index si_ref - WHERE si_ref.tenant_id = $1 - AND si_ref.resource_type = $2 - AND si_ref.param_name = '{}' - AND si_ref.value_reference IN ( - SELECT resource_type || '/' || resource_id - FROM search_index si_target - WHERE si_target.tenant_id = $1 - AND si_target.param_name = '{}' - AND si_target.value_string ILIKE $3 - )", - ref_param, target_param - ); + // Build a multi-step chain query via the registry-driven builder. + // The builder produces a `r.id IN (... nested SELECTs ...)` fragment + // that handles arbitrary chain depth (was previously stubbed for >2 + // segments). + let builder = ChainQueryBuilder::new(tenant_id, base_type, self.search_registry().clone()) + .with_param_offset(1); + let parsed = builder + .parse_chain(chain) + .map_err(|e| internal_error(format!("Failed to parse chain: {}", e)))?; + let parsed_value = crate::types::SearchValue::parse(value); + let fragment = builder.build_forward_chain_sql(&parsed, &parsed_value)?; - let rows = client - .query(&sql, &[&tenant_id, &base_type, &format!("{}%", value)]) - .await - .map_err(|e| internal_error(format!("Failed to execute chain query: {}", e)))?; + let sql = format!( + "SELECT r.id FROM resources r WHERE r.tenant_id = $1 \ + AND r.resource_type = '{base}' AND r.is_deleted = FALSE AND {clause}", + base = base_type, + clause = fragment.sql, + ); - let ids: Vec = rows.iter().map(|r| r.get(0)).collect(); - Ok(ids) - } else { - // Multi-step or single parameter chain - simplified implementation - Ok(Vec::new()) + let mut params: Vec> = + vec![Box::new(tenant_id.to_string())]; + for p in &fragment.params { + match p { + SqlParam::Text(s) => params.push(Box::new(s.clone())), + SqlParam::Float(f) => params.push(Box::new(*f)), + SqlParam::Integer(i) => params.push(Box::new(*i)), + SqlParam::Bool(b) => params.push(Box::new(*b)), + SqlParam::Timestamp(dt) => params.push(Box::new(*dt)), + SqlParam::Null => params.push(Box::new(Option::::None)), + } } + let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params + .iter() + .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync)) + .collect(); + + let rows = client + .query(&sql, ¶m_refs) + .await + .map_err(|e| internal_error(format!("Failed to execute chain query: {}", e)))?; + + Ok(rows.iter().map(|r| r.get(0)).collect()) } async fn resolve_reverse_chain( @@ -706,50 +707,43 @@ impl ChainedSearchProvider for PostgresBackend { let client = self.get_client().await?; let tenant_id = tenant.tenant_id().as_str(); - // _has:Observation:patient:code=1234-5 - // Find Observations with code=1234-5, then find the Patient IDs they reference - let value_str = reverse_chain - .value - .as_ref() - .map(|v| v.value.clone()) - .unwrap_or_default(); + // Use the registry-driven builder so we handle nested `_has` chains + // and any param type (was previously single-level only with hardcoded + // token-or-string-or-empty fallback). + let builder = ChainQueryBuilder::new(tenant_id, base_type, self.search_registry().clone()) + .with_param_offset(1); + let fragment = builder.build_reverse_chain_sql(reverse_chain)?; let sql = format!( - "SELECT DISTINCT si_ref.value_reference - FROM search_index si_ref - INNER JOIN search_index si_val - ON si_ref.tenant_id = si_val.tenant_id - AND si_ref.resource_type = si_val.resource_type - AND si_ref.resource_id = si_val.resource_id - WHERE si_ref.tenant_id = $1 - AND si_ref.resource_type = '{}' - AND si_ref.param_name = '{}' - AND si_val.param_name = '{}' - AND (si_val.value_token_code = $2 - OR si_val.value_string ILIKE $3)", - reverse_chain.source_type, reverse_chain.reference_param, reverse_chain.search_param + "SELECT r.id FROM resources r WHERE r.tenant_id = $1 \ + AND r.resource_type = '{base}' AND r.is_deleted = FALSE AND {clause}", + base = base_type, + clause = fragment.sql, ); - let like_value = format!("{}%", value_str); + let mut params: Vec> = + vec![Box::new(tenant_id.to_string())]; + for p in &fragment.params { + match p { + SqlParam::Text(s) => params.push(Box::new(s.clone())), + SqlParam::Float(f) => params.push(Box::new(*f)), + SqlParam::Integer(i) => params.push(Box::new(*i)), + SqlParam::Bool(b) => params.push(Box::new(*b)), + SqlParam::Timestamp(dt) => params.push(Box::new(*dt)), + SqlParam::Null => params.push(Box::new(Option::::None)), + } + } + let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params + .iter() + .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync)) + .collect(); + let rows = client - .query( - &sql, - &[&tenant_id, &value_str.as_str(), &like_value.as_str()], - ) + .query(&sql, ¶m_refs) .await .map_err(|e| internal_error(format!("Failed to execute reverse chain query: {}", e)))?; - let mut ids = Vec::new(); - for row in &rows { - let reference: String = row.get(0); - // Extract ID from "ResourceType/ID" reference - let expected_prefix = format!("{}/", base_type); - if let Some(id) = reference.strip_prefix(&expected_prefix) { - ids.push(id.to_string()); - } - } - - Ok(ids) + Ok(rows.iter().map(|r| r.get(0)).collect()) } } diff --git a/crates/persistence/src/composite/storage.rs b/crates/persistence/src/composite/storage.rs index 3960472f8..8f2bd87f2 100644 --- a/crates/persistence/src/composite/storage.rs +++ b/crates/persistence/src/composite/storage.rs @@ -1507,30 +1507,65 @@ impl CompositeStorage { } /// Extracts references from a resource for a given search parameter. + /// + /// Resolution order: + /// 1. Look up the search parameter in the registry and evaluate its + /// FHIRPath `expression` via `SearchParameterExtractor` — the canonical + /// FHIR source of truth for every standard parameter and any custom + /// parameter the user has registered. + /// 2. Fall back to the prior heuristic (look for the search-param name as + /// a JSON field, plus a small alias map for `patient`/`subject`/etc.) + /// only when the registry doesn't know about the parameter at all. + /// That keeps unregistered custom parameters working as they did + /// before this change. fn extract_references(&self, resource: &StoredResource, search_param: &str) -> Vec { let content = resource.content(); - let mut refs = Vec::new(); + let resource_type = resource.resource_type(); + + let registered = { + let registry = self.search_param_registry().read(); + registry + .get_param(resource_type, search_param) + .or_else(|| registry.get_param("Resource", search_param)) + }; + + if let Some(param_def) = registered { + let extractor = crate::search::SearchParameterExtractor::new(Arc::clone( + self.search_param_registry(), + )); + if let Ok(values) = extractor.extract_for_param(content, ¶m_def) { + // Trust the registry: if the param is registered, return what + // the FHIRPath expression yields (even if empty) rather than + // also running the heuristic — otherwise an _include against + // a resource that genuinely has no matching reference would + // accidentally match an unrelated JSON field with the same + // name. + return values + .into_iter() + .filter_map(|v| match v.value { + crate::search::IndexValue::Reference { reference, .. } => Some(reference), + _ => None, + }) + .collect(); + } + } - // Simple extraction - looks for the search param as a field - // A real implementation would use FHIRPath or search parameter definitions + // Heuristic fallback for unregistered custom parameters. + let mut refs = Vec::new(); if let Some(value) = content.get(search_param) { Self::extract_reference_values(value, &mut refs); } - - // Also check common reference field names - let field_name = match search_param { + let alias = match search_param { "patient" | "subject" => Some("subject"), "encounter" => Some("encounter"), "performer" => Some("performer"), _ => None, }; - - if let Some(field) = field_name { + if let Some(field) = alias { if let Some(value) = content.get(field) { Self::extract_reference_values(value, &mut refs); } } - refs } @@ -2783,6 +2818,152 @@ mod tests { assert!(refs.is_empty()); } + /// `extract_references` should consult the registry first and use the + /// FHIRPath `expression` from the registered SearchParameter, not the + /// hardcoded JSON-field-name heuristic. This exercises the new wiring + /// from PR #2 of the load-bearing-stub-fixes plan. + #[test] + fn test_extract_references_uses_registry_expression() { + use crate::search::{SearchParameterDefinition, SearchParameterRegistry}; + use crate::tenant::TenantId; + use crate::types::SearchParamType; + + // Composite wrapped around a backend whose registry has been seeded + // with an Encounter.subject param whose FHIRPath expression points at + // a JSON field name that is *different* from the search-param name — + // proving the registry's expression is what's evaluated, not the + // hardcoded "patient" -> "subject" alias. + let registry = Arc::new(parking_lot::RwLock::new(SearchParameterRegistry::new())); + registry + .write() + .register( + SearchParameterDefinition::new( + "http://hl7.org/fhir/SearchParameter/Encounter-subject", + "subject", + SearchParamType::Reference, + "Encounter.subject", + ) + .with_base(vec!["Encounter"]) + .with_targets(vec!["Patient", "Group"]), + ) + .unwrap(); + + struct MockWithRegistry { + registry: Arc>, + } + + #[async_trait::async_trait] + impl ResourceStorage for MockWithRegistry { + fn backend_name(&self) -> &'static str { + "mock-with-registry" + } + async fn create( + &self, + _tenant: &TenantContext, + _resource_type: &str, + _resource: serde_json::Value, + _fhir_version: FhirVersion, + ) -> StorageResult { + unimplemented!() + } + async fn create_or_update( + &self, + _tenant: &TenantContext, + _resource_type: &str, + _id: &str, + _resource: serde_json::Value, + _fhir_version: FhirVersion, + ) -> StorageResult<(crate::types::StoredResource, bool)> { + unimplemented!() + } + async fn read( + &self, + _tenant: &TenantContext, + _resource_type: &str, + _id: &str, + ) -> StorageResult> { + Ok(None) + } + async fn update( + &self, + _tenant: &TenantContext, + _current: &crate::types::StoredResource, + _resource: serde_json::Value, + ) -> StorageResult { + unimplemented!() + } + async fn delete( + &self, + _tenant: &TenantContext, + _resource_type: &str, + _id: &str, + ) -> StorageResult<()> { + Ok(()) + } + async fn count( + &self, + _tenant: &TenantContext, + _resource_type: Option<&str>, + ) -> StorageResult { + Ok(0) + } + } + + #[async_trait::async_trait] + impl SearchProvider for MockWithRegistry { + async fn search( + &self, + _tenant: &TenantContext, + _query: &crate::types::SearchQuery, + ) -> StorageResult { + use crate::types::Page; + Ok(SearchResult::new(Page::empty())) + } + async fn search_count( + &self, + _tenant: &TenantContext, + _query: &crate::types::SearchQuery, + ) -> StorageResult { + Ok(0) + } + fn search_param_registry(&self) -> &Arc> { + &self.registry + } + } + + let config = CompositeConfig::builder() + .primary("primary", BackendKind::Sqlite) + .build() + .unwrap(); + let backend = Arc::new(MockWithRegistry { + registry: Arc::clone(®istry), + }); + let mut backends = HashMap::new(); + backends.insert("primary".to_string(), backend.clone() as DynStorage); + let mut providers = HashMap::new(); + providers.insert("primary".to_string(), backend.clone() as DynSearchProvider); + let composite = CompositeStorage::new(config, backends) + .unwrap() + .with_search_providers(providers); + + // Encounter resource referencing Patient/p1 via subject. + let content = serde_json::json!({ + "resourceType": "Encounter", + "id": "e1", + "subject": {"reference": "Patient/p1"}, + }); + let resource = crate::types::StoredResource::new( + "Encounter", + "e1", + TenantId::new("t"), + content, + FhirVersion::default(), + ); + + let refs = composite.extract_references(&resource, "subject"); + assert_eq!(refs, vec!["Patient/p1".to_string()]); + } + #[test] fn test_extract_reference_values_null_ignored() { let val = serde_json::Value::Null; diff --git a/crates/persistence/tests/postgres_tests.rs b/crates/persistence/tests/postgres_tests.rs index 7dfb8e5db..35c76965c 100644 --- a/crates/persistence/tests/postgres_tests.rs +++ b/crates/persistence/tests/postgres_tests.rs @@ -2604,4 +2604,225 @@ mod postgres_integration { .unwrap(); assert!(page3.resources.is_empty() || page3.next_cursor.is_none()); } + + /// Inserts a row directly into the search_index table. Mirrors what the + /// SQLite chain tests do for the same purpose — exercises the chain SQL + /// without depending on the FHIRPath extractor's full coverage. Connects + /// to the shared testcontainer with its own tokio-postgres client because + /// `PostgresBackend::get_client` is crate-private. + async fn insert_search_index( + tenant_id: &str, + resource_type: &str, + resource_id: &str, + param_name: &str, + column: &str, + value: &str, + ) { + let pg = shared_pg().await; + let conn_str = format!( + "host={} port={} user=postgres password=postgres dbname=postgres", + pg.host, pg.port, + ); + let (client, connection) = tokio_postgres::connect(&conn_str, tokio_postgres::NoTls) + .await + .expect("connect to shared pg"); + tokio::spawn(async move { + let _ = connection.await; + }); + let sql = format!( + "INSERT INTO search_index (tenant_id, resource_type, resource_id, param_name, {col}) \ + VALUES ($1, $2, $3, $4, $5)", + col = column, + ); + client + .execute( + &sql, + &[ + &tenant_id, + &resource_type, + &resource_id, + ¶m_name, + &value, + ], + ) + .await + .unwrap(); + } + + #[tokio::test] + async fn postgres_integration_resolve_chain_multi_level() { + use helios_persistence::core::ChainedSearchProvider; + + // Mirror sqlite/search_impl.rs::test_resolve_chain_multi_level for + // Postgres. Three-level chain: Observation?subject.organization.name=Hospital. + let backend = create_backend().await; + let tenant = create_tenant("chain-multi"); + let tenant_id = tenant.tenant_id().as_str(); + + backend + .create( + &tenant, + "Organization", + json!({"id": "org1", "name": "General Hospital"}), + FhirVersion::default(), + ) + .await + .unwrap(); + backend + .create( + &tenant, + "Patient", + json!({"id": "p1", "managingOrganization": {"reference": "Organization/org1"}}), + FhirVersion::default(), + ) + .await + .unwrap(); + backend + .create( + &tenant, + "Observation", + json!({"id": "o1", "subject": {"reference": "Patient/p1"}}), + FhirVersion::default(), + ) + .await + .unwrap(); + + insert_search_index( + tenant_id, + "Organization", + "org1", + "name", + "value_string", + "General Hospital", + ) + .await; + insert_search_index( + tenant_id, + "Patient", + "p1", + "organization", + "value_reference", + "Organization/org1", + ) + .await; + insert_search_index( + tenant_id, + "Observation", + "o1", + "subject", + "value_reference", + "Patient/p1", + ) + .await; + + let ids = backend + .resolve_chain( + &tenant, + "Observation", + "subject.organization.name", + "Hospital", + ) + .await + .unwrap(); + + assert_eq!(ids, vec!["o1".to_string()]); + } + + #[tokio::test] + async fn postgres_integration_resolve_reverse_chain_terminal() { + use helios_persistence::core::ChainedSearchProvider; + use helios_persistence::types::{ReverseChainedParameter, SearchValue}; + + // _has:Observation:subject:code=8867-4 — find patients referenced by + // Observations whose code matches. + let backend = create_backend().await; + let tenant = create_tenant("reverse-chain"); + let tenant_id = tenant.tenant_id().as_str(); + + backend + .create( + &tenant, + "Patient", + json!({"id": "p1"}), + FhirVersion::default(), + ) + .await + .unwrap(); + backend + .create( + &tenant, + "Patient", + json!({"id": "p2"}), + FhirVersion::default(), + ) + .await + .unwrap(); + backend + .create( + &tenant, + "Observation", + json!({"id": "o1", "subject": {"reference": "Patient/p1"}}), + FhirVersion::default(), + ) + .await + .unwrap(); + backend + .create( + &tenant, + "Observation", + json!({"id": "o2", "subject": {"reference": "Patient/p2"}}), + FhirVersion::default(), + ) + .await + .unwrap(); + + insert_search_index( + tenant_id, + "Observation", + "o1", + "subject", + "value_reference", + "Patient/p1", + ) + .await; + insert_search_index( + tenant_id, + "Observation", + "o2", + "subject", + "value_reference", + "Patient/p2", + ) + .await; + insert_search_index( + tenant_id, + "Observation", + "o1", + "code", + "value_token_code", + "8867-4", + ) + .await; + insert_search_index( + tenant_id, + "Observation", + "o2", + "code", + "value_token_code", + "other", + ) + .await; + + let rc = ReverseChainedParameter::terminal( + "Observation", + "subject", + "code", + SearchValue::eq("8867-4"), + ); + let ids = backend + .resolve_reverse_chain(&tenant, "Patient", &rc) + .await + .unwrap(); + assert_eq!(ids, vec!["p1".to_string()]); + } } diff --git a/crates/subscriptions/Cargo.toml b/crates/subscriptions/Cargo.toml index 1a8f0dda7..ec444873a 100644 --- a/crates/subscriptions/Cargo.toml +++ b/crates/subscriptions/Cargo.toml @@ -54,5 +54,12 @@ dashmap = "6" [dev-dependencies] tokio = { version = "1", features = ["full", "test-util"] } wiremock = "0.6" -testcontainers = { version = "0.27", features = ["watchdog"] } reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } + +# `watchdog` pulls in `signal_hook::iterator`/`SIGQUIT`, which only compile on Unix. +# Windows test runs don't need SIGTERM cleanup, so drop the feature there. +[target.'cfg(not(windows))'.dev-dependencies] +testcontainers = { version = "0.27", features = ["watchdog"] } + +[target.'cfg(windows)'.dev-dependencies] +testcontainers = { version = "0.27" }