Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/content.zh/docs/dev/table/catalogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,9 @@ catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);
// alter function
catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);

// rename function
catalog.renameFunction(new ObjectPath("mydb", "myfunc"), "my_new_func", false);

// get function
catalog.getFunction("myfunc");

Expand All @@ -561,6 +564,9 @@ catalog.drop_function(ObjectPath("mydb", "myfunc"), False)
# alter function
catalog.alter_function(ObjectPath("mydb", "myfunc"), catalog_function, False)

# rename function
catalog.rename_function(ObjectPath("mydb", "myfunc"), "my_new_func", False)

# get function
catalog.get_function("myfunc")

Expand Down
6 changes: 6 additions & 0 deletions docs/content/docs/dev/table/catalogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,9 @@ catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);
// alter function
catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);

// rename function
catalog.renameFunction(new ObjectPath("mydb", "myfunc"), "my_new_func", false);

// get function
catalog.getFunction("myfunc");

Expand All @@ -565,6 +568,9 @@ catalog.drop_function(ObjectPath("mydb", "myfunc"), False)
# alter function
catalog.alter_function(ObjectPath("mydb", "myfunc"), catalog_function, False)

# rename function
catalog.rename_function(ObjectPath("mydb", "myfunc"), "my_new_func", False)

# get function
catalog.get_function("myfunc")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1286,6 +1286,39 @@ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
}
}

@Override
public void renameFunction(
ObjectPath functionPath, String newFunctionName, boolean ignoreIfNotExists)
throws FunctionNotExistException, FunctionAlreadyExistException, CatalogException {
checkNotNull(functionPath, "functionPath cannot be null");
checkArgument(
!isNullOrWhitespaceOnly(newFunctionName),
"newFunctionName cannot be null or empty");

try {
if (functionExists(functionPath)) {
ObjectPath newPath =
new ObjectPath(functionPath.getDatabaseName(), newFunctionName);

if (functionExists(newPath)) {
throw new FunctionAlreadyExistException(getName(), newPath);
} else {
Function function =
client.getFunction(
functionPath.getDatabaseName(), functionPath.getObjectName());
function.setFunctionName(newFunctionName);
client.alterFunction(
functionPath.getDatabaseName(), functionPath.getObjectName(), function);
}
} else if (!ignoreIfNotExists) {
throw new FunctionNotExistException(getName(), functionPath);
}
} catch (TException e) {
throw new CatalogException(
String.format("Failed to rename function %s", functionPath.getFullName()), e);
}
}

@Override
public List<String> listFunctions(String databaseName)
throws DatabaseNotExistException, CatalogException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,38 @@ public void testDateUDF() throws Exception {
}
}

@Test
public void testRenameUDF() throws Exception {

TableEnvironment tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tableEnv.useCatalog(hiveCatalog.getName());
tableEnv.executeSql(
String.format("create function myyeartest as '%s'", UDFYear.class.getName()));
tableEnv.executeSql("create table src(ts timestamp)");

// Rename function myyear to myyearrename
ObjectPath myYearObjectPath = new ObjectPath(HiveCatalog.DEFAULT_DB, "myyeartest");
hiveCatalog.renameFunction(myYearObjectPath, "myyearrename", false);

try {
HiveTestUtils.createTextTableInserter(hiveCatalog, "default", "src")
.addRow(new Object[] {Timestamp.valueOf("2013-07-15 10:00:00")})
.addRow(new Object[] {Timestamp.valueOf("2019-05-23 17:32:55")})
.commit();

List<Row> results =
CollectionUtil.iteratorToList(
tableEnv.sqlQuery("select myyearrename(ts) as y from src")
.execute()
.collect());
Assert.assertEquals(2, results.size());
Assert.assertEquals("[+I[2013], +I[2019]]", results.toString());
} finally {
tableEnv.executeSql("drop table src");
}
}

private static class JavaToScala
implements MapFunction<Tuple2<Boolean, Row>, scala.Tuple2<Boolean, Row>> {

Expand Down
17 changes: 17 additions & 0 deletions flink-python/pyflink/table/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,23 @@ def drop_function(self, function_path: 'ObjectPath', ignore_if_not_exists: bool)
"""
self._j_catalog.dropFunction(function_path._j_object_path, ignore_if_not_exists)

def rename_function(self, function_path: 'ObjectPath', new_function_name: str,
ignore_if_not_exists: bool):
"""
Rename a function.

:param function_path: Path :class:`ObjectPath` of the function to be renamed.
:param ignore_if_not_exists: Flag to specify behavior if the function does not exist:
if set to false, throw an exception
if set to true, nothing happens.
:raise: CatalogException in case of any runtime exception.
FunctionNotExistException if the function does not exist.
FunctionAlreadyExistException if function with new_function_name already exists.
"""
self._j_catalog.renameFunction(function_path._j_object_path,
new_function_name,
ignore_if_not_exists)

def get_table_statistics(self, table_path: 'ObjectPath') -> 'CatalogTableStatistics':
"""
Get the statistics of a table.
Expand Down
31 changes: 31 additions & 0 deletions flink-python/pyflink/table/tests/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,37 @@ def test_drop_function_function_not_exist_ignored(self):
self.catalog.drop_function(self.non_exist_object_path, True)
self.catalog.drop_database(self.db1, False)

def test_rename_function_function_not_exist_exception(self):
self.catalog.create_database(self.db1, self.create_db(), False)

with self.assertRaises(FunctionNotExistException):
self.catalog.rename_function(self.path1, self.t2, False)

def test_rename_function_function_not_exist_exception_ignored(self):
self.catalog.create_database(self.db1, self.create_db(), False)
self.catalog.rename_function(self.path1, self.t2, True)

def test_rename_function_function_already_exist_exception(self):
self.catalog.create_database(self.db1, self.create_db(), False)
self.catalog.create_function(self.path1, self.create_function(), False)
self.catalog.create_function(self.path3, self.create_another_function(), False)

with self.assertRaises(FunctionAlreadyExistException):
self.catalog.rename_function(self.path1, self.t2, False)

def test_rename_function(self):
self.catalog.create_database(self.db1, self.create_db(), False)
function = self.create_function()
self.catalog.create_function(self.path1, function, False)

self.check_catalog_function_equals(function, self.catalog.get_function(self.path1))

self.catalog.rename_function(self.path1, self.t2, False)

self.check_catalog_function_equals(function, self.catalog.get_function(self.path3))
self.assertFalse(self.catalog.function_exists(self.path1))
self.assertTrue(self.catalog.function_exists(self.path3))

def test_create_partition(self):
self.catalog.create_database(self.db1, self.create_db(), False)
self.catalog.create_table(self.path1, self.create_partitioned_table(), False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,26 @@ public void dropFunction(ObjectPath path, boolean ignoreIfNotExists)
}
}

@Override
public void renameFunction(
ObjectPath functionPath, String newFunctionName, boolean ignoreIfNotExists)
throws FunctionNotExistException, FunctionAlreadyExistException {
checkNotNull(functionPath);
checkArgument(!StringUtils.isNullOrWhitespaceOnly(newFunctionName));

if (functionExists(functionPath)) {
ObjectPath newPath = new ObjectPath(functionPath.getDatabaseName(), newFunctionName);

if (functionExists(newPath)) {
throw new FunctionAlreadyExistException(getName(), newPath);
} else {
functions.put(newPath, functions.remove(functionPath));
}
} else if (!ignoreIfNotExists) {
throw new FunctionNotExistException(getName(), functionPath);
}
}

@Override
public List<String> listFunctions(String databaseName) throws DatabaseNotExistException {
checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,23 @@ void alterFunction(
void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
throws FunctionNotExistException, CatalogException;

/**
* Rename an existing function.
*
* @param functionPath path of the function to be renamed
* @param newFunctionName the new name of the function
* @param ignoreIfNotExists flag to specify behavior if the function does not exist: if set to
* false, throw an exception if set to true, nothing happens
* @throws FunctionNotExistException if the function does not exists
* @throws FunctionAlreadyExistException if the function with newFunctionName already exists
* @throws CatalogException in case of any runtime exception
*/
default void renameFunction(
ObjectPath functionPath, String newFunctionName, boolean ignoreIfNotExists)
throws FunctionNotExistException, FunctionAlreadyExistException, CatalogException {
throw new UnsupportedOperationException("this operation is not supported");
}

// ------ statistics ------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ public void cleanup() throws Exception {
if (catalog.functionExists(path1)) {
catalog.dropFunction(path1, true);
}
if (catalog.functionExists(path3)) {
catalog.dropFunction(path3, true);
}
if (catalog.databaseExists(db1)) {
catalog.dropDatabase(db1, true, false);
}
Expand Down Expand Up @@ -772,6 +775,47 @@ public void testDropFunction_FunctionNotExist_ignored() throws Exception {
catalog.dropDatabase(db1, false, false);
}

@Test
public void testRenameFunction_FunctionNotExistsException() throws Exception {
catalog.createDatabase(db1, createDb(), false);

exception.expect(FunctionNotExistException.class);
exception.expectMessage("Function db1.t1 does not exist in Catalog");
catalog.renameFunction(path1, t2, false);
}

@Test
public void testRenameFunction_FunctionNotExistException_ignored() throws Exception {
catalog.createDatabase(db1, createDb(), false);
catalog.renameFunction(path1, t2, true);
}

@Test
public void testRenameFunction_FunctionAlreadyExistsException() throws Exception {
catalog.createDatabase(db1, createDb(), false);
CatalogFunction function = createFunction();
catalog.createFunction(path1, function, false);
catalog.createFunction(path3, createAnotherFunction(), false);

exception.expect(FunctionAlreadyExistException.class);
exception.expectMessage("Function db1.t2 already exists in Catalog");
catalog.renameFunction(path1, t2, false);
}

@Test
public void testRenameFunction() throws Exception {
catalog.createDatabase(db1, createDb(), false);
CatalogFunction function = createFunction();
catalog.createFunction(path1, function, false);

checkEquals(function, catalog.getFunction(path1));

catalog.renameFunction(path1, t2, false);

checkEquals(function, catalog.getFunction(path3));
assertFalse(catalog.functionExists(path1));
}

// ------ partitions ------

@Test
Expand Down