diff --git a/src/lib.rs b/src/lib.rs index ded1e37..76625e8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,11 +1,8 @@ -use etcd_client::{Client, GetOptions, KeyValue}; -use futures::Stream; +use etcd_client::{Client, DeleteOptions, GetOptions, KeyValue, PutOptions}; use pgrx::pg_sys::panic::ErrorReport; -use pgrx::pg_sys::ErrorContextCallback; use pgrx::PgSqlErrorCode; use supabase_wrappers::prelude::*; use thiserror::Error; -use tokio::runtime::*; pgrx::pg_module_magic!(); @@ -19,23 +16,37 @@ pub(crate) struct EtcdFdw { rt: Runtime, prefix: String, fetch_results: Vec, + fetch_key: bool, + fetch_value: bool, } #[derive(Error, Debug)] pub enum EtcdFdwError { - #[error("Failed to fetch from etcd")] + #[error("Failed to fetch from etcd: {0}")] FetchError(String), - #[error("Failed to connect to client")] + #[error("Failed to send update to etcd: {0}")] + UpdateError(String), + + #[error("Failed to connect to client: {0}")] ClientConnectionError(String), #[error("No connection string option was specified. Specify it with connstr")] NoConnStr(()), + + #[error("Column {0} is not contained in the input dataset")] + MissingColumn(String), + + #[error("Key {0} already exists in etcd. No duplicates allowed")] + KeyAlreadyExists(String), + + #[error("Key {0} doesn't exist in etcd")] + KeyDoesntExist(String), } impl From for ErrorReport { - fn from(_value: EtcdFdwError) -> Self { - ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + fn from(value: EtcdFdwError) -> Self { + ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, format!("{}", value), "") } } @@ -44,7 +55,7 @@ type EtcdFdwResult = std::result::Result; impl ForeignDataWrapper for EtcdFdw { fn new(server: ForeignServer) -> EtcdFdwResult { // Open connection to etcd specified through the server parameter - let rt = tokio::runtime::Runtime::new().unwrap(); + let rt = tokio::runtime::Runtime::new().expect("Tokio runtime should be initialized"); // Add parsing for the multi host connection string things here let server_name = match server.options.get("connstr") { @@ -67,13 +78,15 @@ impl ForeignDataWrapper for EtcdFdw { rt, prefix, fetch_results, + fetch_key: false, + fetch_value: false, }) } fn begin_scan( &mut self, _quals: &[Qual], - _columns: &[Column], + columns: &[Column], _sorts: &[Sort], limit: &Option, _options: &std::collections::HashMap, @@ -85,7 +98,10 @@ impl ForeignDataWrapper for EtcdFdw { Some(x) => get_options = get_options.with_limit(x.count), None => (), } - // Also do quals, columns and sorts. + // Check if columns contains key and value + let colnames: Vec = columns.iter().map(|x| x.name.clone()).collect(); + self.fetch_key = colnames.contains(&String::from("key")); + self.fetch_value = colnames.contains(&String::from("value")); let result = self .rt @@ -110,14 +126,20 @@ impl ForeignDataWrapper for EtcdFdw { let value = x .value_str() .expect("Expected a value, but the value was empty"); - row.push("key", Some(Cell::String(key.to_string()))); - row.push("value", Some(Cell::String(value.to_string()))); + if self.fetch_key { + row.push("key", Some(Cell::String(key.to_string()))); + } + if self.fetch_value { + row.push("value", Some(Cell::String(value.to_string()))); + } })) } } fn end_scan(&mut self) -> EtcdFdwResult<()> { self.fetch_results = vec![]; + self.fetch_key = false; + self.fetch_value = false; Ok(()) } @@ -125,19 +147,120 @@ impl ForeignDataWrapper for EtcdFdw { &mut self, _options: &std::collections::HashMap, ) -> Result<(), EtcdFdwError> { - todo!("Begin modify is not yet implemented") + // This currently does nothing + Ok(()) } - fn insert(&mut self, _row: &Row) -> Result<(), EtcdFdwError> { - todo!("Insert is not yet implemented") + fn insert(&mut self, row: &Row) -> Result<(), EtcdFdwError> { + let key_string = match row + .cols + .iter() + .zip(row.cells.clone()) + .filter(|(name, _cell)| *name == "key") + .last() + { + Some(x) => x.1.expect("The key column should be present").to_string(), + None => return Err(EtcdFdwError::MissingColumn("key".to_string())), + }; + let value_string = match row + .cols + .iter() + .zip(row.cells.clone()) + .filter(|(name, _cell)| *name == "value") + .last() + { + Some(x) => x.1.expect("The value column should be present").to_string(), + None => return Err(EtcdFdwError::MissingColumn("value".to_string())), + }; + let key = key_string.trim_matches(|x| x == '\''); + let value = value_string.trim_matches(|x| x == '\''); + + // See if key already exists. Error if it does + match self.rt.block_on(self.client.get(key, None)) { + Ok(x) => { + if let Some(y) = x.kvs().first() { + if y.key_str().expect("There should be a key string") == key { + return Err(EtcdFdwError::KeyAlreadyExists(format!("{}", key))); + } + } + } + Err(e) => return Err(EtcdFdwError::FetchError(e.to_string())), + } + + match self + .rt + .block_on(self.client.put(key, value, Some(PutOptions::new()))) + { + Ok(_) => Ok(()), + Err(e) => return Err(EtcdFdwError::UpdateError(e.to_string())), + } } - fn update(&mut self, _rowid: &Cell, _new_row: &Row) -> Result<(), EtcdFdwError> { - todo!("Update is not yet implemented") + fn update(&mut self, rowid: &Cell, new_row: &Row) -> Result<(), EtcdFdwError> { + let key_string = rowid.to_string(); + let key = key_string.trim_matches(|x| x == '\''); + + match self.rt.block_on(self.client.get(key, None)) { + Ok(x) => { + if let Some(y) = x.kvs().first() { + if y.key_str().expect("There should be a key string") != key { + return Err(EtcdFdwError::KeyDoesntExist(format!("{}", key))); + } + } + } + Err(e) => return Err(EtcdFdwError::FetchError(e.to_string())), + } + + let value_string = match new_row + .cols + .iter() + .zip(new_row.cells.clone()) + .filter(|(name, _cell)| *name == "value") + .last() + { + Some(x) => x.1.expect("The value column should be present").to_string(), + None => return Err(EtcdFdwError::MissingColumn("value".to_string())), + }; + let value = value_string.trim_matches(|x| x == '\''); + + match self.rt.block_on(self.client.put(key, value, None)) { + Ok(_) => Ok(()), + Err(e) => return Err(EtcdFdwError::UpdateError(e.to_string())), + } } - fn delete(&mut self, _rowid: &Cell) -> Result<(), EtcdFdwError> { - todo!("Delete is not yet implemented") + fn delete(&mut self, rowid: &Cell) -> Result<(), EtcdFdwError> { + let key_string = rowid.to_string(); + let key = key_string.trim_matches(|x| x == '\''); + + let delete_options = DeleteOptions::new(); + + match self.rt.block_on(self.client.get(key, None)) { + Ok(x) => { + if let Some(y) = x.kvs().first() { + if y.key_str().expect("There should be a key string") != key { + return Err(EtcdFdwError::KeyDoesntExist(format!("{}", key))); + } + } + } + Err(e) => return Err(EtcdFdwError::FetchError(e.to_string())), + } + + match self + .rt + .block_on(self.client.delete(key, Some(delete_options))) + { + Ok(x) => { + if x.deleted() == 0 { + return Err(EtcdFdwError::UpdateError(format!( + "Deletion seemingly successful, but deleted count is {}", + x.deleted() + ))); + } + Ok(()) + } + Err(e) => Err(EtcdFdwError::UpdateError(e.to_string())), + } } // fn get_rel_size( @@ -152,6 +275,7 @@ impl ForeignDataWrapper for EtcdFdw { // } fn end_modify(&mut self) -> Result<(), EtcdFdwError> { - todo!() + // This currently also does nothing + Ok(()) } }