diff --git a/docs/docs/targets/postgres.md b/docs/docs/targets/postgres.md index 0a748681..9e8c6890 100644 --- a/docs/docs/targets/postgres.md +++ b/docs/docs/targets/postgres.md @@ -47,6 +47,8 @@ The spec takes the following fields: * `table_name` (`str`, optional): The name of the table to store to. If unspecified, will use the table name `[${AppNamespace}__]${FlowName}__${TargetName}`, e.g. `DemoFlow__doc_embeddings` or `Staging__DemoFlow__doc_embeddings`. +* `schema` (`str`, optional): The PostgreSQL schema to create the table in. If unspecified, the table will be created in the default schema (usually `public`). When specified, `table_name` must also be explicitly specified. CocoIndex will automatically create the schema if it doesn't exist. + ## Example >, table_name: Option, + schema: Option, } const BIND_LIMIT: usize = 65535; @@ -143,10 +144,12 @@ impl ExportContext { fn new( db_ref: Option>, db_pool: PgPool, - table_name: String, + table_id: &TableId, key_fields_schema: Box<[FieldSchema]>, value_fields_schema: Vec, ) -> Result { + let table_name = qualified_table_name(table_id); + let key_fields = key_fields_schema .iter() .map(|f| format!("\"{}\"", f.name)) @@ -255,12 +258,18 @@ pub struct Factory {} pub struct TableId { #[serde(skip_serializing_if = "Option::is_none")] database: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + schema: Option, table_name: String, } impl std::fmt::Display for TableId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.table_name)?; + if let Some(schema) = &self.schema { + write!(f, "{}.{}", schema, self.table_name)?; + } else { + write!(f, "{}", self.table_name)?; + } if let Some(database) = &self.database { write!(f, " (database: {database})")?; } @@ -345,6 +354,13 @@ fn to_column_type_sql(column_type: &ValueType) -> String { } } +fn qualified_table_name(table_id: &TableId) -> String { + match &table_id.schema { + Some(schema) => format!("\"{}\".{}", schema, table_id.table_name), + None => table_id.table_name.clone(), + } +} + impl<'a> From<&'a SetupState> for Cow<'a, TableColumnsSchema> { fn from(val: &'a SetupState) -> Self { Cow::Owned(TableColumnsSchema { @@ -554,7 +570,9 @@ impl setup::ResourceSetupChange for SetupChange { } impl SetupChange { - async fn apply_change(&self, db_pool: &PgPool, table_name: &str) -> Result<()> { + async fn apply_change(&self, db_pool: &PgPool, table_id: &TableId) -> Result<()> { + let table_name = qualified_table_name(table_id); + if self.actions.table_action.drop_existing { sqlx::query(&format!("DROP TABLE IF EXISTS {table_name}")) .execute(db_pool) @@ -572,6 +590,12 @@ impl SetupChange { if let Some(table_upsertion) = &self.actions.table_action.table_upsertion { match table_upsertion { TableUpsertionAction::Create { keys, values } => { + // Create schema if specified + if let Some(schema) = &table_id.schema { + let sql = format!("CREATE SCHEMA IF NOT EXISTS \"{}\"", schema); + sqlx::query(&sql).execute(db_pool).await?; + } + let mut fields = (keys .iter() .map(|(name, typ)| format!("\"{name}\" {typ} NOT NULL"))) @@ -638,8 +662,18 @@ impl TargetFactoryBase for Factory { let data_coll_output = data_collections .into_iter() .map(|d| { + // Validate: if schema is specified, table_name must be explicit + if d.spec.schema.is_some() && d.spec.table_name.is_none() { + bail!( + "Postgres target '{}': when 'schema' is specified, 'table_name' must also be explicitly provided. \ + Auto-generated table names are not supported with custom schemas", + d.name + ); + } + let table_id = TableId { database: d.spec.database.clone(), + schema: d.spec.schema.clone(), table_name: d.spec.table_name.unwrap_or_else(|| { utils::db::sanitize_identifier(&format!( "{}__{}", @@ -653,7 +687,7 @@ impl TargetFactoryBase for Factory { &d.value_fields_schema, &d.index_options, ); - let table_name = table_id.table_name.clone(); + let table_id_clone = table_id.clone(); let db_ref = d.spec.database; let auth_registry = context.auth_registry.clone(); let export_context = Box::pin(async move { @@ -661,7 +695,7 @@ impl TargetFactoryBase for Factory { let export_context = Arc::new(ExportContext::new( db_ref, db_pool.clone(), - table_name, + &table_id_clone, d.key_fields_schema, d.value_fields_schema, )?); @@ -699,7 +733,7 @@ impl TargetFactoryBase for Factory { } fn describe_resource(&self, key: &TableId) -> Result { - Ok(format!("Postgres table {}", key.table_name)) + Ok(format!("Postgres table {}", key)) } async fn apply_mutation( @@ -746,7 +780,7 @@ impl TargetFactoryBase for Factory { let db_pool = get_db_pool(change.key.database.as_ref(), &context.auth_registry).await?; change .setup_change - .apply_change(&db_pool, &change.key.table_name) + .apply_change(&db_pool, &change.key) .await?; } Ok(())