Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 44 additions & 33 deletions crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::{self, Debug, Display, Formatter};

use anyhow::anyhow;
use async_trait::async_trait;
Expand Down Expand Up @@ -50,19 +50,13 @@ pub const GLUE_CATALOG_PROP_CATALOG_ID: &str = "catalog_id";
pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse";

/// Builder for [`GlueCatalog`].
#[derive(Debug)]
pub struct GlueCatalogBuilder(GlueCatalogConfig);

impl Default for GlueCatalogBuilder {
fn default() -> Self {
Self(GlueCatalogConfig {
name: None,
uri: None,
catalog_id: None,
warehouse: "".to_string(),
props: HashMap::new(),
})
}
#[derive(Debug, Default)]
pub struct GlueCatalogBuilder {
name: Option<String>,
uri: Option<String>,
catalog_id: Option<String>,
warehouse: Option<String>,
props: HashMap<String, String>,
}

impl CatalogBuilder for GlueCatalogBuilder {
Expand All @@ -73,25 +67,22 @@ impl CatalogBuilder for GlueCatalogBuilder {
name: impl Into<String>,
props: HashMap<String, String>,
) -> impl Future<Output = Result<Self::C>> + Send {
self.0.name = Some(name.into());
self.name = Some(name.into());

if props.contains_key(GLUE_CATALOG_PROP_URI) {
self.0.uri = props.get(GLUE_CATALOG_PROP_URI).cloned()
self.uri = props.get(GLUE_CATALOG_PROP_URI).cloned();
}

if props.contains_key(GLUE_CATALOG_PROP_CATALOG_ID) {
self.0.catalog_id = props.get(GLUE_CATALOG_PROP_CATALOG_ID).cloned()
self.catalog_id = props.get(GLUE_CATALOG_PROP_CATALOG_ID).cloned();
}

if props.contains_key(GLUE_CATALOG_PROP_WAREHOUSE) {
self.0.warehouse = props
.get(GLUE_CATALOG_PROP_WAREHOUSE)
.cloned()
.unwrap_or_default();
self.warehouse = props.get(GLUE_CATALOG_PROP_WAREHOUSE).cloned();
}

// Collect other remaining properties
self.0.props = props
self.props = props
.into_iter()
.filter(|(k, _)| {
k != GLUE_CATALOG_PROP_URI
Expand All @@ -101,34 +92,54 @@ impl CatalogBuilder for GlueCatalogBuilder {
.collect();

async move {
if self.0.name.is_none() {
// Catalog name and warehouse are required
let name = self
.name
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Catalog name is required"))?;
let warehouse = self.warehouse.ok_or_else(|| {
Error::new(ErrorKind::DataInvalid, "Catalog warehouse is required")
})?;

if warehouse.is_empty() {
return Err(Error::new(
ErrorKind::DataInvalid,
"Catalog name is required",
));
}
if self.0.warehouse.is_empty() {
return Err(Error::new(
ErrorKind::DataInvalid,
"Catalog warehouse is required",
"Catalog warehouse cannot be empty",
));
}

GlueCatalog::new(self.0).await
let config = GlueCatalogConfig {
name,
uri: self.uri,
catalog_id: self.catalog_id,
warehouse,
props: self.props,
};

GlueCatalog::new(config).await
}
}
}

#[derive(Debug)]
/// Glue Catalog configuration
#[derive(Debug)]
pub(crate) struct GlueCatalogConfig {
name: Option<String>,
name: String,
uri: Option<String>,
catalog_id: Option<String>,
warehouse: String,
props: HashMap<String, String>,
}

impl Display for GlueCatalogConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"GlueCatalogConfig(name={}, warehouse={})",
self.name, self.warehouse
)
}
}

struct GlueClient(aws_sdk_glue::Client);

/// Glue Catalog
Expand Down
88 changes: 47 additions & 41 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,14 @@ pub const THRIFT_TRANSPORT_BUFFERED: &str = "buffered";
/// HMS Catalog warehouse location
pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse";

/// Builder for [`RestCatalog`].
#[derive(Debug)]
pub struct HmsCatalogBuilder(HmsCatalogConfig);

impl Default for HmsCatalogBuilder {
fn default() -> Self {
Self(HmsCatalogConfig {
name: None,
address: "".to_string(),
thrift_transport: HmsThriftTransport::default(),
warehouse: "".to_string(),
props: HashMap::new(),
})
}
/// Builder for [`HmsCatalog`].
#[derive(Debug, Default)]
pub struct HmsCatalogBuilder {
name: Option<String>,
address: Option<String>,
thrift_transport: HmsThriftTransport,
warehouse: Option<String>,
props: HashMap<String, String>,
}

impl CatalogBuilder for HmsCatalogBuilder {
Expand All @@ -74,28 +68,25 @@ impl CatalogBuilder for HmsCatalogBuilder {
name: impl Into<String>,
props: HashMap<String, String>,
) -> impl Future<Output = Result<Self::C>> + Send {
self.0.name = Some(name.into());
self.name = Some(name.into());

if props.contains_key(HMS_CATALOG_PROP_URI) {
self.0.address = props.get(HMS_CATALOG_PROP_URI).cloned().unwrap_or_default();
self.address = props.get(HMS_CATALOG_PROP_URI).cloned();
}

if let Some(tt) = props.get(HMS_CATALOG_PROP_THRIFT_TRANSPORT) {
self.0.thrift_transport = match tt.to_lowercase().as_str() {
self.thrift_transport = match tt.to_lowercase().as_str() {
THRIFT_TRANSPORT_FRAMED => HmsThriftTransport::Framed,
THRIFT_TRANSPORT_BUFFERED => HmsThriftTransport::Buffered,
_ => HmsThriftTransport::default(),
};
}

if props.contains_key(HMS_CATALOG_PROP_WAREHOUSE) {
self.0.warehouse = props
.get(HMS_CATALOG_PROP_WAREHOUSE)
.cloned()
.unwrap_or_default();
self.warehouse = props.get(HMS_CATALOG_PROP_WAREHOUSE).cloned();
}

self.0.props = props
self.props = props
.into_iter()
.filter(|(k, _)| {
k != HMS_CATALOG_PROP_URI
Expand All @@ -104,28 +95,43 @@ impl CatalogBuilder for HmsCatalogBuilder {
})
.collect();

let result = {
if self.0.name.is_none() {
Err(Error::new(
ErrorKind::DataInvalid,
"Catalog name is required",
))
} else if self.0.address.is_empty() {
Err(Error::new(
async move {
let name = self
.name
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Catalog name is required"))?;

let address = self
.address
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Catalog address is required"))?;

let warehouse = self.warehouse.ok_or_else(|| {
Error::new(ErrorKind::DataInvalid, "Catalog warehouse is required")
})?;

if address.is_empty() {
return Err(Error::new(
ErrorKind::DataInvalid,
"Catalog address is required",
))
} else if self.0.warehouse.is_empty() {
Err(Error::new(
"Catalog address cannot be empty",
));
}

if warehouse.is_empty() {
return Err(Error::new(
ErrorKind::DataInvalid,
"Catalog warehouse is required",
))
} else {
HmsCatalog::new(self.0)
"Catalog warehouse cannot be empty",
));
}
};

std::future::ready(result)
let config = HmsCatalogConfig {
name,
address,
thrift_transport: self.thrift_transport,
warehouse,
props: self.props,
};

HmsCatalog::new(config)
}
}
}

Expand All @@ -143,7 +149,7 @@ pub enum HmsThriftTransport {
/// Hive metastore Catalog configuration.
#[derive(Debug)]
pub(crate) struct HmsCatalogConfig {
name: Option<String>,
name: String,
address: String,
thrift_transport: HmsThriftTransport,
warehouse: String,
Expand Down
Loading
Loading