Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sql catalog #229

Open
wants to merge 34 commits into
base: main
Choose a base branch
from
Open

Sql catalog #229

wants to merge 34 commits into from

Conversation

JanKaul
Copy link
Collaborator

@JanKaul JanKaul commented Mar 4, 2024

This PR implements the basic operations for a Sql catalog. The implementation uses the sqlx crate which enables Postgres, MySQL and Sqlite.

The update_table method is to be implemented later.

@JanKaul
Copy link
Collaborator Author

JanKaul commented Mar 4, 2024

PTAL @liurenjie1024 @Xuanwo @ZENOTME @Fokko

crates/catalog/sql/Cargo.toml Outdated Show resolved Hide resolved
let rows = connection.transaction(|txn|{
let name = self.name.clone();
Box::pin(async move {
sqlx::query(&format!("select distinct table_namespace from iceberg_tables where catalog_name = '{}';",&name)).fetch_all(&mut **txn).await
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you care about SQL injections ? Or the catalog / namespace / table names are assumed to be safe ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, it's better to use prepare statement here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several points with this implementation:

  1. If parent is None, we should list all namespaces.
  2. We should also count namespaces in iceberg_namespace_properties
  3. We should list only sub namespaces.

See java implementation here.

crates/catalog/sql/src/catalog.rs Outdated Show resolved Hide resolved
crates/catalog/sql/Cargo.toml Outdated Show resolved Hide resolved
crates/catalog/sql/src/catalog.rs Outdated Show resolved Hide resolved
crates/catalog/sql/src/catalog.rs Outdated Show resolved Hide resolved
crates/catalog/sql/src/catalog.rs Outdated Show resolved Hide resolved
crates/catalog/sql/Cargo.toml Outdated Show resolved Hide resolved
crates/catalog/sql/src/catalog.rs Outdated Show resolved Hide resolved
}

async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
let metadata_location = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check the cache first? Given that it's inserted later.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the insertion should not blind, we need to check its version first. My suggestion is to remove the cache for now so that things don't get too complicated.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache is only intended for update_table. Optimistically assuming that the metadata_location hasn't been changed since loading the table, the metadata and metadata_location from the cache can directly be used to perform the update. This way the database has to be queried only once for the optimistic case.
If the metadata_location changed, the update method has to be more involved.

I would not use the cache for loading tables.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking maybe we should have a standalone data structure for caching, just like CachingCatalog in java

Box::pin(async move {
sqlx::query(
"create table if not exists iceberg_namespace_properties (
catalog_name text not null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VARCHAR(255) here. Maybe we can copy&paste SQL from java iceberg?

          + CATALOG_NAME
          + " VARCHAR(255) NOT NULL,"
          + NAMESPACE_NAME
          + " VARCHAR(255) NOT NULL,"
          + NAMESPACE_PROPERTY_KEY
          + " VARCHAR(255),"
          + NAMESPACE_PROPERTY_VALUE
          + " VARCHAR(1000),"

crates/catalog/sql/Cargo.toml Outdated Show resolved Hide resolved
let rows = connection.transaction(|txn|{
let name = self.name.clone();
Box::pin(async move {
sqlx::query(&format!("select distinct table_namespace from iceberg_tables where catalog_name = '{}';",&name)).fetch_all(&mut **txn).await
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, it's better to use prepare statement here.

let rows = connection.transaction(|txn|{
let name = self.name.clone();
Box::pin(async move {
sqlx::query(&format!("select distinct table_namespace from iceberg_tables where catalog_name = '{}';",&name)).fetch_all(&mut **txn).await
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several points with this implementation:

  1. If parent is None, we should list all namespaces.
  2. We should also count namespaces in iceberg_namespace_properties
  3. We should list only sub namespaces.

See java implementation here.

crates/catalog/sql/src/lib.rs Outdated Show resolved Hide resolved
Comment on lines 201 to 270
y.table_namespace
.split('.')
.map(ToString::to_string)
.collect::<Vec<_>>(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about extract this to a common method in NamespaceIdent?

crates/catalog/sql/src/catalog.rs Outdated Show resolved Hide resolved
}

async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
let metadata_location = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the insertion should not blind, we need to check its version first. My suggestion is to remove the cache for now so that things don't get too complicated.

@@ -44,21 +49,27 @@ pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
/// Reference to [`TableMetadata`].
pub type TableMetadataRef = Arc<TableMetadata>;

#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
#[derive(Debug, PartialEq, Deserialize, Eq, Clone, TypedBuilder)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a discussion in this pr about the table metadata builder. I have concern on this derived builder since it's error prone and not easy to review. TableMetadataBuilder will be heavily used by transaction api and we will need to do a lot of check for in it. I would suggest to maintain this struct manually, what do you think?

@liurenjie1024
Copy link
Collaborator

cc @JanKaul Is this pr ready for review or you need to do more updates?

@JanKaul
Copy link
Collaborator Author

JanKaul commented Apr 17, 2024

I have to add a couple of more changes. I'll notify you when I'm finished.

@himadripal
Copy link
Contributor

@JanKaul WDYT? I think this PR is ready for review, I can add the update and delete in a separate PR.

@Fokko Fokko added this to the 0.3.0 Release milestone Apr 24, 2024
@liurenjie1024
Copy link
Collaborator

@JanKaul WDYT? I think this PR is ready for review, I can add the update and delete in a separate PR.

Cool, I'll take a look first.

JanKaul and others added 2 commits April 25, 2024 09:40
Co-authored-by: Renjie Liu <liurenjie2008@gmail.com>
@JanKaul
Copy link
Collaborator Author

JanKaul commented Apr 25, 2024

Thank you all for your helpful comments. I think the PR is ready for review again.

@liurenjie1024 @sdd @odysa @ZENOTME @martin-g

Copy link
Collaborator

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @JanKaul for this pr, we moved a huge step forward! I think there are some places we can improve a little to make it more robust.

crates/catalog/sql/Cargo.toml Outdated Show resolved Hide resolved
crates/catalog/sql/Cargo.toml Outdated Show resolved Hide resolved
crates/catalog/sql/src/catalog.rs Outdated Show resolved Hide resolved
name: String,
connection: AnyPool,
storage: FileIO,
cache: Arc<DashMap<TableIdent, (String, TableMetadata)>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hesitating to add cache here, maybe we can add sth like CachedCatalog in java so that all catalog implementations could benefit from it?

crates/catalog/sql/src/catalog.rs Show resolved Hide resolved
crates/catalog/sql/src/catalog.rs Outdated Show resolved Hide resolved
crates/catalog/sql/src/catalog.rs Outdated Show resolved Hide resolved
crates/catalog/sql/src/catalog.rs Outdated Show resolved Hide resolved
crates/catalog/sql/src/catalog.rs Outdated Show resolved Hide resolved
Ok(table)
}

async fn create_table(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some things missing here:

  1. We should first check namespace exists
  2. The location is optional, it should use warehouse's subdir as location

I would suggest to refer to python's implementation,

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement Sql Catalog.
8 participants