Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void testInsertIntoNonPartitionTable() throws Exception {
tableEnv.registerTable("src", src);

tableEnv.registerCatalog("hive", hiveCatalog);
tableEnv.sqlQuery("select * from src").insertInto("hive", "default", "dest");
tableEnv.sqlQuery("select * from src").insertInto("hive.`default`.dest");
tableEnv.execute("mytest");

verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName));
Expand Down Expand Up @@ -138,7 +138,7 @@ public void testWriteComplexType() throws Exception {
tableEnv.registerTable("complexSrc", src);

tableEnv.registerCatalog("hive", hiveCatalog);
tableEnv.sqlQuery("select * from complexSrc").insertInto("hive", "default", "dest");
tableEnv.sqlQuery("select * from complexSrc").insertInto("hive.`default`.dest");
tableEnv.execute("mytest");

List<String> result = hiveShell.executeQuery("select * from " + tblName);
Expand Down Expand Up @@ -177,7 +177,7 @@ public void testWriteNestedComplexType() throws Exception {
Table src = tableEnv.fromTableSource(new CollectionTableSource(toWrite, rowTypeInfo));
tableEnv.registerTable("nestedSrc", src);
tableEnv.registerCatalog("hive", hiveCatalog);
tableEnv.sqlQuery("select * from nestedSrc").insertInto("hive", "default", "dest");
tableEnv.sqlQuery("select * from nestedSrc").insertInto("hive.`default`.dest");
tableEnv.execute("mytest");

List<String> result = hiveShell.executeQuery("select * from " + tblName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,7 @@ public static List<Row> collectTable(TableEnvironment tableEnv, Table table) thr
sink.init(serializer, id);
String sinkName = UUID.randomUUID().toString();
tableEnv.registerTableSink(sinkName, sink);
final String builtInCatalogName = EnvironmentSettings.DEFAULT_BUILTIN_CATALOG;
final String builtInDBName = EnvironmentSettings.DEFAULT_BUILTIN_DATABASE;
tableEnv.insertInto(table, builtInCatalogName, builtInDBName, sinkName);
tableEnv.insertInto(table, sinkName);
JobExecutionResult result = tableEnv.execute("collect-table");
ArrayList<byte[]> data = result.getAccumulatorResult(id);
return SerializedListAccumulator.deserializeList(data, serializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
Expand Down Expand Up @@ -564,8 +563,7 @@ private List<Row> collectBatchResult(Table table) throws Exception {
schema.getFieldNames(), types.toArray(new TypeInformation[0]));
return JavaScalaConversionUtil.toJava(
BatchTableEnvUtil.collect(
t.getTableEnvironment(), table, configuredSink, Option.apply("JOB"),
EnvironmentSettings.DEFAULT_BUILTIN_CATALOG, EnvironmentSettings.DEFAULT_BUILTIN_DATABASE));
t.getTableEnvironment(), table, configuredSink, Option.apply("JOB")));
}
}

Expand Down
13 changes: 4 additions & 9 deletions flink-python/pyflink/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ def drop_columns(self, fields):
"""
return Table(self._j_table.dropColumns(fields))

def insert_into(self, table_path, *table_path_continued):
def insert_into(self, table_path):
"""
Writes the :class:`Table` to a :class:`TableSink` that was registered under
the specified name. For the path resolution algorithm see
Expand All @@ -619,15 +619,10 @@ def insert_into(self, table_path, *table_path_continued):

>>> tab.insert_into("sink")

:param table_path: The first part of the path of the registered :class:`TableSink` to which
the :class:`Table` is written. This is to ensure at least the name of the
:class:`Table` is provided.
:param table_path_continued: The remaining part of the path of the registered
:class:`TableSink` to which the :class:`Table` is written.
:param table_path: The path of the registered :class:`TableSink` to which
the :class:`Table` is written.
"""
gateway = get_gateway()
j_table_path = to_jarray(gateway.jvm.String, table_path_continued)
self._j_table.insertInto(table_path, j_table_path)
self._j_table.insertInto(table_path)

def get_schema(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@ def excluded_methods(cls):
'getCompletionHints',
'create',
'loadModule',
'unloadModule'}
'unloadModule',
'listTemporaryTables',
'createTemporaryView',
'dropTemporaryTable',
'listTemporaryViews',
'from',
'dropTemporaryView'}


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.QueryConfig;
import org.apache.flink.table.api.StreamQueryConfig;
import org.apache.flink.table.api.Table;
Expand Down Expand Up @@ -494,8 +493,6 @@ private <C> ResultDescriptor executeQueryInternal(ExecutionContext<C> context, S
envInst.getTableEnvironment().registerTableSink(jobName, result.getTableSink());
table.insertInto(
envInst.getQueryConfig(),
EnvironmentSettings.DEFAULT_BUILTIN_CATALOG,
EnvironmentSettings.DEFAULT_BUILTIN_DATABASE,
jobName);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,13 @@ public void testValidateSession() throws Exception {

List<String> actualTables = executor.listTables(session);
List<String> expectedTables = Arrays.asList(
"AdditionalView1",
"AdditionalView2",
"TableNumber1",
"TableNumber2",
"TableSourceSink",
"TestView1",
"TestView2",
"AdditionalView1",
"AdditionalView2");
"TestView2");
assertEquals(expectedTables, actualTables);

session.removeView("AdditionalView1");
Expand Down Expand Up @@ -183,8 +183,8 @@ public void testListCatalogs() throws Exception {
final List<String> actualCatalogs = executor.listCatalogs(session);

final List<String> expectedCatalogs = Arrays.asList(
"default_catalog",
"catalog1",
"default_catalog",
"simple-catalog");
assertEquals(expectedCatalogs, actualCatalogs);
}
Expand Down
8 changes: 8 additions & 0 deletions flink-table/flink-table-api-java-bridge/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,13 @@ under the License.
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -109,24 +109,50 @@ public interface BatchTableEnvironment extends TableEnvironment {
<T> Table fromDataSet(DataSet<T> dataSet, String fields);

/**
* Registers the given {@link DataSet} as table in the
* {@link TableEnvironment}'s catalog.
* Registered tables can be referenced in SQL queries.
* Creates a view from the given {@link DataSet}.
* Registered views can be referenced in SQL queries.
*
* The field names of the {@link Table} are automatically derived from the type of the{@link DataSet}.
* <p>The field names of the {@link Table} are automatically derived
* from the type of the {@link DataSet}.
*
* <p>The view is registered in the namespace of the current catalog and database. To register the view in
* a different catalog use {@link #createTemporaryView(String, DataSet)}.
*
* <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will
* be inaccessible in the current session. To make the permanent object available again you can drop the
* corresponding temporary object.
*
* @param name The name under which the {@link DataSet} is registered in the catalog.
* @param dataSet The {@link DataSet} to register.
* @param <T> The type of the {@link DataSet} to register.
* @deprecated use {@link #createTemporaryView(String, DataSet)}
*/
@Deprecated
<T> void registerDataSet(String name, DataSet<T> dataSet);

/**
* Registers the given {@link DataSet} as table with specified field names in the
* {@link TableEnvironment}'s catalog.
* Registered tables can be referenced in SQL queries.
* Creates a view from the given {@link DataSet} in a given path.
* Registered views can be referenced in SQL queries.
*
* Example:
* <p>The field names of the {@link Table} are automatically derived
* from the type of the {@link DataSet}.
*
* <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will
* be inaccessible in the current session. To make the permanent object available again you can drop the
* corresponding temporary object.
*
* @param path The path under which the view is created.
* See also the {@link TableEnvironment} class description for the format of the path.
* @param dataSet The {@link DataSet} out of which to create the view.
* @param <T> The type of the {@link DataSet}.
*/
<T> void createTemporaryView(String path, DataSet<T> dataSet);

/**
* Creates a view from the given {@link DataSet} in a given path with specified field names.
* Registered views can be referenced in SQL queries.
*
* <p>Example:
*
* <pre>
* {@code
Expand All @@ -135,13 +161,47 @@ public interface BatchTableEnvironment extends TableEnvironment {
* }
* </pre>
*
* <p>The view is registered in the namespace of the current catalog and database. To register the view in
* a different catalog use {@link #createTemporaryView(String, DataSet)}.
*
* <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will
* be inaccessible in the current session. To make the permanent object available again you can drop the
* corresponding temporary object.
*
* @param name The name under which the {@link DataSet} is registered in the catalog.
* @param dataSet The {@link DataSet} to register.
* @param fields The field names of the registered table.
* @param fields The field names of the registered view.
* @param <T> The type of the {@link DataSet} to register.
* @deprecated use {@link #createTemporaryView(String, DataSet, String)}
*/
@Deprecated
<T> void registerDataSet(String name, DataSet<T> dataSet, String fields);

/**
* Creates a view from the given {@link DataSet} in a given path with specified field names.
* Registered views can be referenced in SQL queries.
*
* <p>Example:
*
* <pre>
* {@code
* DataSet<Tuple2<String, Long>> set = ...
* tableEnv.createTemporaryView("cat.db.myTable", set, "a, b");
* }
* </pre>
*
* <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will
* be inaccessible in the current session. To make the permanent object available again you can drop the
* corresponding temporary object.
*
* @param path The path under which the view is created.
* See also the {@link TableEnvironment} class description for the format of the path.
* @param dataSet The {@link DataSet} out of which to create the view.
* @param fields The field names of the registered view.
* @param <T> The type of the {@link DataSet}.
*/
<T> void createTemporaryView(String path, DataSet<T> dataSet, String fields);

/**
* Converts the given {@link Table} into a {@link DataSet} of a specified type.
*
Expand Down Expand Up @@ -249,7 +309,9 @@ public interface BatchTableEnvironment extends TableEnvironment {
* written. This is to ensure at least the name of the {@link TableSink} is provided.
* @param sinkPathContinued The remaining part of the path of the registered {@link TableSink} to which the
* {@link Table} is written.
* @deprecated use {@link #insertInto(String, Table)}
*/
@Deprecated
void insertInto(Table table, BatchQueryConfig queryConfig, String sinkPath, String... sinkPathContinued);

/**
Expand Down
Loading