Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions flink-python/pyflink/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,18 +512,25 @@ def drop_columns(self, fields):
"""
return Table(self._j_table.dropColumns(fields))

def insert_into(self, table_name):
def insert_into(self, table_path, *table_path_continued):
"""
Writes the :class:`Table` to a :class:`TableSink` that was registered under
the specified name.
the specified name. For the path resolution algorithm see
:func:`~TableEnvironment.useDatabase`.

Example:
::
>>> tab.insert_into("print")

:param table_name: Name of the :class:`TableSink` to which the :class:`Table` is written.
:param table_path: The first part of the path of the registered :class:`TableSink` to which
the :class:`Table` is written. This is to ensure at least the name of the
:class:`Table` is provided.
:param table_path_continued: The remaining part of the path of the registered
:class:`TableSink` to which the :class:`Table` is written.
"""
self._j_table.insertInto(table_name)
gateway = get_gateway()
j_table_path = to_jarray(gateway.jvm.String, table_path_continued)
self._j_table.insertInto(table_path, j_table_path)

def print_schema(self):
"""
Expand Down
114 changes: 114 additions & 0 deletions flink-python/pyflink/table/table_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,120 @@ def sql_update(self, stmt, query_config=None):
else:
self._j_tenv.sqlUpdate(stmt)

def get_current_catalog(self):
"""
Gets the current default catalog name of the current session.

:return The current default catalog name that is used for the path resolution.
.. seealso:: :func:`~pyflink.table.TableEnvironment.use_catalog`
"""
self._j_tenv.getCurrentCatalog()

def use_catalog(self, catalog_name):
"""
Sets the current catalog to the given value. It also sets the default
database to the catalog's default one.
See also :func:`~TableEnvironment.use_database`.

This is used during the resolution of object paths. Both the catalog and database are
optional when referencing catalog objects such as tables, views etc. The algorithm looks for
requested objects in following paths in that order:

* ``[current-catalog].[current-database].[requested-path]``
* ``[current-catalog].[requested-path]``
* ``[requested-path]``

Example:

Given structure with default catalog set to ``default_catalog`` and default database set to
``default_database``. ::

root:
|- default_catalog
|- default_database
|- tab1
|- db1
|- tab1
|- cat1
|- db1
|- tab1

The following table describes resolved paths:

+----------------+-----------------------------------------+
| Requested path | Resolved path |
+================+=========================================+
| tab1 | default_catalog.default_database.tab1 |
+----------------+-----------------------------------------+
| db1.tab1 | default_catalog.db1.tab1 |
+----------------+-----------------------------------------+
| cat1.db1.tab1 | cat1.db1.tab1 |
+----------------+-----------------------------------------+

:param: catalog_name: The name of the catalog to set as the current default catalog.
:throws: CatalogException thrown if a catalog with given name could not be set as the
default one
.. seealso:: :func:`~pyflink.table.TableEnvironment.use_database`
"""
self._j_tenv.useCatalog(catalog_name)

def get_current_database(self):
"""
Gets the current default database name of the running session.

:return The name of the current database of the current catalog.
.. seealso:: :func:`~pyflink.table.TableEnvironment.use_database`
"""
self._j_tenv.getCurrentCatalog()

def use_database(self, database_name):
"""
Sets the current default database. It has to exist in the current catalog. That path will
be used as the default one when looking for unqualified object names.

This is used during the resolution of object paths. Both the catalog and database are
optional when referencing catalog objects such as tables, views etc. The algorithm looks for
requested objects in following paths in that order:

* ``[current-catalog].[current-database].[requested-path]``
* ``[current-catalog].[requested-path]``
* ``[requested-path]``

Example:

Given structure with default catalog set to ``default_catalog`` and default database set to
``default_database``. ::

root:
|- default_catalog
|- default_database
|- tab1
|- db1
|- tab1
|- cat1
|- db1
|- tab1

The following table describes resolved paths:

+----------------+-----------------------------------------+
| Requested path | Resolved path |
+================+=========================================+
| tab1 | default_catalog.default_database.tab1 |
+----------------+-----------------------------------------+
| db1.tab1 | default_catalog.db1.tab1 |
+----------------+-----------------------------------------+
| cat1.db1.tab1 | cat1.db1.tab1 |
+----------------+-----------------------------------------+

:throws: CatalogException thrown if the given catalog and database could not be set as
the default ones
.. seealso:: :func:`~pyflink.table.TableEnvironment.use_catalog`

:param: database_name: The name of the database to set as the current database.
"""
self._j_tenv.useDatabase(database_name)

def execute(self, job_name=None):
"""
Triggers the program execution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ def java_class(cls):
@classmethod
def excluded_methods(cls):
# registerFunction and listUserDefinedFunctions should be supported when UDFs supported.
# registerExternalCatalog, getRegisteredExternalCatalog and listTables
# should be supported when catalog supported in python.
# registerExternalCatalog, getRegisteredExternalCatalog, registerCatalog, getCatalog and
# listTables should be supported when catalog supported in python.
# getCompletionHints has been deprecated. It will be removed in the next release.
return {'registerExternalCatalog', 'getRegisteredExternalCatalog',
'registerFunction', 'listUserDefinedFunctions', 'listTables', 'getCompletionHints'}
return {'registerExternalCatalog', 'getRegisteredExternalCatalog', 'registerCatalog',
'getCatalog', 'registerFunction', 'listUserDefinedFunctions', 'listTables',
'getCompletionHints'}


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
import org.apache.commons.cli.Options;

import java.net.URL;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
Expand Down Expand Up @@ -115,8 +115,8 @@ public ExecutionContext(Environment defaultEnvironment, SessionContext sessionCo
this.getClass().getClassLoader());

// create table sources & sinks.
tableSources = new HashMap<>();
tableSinks = new HashMap<>();
tableSources = new LinkedHashMap<>();
tableSinks = new LinkedHashMap<>();
mergedEnv.getTables().forEach((name, entry) -> {
if (entry instanceof SourceTableEntry || entry instanceof SourceSinkTableEntry) {
tableSources.put(name, createTableSource(mergedEnv.getExecution(), entry.asMap(), classLoader));
Expand All @@ -127,7 +127,7 @@ public ExecutionContext(Environment defaultEnvironment, SessionContext sessionCo
});

// create user-defined functions
functions = new HashMap<>();
functions = new LinkedHashMap<>();
mergedEnv.getFunctions().forEach((name, entry) -> {
final UserDefinedFunction function = FunctionService.createFunction(entry.getDescriptor(), classLoader, false);
functions.put(name, function);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ public void testValidateSession() throws Exception {

List<String> actualTables = executor.listTables(session);
List<String> expectedTables = Arrays.asList(
"AdditionalView1",
"AdditionalView2",
"TableNumber1",
"TableNumber2",
"TableSourceSink",
"TestView1",
"TestView2");
"TestView2",
"AdditionalView1",
"AdditionalView2");
assertEquals(expectedTables, actualTables);

session.removeView("AdditionalView1");
Expand Down Expand Up @@ -229,9 +229,9 @@ public void testCompleteStatement() throws Exception {
final SessionContext session = new SessionContext("test-session", new Environment());

final List<String> expectedTableHints = Arrays.asList(
"TableNumber1",
"TableNumber2",
"TableSourceSink");
"default_catalog.default_database.TableNumber1",
"default_catalog.default_database.TableNumber2",
"default_catalog.default_database.TableSourceSink");
assertEquals(expectedTableHints, executor.completeStatement(session, "SELECT * FROM Ta", 16));

final List<String> expectedClause = Collections.singletonList("WHERE");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.descriptors.BatchTableDescriptor;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.functions.AggregateFunction;
Expand Down Expand Up @@ -283,9 +285,13 @@ static BatchTableEnvironment create(ExecutionEnvironment executionEnvironment) {
*/
static BatchTableEnvironment create(ExecutionEnvironment executionEnvironment, TableConfig tableConfig) {
try {
Class clazz = Class.forName("org.apache.flink.table.api.java.BatchTableEnvImpl");
Constructor con = clazz.getConstructor(ExecutionEnvironment.class, TableConfig.class);
return (BatchTableEnvironment) con.newInstance(executionEnvironment, tableConfig);
Class<?> clazz = Class.forName("org.apache.flink.table.api.java.BatchTableEnvImpl");
Constructor con = clazz.getConstructor(ExecutionEnvironment.class, TableConfig.class, CatalogManager.class);
CatalogManager catalogManager = new CatalogManager(
tableConfig.getBuiltInCatalogName(),
new GenericInMemoryCatalog(tableConfig.getBuiltInCatalogName(), tableConfig.getBuiltInDatabaseName())
);
return (BatchTableEnvironment) con.newInstance(executionEnvironment, tableConfig, catalogManager);
} catch (Throwable t) {
throw new TableException("Create BatchTableEnvironment failed.", t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.descriptors.StreamTableDescriptor;
import org.apache.flink.table.functions.AggregateFunction;
Expand Down Expand Up @@ -400,9 +402,15 @@ static StreamTableEnvironment create(StreamExecutionEnvironment executionEnviron
*/
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, TableConfig tableConfig) {
try {
Class clazz = Class.forName("org.apache.flink.table.api.java.StreamTableEnvImpl");
Constructor con = clazz.getConstructor(StreamExecutionEnvironment.class, TableConfig.class);
return (StreamTableEnvironment) con.newInstance(executionEnvironment, tableConfig);
Class<?> clazz = Class.forName("org.apache.flink.table.api.java.StreamTableEnvImpl");
Constructor con = clazz.getConstructor(
StreamExecutionEnvironment.class,
TableConfig.class,
CatalogManager.class);
CatalogManager catalogManager = new CatalogManager(
tableConfig.getBuiltInCatalogName(),
new GenericInMemoryCatalog(tableConfig.getBuiltInCatalogName(), tableConfig.getBuiltInDatabaseName()));
return (StreamTableEnvironment) con.newInstance(executionEnvironment, tableConfig, catalogManager);
} catch (Throwable t) {
throw new TableException("Create StreamTableEnvironment failed.", t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,33 +812,57 @@ public interface Table {
Table fetch(int fetch);

/**
* Writes the {@link Table} to a {@link TableSink} that was registered under the specified name.
* Writes the {@link Table} to a {@link TableSink} that was registered under the specified path.
* For the path resolution algorithm see {@link TableEnvironment#useDatabase(String)}.
*
* <p>A batch {@link Table} can only be written to a
* {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a
* {@code org.apache.flink.table.sinks.AppendStreamTableSink}, a
* {@code org.apache.flink.table.sinks.RetractStreamTableSink}, or an
* {@code org.apache.flink.table.sinks.UpsertStreamTableSink}.
*
* @param tableName Name of the registered {@link TableSink} to which the {@link Table} is
* written.
* @param tablePath The first part of the path of the registered {@link TableSink} to which the {@link Table} is
* written. This is to ensure at least the name of the {@link TableSink} is provided.
* @param tablePathContinued The remaining part of the path of the registered {@link TableSink} to which the
* {@link Table} is written.
*/
void insertInto(String tableName);
void insertInto(String tablePath, String... tablePathContinued);

/**
* Writes the {@link Table} to a {@link TableSink} that was registered under the specified name.
* Writes the {@link Table} to a {@link TableSink} that was registered under the specified name
* in the initial default catalog.
*
* <p>A batch {@link Table} can only be written to a
* {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a
* {@code org.apache.flink.table.sinks.AppendStreamTableSink}, a
* {@code org.apache.flink.table.sinks.RetractStreamTableSink}, or an
* {@code org.apache.flink.table.sinks.UpsertStreamTableSink}.
*
* @param tableName Name of the {@link TableSink} to which the {@link Table} is written.
* @param tableName The name of the {@link TableSink} to which the {@link Table} is written.
* @param conf The {@link QueryConfig} to use.
* @deprecated use {@link #insertInto(QueryConfig, String, String...)}
*/
@Deprecated
void insertInto(String tableName, QueryConfig conf);

/**
* Writes the {@link Table} to a {@link TableSink} that was registered under the specified path.
* For the path resolution algorithm see {@link TableEnvironment#useDatabase(String)}.
*
* <p>A batch {@link Table} can only be written to a
* {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a
* {@code org.apache.flink.table.sinks.AppendStreamTableSink}, a
* {@code org.apache.flink.table.sinks.RetractStreamTableSink}, or an
* {@code org.apache.flink.table.sinks.UpsertStreamTableSink}.
*
* @param conf The {@link QueryConfig} to use.
* @param tablePath The first part of the path of the registered {@link TableSink} to which the {@link Table} is
* written. This is to ensure at least the name of the {@link TableSink} is provided.
* @param tablePathContinued The remaining part of the path of the registered {@link TableSink} to which the
* {@link Table} is written.
*/
void insertInto(QueryConfig conf, String tablePath, String... tablePathContinued);

/**
* Groups the records of a table by assigning them to windows defined by a time or row interval.
*
Expand Down
Loading