diff --git a/README.md b/README.md index d0a5523..e5ed2c3 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ CREATE SERVER my_etcd_server foreign data wrapper etcd_fdw options (connstr '127 ``` ```sql -CREATE foreign table test (key text, value text) server my_etcd_server options(rowid 'key'); +CREATE foreign table test (key text, value text) server my_etcd_server options(rowid_column 'key'); ``` ```sql @@ -93,6 +93,57 @@ Usage Timeout in seconds to each request after the connection has been established. + +## CREATE FOREIGN TABLE options + +`etcd_fdw` accepts the following table-level options via the +`CREATE FOREIGN TABLE` command. + +- **rowid_column** as *string*, mandatory, no default + + Specifies which column should be treated as the unique row identifier. + Usually set to key. + +- **prefix** as *string*, optional, no default + + Restrict the scan to keys beginning with this prefix. + If not provided, the FDW will fetch all keys from the etcd server + +- **keys_only** as *string*, optional, default `false` + + If set to true, only the keys are fetched, not the values. + Useful to reduce network overhead when values are not needed. + +- **revision** as *string*, optional, default `0` + + Read key-value data at a specific etcd revision. + If 0, the latest revision is used. + +- **key** as *string*, optional, no default + + The starting key to fetch from etcd. + + This option defines the beginning of the range. + If neither `prefix` nor `key` is specified, the FDW will default to `\0` (the lowest possible key). + +- **range_end** as *string*, optional, no default + + The exclusive end of the key range. Restricts the scan to the half-open interval `[key, range_end)`. + + All keys between key (inclusive) and range_end (exclusive) will be returned. + If range_end is omitted, only the single key defined by key will be returned (unless prefix is used). + +- **consistency** as *string*, optional, default `l` + + Specifies the read consistency level for etcd queries. + + + Linearizable(`l`), Ensures the result reflects the latest consensus state of the cluster. + Linearizable reads have higher latency but guarantee fresh data. + + Serializable(`s`), Allows serving results from a local etcd member without cluster-wide consensus. + Serializable reads are faster and lighter on the cluster, but may return stale data in some cases + ## What doesn't work etcd_fdw supports almost all kinds of CRUD operations. What doesn't work is modifying the key (which is the rowid value) directly using `UPDATE` statements. What does work is the following workflow: diff --git a/src/lib.rs b/src/lib.rs index c40f928..a0130e2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,6 @@ pgrx::pg_module_magic!(); pub(crate) struct EtcdFdw { client: Client, rt: Runtime, - prefix: String, fetch_results: Vec, fetch_key: bool, fetch_value: bool, @@ -75,11 +74,20 @@ pub enum EtcdFdwError { #[error("Key {0} already exists in etcd. No duplicates allowed")] KeyAlreadyExists(String), + #[error("Options 'prefix' and 'range_end' cannot be used together")] + ConflictingPrefixAndRange, + + #[error("Options 'prefix' and 'key' should not be used together")] + ConflictingPrefixAndKey, + #[error("Key {0} doesn't exist in etcd")] KeyDoesntExist(String), #[error("Invalid option '{0}' with value '{1}'")] InvalidOption(String, String), + + #[error("{0}")] + OptionsError(#[from] OptionsError), } impl From for ErrorReport { @@ -90,13 +98,13 @@ impl From for ErrorReport { /// Check whether dependent options exits /// i.e username & pass, cert & key -fn require_pair( - a: &Option, - b: &Option, +fn require_pair( + a: bool, + b: bool, err: EtcdFdwError, ) -> Result<(), EtcdFdwError> { match (a, b) { - (Some(_), None) | (None, Some(_)) => Err(err), + (true, false) | (false, true) => Err(err), _ => Ok(()), } } @@ -194,8 +202,8 @@ impl ForeignDataWrapper for EtcdFdw { // ssl_cert + ssl_key must be both present or both absent // username + password must be both present or both absent - require_pair(&cert_path, &key_path, EtcdFdwError::CertKeyMismatch(()))?; - require_pair(&username, &password, EtcdFdwError::UserPassMismatch(()))?; + require_pair(cert_path.is_some(), key_path.is_some(), EtcdFdwError::CertKeyMismatch(()))?; + require_pair(username.is_some(), password.is_some(), EtcdFdwError::UserPassMismatch(()))?; config = EtcdConfig { endpoints: vec![connstr], @@ -213,16 +221,12 @@ impl ForeignDataWrapper for EtcdFdw { Ok(x) => x, Err(e) => return Err(EtcdFdwError::ClientConnectionError(e.to_string())), }; - let prefix = match server.options.get("prefix") { - Some(x) => x.clone(), - None => String::from(""), - }; + let fetch_results = vec![]; Ok(Self { client, rt, - prefix, fetch_results, fetch_key: false, fetch_value: false, @@ -235,15 +239,57 @@ impl ForeignDataWrapper for EtcdFdw { columns: &[Column], _sorts: &[Sort], limit: &Option, - _options: &std::collections::HashMap, + options: &std::collections::HashMap, ) -> Result<(), EtcdFdwError> { - // Select get all rows as a result into a field of the struct - // Build Query options from parameters - let mut get_options = GetOptions::new().with_all_keys(); - match limit { - Some(x) => get_options = get_options.with_limit(x.count), - None => (), + // parse the options defined when `CREATE FOREIGN TABLE` + let prefix = options.get("prefix").cloned(); + let range_end = options.get("range_end").cloned(); + let key_start = options.get("key").cloned(); + let keys_only = options.get("keys_only").map(|v| v == "true").unwrap_or(false); + let revision = options.get("revision").and_then(|v| v.parse::().ok()).unwrap_or(0); + let serializable = options.get("consistency").map(|v| v == "s").unwrap_or(false); + let mut get_options = GetOptions::new(); + + // prefix and range are mutually exclusive + match (prefix.as_ref(), range_end.as_ref()) { + (Some(_), Some(_)) => { + return Err(EtcdFdwError::ConflictingPrefixAndRange); + } + (Some(_), None) => { + get_options = get_options.with_prefix(); + } + (None, Some(r)) => { + get_options = get_options.with_range(r.clone()); + } + (None, None) => { + if key_start.is_none() { + get_options = get_options.with_all_keys(); + } + } + } + + if let Some(x) = limit { + get_options = get_options.with_limit(x.count); + } + + if keys_only { + get_options = get_options.with_keys_only(); + } + + if revision > 0 { + get_options = get_options.with_revision(revision); } + + if serializable { + get_options = get_options.with_serializable(); + } + + // preference order : prefix > key_start > default "\0" + // samllest possible valid key '\0' + let key = prefix.clone() + .or_else(|| key_start.clone()) + .unwrap_or_else(|| String::from("\0")); + // Check if columns contains key and value let colnames: Vec = columns.iter().map(|x| x.name.clone()).collect(); self.fetch_key = colnames.contains(&String::from("key")); @@ -251,7 +297,7 @@ impl ForeignDataWrapper for EtcdFdw { let result = self .rt - .block_on(self.client.get(self.prefix.clone(), Some(get_options))); + .block_on(self.client.get(key, Some(get_options))); let mut result_unwrapped = match result { Ok(x) => x, Err(e) => return Err(EtcdFdwError::FetchError(e.to_string())), @@ -424,6 +470,38 @@ impl ForeignDataWrapper for EtcdFdw { // This currently also does nothing Ok(()) } + + fn validator(options: Vec>, catalog: Option) -> EtcdFdwResult<()> { + if let Some(oid) = catalog { + if oid == FOREIGN_SERVER_RELATION_ID { + check_options_contain(&options, "connstr")?; + + let cacert_path_exists = check_options_contain(&options, "ssl_ca").is_ok(); + let cert_path_exists = check_options_contain(&options, "ssl_cert").is_ok(); + let username_exists = check_options_contain(&options, "username").is_ok(); + let password_exists = check_options_contain(&options, "password").is_ok(); + + require_pair(cacert_path_exists, cert_path_exists, EtcdFdwError::CertKeyMismatch(()))?; + require_pair(username_exists, password_exists, EtcdFdwError::UserPassMismatch(()))?; + } else if oid == FOREIGN_TABLE_RELATION_ID { + check_options_contain(&options, "rowid_column")?; + + let prefix_exists = check_options_contain(&options, "prefix").is_ok(); + let rannge_exists = check_options_contain(&options, "range_end").is_ok(); + let key_exists = check_options_contain(&options, "key").is_ok(); + + if prefix_exists && rannge_exists { + return Err(EtcdFdwError::ConflictingPrefixAndRange); + } + + if prefix_exists && key_exists { + return Err(EtcdFdwError::ConflictingPrefixAndKey); + } + } + } + + Ok(()) + } } #[cfg(test)]