diff --git a/docs/content.zh/docs/dev/table/catalogs.md b/docs/content.zh/docs/dev/table/catalogs.md index 50de5af54e6f2..3b818ef291143 100644 --- a/docs/content.zh/docs/dev/table/catalogs.md +++ b/docs/content.zh/docs/dev/table/catalogs.md @@ -536,6 +536,9 @@ catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false); // alter function catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false); +// rename function +catalog.renameFunction(new ObjectPath("mydb", "myfunc"), "my_new_func", false); + // get function catalog.getFunction("myfunc"); @@ -561,6 +564,9 @@ catalog.drop_function(ObjectPath("mydb", "myfunc"), False) # alter function catalog.alter_function(ObjectPath("mydb", "myfunc"), catalog_function, False) +# rename function +catalog.rename_function(ObjectPath("mydb", "myfunc"), "my_new_func", False) + # get function catalog.get_function("myfunc") diff --git a/docs/content/docs/dev/table/catalogs.md b/docs/content/docs/dev/table/catalogs.md index 2b193c62f7c75..fe54e8cfe4cd9 100644 --- a/docs/content/docs/dev/table/catalogs.md +++ b/docs/content/docs/dev/table/catalogs.md @@ -540,6 +540,9 @@ catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false); // alter function catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false); +// rename function +catalog.renameFunction(new ObjectPath("mydb", "myfunc"), "my_new_func", false); + // get function catalog.getFunction("myfunc"); @@ -565,6 +568,9 @@ catalog.drop_function(ObjectPath("mydb", "myfunc"), False) # alter function catalog.alter_function(ObjectPath("mydb", "myfunc"), catalog_function, False) +# rename function +catalog.rename_function(ObjectPath("mydb", "myfunc"), "my_new_func", False) + # get function catalog.get_function("myfunc") diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index 2a85b8850aba4..56e8b92699389 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -1286,6 +1286,39 @@ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) } } + @Override + public void renameFunction( + ObjectPath functionPath, String newFunctionName, boolean ignoreIfNotExists) + throws FunctionNotExistException, FunctionAlreadyExistException, CatalogException { + checkNotNull(functionPath, "functionPath cannot be null"); + checkArgument( + !isNullOrWhitespaceOnly(newFunctionName), + "newFunctionName cannot be null or empty"); + + try { + if (functionExists(functionPath)) { + ObjectPath newPath = + new ObjectPath(functionPath.getDatabaseName(), newFunctionName); + + if (functionExists(newPath)) { + throw new FunctionAlreadyExistException(getName(), newPath); + } else { + Function function = + client.getFunction( + functionPath.getDatabaseName(), functionPath.getObjectName()); + function.setFunctionName(newFunctionName); + client.alterFunction( + functionPath.getDatabaseName(), functionPath.getObjectName(), function); + } + } else if (!ignoreIfNotExists) { + throw new FunctionNotExistException(getName(), functionPath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename function %s", functionPath.getFullName()), e); + } + } + @Override public List listFunctions(String databaseName) throws DatabaseNotExistException, CatalogException { diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUdfITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUdfITCase.java index c3efe604b34d9..84ce9e9295971 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUdfITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUdfITCase.java @@ -280,6 +280,38 @@ public void testDateUDF() throws Exception { } } + @Test + public void testRenameUDF() throws Exception { + + TableEnvironment tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE); + tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); + tableEnv.useCatalog(hiveCatalog.getName()); + tableEnv.executeSql( + String.format("create function myyeartest as '%s'", UDFYear.class.getName())); + tableEnv.executeSql("create table src(ts timestamp)"); + + // Rename function myyear to myyearrename + ObjectPath myYearObjectPath = new ObjectPath(HiveCatalog.DEFAULT_DB, "myyeartest"); + hiveCatalog.renameFunction(myYearObjectPath, "myyearrename", false); + + try { + HiveTestUtils.createTextTableInserter(hiveCatalog, "default", "src") + .addRow(new Object[] {Timestamp.valueOf("2013-07-15 10:00:00")}) + .addRow(new Object[] {Timestamp.valueOf("2019-05-23 17:32:55")}) + .commit(); + + List results = + CollectionUtil.iteratorToList( + tableEnv.sqlQuery("select myyearrename(ts) as y from src") + .execute() + .collect()); + Assert.assertEquals(2, results.size()); + Assert.assertEquals("[+I[2013], +I[2019]]", results.toString()); + } finally { + tableEnv.executeSql("drop table src"); + } + } + private static class JavaToScala implements MapFunction, scala.Tuple2> { diff --git a/flink-python/pyflink/table/catalog.py b/flink-python/pyflink/table/catalog.py index 5168e4bf4fad8..12c1fac6d2778 100644 --- a/flink-python/pyflink/table/catalog.py +++ b/flink-python/pyflink/table/catalog.py @@ -426,6 +426,23 @@ def drop_function(self, function_path: 'ObjectPath', ignore_if_not_exists: bool) """ self._j_catalog.dropFunction(function_path._j_object_path, ignore_if_not_exists) + def rename_function(self, function_path: 'ObjectPath', new_function_name: str, + ignore_if_not_exists: bool): + """ + Rename a function. + + :param function_path: Path :class:`ObjectPath` of the function to be renamed. + :param ignore_if_not_exists: Flag to specify behavior if the function does not exist: + if set to false, throw an exception + if set to true, nothing happens. + :raise: CatalogException in case of any runtime exception. + FunctionNotExistException if the function does not exist. + FunctionAlreadyExistException if function with new_function_name already exists. + """ + self._j_catalog.renameFunction(function_path._j_object_path, + new_function_name, + ignore_if_not_exists) + def get_table_statistics(self, table_path: 'ObjectPath') -> 'CatalogTableStatistics': """ Get the statistics of a table. diff --git a/flink-python/pyflink/table/tests/test_catalog.py b/flink-python/pyflink/table/tests/test_catalog.py index dcc1bc7d9cea2..74d3905beb337 100644 --- a/flink-python/pyflink/table/tests/test_catalog.py +++ b/flink-python/pyflink/table/tests/test_catalog.py @@ -650,6 +650,37 @@ def test_drop_function_function_not_exist_ignored(self): self.catalog.drop_function(self.non_exist_object_path, True) self.catalog.drop_database(self.db1, False) + def test_rename_function_function_not_exist_exception(self): + self.catalog.create_database(self.db1, self.create_db(), False) + + with self.assertRaises(FunctionNotExistException): + self.catalog.rename_function(self.path1, self.t2, False) + + def test_rename_function_function_not_exist_exception_ignored(self): + self.catalog.create_database(self.db1, self.create_db(), False) + self.catalog.rename_function(self.path1, self.t2, True) + + def test_rename_function_function_already_exist_exception(self): + self.catalog.create_database(self.db1, self.create_db(), False) + self.catalog.create_function(self.path1, self.create_function(), False) + self.catalog.create_function(self.path3, self.create_another_function(), False) + + with self.assertRaises(FunctionAlreadyExistException): + self.catalog.rename_function(self.path1, self.t2, False) + + def test_rename_function(self): + self.catalog.create_database(self.db1, self.create_db(), False) + function = self.create_function() + self.catalog.create_function(self.path1, function, False) + + self.check_catalog_function_equals(function, self.catalog.get_function(self.path1)) + + self.catalog.rename_function(self.path1, self.t2, False) + + self.check_catalog_function_equals(function, self.catalog.get_function(self.path3)) + self.assertFalse(self.catalog.function_exists(self.path1)) + self.assertTrue(self.catalog.function_exists(self.path3)) + def test_create_partition(self): self.catalog.create_database(self.db1, self.create_db(), False) self.catalog.create_table(self.path1, self.create_partitioned_table(), False) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java index 9de620f1cdfeb..6d9f4064c6e2a 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java @@ -437,6 +437,26 @@ public void dropFunction(ObjectPath path, boolean ignoreIfNotExists) } } + @Override + public void renameFunction( + ObjectPath functionPath, String newFunctionName, boolean ignoreIfNotExists) + throws FunctionNotExistException, FunctionAlreadyExistException { + checkNotNull(functionPath); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(newFunctionName)); + + if (functionExists(functionPath)) { + ObjectPath newPath = new ObjectPath(functionPath.getDatabaseName(), newFunctionName); + + if (functionExists(newPath)) { + throw new FunctionAlreadyExistException(getName(), newPath); + } else { + functions.put(newPath, functions.remove(functionPath)); + } + } else if (!ignoreIfNotExists) { + throw new FunctionNotExistException(getName(), functionPath); + } + } + @Override public List listFunctions(String databaseName) throws DatabaseNotExistException { checkArgument( diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java index e0e4d3e479946..8c0e9239f7edc 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java @@ -520,6 +520,23 @@ void alterFunction( void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException; + /** + * Rename an existing function. + * + * @param functionPath path of the function to be renamed + * @param newFunctionName the new name of the function + * @param ignoreIfNotExists flag to specify behavior if the function does not exist: if set to + * false, throw an exception if set to true, nothing happens + * @throws FunctionNotExistException if the function does not exists + * @throws FunctionAlreadyExistException if the function with newFunctionName already exists + * @throws CatalogException in case of any runtime exception + */ + default void renameFunction( + ObjectPath functionPath, String newFunctionName, boolean ignoreIfNotExists) + throws FunctionNotExistException, FunctionAlreadyExistException, CatalogException { + throw new UnsupportedOperationException("this operation is not supported"); + } + // ------ statistics ------ /** diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java index 1fc421fbaa0e9..5726ce9e2e62c 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java @@ -97,6 +97,9 @@ public void cleanup() throws Exception { if (catalog.functionExists(path1)) { catalog.dropFunction(path1, true); } + if (catalog.functionExists(path3)) { + catalog.dropFunction(path3, true); + } if (catalog.databaseExists(db1)) { catalog.dropDatabase(db1, true, false); } @@ -772,6 +775,47 @@ public void testDropFunction_FunctionNotExist_ignored() throws Exception { catalog.dropDatabase(db1, false, false); } + @Test + public void testRenameFunction_FunctionNotExistsException() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + exception.expect(FunctionNotExistException.class); + exception.expectMessage("Function db1.t1 does not exist in Catalog"); + catalog.renameFunction(path1, t2, false); + } + + @Test + public void testRenameFunction_FunctionNotExistException_ignored() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.renameFunction(path1, t2, true); + } + + @Test + public void testRenameFunction_FunctionAlreadyExistsException() throws Exception { + catalog.createDatabase(db1, createDb(), false); + CatalogFunction function = createFunction(); + catalog.createFunction(path1, function, false); + catalog.createFunction(path3, createAnotherFunction(), false); + + exception.expect(FunctionAlreadyExistException.class); + exception.expectMessage("Function db1.t2 already exists in Catalog"); + catalog.renameFunction(path1, t2, false); + } + + @Test + public void testRenameFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + CatalogFunction function = createFunction(); + catalog.createFunction(path1, function, false); + + checkEquals(function, catalog.getFunction(path1)); + + catalog.renameFunction(path1, t2, false); + + checkEquals(function, catalog.getFunction(path3)); + assertFalse(catalog.functionExists(path1)); + } + // ------ partitions ------ @Test