Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,35 @@ void createTemporarySystemFunction(
*/
void createTable(String path, TableDescriptor descriptor);

/**
* Registers the given {@link TableDescriptor} as a catalog table.
*
* <p>The {@link TableDescriptor descriptor} is converted into a {@link CatalogTable} and stored
* in the catalog.
*
* <p>If the table should not be permanently stored in a catalog, use {@link
* #createTemporaryTable(String, TableDescriptor)} instead.
*
* <p>Examples:
*
* <pre>{@code
* tEnv.createTable("MyTable", TableDescriptor.forConnector("datagen")
* .schema(Schema.newBuilder()
* .column("f0", DataTypes.STRING())
* .build())
* .option(DataGenOptions.ROWS_PER_SECOND, 10)
* .option("fields.f0.kind", "random")
* .build());
* }</pre>
*
* @param path The path under which the table will be registered. See also the {@link
* TableEnvironment} class description for the format of the path.
* @param descriptor Template for creating a {@link CatalogTable} instance.
* @param ignoreIfExists If a table exists under the given path and this flag is set, no
* operation is executed. An exception is thrown otherwise.
*/
void createTable(String path, TableDescriptor descriptor, boolean ignoreIfExists);

/**
* Registers a {@link Table} under a unique name in the TableEnvironment's catalog. Registered
* tables can be referenced in SQL queries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,12 +495,17 @@ public void createTemporaryTable(String path, TableDescriptor descriptor) {

@Override
public void createTable(String path, TableDescriptor descriptor) {
this.createTable(path, descriptor, false);
}

@Override
public void createTable(String path, TableDescriptor descriptor, boolean ignoreIfExists) {
Preconditions.checkNotNull(path, "Path must not be null.");
Preconditions.checkNotNull(descriptor, "Table descriptor must not be null.");

final ObjectIdentifier tableIdentifier =
catalogManager.qualifyIdentifier(getParser().parseIdentifier(path));
catalogManager.createTable(descriptor.toCatalogTable(), tableIdentifier, false);
catalogManager.createTable(descriptor.toCatalogTable(), tableIdentifier, ignoreIfExists);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.apache.flink.table.factories.TestManagedTableFactory.ENRICHED_VALUE;
import static org.apache.flink.table.factories.TestManagedTableFactory.MANAGED_TABLES;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.entry;
import static org.assertj.core.api.InstanceOfAssertFactories.type;
Expand Down Expand Up @@ -100,6 +101,55 @@ void testCreateTableFromDescriptor() throws Exception {
.contains(entry("connector", "fake"), entry("a", "Test"));
}

@Test
void testCreateTableIfNotExistsFromDescriptor() throws Exception {
final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance();
final String catalog = tEnv.getCurrentCatalog();
final String database = tEnv.getCurrentDatabase();

final Schema schema = Schema.newBuilder().column("f0", DataTypes.INT()).build();
tEnv.createTable(
"T",
TableDescriptor.forConnector("fake").schema(schema).option("a", "Test").build(),
true);

final ObjectPath objectPath = new ObjectPath(database, "T");
assertThat(
tEnv.getCatalog(catalog)
.orElseThrow(AssertionError::new)
.tableExists(objectPath))
.isTrue();

final CatalogBaseTable catalogTable =
tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).getTable(objectPath);
assertThat(catalogTable).isInstanceOf(CatalogTable.class);
assertThat(catalogTable.getUnresolvedSchema()).isEqualTo(schema);
assertThat(catalogTable.getOptions())
.contains(entry("connector", "fake"), entry("a", "Test"));

assertThatNoException()
.isThrownBy(
() ->
tEnv.createTable(
"T",
TableDescriptor.forConnector("fake")
.schema(schema)
.option("a", "Test")
.build(),
true));

assertThatThrownBy(
() ->
tEnv.createTable(
"T",
TableDescriptor.forConnector("fake")
.schema(schema)
.option("a", "Test")
.build(),
false))
.isInstanceOf(ValidationException.class);
}

@Test
void testTableFromDescriptor() {
final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance();
Expand Down