diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java
index 59744e50924f..e7e07480b9be 100644
--- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java
+++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java
@@ -173,4 +173,24 @@ private CatalogProperties() {}
"org.apache.iceberg.azure.keymanagement.AzureKeyManagementClient";
public static final String ENCRYPTION_KMS_IMPL_GCP =
"org.apache.iceberg.gcp.GcpKeyManagementClient";
+
+ /**
+ * Controls whether identifier resolution (namespace, table, view) should be case-insensitive.
+ *
+ *
When enabled, all identifiers are normalized to a consistent case before being sent to the
+ * catalog backend. This is useful for catalogs that preserve identifier case but may require
+ * case-insensitive lookups.
+ */
+ public static final String CASE_INSENSITIVE = "case-insensitive";
+
+ public static final boolean CASE_INSENSITIVE_DEFAULT = false;
+
+ /**
+ * Controls the case conversion type when {@link #CASE_INSENSITIVE} is enabled.
+ *
+ *
Supported values are {@code lower_case} (default) and {@code upper_case}.
+ */
+ public static final String CASE_INSENSITIVE_TYPE = "case-insensitive-type";
+
+ public static final String CASE_INSENSITIVE_TYPE_DEFAULT = "lower_case";
}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
index a3c66ed09def..7dbde7d2ea1c 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
@@ -21,6 +21,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
@@ -39,8 +40,10 @@
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.view.View;
import org.apache.iceberg.view.ViewBuilder;
@@ -52,6 +55,9 @@ public class RESTCatalog
private final SessionCatalog.SessionContext context;
private final ViewCatalog viewSessionCatalog;
+ private boolean caseInsensitive = CatalogProperties.CASE_INSENSITIVE_DEFAULT;
+ private String caseType = CatalogProperties.CASE_INSENSITIVE_TYPE_DEFAULT;
+
public RESTCatalog() {
this(
SessionCatalog.SessionContext.createEmpty(),
@@ -93,9 +99,56 @@ protected RESTSessionCatalog newSessionCatalog(
@Override
public void initialize(String name, Map props) {
Preconditions.checkArgument(props != null, "Invalid configuration: null");
+
+ this.caseInsensitive =
+ PropertyUtil.propertyAsBoolean(
+ props, CatalogProperties.CASE_INSENSITIVE, CatalogProperties.CASE_INSENSITIVE_DEFAULT);
+ this.caseType =
+ PropertyUtil.propertyAsString(
+ props,
+ CatalogProperties.CASE_INSENSITIVE_TYPE,
+ CatalogProperties.CASE_INSENSITIVE_TYPE_DEFAULT);
+ Preconditions.checkArgument(
+ "lower_case".equals(caseType) || "upper_case".equals(caseType),
+ "Invalid value for '%s': %s. Allowed values are: lower_case, upper_case",
+ CatalogProperties.CASE_INSENSITIVE_TYPE,
+ caseType);
+
sessionCatalog.initialize(name, props);
}
+ @VisibleForTesting
+ Namespace convertCase(Namespace ns) {
+ if (!caseInsensitive) {
+ return ns;
+ }
+
+ String[] levels = ns.levels();
+ String[] converted = new String[levels.length];
+ for (int i = 0; i < levels.length; i++) {
+ converted[i] =
+ "upper_case".equals(caseType)
+ ? levels[i].toUpperCase(Locale.ROOT)
+ : levels[i].toLowerCase(Locale.ROOT);
+ }
+
+ return Namespace.of(converted);
+ }
+
+ @VisibleForTesting
+ TableIdentifier convertCase(TableIdentifier ident) {
+ if (!caseInsensitive) {
+ return ident;
+ }
+
+ Namespace convertedNs = convertCase(ident.namespace());
+ String convertedName =
+ "upper_case".equals(caseType)
+ ? ident.name().toUpperCase(Locale.ROOT)
+ : ident.name().toLowerCase(Locale.ROOT);
+ return TableIdentifier.of(convertedNs, convertedName);
+ }
+
protected RESTSessionCatalog sessionCatalog() {
return sessionCatalog;
}
@@ -111,27 +164,27 @@ public Map properties() {
@Override
public List listTables(Namespace ns) {
- return delegate.listTables(ns);
+ return delegate.listTables(convertCase(ns));
}
@Override
public boolean tableExists(TableIdentifier ident) {
- return delegate.tableExists(ident);
+ return delegate.tableExists(convertCase(ident));
}
@Override
public Table loadTable(TableIdentifier ident) {
- return delegate.loadTable(ident);
+ return delegate.loadTable(convertCase(ident));
}
@Override
public void invalidateTable(TableIdentifier ident) {
- delegate.invalidateTable(ident);
+ delegate.invalidateTable(convertCase(ident));
}
@Override
public TableBuilder buildTable(TableIdentifier ident, Schema schema) {
- return delegate.buildTable(ident, schema);
+ return delegate.buildTable(convertCase(ident), schema);
}
@Override
@@ -141,23 +194,23 @@ public Table createTable(
PartitionSpec spec,
String location,
Map props) {
- return delegate.createTable(ident, schema, spec, location, props);
+ return delegate.createTable(convertCase(ident), schema, spec, location, props);
}
@Override
public Table createTable(
TableIdentifier ident, Schema schema, PartitionSpec spec, Map props) {
- return delegate.createTable(ident, schema, spec, props);
+ return delegate.createTable(convertCase(ident), schema, spec, props);
}
@Override
public Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spec) {
- return delegate.createTable(ident, schema, spec);
+ return delegate.createTable(convertCase(ident), schema, spec);
}
@Override
public Table createTable(TableIdentifier identifier, Schema schema) {
- return delegate.createTable(identifier, schema);
+ return delegate.createTable(convertCase(identifier), schema);
}
@Override
@@ -167,24 +220,24 @@ public Transaction newCreateTableTransaction(
PartitionSpec spec,
String location,
Map props) {
- return delegate.newCreateTableTransaction(ident, schema, spec, location, props);
+ return delegate.newCreateTableTransaction(convertCase(ident), schema, spec, location, props);
}
@Override
public Transaction newCreateTableTransaction(
TableIdentifier ident, Schema schema, PartitionSpec spec, Map props) {
- return delegate.newCreateTableTransaction(ident, schema, spec, props);
+ return delegate.newCreateTableTransaction(convertCase(ident), schema, spec, props);
}
@Override
public Transaction newCreateTableTransaction(
TableIdentifier ident, Schema schema, PartitionSpec spec) {
- return delegate.newCreateTableTransaction(ident, schema, spec);
+ return delegate.newCreateTableTransaction(convertCase(ident), schema, spec);
}
@Override
public Transaction newCreateTableTransaction(TableIdentifier identifier, Schema schema) {
- return delegate.newCreateTableTransaction(identifier, schema);
+ return delegate.newCreateTableTransaction(convertCase(identifier), schema);
}
@Override
@@ -195,7 +248,8 @@ public Transaction newReplaceTableTransaction(
String location,
Map props,
boolean orCreate) {
- return delegate.newReplaceTableTransaction(ident, schema, spec, location, props, orCreate);
+ return delegate.newReplaceTableTransaction(
+ convertCase(ident), schema, spec, location, props, orCreate);
}
@Override
@@ -205,81 +259,81 @@ public Transaction newReplaceTableTransaction(
PartitionSpec spec,
Map props,
boolean orCreate) {
- return delegate.newReplaceTableTransaction(ident, schema, spec, props, orCreate);
+ return delegate.newReplaceTableTransaction(convertCase(ident), schema, spec, props, orCreate);
}
@Override
public Transaction newReplaceTableTransaction(
TableIdentifier ident, Schema schema, PartitionSpec spec, boolean orCreate) {
- return delegate.newReplaceTableTransaction(ident, schema, spec, orCreate);
+ return delegate.newReplaceTableTransaction(convertCase(ident), schema, spec, orCreate);
}
@Override
public Transaction newReplaceTableTransaction(
TableIdentifier ident, Schema schema, boolean orCreate) {
- return delegate.newReplaceTableTransaction(ident, schema, orCreate);
+ return delegate.newReplaceTableTransaction(convertCase(ident), schema, orCreate);
}
@Override
public boolean dropTable(TableIdentifier ident) {
- return delegate.dropTable(ident);
+ return delegate.dropTable(convertCase(ident));
}
@Override
public boolean dropTable(TableIdentifier ident, boolean purge) {
- return delegate.dropTable(ident, purge);
+ return delegate.dropTable(convertCase(ident), purge);
}
@Override
public void renameTable(TableIdentifier from, TableIdentifier to) {
- delegate.renameTable(from, to);
+ delegate.renameTable(convertCase(from), convertCase(to));
}
@Override
public Table registerTable(TableIdentifier ident, String metadataFileLocation) {
- return delegate.registerTable(ident, metadataFileLocation);
+ return delegate.registerTable(convertCase(ident), metadataFileLocation);
}
@Override
public Table registerTable(
TableIdentifier ident, String metadataFileLocation, boolean overwrite) {
- return delegate.registerTable(ident, metadataFileLocation, overwrite);
+ return delegate.registerTable(convertCase(ident), metadataFileLocation, overwrite);
}
@Override
public void createNamespace(Namespace ns, Map props) {
- nsDelegate.createNamespace(ns, props);
+ nsDelegate.createNamespace(convertCase(ns), props);
}
@Override
public List listNamespaces(Namespace ns) throws NoSuchNamespaceException {
- return nsDelegate.listNamespaces(ns);
+ return nsDelegate.listNamespaces(convertCase(ns));
}
@Override
public boolean namespaceExists(Namespace namespace) {
- return nsDelegate.namespaceExists(namespace);
+ return nsDelegate.namespaceExists(convertCase(namespace));
}
@Override
public Map loadNamespaceMetadata(Namespace ns) throws NoSuchNamespaceException {
- return nsDelegate.loadNamespaceMetadata(ns);
+ return nsDelegate.loadNamespaceMetadata(convertCase(ns));
}
@Override
public boolean dropNamespace(Namespace ns) throws NamespaceNotEmptyException {
- return nsDelegate.dropNamespace(ns);
+ return nsDelegate.dropNamespace(convertCase(ns));
}
@Override
public boolean setProperties(Namespace ns, Map props)
throws NoSuchNamespaceException {
- return nsDelegate.setProperties(ns, props);
+ return nsDelegate.setProperties(convertCase(ns), props);
}
@Override
public boolean removeProperties(Namespace ns, Set props) throws NoSuchNamespaceException {
- return nsDelegate.removeProperties(ns, props);
+ return nsDelegate.removeProperties(convertCase(ns), props);
}
@Override
@@ -303,41 +357,41 @@ public void commitTransaction(TableCommit... commits) {
@Override
public List listViews(Namespace namespace) {
- return viewSessionCatalog.listViews(namespace);
+ return viewSessionCatalog.listViews(convertCase(namespace));
}
@Override
public View loadView(TableIdentifier identifier) {
- return viewSessionCatalog.loadView(identifier);
+ return viewSessionCatalog.loadView(convertCase(identifier));
}
@Override
public ViewBuilder buildView(TableIdentifier identifier) {
- return viewSessionCatalog.buildView(identifier);
+ return viewSessionCatalog.buildView(convertCase(identifier));
}
@Override
public boolean dropView(TableIdentifier identifier) {
- return viewSessionCatalog.dropView(identifier);
+ return viewSessionCatalog.dropView(convertCase(identifier));
}
@Override
public void renameView(TableIdentifier from, TableIdentifier to) {
- viewSessionCatalog.renameView(from, to);
+ viewSessionCatalog.renameView(convertCase(from), convertCase(to));
}
@Override
public boolean viewExists(TableIdentifier identifier) {
- return viewSessionCatalog.viewExists(identifier);
+ return viewSessionCatalog.viewExists(convertCase(identifier));
}
@Override
public void invalidateView(TableIdentifier identifier) {
- viewSessionCatalog.invalidateView(identifier);
+ viewSessionCatalog.invalidateView(convertCase(identifier));
}
@Override
public View registerView(TableIdentifier identifier, String metadataFileLocation) {
- return viewSessionCatalog.registerView(identifier, metadataFileLocation);
+ return viewSessionCatalog.registerView(convertCase(identifier), metadataFileLocation);
}
}
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalogCaseInsensitive.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalogCaseInsensitive.java
new file mode 100644
index 000000000000..36379f934501
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalogCaseInsensitive.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.Map;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.inmemory.InMemoryCatalog;
+import org.junit.jupiter.api.Test;
+
+public class TestRESTCatalogCaseInsensitive {
+
+ private static RESTCatalogAdapter newAdapter() {
+ InMemoryCatalog backendCatalog = new InMemoryCatalog();
+ backendCatalog.initialize("in-memory", Map.of());
+ return new RESTCatalogAdapter(backendCatalog);
+ }
+
+ private RESTCatalog createCatalogWithCaseInsensitive(String type) {
+ RESTCatalog catalog = new RESTCatalog(config -> newAdapter());
+ catalog.initialize(
+ "test",
+ Map.of(
+ "uri", "http://localhost:8080",
+ "case-insensitive", "true",
+ "case-insensitive-type", type));
+ return catalog;
+ }
+
+ @Test
+ public void testConvertCaseDisabledByDefault() {
+ RESTCatalog catalog = new RESTCatalog(config -> newAdapter());
+ catalog.initialize("test", Map.of("uri", "http://localhost:8080"));
+
+ Namespace ns = Namespace.of("CustomerDB", "Schema");
+ TableIdentifier ident = TableIdentifier.of(ns, "Orders");
+
+ assertThat(catalog.convertCase(ns)).isEqualTo(ns);
+ assertThat(catalog.convertCase(ident)).isEqualTo(ident);
+ }
+
+ @Test
+ public void testConvertCaseLowerCase() {
+ RESTCatalog catalog = createCatalogWithCaseInsensitive("lower_case");
+
+ Namespace ns = Namespace.of("CustomerDB", "Schema");
+ Namespace expected = Namespace.of("customerdb", "schema");
+ assertThat(catalog.convertCase(ns)).isEqualTo(expected);
+
+ TableIdentifier ident = TableIdentifier.of(ns, "Orders");
+ TableIdentifier expectedIdent = TableIdentifier.of(expected, "orders");
+ assertThat(catalog.convertCase(ident)).isEqualTo(expectedIdent);
+ }
+
+ @Test
+ public void testConvertCaseUpperCase() {
+ RESTCatalog catalog = createCatalogWithCaseInsensitive("upper_case");
+
+ Namespace ns = Namespace.of("customerdb", "schema");
+ Namespace expected = Namespace.of("CUSTOMERDB", "SCHEMA");
+ assertThat(catalog.convertCase(ns)).isEqualTo(expected);
+
+ TableIdentifier ident = TableIdentifier.of(ns, "orders");
+ TableIdentifier expectedIdent = TableIdentifier.of(expected, "ORDERS");
+ assertThat(catalog.convertCase(ident)).isEqualTo(expectedIdent);
+ }
+
+ @Test
+ public void testConvertCaseMixedInput() {
+ RESTCatalog catalog = createCatalogWithCaseInsensitive("lower_case");
+
+ Namespace ns = Namespace.of("MyDB", "MySchema");
+ assertThat(catalog.convertCase(ns)).isEqualTo(Namespace.of("mydb", "myschema"));
+
+ TableIdentifier ident = TableIdentifier.of("MyDB", "MyTable");
+ assertThat(catalog.convertCase(ident)).isEqualTo(TableIdentifier.of("mydb", "mytable"));
+ }
+
+ @Test
+ public void testConvertCaseEmptyNamespace() {
+ RESTCatalog catalog = createCatalogWithCaseInsensitive("lower_case");
+
+ Namespace empty = Namespace.empty();
+ assertThat(catalog.convertCase(empty)).isEqualTo(empty);
+ }
+
+ @Test
+ public void testConvertCaseMultiLevelNamespace() {
+ RESTCatalog catalog = createCatalogWithCaseInsensitive("lower_case");
+
+ Namespace ns = Namespace.of("Finance", "Q1", "Reports");
+ assertThat(catalog.convertCase(ns)).isEqualTo(Namespace.of("finance", "q1", "reports"));
+ }
+
+ @Test
+ public void testCaseVariationsAllNormalizeToSame() {
+ RESTCatalog catalog = createCatalogWithCaseInsensitive("lower_case");
+
+ TableIdentifier lower = catalog.convertCase(TableIdentifier.of("customerdb", "orders"));
+ TableIdentifier upper = catalog.convertCase(TableIdentifier.of("CUSTOMERDB", "ORDERS"));
+ TableIdentifier mixed = catalog.convertCase(TableIdentifier.of("CustomerDB", "Orders"));
+ TableIdentifier random = catalog.convertCase(TableIdentifier.of("cUsToMeRdB", "oRdErS"));
+
+ assertThat(lower).isEqualTo(upper).isEqualTo(mixed).isEqualTo(random);
+ }
+
+ @Test
+ public void testInvalidCaseType() {
+ RESTCatalog catalog = new RESTCatalog(config -> newAdapter());
+
+ assertThatThrownBy(
+ () ->
+ catalog.initialize(
+ "test",
+ Map.of(
+ "uri", "http://localhost:8080",
+ "case-insensitive", "true",
+ "case-insensitive-type", "invalid")))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("lower_case, upper_case");
+ }
+}