-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GCP: Add Iceberg Catalog for GCP BigLake Metastore #7412
Conversation
sorry for posting here, ( asking from a user perspective) |
It provides the same functionalities that an Iceberg custom catalog supports (https://iceberg.apache.org/docs/latest/custom-catalog/), like the existing HiveCatalog. It supports read/write with Spark and Flink. It does not work with Trino (needs Trino integration here: https://github.com/trinodb/trino/tree/master/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog). The BigLake Metastore API is not the same as the Iceberg REST catalog API spec, but they should be compatible and convertible via a proxy. We are happy to explore how to make it work with Iceberg REST client. Please let me know if you have any use cases. BigLake Metastore works with PyIceberg, we have a Python client: https://cloud.google.com/python/docs/reference/biglake/latest. We need to contribute some code in PyIceberg for the integration. BTW, we already released this catalog as a JAR: gs://spark-lib/biglake/biglake-catalog-iceberg1.2.0-0.1.0-with-dependencies.jar, with user guide here: https://cloud.google.com/bigquery/docs/iceberg-tables. Please give it a try : ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few initial comments on a quick first pass.
gcp/src/main/java/org/apache/iceberg/gcp/biglake/BigLakeCatalog.java
Outdated
Show resolved
Hide resolved
// The endpoint of BigLake API. Optional, default to DEFAULT_BIGLAKE_SERVICE_ENDPOINT. | ||
public static final String PROPERTIES_KEY_BIGLAKE_ENDPOINT = "blms_endpoint"; | ||
// The GCP project ID. Required. | ||
public static final String PROPERTIES_KEY_GCP_PROJECT = "gcp_project"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These properties appear to be duplicates of values in GCPProperties
. Please use that class instead for defining and accessing properties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense. I added a TODO to use GCPProperties in a following PR. I changed the config name to follow the existing style (e.g., biglake.project-id) in this PR. The issue of simply using GCPProperties in this PR is that "gcs.project-id" is too GCS specific, but the class field name is "projectId". I hope to change it to gcsProjectId, and add a bigLakeProjectId. It touches other classes so I prefer to separate it out.
I actually feel "gcs.project-id" is not necessary: it would be great if customers can use buckets from any project (and different projects) in the same catalog handler. Most GCP customers today use a GCS connector (https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage). It is an adapter between HDFS and GCS. A URL like "gs://bucket/folder" will be handled by this connector, project ID is detected from the bucket. It is open-source and pre-installed in Dataproc. This makes me believe specifying a project ID for GCS is not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should defer the property updates to a separate PR. It doesn't make sense to introduce them here just to move them later. They are also public fields which means that if they end up in a release, they will need to go through a deprecation cycle. We can add the additional properties to the GCPProperties and make sure that they are namespaced appropriately to work with other properties.
If we can determine the project id from the bucket itself, that would be great, but one of the major points of the GCSFileIO is to remove dependencies on hadoop/hdfs, so using that connector is not a solution for this.
gcp/src/main/java/org/apache/iceberg/gcp/biglake/BigLakeCatalog.java
Outdated
Show resolved
Hide resolved
|
||
@Override | ||
public void createNamespace(Namespace namespace, Map<String, String> metadata) { | ||
if (namespace.levels().length == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this will map correctly to the spark catalog namespacing. What is the namespacing for the catalog?
Is it <spark-catalog>.<biglake-catalog>.<database/schema>.<table>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the confusion. There are two options: (1) link a <spark-catalog>
to a physical <biglake-catalog>
, so the full identifier of a table is just <spark-catalog>.<database/schema>.<table>
. CREATE/DROP <spark-catalog>
is supported: it creates/deletes <biglake-catalog>
via API. (2) Use <spark-catalog>.<biglake-catalog>.<database/schema>.<table>
.
We choose (1) to avoid the long table identifier in (2) (linking is done by a config biglake.catalog
). The limitation is that customers can't use two <biglake-catalog>
s in the same <spark-catalog>
, they have to install two <spark-catalog>
s instead. We think it is OK, because usually the tables to use together are in the same catalog.
My concern is, I am not sure whether (1) violates any design pattern of Spark namespaces. Please let me know if it does.
gcp/src/main/java/org/apache/iceberg/gcp/biglake/BigLakeCatalog.java
Outdated
Show resolved
Hide resolved
gcp/src/main/java/org/apache/iceberg/gcp/biglake/BigLakeClient.java
Outdated
Show resolved
Hide resolved
gcp/src/main/java/org/apache/iceberg/gcp/biglake/BigLakeClientImpl.java
Outdated
Show resolved
Hide resolved
gcp/src/main/java/org/apache/iceberg/gcp/biglake/BigLakeTableOperations.java
Outdated
Show resolved
Hide resolved
private Table makeNewTable(TableMetadata metadata, String metadataFileLocation) { | ||
Table.Builder builder = Table.newBuilder().setType(Table.Type.HIVE); | ||
builder | ||
.getHiveOptionsBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary? These options don't apply to Iceberg tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another way to use BigLake Metastore for Iceberg tables is by installing an HMS proxy exposing a local 9083 Thrift port. This proxy reads from the Metastore API and returns HMS tables. The benefit of this approach is that it works for all data engines that support HMS (without having to write a catalog plugin). We populate these fields because the Hive Iceberg catalog populates them. It makes these fields stored in BigLake Metastore, so the HMS proxy path can return exactly the same table as HMS does.
We received requests to develop more Iceberg catalog plugins (e.g., for Trino) for BigLake Metastore. After we have these catalog plugins, maybe we don't need to keep this HMS path any more. If it is OK, I'd like to keep the code, and we can remove them in future.
@coufon can you help explain and document how the atomic update works? I assume this is somehow related to the |
Thank you so much for the review. I added a comment in the code to explain atomic update:
|
import java.util.Map; | ||
|
||
/** A client interface of Google BigLake service. */ | ||
interface BigLakeClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the value of this interface? Do you expect someone to swap out the implementation for some reason? And if so, why not just swap out the entire catalog instead? That would make more sense to me. Then you wouldn't need a second interface that basically duplicates the public Catalog
API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interface is for creating a fake client implementation. In the previous commit, I use mocked clients so this interface was not used. But now I switched to CatalogTests as you suggested, mocking client per test case doesn't work any more, so I added a FakeBigLakeClient implementing this interface for tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still struggling to see how this interface makes sense. The implementation only wraps MetastoreServiceClient
so it seems like you should be able to mock or spy that instead of creating one here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach also doesn't test any of the code in BigLakeClientImpl
, like convertException
. I think this should be removed and replaced by testing against a different MetastoreServiceClient
.
gcp/src/main/java/org/apache/iceberg/gcp/biglake/BigLakeCatalog.java
Outdated
Show resolved
Hide resolved
gcp/src/main/java/org/apache/iceberg/gcp/biglake/BigLakeCatalog.java
Outdated
Show resolved
Hide resolved
gcp/src/main/java/org/apache/iceberg/gcp/biglake/BigLakeCatalog.java
Outdated
Show resolved
Hide resolved
gcp/src/main/java/org/apache/iceberg/gcp/biglake/BigLakeCatalog.java
Outdated
Show resolved
Hide resolved
gcp/src/main/java/org/apache/iceberg/gcp/biglake/BigLakeCatalog.java
Outdated
Show resolved
Hide resolved
gcp/src/main/java/org/apache/iceberg/gcp/biglake/BigLakeCatalog.java
Outdated
Show resolved
Hide resolved
return false; | ||
} | ||
} catch (NoSuchNamespaceException e) { | ||
LOG.warn("Failed to drop namespace", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This log message is inaccurate. If the database didn't exist, then there was no failure. This method is idempotent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we can't tell whether it is not found or permission denied. For permission denied, we did want to notice the user to double check. Maybe the BigLake client should not convert 403 to 404, instead just convert 403 to Iceberg NotAuthorizedException? then the downstream never treat these errors as not found.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to return NotAuthorizedException from the client instead of 403. The problem is that the tests in CatalogTests expects NoSuchTableException and NoSuchNamespaceException. More refactoring is required. I feel convert 403 to 404 and make it is explicit to users is the best option for now.
gcp/src/main/java/org/apache/iceberg/gcp/biglake/BigLakeCatalog.java
Outdated
Show resolved
Hide resolved
gcp/src/main/java/org/apache/iceberg/gcp/biglake/BigLakeCatalog.java
Outdated
Show resolved
Hide resolved
} else if (namespace.levels().length == 1) { | ||
String dbId = databaseId(namespace); | ||
validateDatabaseId(dbId, namespace); | ||
return loadDatabase(dbId).getHiveOptions().getParametersMap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this parameter map immutable or unmodifiable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is unmodifiable: https://protobuf.dev/reference/java/java-generated/#map-fields
gcp/src/main/java/org/apache/iceberg/gcp/biglake/BigLakeCatalog.java
Outdated
Show resolved
Hide resolved
public Catalog catalog(CatalogName name) { | ||
try { | ||
return stub.getCatalog(GetCatalogRequest.newBuilder().setName(name.toString()).build()); | ||
} catch (PermissionDeniedException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would make sense if everything returned 404, but returning 401/403 and translating that to 404 doesn't make any sense to me.
When we last talked, the argument for this behavior was that returning 401/403 leaks the fact that the object exists. But this is actually doing the opposite and hiding the fact that the object doesn't exist by throwing NoSuchNamespaceException
. I guess that in the end, the service is returning a single response for all 401/403/404 cases, but it's strange to use a permission error rather than a not-exists error.
Can you confirm that the service will never return a 404?
Also, if the service guarantees that 401, 403, and 404 will result in PermissionDeniedException
, then that should be documented in this class somewhere, probably in class-level Javadoc. Then maybe we don't need to add the confusing "(or permission denied)" to all of the messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes the service never returns 404, it is always 403. The response is like this:
{
"error": {
"code": 403,
"message": "Permission 'biglake.databases.get' denied on resource '//biglake.googleapis.com/projects/myproj/locations/us/catalogs/mycat/databases/notexist' (or it may not exist).",
"status": "PERMISSION_DENIED",
}
This is the unified response format in new GCP APIs. The error message mentions it could be "permission denied" or "not exist". Here is another example: https://stackoverflow.com/questions/75357894/permission-logging-logentries-create-denied-on-resource-or-it-may-not-exist
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a description to the class java doc.
.setDatabaseId(name.getDatabase()) | ||
.setDatabase(db) | ||
.build()); | ||
} catch (com.google.api.gax.rpc.AlreadyExistsException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this also handle PermissionDeniedException
since that may be thrown by any of the calls? Here it should probably be translated to Iceberg's ForbiddenException
or NotAuthorizedException
. Or is this an appropriate place to return NoSuchNamespaceException
to indicate that the catalog doesn't exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The wrapper convertException will handle PermissionDeniedException and convert it to Iceberg's NotAuthorizedException. It is extracted to a wrapper for reducing boilerplate code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is a good point. We should check whether it failed due to parent not found or permission denied. It needs parsing the error message. I added a TODO to do it in a follow-up PR, to avoid adding more new code to this PR which has been around for long.
The error message is like this (this example creates database): "Permission 'biglake.databases.create' denied on resource '//biglake.googleapis.com/projects/myproj/locations/us/catalogs/mycat' (or it may not exist).". We need to determine whether the error is on this resource (table) or its parent (database or catalog).
We released these code here (https://cloud.google.com/bigquery/docs/manage-open-source-metadata#connect-dataproc-vm): Iceberg 1.2.0: gs://spark-lib/biglake/biglake-catalog-iceberg1.2.0-0.1.1-with-dependencies.jar Feel free to try these before this PR is merged. |
@rdblue just wanted to check if you had any remaining concerns here? |
@coufon @emkornfield Are there instructions on how to build the equivalent artifacts using this pull request, including dependencies ( |
Hi @dchristle @coufon Please guide me. Thanks. |
I just came across this PR -- it looks like this would be a hugely valuable addition to Iceberg! My company would certainly benefit from this. It seems to have got stuck though. Is there anything I can do to help get this over the finish line? I'd be willing to help out if there's any chance this could get released. |
+1 would love to see this move forward :) |
@coufon sorry for the delay here, this fell off the radar unfortunately. Could you rebase the PR please? |
@coufon I think we can close this for now, as we will be investing the effort in the BigQuery Metastore catalog we announced at Next (https://www.youtube.com/watch?v=LIMnhzJWmLQ&t=1s) |
Add the basic implementation of a new Iceberg catalog for GCP BigLake Metastore.
BigLake Metastore (BLMS) is a serverless metastore for Dataproc and BigQuery on GCP. BLMS provides a HMS style API for Iceberg tables. Iceberg tables stored in BLMS are queryable in BigQuery (https://cloud.google.com/bigquery/docs/iceberg-tables).
BLMS API reference: https://cloud.google.com/bigquery/docs/reference/biglake/rest
BLMS API clients: https://github.com/googleapis/google-cloud-java/tree/main/java-biglake