Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 145 additions & 21 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use etcd_client::{Client, GetOptions, KeyValue};
use futures::Stream;
use etcd_client::{Client, DeleteOptions, GetOptions, KeyValue, PutOptions};
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::pg_sys::ErrorContextCallback;
use pgrx::PgSqlErrorCode;
use supabase_wrappers::prelude::*;
use thiserror::Error;
use tokio::runtime::*;

pgrx::pg_module_magic!();

Expand All @@ -19,23 +16,37 @@ pub(crate) struct EtcdFdw {
rt: Runtime,
prefix: String,
fetch_results: Vec<KeyValue>,
fetch_key: bool,
fetch_value: bool,
}

#[derive(Error, Debug)]
pub enum EtcdFdwError {
#[error("Failed to fetch from etcd")]
#[error("Failed to fetch from etcd: {0}")]
FetchError(String),

#[error("Failed to connect to client")]
#[error("Failed to send update to etcd: {0}")]
UpdateError(String),

#[error("Failed to connect to client: {0}")]
ClientConnectionError(String),

#[error("No connection string option was specified. Specify it with connstr")]
NoConnStr(()),

#[error("Column {0} is not contained in the input dataset")]
MissingColumn(String),

#[error("Key {0} already exists in etcd. No duplicates allowed")]
KeyAlreadyExists(String),

#[error("Key {0} doesn't exist in etcd")]
KeyDoesntExist(String),
}

impl From<EtcdFdwError> for ErrorReport {
fn from(_value: EtcdFdwError) -> Self {
ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "")
fn from(value: EtcdFdwError) -> Self {
ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, format!("{}", value), "")
}
}

Expand All @@ -44,7 +55,7 @@ type EtcdFdwResult<T> = std::result::Result<T, EtcdFdwError>;
impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
fn new(server: ForeignServer) -> EtcdFdwResult<EtcdFdw> {
// Open connection to etcd specified through the server parameter
let rt = tokio::runtime::Runtime::new().unwrap();
let rt = tokio::runtime::Runtime::new().expect("Tokio runtime should be initialized");

// Add parsing for the multi host connection string things here
let server_name = match server.options.get("connstr") {
Expand All @@ -67,13 +78,15 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
rt,
prefix,
fetch_results,
fetch_key: false,
fetch_value: false,
})
}

fn begin_scan(
&mut self,
_quals: &[Qual],
_columns: &[Column],
columns: &[Column],
_sorts: &[Sort],
limit: &Option<Limit>,
_options: &std::collections::HashMap<String, String>,
Expand All @@ -85,7 +98,10 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
Some(x) => get_options = get_options.with_limit(x.count),
None => (),
}
// Also do quals, columns and sorts.
// Check if columns contains key and value
let colnames: Vec<String> = columns.iter().map(|x| x.name.clone()).collect();
self.fetch_key = colnames.contains(&String::from("key"));
self.fetch_value = colnames.contains(&String::from("value"));

let result = self
.rt
Expand All @@ -110,34 +126,141 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
let value = x
.value_str()
.expect("Expected a value, but the value was empty");
row.push("key", Some(Cell::String(key.to_string())));
row.push("value", Some(Cell::String(value.to_string())));
if self.fetch_key {
row.push("key", Some(Cell::String(key.to_string())));
}
if self.fetch_value {
row.push("value", Some(Cell::String(value.to_string())));
}
}))
}
}

fn end_scan(&mut self) -> EtcdFdwResult<()> {
self.fetch_results = vec![];
self.fetch_key = false;
self.fetch_value = false;
Ok(())
}

fn begin_modify(
&mut self,
_options: &std::collections::HashMap<String, String>,
) -> Result<(), EtcdFdwError> {
todo!("Begin modify is not yet implemented")
// This currently does nothing
Ok(())
}

fn insert(&mut self, _row: &Row) -> Result<(), EtcdFdwError> {
todo!("Insert is not yet implemented")
fn insert(&mut self, row: &Row) -> Result<(), EtcdFdwError> {
let key_string = match row
.cols
.iter()
.zip(row.cells.clone())
.filter(|(name, _cell)| *name == "key")
.last()
{
Some(x) => x.1.expect("The key column should be present").to_string(),
None => return Err(EtcdFdwError::MissingColumn("key".to_string())),
};
let value_string = match row
.cols
.iter()
.zip(row.cells.clone())
.filter(|(name, _cell)| *name == "value")
.last()
{
Some(x) => x.1.expect("The value column should be present").to_string(),
None => return Err(EtcdFdwError::MissingColumn("value".to_string())),
};
let key = key_string.trim_matches(|x| x == '\'');
let value = value_string.trim_matches(|x| x == '\'');

// See if key already exists. Error if it does
match self.rt.block_on(self.client.get(key, None)) {
Ok(x) => {
if let Some(y) = x.kvs().first() {
if y.key_str().expect("There should be a key string") == key {
return Err(EtcdFdwError::KeyAlreadyExists(format!("{}", key)));
}
}
}
Err(e) => return Err(EtcdFdwError::FetchError(e.to_string())),
}

match self
.rt
.block_on(self.client.put(key, value, Some(PutOptions::new())))
{
Ok(_) => Ok(()),
Err(e) => return Err(EtcdFdwError::UpdateError(e.to_string())),
}
}

fn update(&mut self, _rowid: &Cell, _new_row: &Row) -> Result<(), EtcdFdwError> {
todo!("Update is not yet implemented")
fn update(&mut self, rowid: &Cell, new_row: &Row) -> Result<(), EtcdFdwError> {
let key_string = rowid.to_string();
let key = key_string.trim_matches(|x| x == '\'');

match self.rt.block_on(self.client.get(key, None)) {
Ok(x) => {
if let Some(y) = x.kvs().first() {
if y.key_str().expect("There should be a key string") != key {
return Err(EtcdFdwError::KeyDoesntExist(format!("{}", key)));
}
}
}
Err(e) => return Err(EtcdFdwError::FetchError(e.to_string())),
}

let value_string = match new_row
.cols
.iter()
.zip(new_row.cells.clone())
.filter(|(name, _cell)| *name == "value")
.last()
{
Some(x) => x.1.expect("The value column should be present").to_string(),
None => return Err(EtcdFdwError::MissingColumn("value".to_string())),
};
let value = value_string.trim_matches(|x| x == '\'');

match self.rt.block_on(self.client.put(key, value, None)) {
Ok(_) => Ok(()),
Err(e) => return Err(EtcdFdwError::UpdateError(e.to_string())),
}
}

fn delete(&mut self, _rowid: &Cell) -> Result<(), EtcdFdwError> {
todo!("Delete is not yet implemented")
fn delete(&mut self, rowid: &Cell) -> Result<(), EtcdFdwError> {
let key_string = rowid.to_string();
let key = key_string.trim_matches(|x| x == '\'');

let delete_options = DeleteOptions::new();

match self.rt.block_on(self.client.get(key, None)) {
Ok(x) => {
if let Some(y) = x.kvs().first() {
if y.key_str().expect("There should be a key string") != key {
return Err(EtcdFdwError::KeyDoesntExist(format!("{}", key)));
}
}
}
Err(e) => return Err(EtcdFdwError::FetchError(e.to_string())),
}

match self
.rt
.block_on(self.client.delete(key, Some(delete_options)))
{
Ok(x) => {
if x.deleted() == 0 {
return Err(EtcdFdwError::UpdateError(format!(
"Deletion seemingly successful, but deleted count is {}",
x.deleted()
)));
}
Ok(())
}
Err(e) => Err(EtcdFdwError::UpdateError(e.to_string())),
}
}

// fn get_rel_size(
Expand All @@ -152,6 +275,7 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
// }

fn end_modify(&mut self) -> Result<(), EtcdFdwError> {
todo!()
// This currently also does nothing
Ok(())
}
}