diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMap.java new file mode 100644 index 0000000000000..a4ad1f6994f93 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMap.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +/** + * Case-insensitive map of string keys to string values. + *

+ * This is used to pass options to v2 implementations to ensure consistent case insensitivity. + *

+ * Methods that return keys in this map, like {@link #entrySet()} and {@link #keySet()}, return + * keys converted to lower case. + */ +public class CaseInsensitiveStringMap implements Map { + + public static CaseInsensitiveStringMap empty() { + return new CaseInsensitiveStringMap(); + } + + private final Map delegate; + + private CaseInsensitiveStringMap() { + this.delegate = new HashMap<>(); + } + + @Override + public int size() { + return delegate.size(); + } + + @Override + public boolean isEmpty() { + return delegate.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return delegate.containsKey(key.toString().toLowerCase(Locale.ROOT)); + } + + @Override + public boolean containsValue(Object value) { + return delegate.containsValue(value); + } + + @Override + public String get(Object key) { + return delegate.get(key.toString().toLowerCase(Locale.ROOT)); + } + + @Override + public String put(String key, String value) { + return delegate.put(key.toLowerCase(Locale.ROOT), value); + } + + @Override + public String remove(Object key) { + return delegate.remove(key.toString().toLowerCase(Locale.ROOT)); + } + + @Override + public void putAll(Map m) { + for (Map.Entry entry : m.entrySet()) { + delegate.put(entry.getKey().toLowerCase(Locale.ROOT), entry.getValue()); + } + } + + @Override + public void clear() { + delegate.clear(); + } + + @Override + public Set keySet() { + return delegate.keySet(); + } + + @Override + public Collection values() { + return delegate.values(); + } + + @Override + public Set> entrySet() { + return delegate.entrySet(); + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogProvider.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogProvider.java new file mode 100644 index 0000000000000..03831b7aa9155 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogProvider.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.internal.SQLConf; + +/** + * A marker interface to provide a catalog implementation for Spark. + *

+ * Implementations can provide catalog functions by implementing additional interfaces, like + * {@link TableCatalog} to expose table operations. + *

+ * Catalog implementations must implement this marker interface to be loaded by + * {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog classes using the + * required public no-arg constructor. After creating an instance, it will be configured by calling + * {@link #initialize(CaseInsensitiveStringMap)}. + *

+ * Catalog implementations are registered to a name by adding a configuration option to Spark: + * {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties + * in the Spark configuration that share the catalog name prefix, + * {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the case insensitive + * string map of options in initialization with the prefix removed. An additional property, + * {@code name}, is also added to the options and will contain the catalog's name; in this case, + * "catalog-name". + */ +public interface CatalogProvider { + /** + * Called to initialize configuration. + *

+ * This method is called once, just after the provider is instantiated. + * + * @param options a case-insensitive string map of configuration + */ + void initialize(CaseInsensitiveStringMap options); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java new file mode 100644 index 0000000000000..71ab9f528dbe7 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.SparkException; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.util.Utils; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static scala.collection.JavaConverters.mapAsJavaMapConverter; + +public class Catalogs { + private Catalogs() { + } + + /** + * Load and configure a catalog by name. + *

+ * This loads, instantiates, and initializes the catalog provider for each call; it does not + * cache or reuse instances. + * + * @param name a String catalog name + * @param conf a SQLConf + * @return an initialized CatalogProvider + * @throws SparkException If the provider class cannot be found or instantiated + */ + public static CatalogProvider load(String name, SQLConf conf) throws SparkException { + String providerClassName = conf.getConfString("spark.sql.catalog." + name, null); + if (providerClassName == null) { + throw new SparkException(String.format( + "Catalog '%s' provider not found: spark.sql.catalog.%s is not defined", name, name)); + } + + ClassLoader loader = Utils.getContextOrSparkClassLoader(); + + try { + Class providerClass = loader.loadClass(providerClassName); + + if (!CatalogProvider.class.isAssignableFrom(providerClass)) { + throw new SparkException(String.format( + "Provider class for catalog '%s' does not implement CatalogProvider: %s", + name, providerClassName)); + } + + CatalogProvider provider = CatalogProvider.class.cast(providerClass.newInstance()); + + provider.initialize(catalogOptions(name, conf)); + + return provider; + + } catch (ClassNotFoundException e) { + throw new SparkException(String.format( + "Cannot find catalog provider class for catalog '%s': %s", name, providerClassName)); + + } catch (IllegalAccessException e) { + throw new SparkException(String.format( + "Failed to call public no-arg constructor for catalog '%s': %s", name, providerClassName), + e); + + } catch (InstantiationException e) { + throw new SparkException(String.format( + "Failed while instantiating provider for catalog '%s': %s", name, providerClassName), + e.getCause()); + } + } + + /** + * Extracts a named catalog's configuration from a SQLConf. + * + * @param name a catalog name + * @param conf a SQLConf + * @return a case insensitive string map of options starting with spark.sql.catalog.(name). + */ + private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf) { + Map allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava(); + Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)"); + + CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty(); + for (Map.Entry entry : allConfs.entrySet()) { + Matcher matcher = prefix.matcher(entry.getKey()); + if (matcher.matches() && matcher.groupCount() > 0) { + options.put(matcher.group(1), entry.getValue()); + } + } + + // add name last to ensure it overwrites any conflicting options + options.put("name", name); + + return options; + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java new file mode 100644 index 0000000000000..30a20f27b8c65 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.types.StructType; + +import java.util.List; +import java.util.Map; + +/** + * Represents table metadata from a {@link TableCatalog} or other table sources. + */ +public interface Table { + /** + * Return the table properties. + * @return this table's map of string properties + */ + Map properties(); + + /** + * Return the table schema. + * @return this table's schema as a struct type + */ + StructType schema(); + + /** + * Return the table partitioning expressions. + * @return this table's partitioning expressions + */ + List partitionExpressions(); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java new file mode 100644 index 0000000000000..539beb0c39c56 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.types.StructType; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public interface TableCatalog extends CatalogProvider { + /** + * Load table metadata by {@link TableIdentifier identifier} from the catalog. + * + * @param ident a table identifier + * @return the table's metadata + * @throws NoSuchTableException If the table doesn't exist. + */ + Table loadTable(TableIdentifier ident) throws NoSuchTableException; + + /** + * Test whether a table exists using an {@link TableIdentifier identifier} from the catalog. + * + * @param ident a table identifier + * @return true if the table exists, false otherwise + */ + default boolean tableExists(TableIdentifier ident) { + try { + return loadTable(ident) != null; + } catch (NoSuchTableException e) { + return false; + } + } + + /** + * Create a table in the catalog. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table already exists for the identifier + */ + default Table createTable(TableIdentifier ident, + StructType schema) throws TableAlreadyExistsException { + return createTable(ident, schema, Collections.emptyList(), Collections.emptyMap()); + } + + /** + * Create a table in the catalog. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @param properties a string map of table properties + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table already exists for the identifier + */ + default Table createTable(TableIdentifier ident, + StructType schema, + Map properties) throws TableAlreadyExistsException { + return createTable(ident, schema, Collections.emptyList(), properties); + } + + /** + * Create a table in the catalog. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @param partitions a list of expressions to use for partitioning data in the table + * @param properties a string map of table properties + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table already exists for the identifier + */ + Table createTable(TableIdentifier ident, + StructType schema, + List partitions, + Map properties) throws TableAlreadyExistsException; + + /** + * Apply a list of {@link TableChange changes} to a table in the catalog. + *

+ * Implementations may reject the requested changes. If any change is rejected, none of the + * changes should be applied to the table. + * + * @param ident a table identifier + * @param changes a list of changes to apply to the table + * @return updated metadata for the table + * @throws NoSuchTableException If the table doesn't exist. + * @throws IllegalArgumentException If any change is rejected by the implementation. + */ + Table alterTable(TableIdentifier ident, + List changes) throws NoSuchTableException; + + /** + * Apply {@link TableChange changes} to a table in the catalog. + *

+ * Implementations may reject the requested changes. If any change is rejected, none of the + * changes should be applied to the table. + * + * @param ident a table identifier + * @param changes a list of changes to apply to the table + * @return updated metadata for the table + * @throws NoSuchTableException If the table doesn't exist. + * @throws IllegalArgumentException If any change is rejected by the implementation. + */ + default Table alterTable(TableIdentifier ident, + TableChange... changes) throws NoSuchTableException { + return alterTable(ident, Arrays.asList(changes)); + } + + /** + * Drop a table in the catalog. + * + * @param ident a table identifier + * @return true if a table was deleted, false if no table exists for the identifier + */ + boolean dropTable(TableIdentifier ident); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java new file mode 100644 index 0000000000000..3a8ba5e00b397 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.types.DataType; + +/** + * TableChange subclasses represent requested changes to a table. These are passed to + * {@link TableCatalog#alterTable}. For example, + *

+ *   import TableChange._
+ *   val catalog = source.asInstanceOf[TableSupport].catalog()
+ *   catalog.alterTable(ident,
+ *       addColumn("x", IntegerType),
+ *       renameColumn("a", "b"),
+ *       deleteColumn("c")
+ *     )
+ * 
+ */ +public interface TableChange { + + /** + * Create a TableChange for adding a top-level column to a table. + *

+ * Because "." may be interpreted as a field path separator or may be used in field names, it is + * not allowed in names passed to this method. To add to nested types or to add fields with + * names that contain ".", use {@link #addColumn(String, String, DataType)}. + * + * @param name the new top-level column name + * @param dataType the new column's data type + * @return a TableChange for the addition + */ + static TableChange addColumn(String name, DataType dataType) { + return new AddColumn(null, name, dataType); + } + + /** + * Create a TableChange for adding a nested column to a table. + *

+ * The parent name is used to find the parent struct type where the nested field will be added. + * If the parent name is null, the new column will be added to the root as a top-level column. + * If parent identifies a struct, a new column is added to that struct. If it identifies a list, + * the column is added to the list element struct, and if it identifies a map, the new column is + * added to the map's value struct. + *

+ * The given name is used to name the new column and names containing "." are not handled + * differently. + * + * @param parent the new field's parent + * @param name the new field name + * @param dataType the new field's data type + * @return a TableChange for the addition + */ + static TableChange addColumn(String parent, String name, DataType dataType) { + return new AddColumn(parent, name, dataType); + } + + /** + * Create a TableChange for renaming a field. + *

+ * The name is used to find the field to rename. The new name will replace the name of the type. + * For example, renameColumn("a.b.c", "x") should produce column a.b.x. + * + * @param name the current field name + * @param newName the new name + * @return a TableChange for the rename + */ + static TableChange renameColumn(String name, String newName) { + return new RenameColumn(name, newName); + } + + /** + * Create a TableChange for updating the type of a field. + *

+ * The name is used to find the field to update. + * + * @param name the field name + * @param newDataType the new data type + * @return a TableChange for the update + */ + static TableChange updateColumn(String name, DataType newDataType) { + return new UpdateColumn(name, newDataType); + } + + /** + * Create a TableChange for deleting a field from a table. + * + * @param name the name of the field to delete + * @return a TableChange for the delete + */ + static TableChange deleteColumn(String name) { + return new DeleteColumn(name); + } + + final class AddColumn implements TableChange { + private final String parent; + private final String name; + private final DataType dataType; + + private AddColumn(String parent, String name, DataType dataType) { + this.parent = parent; + this.name = name; + this.dataType = dataType; + } + + public String parent() { + return parent; + } + + public String name() { + return name; + } + + public DataType type() { + return dataType; + } + } + + final class RenameColumn implements TableChange { + private final String name; + private final String newName; + + private RenameColumn(String name, String newName) { + this.name = name; + this.newName = newName; + } + + public String name() { + return name; + } + + public String newName() { + return newName; + } + } + + final class UpdateColumn implements TableChange { + private final String name; + private final DataType newDataType; + + private UpdateColumn(String name, DataType newDataType) { + this.name = name; + this.newDataType = newDataType; + } + + public String name() { + return name; + } + + public DataType newDataType() { + return newDataType; + } + } + + final class DeleteColumn implements TableChange { + private final String name; + + private DeleteColumn(String name) { + this.name = name; + } + + public String name() { + return name; + } + } + +} diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMapSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMapSuite.java new file mode 100644 index 0000000000000..0d869108fa7d3 --- /dev/null +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMapSuite.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +public class CaseInsensitiveStringMapSuite { + @Test + public void testPutAndGet() { + CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty(); + options.put("kEy", "valUE"); + + Assert.assertEquals("Should return correct value for lower-case key", + "valUE", options.get("key")); + Assert.assertEquals("Should return correct value for upper-case key", + "valUE", options.get("KEY")); + } + + @Test + public void testKeySet() { + CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty(); + options.put("kEy", "valUE"); + + Set expectedKeySet = new HashSet<>(); + expectedKeySet.add("key"); + + Assert.assertEquals("Should return lower-case key set", expectedKeySet, options.keySet()); + } +} diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogLoadingSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogLoadingSuite.java new file mode 100644 index 0000000000000..62e26af7f0c60 --- /dev/null +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogLoadingSuite.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.SparkException; +import org.apache.spark.sql.internal.SQLConf; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.Callable; + +public class CatalogLoadingSuite { + @Test + public void testLoad() throws SparkException { + SQLConf conf = new SQLConf(); + conf.setConfString("spark.sql.catalog.test-name", TestCatalogProvider.class.getCanonicalName()); + + CatalogProvider provider = Catalogs.load("test-name", conf); + Assert.assertNotNull("Should instantiate a non-null provider", provider); + Assert.assertEquals("Provider should have correct implementation", + TestCatalogProvider.class, provider.getClass()); + + TestCatalogProvider testProvider = (TestCatalogProvider) provider; + Assert.assertEquals("Options should contain only one key", 1, testProvider.options.size()); + Assert.assertEquals("Options should contain correct catalog name", + "test-name", testProvider.options.get("name")); + } + + @Test + public void testInitializationOptions() throws SparkException { + SQLConf conf = new SQLConf(); + conf.setConfString("spark.sql.catalog.test-name", TestCatalogProvider.class.getCanonicalName()); + conf.setConfString("spark.sql.catalog.test-name.name", "overwritten"); + conf.setConfString("spark.sql.catalog.test-name.kEy", "valUE"); + + CatalogProvider provider = Catalogs.load("test-name", conf); + Assert.assertNotNull("Should instantiate a non-null provider", provider); + Assert.assertEquals("Provider should have correct implementation", + TestCatalogProvider.class, provider.getClass()); + + TestCatalogProvider testProvider = (TestCatalogProvider) provider; + + Assert.assertEquals("Options should contain only two keys", 2, testProvider.options.size()); + Assert.assertEquals("Options should contain correct catalog name", + "test-name", testProvider.options.get("name")); + Assert.assertEquals("Options should contain correct value for key", + "valUE", testProvider.options.get("key")); + } + + @Test + public void testLoadWithoutConfig() { + SQLConf conf = new SQLConf(); + + SparkException exc = intercept(SparkException.class, () -> Catalogs.load("missing", conf)); + + Assert.assertTrue("Should complain that implementation is not configured", + exc.getMessage().contains("provider not found: spark.sql.catalog.missing is not defined")); + Assert.assertTrue("Should identify the catalog by name", exc.getMessage().contains("missing")); + } + + @Test + public void testLoadMissingClass() { + SQLConf conf = new SQLConf(); + conf.setConfString("spark.sql.catalog.missing", "com.example.NoSuchCatalogProvider"); + + SparkException exc = intercept(SparkException.class, () -> Catalogs.load("missing", conf)); + + Assert.assertTrue("Should complain that the class is not found", + exc.getMessage().contains("Cannot find catalog provider class")); + Assert.assertTrue("Should identify the catalog by name", exc.getMessage().contains("missing")); + Assert.assertTrue("Should identify the missing class", + exc.getMessage().contains("com.example.NoSuchCatalogProvider")); + } + + @Test + public void testLoadNonCatalogProvider() { + SQLConf conf = new SQLConf(); + String invalidClassName = InvalidCatalogProvider.class.getCanonicalName(); + conf.setConfString("spark.sql.catalog.invalid", invalidClassName); + + SparkException exc = intercept(SparkException.class, () -> Catalogs.load("invalid", conf)); + + Assert.assertTrue("Should complain that class does not implement CatalogProvider", + exc.getMessage().contains("does not implement CatalogProvider")); + Assert.assertTrue("Should identify the catalog by name", exc.getMessage().contains("invalid")); + Assert.assertTrue("Should identify the class", exc.getMessage().contains(invalidClassName)); + } + + @Test + public void testLoadConstructorFailureCatalogProvider() { + SQLConf conf = new SQLConf(); + String invalidClassName = ConstructorFailureCatalogProvider.class.getCanonicalName(); + conf.setConfString("spark.sql.catalog.invalid", invalidClassName); + + RuntimeException exc = intercept(RuntimeException.class, () -> Catalogs.load("invalid", conf)); + + Assert.assertTrue("Should have expected error message", + exc.getMessage().contains("Expected failure")); + } + + @Test + public void testLoadAccessErrorCatalogProvider() { + SQLConf conf = new SQLConf(); + String invalidClassName = AccessErrorCatalogProvider.class.getCanonicalName(); + conf.setConfString("spark.sql.catalog.invalid", invalidClassName); + + SparkException exc = intercept(SparkException.class, () -> Catalogs.load("invalid", conf)); + + Assert.assertTrue("Should complain that no public constructor is provided", + exc.getMessage().contains("Failed to call public no-arg constructor for catalog")); + Assert.assertTrue("Should identify the catalog by name", exc.getMessage().contains("invalid")); + Assert.assertTrue("Should identify the class", exc.getMessage().contains(invalidClassName)); + } + + @SuppressWarnings("unchecked") + public static E intercept(Class expected, Callable callable) { + try { + callable.call(); + Assert.fail("No exception was thrown, expected: " + + expected.getName()); + } catch (Exception actual) { + try { + Assert.assertEquals(expected, actual.getClass()); + return (E) actual; + } catch (AssertionError e) { + e.addSuppressed(actual); + throw e; + } + } + // Compiler doesn't catch that Assert.fail will always throw an exception. + throw new UnsupportedOperationException("[BUG] Should not reach this statement"); + } +} + +class TestCatalogProvider implements CatalogProvider { + CaseInsensitiveStringMap options = null; + + TestCatalogProvider() { + } + + @Override + public void initialize(CaseInsensitiveStringMap options) { + this.options = options; + } +} + +class ConstructorFailureCatalogProvider implements CatalogProvider { // fails in its constructor + ConstructorFailureCatalogProvider() { + throw new RuntimeException("Expected failure."); + } + + @Override + public void initialize(CaseInsensitiveStringMap options) { + } +} + +class AccessErrorCatalogProvider implements CatalogProvider { // no public constructor + private AccessErrorCatalogProvider() { + } + + @Override + public void initialize(CaseInsensitiveStringMap options) { + } +} + +class InvalidCatalogProvider { // doesn't implement CatalogProvider + public void initialize(CaseInsensitiveStringMap options) { + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index d9278d8cd23d6..a4c8de6afceb3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -21,6 +21,7 @@ import java.io.Closeable import java.util.concurrent.atomic.AtomicReference import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal @@ -31,6 +32,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.catalog.Catalog +import org.apache.spark.sql.catalog.v2.{CatalogProvider, Catalogs} import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders._ @@ -610,6 +612,12 @@ class SparkSession private( */ @transient lazy val catalog: Catalog = new CatalogImpl(self) + @transient private lazy val catalogs = new mutable.HashMap[String, CatalogProvider]() + + private[sql] def catalog(name: String): CatalogProvider = synchronized { + catalogs.getOrElseUpdate(name, Catalogs.load(name, sessionState.conf)) + } + /** * Returns the specified table/view as a `DataFrame`. *