Skip to content

Commit

Permalink
add isTemporary flag
Browse files Browse the repository at this point in the history
  • Loading branch information
lirui-apache committed Aug 28, 2020
1 parent 7dd881c commit 31eef5a
Show file tree
Hide file tree
Showing 21 changed files with 135 additions and 53 deletions.
Expand Up @@ -69,6 +69,11 @@ public ReadableConfig getConfiguration() {
public ClassLoader getClassLoader() {
return TestContext.class.getClassLoader();
}

@Override
public boolean isTemporary() {
return false;
}
};
}

Expand Down
Expand Up @@ -56,22 +56,23 @@ public Set<ConfigOption<?>> optionalOptions() {
throw new UnsupportedOperationException("Hive factory is only work for catalog.");
}

private static CatalogTable removeIsGenericFlag(CatalogTable table) {
Map<String, String> newOptions = new HashMap<>(table.getOptions());
private static CatalogTable removeIsGenericFlag(Context context) {
Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
boolean isGeneric = Boolean.parseBoolean(newOptions.remove(IS_GENERIC));
if (!isGeneric) {
// temporary table doesn't have the IS_GENERIC flag but we still consider it generic
if (!isGeneric && !context.isTemporary()) {
throw new ValidationException(
"Hive dynamic table factory now only work for generic table.");
}
return table.copy(newOptions);
return context.getCatalogTable().copy(newOptions);
}

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
return FactoryUtil.createTableSink(
null, // we already in the factory of catalog
context.getObjectIdentifier(),
removeIsGenericFlag(context.getCatalogTable()),
removeIsGenericFlag(context),
context.getConfiguration(),
context.getClassLoader());
}
Expand All @@ -81,7 +82,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
return FactoryUtil.createTableSource(
null, // we already in the factory of catalog
context.getObjectIdentifier(),
removeIsGenericFlag(context.getCatalogTable()),
removeIsGenericFlag(context),
context.getConfiguration(),
context.getClassLoader());
}
Expand Down
Expand Up @@ -66,7 +66,8 @@ public TableSource<RowData> createTableSource(TableSourceFactory.Context context

boolean isGeneric = Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));

if (!isGeneric) {
// temporary table doesn't have the IS_GENERIC flag but we still consider it generic
if (!isGeneric && !context.isTemporary()) {
return new HiveTableSource(
new JobConf(hiveConf),
context.getConfiguration(),
Expand All @@ -84,7 +85,8 @@ public TableSink createTableSink(TableSinkFactory.Context context) {

boolean isGeneric = Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));

if (!isGeneric) {
// temporary table doesn't have the IS_GENERIC flag but we still consider it generic
if (!isGeneric && !context.isTemporary()) {
return new HiveTableSink(
context.getConfiguration().get(
HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
Expand Down
Expand Up @@ -81,7 +81,7 @@ public void test() throws Exception {
ObjectIdentifier tableIdentifier = ObjectIdentifier.of(hiveCatalog.getName(), "default", "build");
CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tableIdentifier.toObjectPath());
HiveTableSource hiveTableSource = (HiveTableSource) ((HiveTableFactory) hiveCatalog.getTableFactory().get()).createTableSource(
new TableSourceFactoryContextImpl(tableIdentifier, catalogTable, tableEnv.getConfig().getConfiguration()));
new TableSourceFactoryContextImpl(tableIdentifier, catalogTable, tableEnv.getConfig().getConfiguration(), false));
FileSystemLookupFunction lookupFunction = (FileSystemLookupFunction) hiveTableSource.getLookupFunction(new String[]{"x"});
assertEquals(Duration.ofMinutes(5), lookupFunction.getCacheTTL());

Expand Down
Expand Up @@ -83,13 +83,13 @@ public void testGenericTable() throws Exception {
assertTrue(opt.isPresent());
HiveTableFactory tableFactory = (HiveTableFactory) opt.get();
TableSource tableSource = tableFactory.createTableSource(new TableSourceFactoryContextImpl(
ObjectIdentifier.of("mycatalog", "mydb", "mytable"), table, new Configuration()));
ObjectIdentifier.of("mycatalog", "mydb", "mytable"), table, new Configuration(), false));
assertTrue(tableSource instanceof StreamTableSource);
TableSink tableSink = tableFactory.createTableSink(new TableSinkFactoryContextImpl(
ObjectIdentifier.of("mycatalog", "mydb", "mytable"),
table,
new Configuration(),
true));
true, false));
assertTrue(tableSink instanceof StreamTableSink);
}

Expand All @@ -114,10 +114,10 @@ public void testHiveTable() throws Exception {
ObjectIdentifier.of("mycatalog", "mydb", "mytable"),
table,
new Configuration(),
true));
true, false));
assertTrue(tableSink instanceof HiveTableSink);
TableSource tableSource = tableFactory.createTableSource(new TableSourceFactoryContextImpl(
ObjectIdentifier.of("mycatalog", "mydb", "mytable"), table, new Configuration()));
ObjectIdentifier.of("mycatalog", "mydb", "mytable"), table, new Configuration(), false));
assertTrue(tableSource instanceof HiveTableSource);
}

Expand Down
Expand Up @@ -395,7 +395,7 @@ private TableSource<?> createTableSource(String name, Map<String, String> source
tableEnv.getCurrentDatabase(),
name),
CatalogTableImpl.fromProperties(sourceProperties),
tableEnv.getConfig().getConfiguration()));
tableEnv.getConfig().getConfiguration(), true));
} else if (environment.getExecution().isBatchPlanner()) {
final BatchTableSourceFactory<?> factory = (BatchTableSourceFactory<?>)
TableFactoryService.find(BatchTableSourceFactory.class, sourceProperties, classLoader);
Expand All @@ -415,7 +415,7 @@ private TableSink<?> createTableSink(String name, Map<String, String> sinkProper
name),
CatalogTableImpl.fromProperties(sinkProperties),
tableEnv.getConfig().getConfiguration(),
!environment.getExecution().inStreamingMode()));
!environment.getExecution().inStreamingMode(), true));
} else if (environment.getExecution().isBatchPlanner()) {
final BatchTableSinkFactory<?> factory = (BatchTableSinkFactory<?>)
TableFactoryService.find(BatchTableSinkFactory.class, sinkProperties, classLoader);
Expand Down
Expand Up @@ -60,15 +60,14 @@ public static <T> TableSource<T> findAndCreateTableSource(
Catalog catalog,
ObjectIdentifier objectIdentifier,
CatalogTable catalogTable,
boolean isTemporary,
ReadableConfig configuration) {
ReadableConfig configuration,
boolean isTemporary) {
TableSourceFactory.Context context = new TableSourceFactoryContextImpl(
objectIdentifier,
catalogTable,
configuration);
configuration, isTemporary);
Optional<TableFactory> factoryOptional = catalog.getTableFactory();
// avoid using Catalog TableFactory for temporary tables as temporary tables are not stored in catalogs
if (factoryOptional.isPresent() && !isTemporary) {
if (factoryOptional.isPresent()) {
TableFactory factory = factoryOptional.get();
if (factory instanceof TableSourceFactory) {
return ((TableSourceFactory<T>) factory).createTableSource(context);
Expand Down Expand Up @@ -106,12 +105,14 @@ public static <T> TableSink<T> findAndCreateTableSink(
ObjectIdentifier objectIdentifier,
CatalogTable catalogTable,
ReadableConfig configuration,
boolean isStreamingMode) {
boolean isStreamingMode,
boolean isTemporary) {
TableSinkFactory.Context context = new TableSinkFactoryContextImpl(
objectIdentifier,
catalogTable,
configuration,
!isStreamingMode);
!isStreamingMode,
isTemporary);
if (catalog == null) {
return findAndCreateTableSink(context);
} else {
Expand Down
Expand Up @@ -69,5 +69,10 @@ interface Context {
* <p>The class loader is in particular useful for discovering further (nested) factories.
*/
ClassLoader getClassLoader();

/**
* Whether the table is temporary.
*/
boolean isTemporary();
}
}
Expand Up @@ -109,11 +109,27 @@ public static DynamicTableSource createTableSource(
CatalogTable catalogTable,
ReadableConfig configuration,
ClassLoader classLoader) {
return createTableSource(catalog, objectIdentifier, catalogTable, configuration, classLoader, false);
}

/**
* Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
*
* <p>It considers {@link Catalog#getFactory()} if provided.
*/
public static DynamicTableSource createTableSource(
@Nullable Catalog catalog,
ObjectIdentifier objectIdentifier,
CatalogTable catalogTable,
ReadableConfig configuration,
ClassLoader classLoader,
boolean isTemporary) {
final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
objectIdentifier,
catalogTable,
configuration,
classLoader);
classLoader,
isTemporary);
try {
final DynamicTableSourceFactory factory = getDynamicTableFactory(
DynamicTableSourceFactory.class,
Expand Down Expand Up @@ -148,11 +164,27 @@ public static DynamicTableSink createTableSink(
CatalogTable catalogTable,
ReadableConfig configuration,
ClassLoader classLoader) {
return createTableSink(catalog, objectIdentifier, catalogTable, configuration, classLoader, false);
}

/**
* Creates a {@link DynamicTableSink} from a {@link CatalogTable}.
*
* <p>It considers {@link Catalog#getFactory()} if provided.
*/
public static DynamicTableSink createTableSink(
@Nullable Catalog catalog,
ObjectIdentifier objectIdentifier,
CatalogTable catalogTable,
ReadableConfig configuration,
ClassLoader classLoader,
boolean isTemporary) {
final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
objectIdentifier,
catalogTable,
configuration,
classLoader);
classLoader,
isTemporary);
try {
final DynamicTableSinkFactory factory = getDynamicTableFactory(
DynamicTableSinkFactory.class,
Expand Down Expand Up @@ -603,16 +635,18 @@ private static class DefaultDynamicTableContext implements DynamicTableFactory.C
private final CatalogTable catalogTable;
private final ReadableConfig configuration;
private final ClassLoader classLoader;
private final boolean isTemporary;

DefaultDynamicTableContext(
ObjectIdentifier objectIdentifier,
CatalogTable catalogTable,
ReadableConfig configuration,
ClassLoader classLoader) {
ClassLoader classLoader, boolean isTemporary) {
this.objectIdentifier = objectIdentifier;
this.catalogTable = catalogTable;
this.configuration = configuration;
this.classLoader = classLoader;
this.isTemporary = isTemporary;
}

@Override
Expand All @@ -634,6 +668,11 @@ public ReadableConfig getConfiguration() {
public ClassLoader getClassLoader() {
return classLoader;
}

@Override
public boolean isTemporary() {
return isTemporary;
}
}

// --------------------------------------------------------------------------------------------
Expand Down
Expand Up @@ -107,6 +107,11 @@ interface Context {
* See {@link Source#getBoundedness}.
*/
boolean isBounded();

/**
* Whether the table is temporary.
*/
boolean isTemporary();
}

}
Expand Up @@ -35,16 +35,18 @@ public class TableSinkFactoryContextImpl implements TableSinkFactory.Context {
private final CatalogTable table;
private final ReadableConfig config;
private final boolean isBounded;
private final boolean isTemporary;

public TableSinkFactoryContextImpl(
ObjectIdentifier identifier,
CatalogTable table,
ReadableConfig config,
boolean isBounded) {
boolean isBounded, boolean isTemporary) {
this.identifier = checkNotNull(identifier);
this.table = checkNotNull(table);
this.config = checkNotNull(config);
this.isBounded = isBounded;
this.isTemporary = isTemporary;
}

@Override
Expand All @@ -66,4 +68,9 @@ public ReadableConfig getConfiguration() {
public boolean isBounded() {
return isBounded;
}

@Override
public boolean isTemporary() {
return isTemporary;
}
}
Expand Up @@ -98,6 +98,11 @@ interface Context {
* {@code TableEnvironment} session configurations.
*/
ReadableConfig getConfiguration();

/**
* Whether the table is temporary.
*/
boolean isTemporary();
}

}
Expand Up @@ -34,14 +34,16 @@ public class TableSourceFactoryContextImpl implements TableSourceFactory.Context
private final ObjectIdentifier identifier;
private final CatalogTable table;
private final ReadableConfig config;
private final boolean isTemporary;

public TableSourceFactoryContextImpl(
ObjectIdentifier identifier,
CatalogTable table,
ReadableConfig config) {
ReadableConfig config, boolean isTemporary) {
this.identifier = checkNotNull(identifier);
this.table = checkNotNull(table);
this.config = checkNotNull(config);
this.isTemporary = isTemporary;
}

@Override
Expand All @@ -58,4 +60,9 @@ public CatalogTable getTable() {
public ReadableConfig getConfiguration() {
return config;
}

@Override
public boolean isTemporary() {
return isTemporary;
}
}
Expand Up @@ -195,7 +195,7 @@ private Optional<TableSource<?>> findAndCreateTableSource() {
// actual TableConfig here. And currently the empty config do not affect the logic.
ReadableConfig config = new Configuration();
TableSourceFactory.Context context =
new TableSourceFactoryContextImpl(tableIdentifier, (CatalogTable) lookupResult.getTable(), config);
new TableSourceFactoryContextImpl(tableIdentifier, (CatalogTable) lookupResult.getTable(), config, lookupResult.isTemporary());
TableSource<?> source = TableFactoryUtil.findAndCreateTableSource(context);
if (source instanceof StreamTableSource) {
if (!isStreamingMode && !((StreamTableSource<?>) source).isBounded()) {
Expand Down
Expand Up @@ -232,8 +232,8 @@ private static boolean isLegacySourceOptions(
schemaTable.getCatalog(),
schemaTable.getTableIdentifier(),
catalogTable,
schemaTable.isTemporary(),
new Configuration());
new Configuration(),
schemaTable.isTemporary());
// success, then we will use the legacy factories
return true;
} catch (Throwable e) {
Expand Down

0 comments on commit 31eef5a

Please sign in to comment.