From ec48f9bb133a503f9188d97b02af51b68b17d212 Mon Sep 17 00:00:00 2001 From: Imran Zaheer Date: Thu, 25 Sep 2025 20:18:12 +0500 Subject: [PATCH 1/5] Add more CREATE FOREIGN TABLE options Add new options `prefix,keys_only,revision,range` with `CREATE FOREIGN TABLE`. --- README.md | 34 ++++++++++++++++++++++++++++++++++ src/lib.rs | 53 ++++++++++++++++++++++++++++++++++++++++------------- 2 files changed, 74 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index d0a5523..95faa0b 100644 --- a/README.md +++ b/README.md @@ -93,6 +93,40 @@ Usage Timeout in seconds to each request after the connection has been established. + +## CREATE FOREIGN TABLE options + +`etcd_fdw` accepts the following table-level options via the +`CREATE FOREIGN TABLE` command. + +- **rowid** as *string*, mandatory, no default + + Specifies which column should be treated as the unique row identifier. + Usually set to key. + +- **prefix** as *string*, optional, default `/` + + Restrict the scan to keys beginning with this prefix. + If not provided, the FDW will fetch all keys from the etcd server + +- **keys_only** as *string*, optional, default `false` + + If set to true, only the keys are fetched, not the values. + Useful to reduce network overhead when values are not needed. + +- **revision** as *string*, optional, default `0` + + Read key-value data at a specific etcd revision. + If 0, the latest revision is used. + +- **range** as *string*, optional, no default + + Restricts the scan to the half-open interval `[key, range)`. + Example: with range `/gamma` and scan starting at `/`, the query will return keys strictly less than `/gamma`. + + + Note: Cannot be used together with `prefix`. + ## What doesn't work etcd_fdw supports almost all kinds of CRUD operations. What doesn't work is modifying the key (which is the rowid value) directly using `UPDATE` statements. What does work is the following workflow: diff --git a/src/lib.rs b/src/lib.rs index c40f928..ac41521 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,6 @@ pgrx::pg_module_magic!(); pub(crate) struct EtcdFdw { client: Client, rt: Runtime, - prefix: String, fetch_results: Vec, fetch_key: bool, fetch_value: bool, @@ -75,6 +74,9 @@ pub enum EtcdFdwError { #[error("Key {0} already exists in etcd. No duplicates allowed")] KeyAlreadyExists(String), + #[error("Options 'prefix' and 'range' cannot be used together")] + ConflictingPrefixAndRange, + #[error("Key {0} doesn't exist in etcd")] KeyDoesntExist(String), @@ -213,16 +215,12 @@ impl ForeignDataWrapper for EtcdFdw { Ok(x) => x, Err(e) => return Err(EtcdFdwError::ClientConnectionError(e.to_string())), }; - let prefix = match server.options.get("prefix") { - Some(x) => x.clone(), - None => String::from(""), - }; + let fetch_results = vec![]; Ok(Self { client, rt, - prefix, fetch_results, fetch_key: false, fetch_value: false, @@ -237,21 +235,50 @@ impl ForeignDataWrapper for EtcdFdw { limit: &Option, _options: &std::collections::HashMap, ) -> Result<(), EtcdFdwError> { - // Select get all rows as a result into a field of the struct - // Build Query options from parameters - let mut get_options = GetOptions::new().with_all_keys(); - match limit { - Some(x) => get_options = get_options.with_limit(x.count), - None => (), + // parse the options defined when `CREATE FOREIGN TABLE` + let prefix = _options.get("prefix").cloned(); + let range = _options.get("range").cloned(); + let keys_only = _options.get("keys_only").map(|v| v == "true").unwrap_or(false); + let revision = _options.get("revision").and_then(|v| v.parse::().ok()).unwrap_or(0); + let mut get_options = GetOptions::new(); + + // prefix and range are mutually exclusive + match (prefix.as_ref(), range.as_ref()) { + (Some(_), Some(_)) => { + return Err(EtcdFdwError::ConflictingPrefixAndRange); + } + (Some(_), None) => { + get_options = get_options.with_prefix(); + } + (None, Some(r)) => { + get_options = get_options.with_range(r.clone()); + } + (None, None) => { + get_options = get_options.with_all_keys(); + } } + + if let Some(x) = limit { + get_options = get_options.with_limit(x.count); + } + + if keys_only { + get_options = get_options.with_keys_only(); + } + + if revision > 0 { + get_options = get_options.with_revision(revision); + } + // Check if columns contains key and value let colnames: Vec = columns.iter().map(|x| x.name.clone()).collect(); self.fetch_key = colnames.contains(&String::from("key")); self.fetch_value = colnames.contains(&String::from("value")); + let key = prefix.clone().unwrap_or_else(|| String::from("/")); let result = self .rt - .block_on(self.client.get(self.prefix.clone(), Some(get_options))); + .block_on(self.client.get(key, Some(get_options))); let mut result_unwrapped = match result { Ok(x) => x, Err(e) => return Err(EtcdFdwError::FetchError(e.to_string())), From 9773e3179bee449cd5a37298059086ce9ce18218 Mon Sep 17 00:00:00 2001 From: Imran Zaheer Date: Thu, 25 Sep 2025 20:40:40 +0500 Subject: [PATCH 2/5] fix defautl key value for scans --- README.md | 2 +- src/lib.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 95faa0b..eb99c5b 100644 --- a/README.md +++ b/README.md @@ -104,7 +104,7 @@ Usage Specifies which column should be treated as the unique row identifier. Usually set to key. -- **prefix** as *string*, optional, default `/` +- **prefix** as *string*, optional, default `` Restrict the scan to keys beginning with this prefix. If not provided, the FDW will fetch all keys from the etcd server diff --git a/src/lib.rs b/src/lib.rs index ac41521..7f3ab48 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -275,7 +275,8 @@ impl ForeignDataWrapper for EtcdFdw { self.fetch_key = colnames.contains(&String::from("key")); self.fetch_value = colnames.contains(&String::from("value")); - let key = prefix.clone().unwrap_or_else(|| String::from("/")); + // samllest possible valid key '\0', empty string will not work with with_range + let key = prefix.clone().unwrap_or_else(|| String::from("\0")); let result = self .rt .block_on(self.client.get(key, Some(get_options))); From 239ab991afb20e906a435fa9dac93063a53aebd1 Mon Sep 17 00:00:00 2001 From: Imran Zaheer Date: Fri, 26 Sep 2025 07:05:41 +0500 Subject: [PATCH 3/5] docs: fix default for prefix options --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index eb99c5b..b151e3a 100644 --- a/README.md +++ b/README.md @@ -104,7 +104,7 @@ Usage Specifies which column should be treated as the unique row identifier. Usually set to key. -- **prefix** as *string*, optional, default `` +- **prefix** as *string*, optional, no default Restrict the scan to keys beginning with this prefix. If not provided, the FDW will fetch all keys from the etcd server From aa5f81b96f684f8591a390f0e0bdb7c148214b16 Mon Sep 17 00:00:00 2001 From: Imran Zaheer Date: Mon, 29 Sep 2025 13:01:55 +0500 Subject: [PATCH 4/5] Improve options handling & validation --- README.md | 29 ++++++++++++++++----- src/lib.rs | 74 +++++++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 85 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index b151e3a..e5ed2c3 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ CREATE SERVER my_etcd_server foreign data wrapper etcd_fdw options (connstr '127 ``` ```sql -CREATE foreign table test (key text, value text) server my_etcd_server options(rowid 'key'); +CREATE foreign table test (key text, value text) server my_etcd_server options(rowid_column 'key'); ``` ```sql @@ -99,7 +99,7 @@ Usage `etcd_fdw` accepts the following table-level options via the `CREATE FOREIGN TABLE` command. -- **rowid** as *string*, mandatory, no default +- **rowid_column** as *string*, mandatory, no default Specifies which column should be treated as the unique row identifier. Usually set to key. @@ -119,13 +119,30 @@ Usage Read key-value data at a specific etcd revision. If 0, the latest revision is used. -- **range** as *string*, optional, no default +- **key** as *string*, optional, no default - Restricts the scan to the half-open interval `[key, range)`. - Example: with range `/gamma` and scan starting at `/`, the query will return keys strictly less than `/gamma`. + The starting key to fetch from etcd. + This option defines the beginning of the range. + If neither `prefix` nor `key` is specified, the FDW will default to `\0` (the lowest possible key). - Note: Cannot be used together with `prefix`. +- **range_end** as *string*, optional, no default + + The exclusive end of the key range. Restricts the scan to the half-open interval `[key, range_end)`. + + All keys between key (inclusive) and range_end (exclusive) will be returned. + If range_end is omitted, only the single key defined by key will be returned (unless prefix is used). + +- **consistency** as *string*, optional, default `l` + + Specifies the read consistency level for etcd queries. + + + Linearizable(`l`), Ensures the result reflects the latest consensus state of the cluster. + Linearizable reads have higher latency but guarantee fresh data. + + Serializable(`s`), Allows serving results from a local etcd member without cluster-wide consensus. + Serializable reads are faster and lighter on the cluster, but may return stale data in some cases ## What doesn't work etcd_fdw supports almost all kinds of CRUD operations. What doesn't work is modifying the key (which is the rowid value) directly using `UPDATE` statements. diff --git a/src/lib.rs b/src/lib.rs index 7f3ab48..b4e7294 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -74,14 +74,20 @@ pub enum EtcdFdwError { #[error("Key {0} already exists in etcd. No duplicates allowed")] KeyAlreadyExists(String), - #[error("Options 'prefix' and 'range' cannot be used together")] + #[error("Options 'prefix' and 'range_end' cannot be used together")] ConflictingPrefixAndRange, + #[error("Options 'prefix' and 'key' should not be used together")] + ConflictingPrefixAndKey, + #[error("Key {0} doesn't exist in etcd")] KeyDoesntExist(String), #[error("Invalid option '{0}' with value '{1}'")] InvalidOption(String, String), + + #[error("{0}")] + OptionsError(#[from] OptionsError), } impl From for ErrorReport { @@ -92,13 +98,13 @@ impl From for ErrorReport { /// Check whether dependent options exits /// i.e username & pass, cert & key -fn require_pair( - a: &Option, - b: &Option, +fn require_pair( + a: bool, + b: bool, err: EtcdFdwError, ) -> Result<(), EtcdFdwError> { match (a, b) { - (Some(_), None) | (None, Some(_)) => Err(err), + (true, false) | (false, true) => Err(err), _ => Ok(()), } } @@ -196,8 +202,8 @@ impl ForeignDataWrapper for EtcdFdw { // ssl_cert + ssl_key must be both present or both absent // username + password must be both present or both absent - require_pair(&cert_path, &key_path, EtcdFdwError::CertKeyMismatch(()))?; - require_pair(&username, &password, EtcdFdwError::UserPassMismatch(()))?; + require_pair(cert_path.is_some(), key_path.is_some(), EtcdFdwError::CertKeyMismatch(()))?; + require_pair(username.is_some(), password.is_some(), EtcdFdwError::UserPassMismatch(()))?; config = EtcdConfig { endpoints: vec![connstr], @@ -237,13 +243,15 @@ impl ForeignDataWrapper for EtcdFdw { ) -> Result<(), EtcdFdwError> { // parse the options defined when `CREATE FOREIGN TABLE` let prefix = _options.get("prefix").cloned(); - let range = _options.get("range").cloned(); + let range_end = _options.get("range_end").cloned(); + let key_start = _options.get("key").cloned(); let keys_only = _options.get("keys_only").map(|v| v == "true").unwrap_or(false); let revision = _options.get("revision").and_then(|v| v.parse::().ok()).unwrap_or(0); + let serializable = _options.get("consistency").map(|v| v == "s").unwrap_or(false); let mut get_options = GetOptions::new(); // prefix and range are mutually exclusive - match (prefix.as_ref(), range.as_ref()) { + match (prefix.as_ref(), range_end.as_ref()) { (Some(_), Some(_)) => { return Err(EtcdFdwError::ConflictingPrefixAndRange); } @@ -254,7 +262,9 @@ impl ForeignDataWrapper for EtcdFdw { get_options = get_options.with_range(r.clone()); } (None, None) => { - get_options = get_options.with_all_keys(); + if key_start.is_none() { + get_options = get_options.with_all_keys(); + } } } @@ -270,13 +280,21 @@ impl ForeignDataWrapper for EtcdFdw { get_options = get_options.with_revision(revision); } + if serializable { + get_options = get_options.with_serializable(); + } + + // preference order : prefix > key_start > default "\0" + // samllest possible valid key '\0' + let key = prefix.clone() + .or_else(|| key_start.clone()) + .unwrap_or_else(|| String::from("\0")); + // Check if columns contains key and value let colnames: Vec = columns.iter().map(|x| x.name.clone()).collect(); self.fetch_key = colnames.contains(&String::from("key")); self.fetch_value = colnames.contains(&String::from("value")); - // samllest possible valid key '\0', empty string will not work with with_range - let key = prefix.clone().unwrap_or_else(|| String::from("\0")); let result = self .rt .block_on(self.client.get(key, Some(get_options))); @@ -452,6 +470,38 @@ impl ForeignDataWrapper for EtcdFdw { // This currently also does nothing Ok(()) } + + fn validator(options: Vec>, catalog: Option) -> EtcdFdwResult<()> { + if let Some(oid) = catalog { + if oid == FOREIGN_SERVER_RELATION_ID { + check_options_contain(&options, "connstr")?; + + let cacert_path_exists = check_options_contain(&options, "ssl_ca").is_ok(); + let cert_path_exists = check_options_contain(&options, "ssl_cert").is_ok(); + let username_exists = check_options_contain(&options, "username").is_ok(); + let password_exists = check_options_contain(&options, "password").is_ok(); + + require_pair(cacert_path_exists, cert_path_exists, EtcdFdwError::CertKeyMismatch(()))?; + require_pair(username_exists, password_exists, EtcdFdwError::UserPassMismatch(()))?; + } else if oid == FOREIGN_TABLE_RELATION_ID { + check_options_contain(&options, "rowid_column")?; + + let prefix_exists = check_options_contain(&options, "prefix").is_ok(); + let rannge_exists = check_options_contain(&options, "range_end").is_ok(); + let key_exists = check_options_contain(&options, "key").is_ok(); + + if prefix_exists && rannge_exists { + return Err(EtcdFdwError::ConflictingPrefixAndRange); + } + + if prefix_exists && key_exists { + return Err(EtcdFdwError::ConflictingPrefixAndKey); + } + } + } + + Ok(()) + } } #[cfg(test)] From d3c4ac7a4657c3fcba7ac099c9268ab48dddc4a1 Mon Sep 17 00:00:00 2001 From: Imran Zaheer Date: Mon, 29 Sep 2025 13:41:44 +0500 Subject: [PATCH 5/5] rename var `_options` to `options` --- src/lib.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b4e7294..a0130e2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -239,15 +239,15 @@ impl ForeignDataWrapper for EtcdFdw { columns: &[Column], _sorts: &[Sort], limit: &Option, - _options: &std::collections::HashMap, + options: &std::collections::HashMap, ) -> Result<(), EtcdFdwError> { // parse the options defined when `CREATE FOREIGN TABLE` - let prefix = _options.get("prefix").cloned(); - let range_end = _options.get("range_end").cloned(); - let key_start = _options.get("key").cloned(); - let keys_only = _options.get("keys_only").map(|v| v == "true").unwrap_or(false); - let revision = _options.get("revision").and_then(|v| v.parse::().ok()).unwrap_or(0); - let serializable = _options.get("consistency").map(|v| v == "s").unwrap_or(false); + let prefix = options.get("prefix").cloned(); + let range_end = options.get("range_end").cloned(); + let key_start = options.get("key").cloned(); + let keys_only = options.get("keys_only").map(|v| v == "true").unwrap_or(false); + let revision = options.get("revision").and_then(|v| v.parse::().ok()).unwrap_or(0); + let serializable = options.get("consistency").map(|v| v == "s").unwrap_or(false); let mut get_options = GetOptions::new(); // prefix and range are mutually exclusive