Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.api.internal;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
Expand Down Expand Up @@ -67,7 +68,10 @@ public class ShowCreateUtil {
private ShowCreateUtil() {}

public static String buildShowCreateModelRow(
ResolvedCatalogModel model, ObjectIdentifier modelIdentifier, boolean isTemporary) {
ResolvedCatalogModel model,
ObjectIdentifier modelIdentifier,
boolean isTemporary,
List<String> additionalSensitiveKeys) {
StringBuilder sb =
new StringBuilder()
.append(
Expand All @@ -81,7 +85,7 @@ public static String buildShowCreateModelRow(
c -> sb.append(String.format("OUTPUT (%s)%s", c, System.lineSeparator())));
extractComment(model)
.ifPresent(c -> sb.append(formatComment(c)).append(System.lineSeparator()));
extractFormattedOptions(model.getOptions(), PRINT_INDENT)
extractFormattedOptions(model.getOptions(), PRINT_INDENT, additionalSensitiveKeys)
.ifPresent(
v ->
sb.append(String.format("WITH (%s", System.lineSeparator()))
Expand All @@ -98,7 +102,8 @@ public static String buildShowCreateTableRow(
ResolvedCatalogBaseTable<?> table,
ObjectIdentifier tableIdentifier,
boolean isTemporary,
SqlFactory sqlFactory) {
SqlFactory sqlFactory,
List<String> additionalSensitiveKeys) {
validateTableKind(table, tableIdentifier, TableKind.TABLE);
StringBuilder sb =
new StringBuilder()
Expand All @@ -118,7 +123,7 @@ public static String buildShowCreateTableRow(
.ifPresent(
partitionedInfoFormatted ->
sb.append(formatPartitionedBy(partitionedInfoFormatted)));
extractFormattedOptions(table.getOptions(), PRINT_INDENT)
extractFormattedOptions(table.getOptions(), PRINT_INDENT, additionalSensitiveKeys)
.ifPresent(v -> sb.append("WITH (\n").append(v).append("\n)\n"));
return sb.toString();
}
Expand All @@ -130,7 +135,8 @@ public static String buildShowCreateMaterializedTableRow(
boolean isTemporary,
boolean createOrAlter,
ZoneId timeZoneId,
SqlFactory sqlFactory) {
SqlFactory sqlFactory,
List<String> additionalSensitiveKeys) {
return buildShowCreateMaterializedTableRow(
table,
tableIdentifier,
Expand All @@ -139,7 +145,8 @@ public static String buildShowCreateMaterializedTableRow(
timeZoneId,
sqlFactory,
true,
true);
true,
additionalSensitiveKeys);
}

/** Show create materialized table statement only for materialized tables. */
Expand All @@ -151,7 +158,8 @@ public static String buildShowCreateMaterializedTableRow(
ZoneId timeZoneId,
SqlFactory sqlFactory,
boolean includeFreshness,
boolean includeRefreshMode) {
boolean includeRefreshMode,
List<String> additionalSensitiveKeys) {
validateTableKind(table, tableIdentifier, TableKind.MATERIALIZED_TABLE);
StringBuilder sb =
new StringBuilder()
Expand All @@ -174,7 +182,7 @@ public static String buildShowCreateMaterializedTableRow(
.ifPresent(d -> sb.append(d).append("\n"));
extractFormattedPartitionedInfo(table)
.ifPresent(partitionedBy -> sb.append(formatPartitionedBy(partitionedBy)));
extractFormattedOptions(table.getOptions(), PRINT_INDENT)
extractFormattedOptions(table.getOptions(), PRINT_INDENT, additionalSensitiveKeys)
.ifPresent(v -> sb.append("WITH (\n").append(v).append("\n)\n"));
sb.append(extractStartMode(table, timeZoneId)).append("\n");
if (includeFreshness) {
Expand Down Expand Up @@ -211,14 +219,18 @@ public static String buildShowCreateViewRow(
return sb.toString();
}

public static String buildShowCreateCatalogRow(CatalogDescriptor catalogDescriptor) {
public static String buildShowCreateCatalogRow(
CatalogDescriptor catalogDescriptor, List<String> additionalSensitiveKeys) {
final Optional<String> comment = catalogDescriptor.getComment();
StringBuilder sb = new StringBuilder();
sb.append("CREATE CATALOG ")
.append(EncodingUtils.escapeIdentifier(catalogDescriptor.getCatalogName()))
.append("\n");
comment.ifPresent(c -> sb.append(formatComment(c)).append("\n"));
extractFormattedOptions(catalogDescriptor.getConfiguration().toMap(), PRINT_INDENT)
extractFormattedOptions(
catalogDescriptor.getConfiguration().toMap(),
PRINT_INDENT,
additionalSensitiveKeys)
.ifPresent(o -> sb.append("WITH (\n").append(o).append("\n)\n"));
return sb.toString();
}
Expand Down Expand Up @@ -432,7 +444,8 @@ private static String getFormattedLocalDateTime(Instant instant, ZoneId timeZone
return TIMESTAMP_FORMATTER.format(LocalDateTime.ofInstant(instant, timeZone));
}

static Optional<String> extractFormattedOptions(Map<String, String> conf, String printIndent) {
static Optional<String> extractFormattedOptions(
Map<String, String> conf, String printIndent, List<String> additionalSensitiveKeys) {
if (Objects.isNull(conf) || conf.isEmpty()) {
return Optional.empty();
}
Expand All @@ -446,7 +459,12 @@ static Optional<String> extractFormattedOptions(Map<String, String> conf, String
"%s'%s' = '%s'",
printIndent,
EncodingUtils.escapeSingleQuotes(entry),
EncodingUtils.escapeSingleQuotes(conf.get(entry))))
EncodingUtils.escapeSingleQuotes(
GlobalConfiguration.isSensitive(
entry,
additionalSensitiveKeys)
? GlobalConfiguration.HIDDEN_CONTENT
: conf.get(entry))))
.collect(Collectors.joining(",\n")));
}

Expand All @@ -462,31 +480,6 @@ static String extractFormattedColumnNames(
.collect(Collectors.joining(",\n"));
}

private static String maybeLowerCaseKey(String key, boolean lowerCaseKey) {
return lowerCaseKey ? key.toLowerCase() : key;
}

static Optional<String> extractFormattedOptions(
Map<String, String> conf, String printIndent, boolean lowerCaseKeys) {
if (Objects.isNull(conf) || conf.isEmpty()) {
return Optional.empty();
}
return Optional.of(
conf.entrySet().stream()
.map(
entry ->
String.format(
"%s'%s' = '%s'",
printIndent,
maybeLowerCaseKey(
EncodingUtils.escapeSingleQuotes(
entry.getKey()),
lowerCaseKeys),
EncodingUtils.escapeSingleQuotes(entry.getValue())))
.sorted()
.collect(Collectors.joining("," + System.lineSeparator())));
}

private static void validateTableKind(
ResolvedCatalogBaseTable<?> table,
ObjectIdentifier tableIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.table.operations;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableResultInternal;
Expand Down Expand Up @@ -87,17 +89,24 @@ public TableResultInternal execute(Context ctx) {
Arrays.asList(
"comment", catalogDescriptor.getComment().orElse(null))));
if (isExtended) {
List<String> additionalSensitiveKeys =
ctx.getTableConfig().get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS);
properties.entrySet().stream()
.filter(
entry ->
!CommonCatalogOptions.CATALOG_TYPE.key().equals(entry.getKey()))
.sorted(Map.Entry.comparingByKey())
.forEach(
entry ->
rows.add(
Arrays.asList(
String.format("option:%s", entry.getKey()),
entry.getValue())));
entry -> {
String value =
GlobalConfiguration.isSensitive(
entry.getKey(), additionalSensitiveKeys)
? GlobalConfiguration.HIDDEN_CONTENT
: entry.getValue();
rows.add(
Arrays.asList(
String.format("option:%s", entry.getKey()), value));
});
}

return buildTableResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.operations;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.ShowCreateUtil;
import org.apache.flink.table.api.internal.TableResultInternal;
Expand Down Expand Up @@ -57,7 +58,10 @@ public TableResultInternal execute(Context ctx) {
String.format(
"Cannot obtain metadata information from Catalog %s.",
catalogName)));
String resultRow = ShowCreateUtil.buildShowCreateCatalogRow(catalogDescriptor);
String resultRow =
ShowCreateUtil.buildShowCreateCatalogRow(
catalogDescriptor,
ctx.getTableConfig().get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS));

return buildStringArrayResult("result", new String[] {resultRow});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.operations;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.ShowCreateUtil;
import org.apache.flink.table.api.internal.TableResultInternal;
Expand Down Expand Up @@ -65,7 +66,8 @@ public TableResultInternal execute(Context ctx) {
table.isTemporary(),
createOrAlter,
ctx.getTableConfig().getLocalTimeZone(),
ctx.getCatalogManager().getSqlFactory());
ctx.getCatalogManager().getSqlFactory(),
ctx.getTableConfig().get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS));

return buildStringArrayResult("result", new String[] {resultRow});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.operations;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.table.api.internal.ShowCreateUtil;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.ObjectIdentifier;
Expand Down Expand Up @@ -53,7 +54,11 @@ public String asSummaryString() {
@Override
public TableResultInternal execute(Context ctx) {
String resultRow =
ShowCreateUtil.buildShowCreateModelRow(model, modelIdentifier, isTemporary);
ShowCreateUtil.buildShowCreateModelRow(
model,
modelIdentifier,
isTemporary,
ctx.getTableConfig().get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS));
return buildStringArrayResult("result", new String[] {resultRow});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.operations;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.ShowCreateUtil;
import org.apache.flink.table.api.internal.TableResultInternal;
Expand Down Expand Up @@ -62,7 +63,8 @@ public TableResultInternal execute(Context ctx) {
table.getResolvedTable(),
tableIdentifier,
table.isTemporary(),
ctx.getCatalogManager().getSqlFactory());
ctx.getCatalogManager().getSqlFactory(),
ctx.getTableConfig().get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS));

return buildStringArrayResult("result", new String[] {resultRow});
}
Expand Down
Loading