diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java index dfc9fef79c5c5..9ea15b1eca867 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java @@ -19,6 +19,7 @@ package org.apache.flink.table.api.internal; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogBaseTable.TableKind; @@ -67,7 +68,10 @@ public class ShowCreateUtil { private ShowCreateUtil() {} public static String buildShowCreateModelRow( - ResolvedCatalogModel model, ObjectIdentifier modelIdentifier, boolean isTemporary) { + ResolvedCatalogModel model, + ObjectIdentifier modelIdentifier, + boolean isTemporary, + List additionalSensitiveKeys) { StringBuilder sb = new StringBuilder() .append( @@ -81,7 +85,7 @@ public static String buildShowCreateModelRow( c -> sb.append(String.format("OUTPUT (%s)%s", c, System.lineSeparator()))); extractComment(model) .ifPresent(c -> sb.append(formatComment(c)).append(System.lineSeparator())); - extractFormattedOptions(model.getOptions(), PRINT_INDENT) + extractFormattedOptions(model.getOptions(), PRINT_INDENT, additionalSensitiveKeys) .ifPresent( v -> sb.append(String.format("WITH (%s", System.lineSeparator())) @@ -98,7 +102,8 @@ public static String buildShowCreateTableRow( ResolvedCatalogBaseTable table, ObjectIdentifier tableIdentifier, boolean isTemporary, - SqlFactory sqlFactory) { + SqlFactory sqlFactory, + List additionalSensitiveKeys) { validateTableKind(table, tableIdentifier, TableKind.TABLE); StringBuilder sb = new StringBuilder() @@ -118,7 +123,7 @@ public static String buildShowCreateTableRow( .ifPresent( partitionedInfoFormatted -> sb.append(formatPartitionedBy(partitionedInfoFormatted))); - extractFormattedOptions(table.getOptions(), PRINT_INDENT) + extractFormattedOptions(table.getOptions(), PRINT_INDENT, additionalSensitiveKeys) .ifPresent(v -> sb.append("WITH (\n").append(v).append("\n)\n")); return sb.toString(); } @@ -130,7 +135,8 @@ public static String buildShowCreateMaterializedTableRow( boolean isTemporary, boolean createOrAlter, ZoneId timeZoneId, - SqlFactory sqlFactory) { + SqlFactory sqlFactory, + List additionalSensitiveKeys) { return buildShowCreateMaterializedTableRow( table, tableIdentifier, @@ -139,7 +145,8 @@ public static String buildShowCreateMaterializedTableRow( timeZoneId, sqlFactory, true, - true); + true, + additionalSensitiveKeys); } /** Show create materialized table statement only for materialized tables. */ @@ -151,7 +158,8 @@ public static String buildShowCreateMaterializedTableRow( ZoneId timeZoneId, SqlFactory sqlFactory, boolean includeFreshness, - boolean includeRefreshMode) { + boolean includeRefreshMode, + List additionalSensitiveKeys) { validateTableKind(table, tableIdentifier, TableKind.MATERIALIZED_TABLE); StringBuilder sb = new StringBuilder() @@ -174,7 +182,7 @@ public static String buildShowCreateMaterializedTableRow( .ifPresent(d -> sb.append(d).append("\n")); extractFormattedPartitionedInfo(table) .ifPresent(partitionedBy -> sb.append(formatPartitionedBy(partitionedBy))); - extractFormattedOptions(table.getOptions(), PRINT_INDENT) + extractFormattedOptions(table.getOptions(), PRINT_INDENT, additionalSensitiveKeys) .ifPresent(v -> sb.append("WITH (\n").append(v).append("\n)\n")); sb.append(extractStartMode(table, timeZoneId)).append("\n"); if (includeFreshness) { @@ -211,14 +219,18 @@ public static String buildShowCreateViewRow( return sb.toString(); } - public static String buildShowCreateCatalogRow(CatalogDescriptor catalogDescriptor) { + public static String buildShowCreateCatalogRow( + CatalogDescriptor catalogDescriptor, List additionalSensitiveKeys) { final Optional comment = catalogDescriptor.getComment(); StringBuilder sb = new StringBuilder(); sb.append("CREATE CATALOG ") .append(EncodingUtils.escapeIdentifier(catalogDescriptor.getCatalogName())) .append("\n"); comment.ifPresent(c -> sb.append(formatComment(c)).append("\n")); - extractFormattedOptions(catalogDescriptor.getConfiguration().toMap(), PRINT_INDENT) + extractFormattedOptions( + catalogDescriptor.getConfiguration().toMap(), + PRINT_INDENT, + additionalSensitiveKeys) .ifPresent(o -> sb.append("WITH (\n").append(o).append("\n)\n")); return sb.toString(); } @@ -432,7 +444,8 @@ private static String getFormattedLocalDateTime(Instant instant, ZoneId timeZone return TIMESTAMP_FORMATTER.format(LocalDateTime.ofInstant(instant, timeZone)); } - static Optional extractFormattedOptions(Map conf, String printIndent) { + static Optional extractFormattedOptions( + Map conf, String printIndent, List additionalSensitiveKeys) { if (Objects.isNull(conf) || conf.isEmpty()) { return Optional.empty(); } @@ -446,7 +459,12 @@ static Optional extractFormattedOptions(Map conf, String "%s'%s' = '%s'", printIndent, EncodingUtils.escapeSingleQuotes(entry), - EncodingUtils.escapeSingleQuotes(conf.get(entry)))) + EncodingUtils.escapeSingleQuotes( + GlobalConfiguration.isSensitive( + entry, + additionalSensitiveKeys) + ? GlobalConfiguration.HIDDEN_CONTENT + : conf.get(entry)))) .collect(Collectors.joining(",\n"))); } @@ -462,31 +480,6 @@ static String extractFormattedColumnNames( .collect(Collectors.joining(",\n")); } - private static String maybeLowerCaseKey(String key, boolean lowerCaseKey) { - return lowerCaseKey ? key.toLowerCase() : key; - } - - static Optional extractFormattedOptions( - Map conf, String printIndent, boolean lowerCaseKeys) { - if (Objects.isNull(conf) || conf.isEmpty()) { - return Optional.empty(); - } - return Optional.of( - conf.entrySet().stream() - .map( - entry -> - String.format( - "%s'%s' = '%s'", - printIndent, - maybeLowerCaseKey( - EncodingUtils.escapeSingleQuotes( - entry.getKey()), - lowerCaseKeys), - EncodingUtils.escapeSingleQuotes(entry.getValue()))) - .sorted() - .collect(Collectors.joining("," + System.lineSeparator()))); - } - private static void validateTableKind( ResolvedCatalogBaseTable table, ObjectIdentifier tableIdentifier, diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeCatalogOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeCatalogOperation.java index 08a97d10becda..9d1990fe67ddc 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeCatalogOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeCatalogOperation.java @@ -19,6 +19,8 @@ package org.apache.flink.table.operations; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.internal.TableResultInternal; @@ -87,17 +89,24 @@ public TableResultInternal execute(Context ctx) { Arrays.asList( "comment", catalogDescriptor.getComment().orElse(null)))); if (isExtended) { + List additionalSensitiveKeys = + ctx.getTableConfig().get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS); properties.entrySet().stream() .filter( entry -> !CommonCatalogOptions.CATALOG_TYPE.key().equals(entry.getKey())) .sorted(Map.Entry.comparingByKey()) .forEach( - entry -> - rows.add( - Arrays.asList( - String.format("option:%s", entry.getKey()), - entry.getValue()))); + entry -> { + String value = + GlobalConfiguration.isSensitive( + entry.getKey(), additionalSensitiveKeys) + ? GlobalConfiguration.HIDDEN_CONTENT + : entry.getValue(); + rows.add( + Arrays.asList( + String.format("option:%s", entry.getKey()), value)); + }); } return buildTableResult( diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java index 60baecb40a88a..fca7b39a5d93c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java @@ -19,6 +19,7 @@ package org.apache.flink.table.operations; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.internal.ShowCreateUtil; import org.apache.flink.table.api.internal.TableResultInternal; @@ -57,7 +58,10 @@ public TableResultInternal execute(Context ctx) { String.format( "Cannot obtain metadata information from Catalog %s.", catalogName))); - String resultRow = ShowCreateUtil.buildShowCreateCatalogRow(catalogDescriptor); + String resultRow = + ShowCreateUtil.buildShowCreateCatalogRow( + catalogDescriptor, + ctx.getTableConfig().get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS)); return buildStringArrayResult("result", new String[] {resultRow}); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java index 0ca8a1475ce0b..1165c44e4bb0f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java @@ -19,6 +19,7 @@ package org.apache.flink.table.operations; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.internal.ShowCreateUtil; import org.apache.flink.table.api.internal.TableResultInternal; @@ -65,7 +66,8 @@ public TableResultInternal execute(Context ctx) { table.isTemporary(), createOrAlter, ctx.getTableConfig().getLocalTimeZone(), - ctx.getCatalogManager().getSqlFactory()); + ctx.getCatalogManager().getSqlFactory(), + ctx.getTableConfig().get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS)); return buildStringArrayResult("result", new String[] {resultRow}); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateModelOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateModelOperation.java index d94471ba0c371..ea0bd14174a94 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateModelOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateModelOperation.java @@ -19,6 +19,7 @@ package org.apache.flink.table.operations; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.table.api.internal.ShowCreateUtil; import org.apache.flink.table.api.internal.TableResultInternal; import org.apache.flink.table.catalog.ObjectIdentifier; @@ -53,7 +54,11 @@ public String asSummaryString() { @Override public TableResultInternal execute(Context ctx) { String resultRow = - ShowCreateUtil.buildShowCreateModelRow(model, modelIdentifier, isTemporary); + ShowCreateUtil.buildShowCreateModelRow( + model, + modelIdentifier, + isTemporary, + ctx.getTableConfig().get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS)); return buildStringArrayResult("result", new String[] {resultRow}); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java index 6608ff263b9f6..d0144f7abb53e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java @@ -19,6 +19,7 @@ package org.apache.flink.table.operations; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.internal.ShowCreateUtil; import org.apache.flink.table.api.internal.TableResultInternal; @@ -62,7 +63,8 @@ public TableResultInternal execute(Context ctx) { table.getResolvedTable(), tableIdentifier, table.isTemporary(), - ctx.getCatalogManager().getSqlFactory()); + ctx.getCatalogManager().getSqlFactory(), + ctx.getTableConfig().get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS)); return buildStringArrayResult("result", new String[] {resultRow}); } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java index 69df05557282d..750818181d460 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java @@ -45,6 +45,7 @@ import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.expressions.DefaultSqlFactory; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; @@ -112,7 +113,8 @@ void showCreateTable( resolvedCatalogTable, TABLE_IDENTIFIER, isTemporary, - DefaultSqlFactory.INSTANCE); + DefaultSqlFactory.INSTANCE, + List.of()); assertThat(createTableString).isEqualTo(expected); } @@ -139,7 +141,8 @@ void showCreateMaterializedTable( false, createOrAlter, ZoneOffset.UTC, - DefaultSqlFactory.INSTANCE); + DefaultSqlFactory.INSTANCE, + List.of()); final String fixedTimestamp = "1970-01-02 12:34:56"; final String normalizedMTString = setFixedTimestamp(createMaterializedTableString, fixedTimestamp); @@ -171,7 +174,8 @@ void showCreateMaterializedTableWithFlags( ZoneOffset.UTC, DefaultSqlFactory.INSTANCE, includeFreshness, - includeRefreshMode); + includeRefreshMode, + List.of()); final StringBuilder expected = new StringBuilder() @@ -195,7 +199,7 @@ void showCreateMaterializedTableWithFlags( @MethodSource("argsForShowCreateCatalog") void showCreateCatalog(CatalogDescriptor catalogDescriptor, String expected) { final String createCatalogString = - ShowCreateUtil.buildShowCreateCatalogRow(catalogDescriptor); + ShowCreateUtil.buildShowCreateCatalogRow(catalogDescriptor, List.of()); assertThat(createCatalogString).isEqualTo(expected); } @@ -228,6 +232,19 @@ private static Collection argsForShowCreateCatalog() { + " 'k_c' = 'v_c'\n" + ")\n")); + final Map sensitiveCatalogOptions = new HashMap<>(); + sensitiveCatalogOptions.put("type", "hive"); + sensitiveCatalogOptions.put("hive.metastore.token", "tok123"); + argList.add( + Arguments.of( + CatalogDescriptor.of( + "catalogName", Configuration.fromMap(sensitiveCatalogOptions)), + "CREATE CATALOG `catalogName`\n" + + "WITH (\n" + + " 'hive.metastore.token' = '******',\n" + + " 'type' = 'hive'\n" + + ")\n")); + return argList; } @@ -354,6 +371,22 @@ private static Collection argsForShowCreateTable() { + " 'option_key_b' = 'option_value_b',\n" + " 'option_key_c' = 'option_value_c'\n" + ")\n"); + + final Map sensitiveOptions = new HashMap<>(); + sensitiveOptions.put("connector", "kafka"); + sensitiveOptions.put("my.api-key", "abc123"); + sensitiveOptions.put("password", "topsecret"); + addTemporaryAndPermanent( + argList, + createResolvedTable(ONE_COLUMN_SCHEMA, sensitiveOptions, List.of(), null, null), + "CREATE %sTABLE `catalogName`.`dbName`.`tableName` (\n" + + " `id` INT\n" + + ")\n" + + "WITH (\n" + + " 'connector' = 'kafka',\n" + + " 'my.api-key' = '******',\n" + + " 'password' = '******'\n" + + ")\n"); return argList; } @@ -477,9 +510,51 @@ private static Collection argsForShowCreateMaterializedTable() { + "REFRESH_MODE = FULL\n" + "AS SELECT id, name FROM `catalogName`.`dbName`.`tbl_a`\n"); + addCreateAndCreateOrAlter( + argList, + createResolvedMaterializedWithOptions( + ONE_COLUMN_SCHEMA, + Map.of("connector", "kafka", "secret.key", "mysecret"), + StartMode.of(StartModeKind.FROM_BEGINNING), + IntervalFreshness.ofMinute(2), + RefreshMode.CONTINUOUS, + "SELECT 1"), + "%sMATERIALIZED TABLE `catalogName`.`dbName`.`materializedTableName` (\n" + + " `id` INT\n" + + ")\n" + + "WITH (\n" + + " 'connector' = 'kafka',\n" + + " 'secret.key' = '******'\n" + + ")\n" + + "START_MODE = FROM_BEGINNING\n" + + "FRESHNESS = INTERVAL '2' MINUTE\n" + + "REFRESH_MODE = CONTINUOUS\n" + + "AS SELECT 1\n"); + return argList; } + @Test + void showCreateTableRedactsAdditionalSensitiveKeys() { + Map options = new HashMap<>(); + options.put("connector", "kafka"); + options.put("my.custom.cred", "supersecret"); + ResolvedCatalogTable table = + createResolvedTable(ONE_COLUMN_SCHEMA, options, List.of(), null, null); + + String result = + ShowCreateUtil.buildShowCreateTableRow( + table, + TABLE_IDENTIFIER, + false, + DefaultSqlFactory.INSTANCE, + List.of("custom.cred")); + + assertThat(result).contains("'connector' = 'kafka'"); + assertThat(result).contains("'my.custom.cred' = '******'"); + assertThat(result).doesNotContain("supersecret"); + } + private static void addTemporaryAndPermanent( Collection argList, CatalogBaseTable catalogBaseTable, String sql) { argList.add(Arguments.of(catalogBaseTable, false, String.format(sql, ""))); @@ -554,6 +629,30 @@ private static ResolvedCatalogMaterializedTable createResolvedMaterialized( startMode); } + private static ResolvedCatalogMaterializedTable createResolvedMaterializedWithOptions( + ResolvedSchema resolvedSchema, + Map options, + StartMode startMode, + IntervalFreshness freshness, + RefreshMode refreshMode, + String query) { + return new ResolvedCatalogMaterializedTable( + CatalogMaterializedTable.newBuilder() + .schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build()) + .options(options) + .freshness(freshness) + .refreshMode(refreshMode) + .originalQuery(query) + .expandedQuery(query) + .logicalRefreshMode(LogicalRefreshMode.AUTOMATIC) + .refreshStatus(RefreshStatus.ACTIVATED) + .build(), + resolvedSchema, + refreshMode, + freshness, + startMode); + } + private static String setFixedTimestamp(String sql, String fixedTimestamp) { return START_MODE_EVALUATED_TIMESTAMP .matcher(sql) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogModel.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogModel.java index cc39813ec133e..e6b38f8ff94be 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogModel.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogModel.java @@ -19,10 +19,12 @@ package org.apache.flink.table.catalog; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.table.api.Schema; import javax.annotation.Nullable; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -107,7 +109,7 @@ public String toString() { + ", outputSchema=" + outputSchema + ", modelOptions=" - + modelOptions + + ConfigurationUtils.hideSensitiveValues(modelOptions, List.of()) + ", comment=" + comment + "}"; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java index b06e6acee263a..f94c2399accab 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java @@ -19,6 +19,7 @@ package org.apache.flink.table.catalog; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.table.api.Schema; import javax.annotation.Nullable; @@ -162,7 +163,7 @@ public String toString() { + ", partitionKeys=" + partitionKeys + ", options=" - + options + + ConfigurationUtils.hideSensitiveValues(options, List.of()) + ", snapshot=" + snapshot + '}'; diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/DefaultCatalogTableTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/DefaultCatalogTableTest.java new file mode 100644 index 0000000000000..03301ba5cd6ad --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/DefaultCatalogTableTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link DefaultCatalogTable}. */ +class DefaultCatalogTableTest { + + @Test + void toStringRedactsSensitiveOptions() { + Map options = new HashMap<>(); + options.put("connector", "kafka"); + options.put("password", "topsecret"); + options.put("my.token", "tok123"); + + CatalogTable table = + CatalogTable.newBuilder() + .schema(Schema.newBuilder().column("id", DataTypes.INT()).build()) + .options(options) + .build(); + + String result = table.toString(); + + assertThat(result).contains("connector=kafka"); + assertThat(result).doesNotContain("topsecret"); + assertThat(result).doesNotContain("tok123"); + assertThat(result).contains("password=******"); + assertThat(result).contains("my.token=******"); + } + + @Test + void toStringKeepsNonSensitiveOptions() { + Map options = new HashMap<>(); + options.put("connector", "kafka"); + options.put("topic", "my-topic"); + + CatalogTable table = + CatalogTable.newBuilder() + .schema(Schema.newBuilder().column("id", DataTypes.INT()).build()) + .options(options) + .build(); + + String result = table.toString(); + + assertThat(result).contains("connector=kafka"); + assertThat(result).contains("topic=my-topic"); + } + + @Test + void toStringWithEmptyOptionsDoesNotFail() { + CatalogTable table = + CatalogTable.newBuilder() + .schema(Schema.newBuilder().column("id", DataTypes.INT()).build()) + .options(Map.of()) + .build(); + + assertThat(table.toString()).contains("options={}"); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/DescribeCatalogTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/DescribeCatalogTest.java new file mode 100644 index 0000000000000..e27e18813f407 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/DescribeCatalogTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.catalog; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.GenericInMemoryCatalogStore; +import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; + +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@code DESCRIBE CATALOG} SQL statement, focusing on sensitive-option redaction. */ +class DescribeCatalogTest { + + private static final String CATALOG_NAME = "test_cat"; + + @Test + void describeCatalogExtendedRedactsSensitiveOptions() throws Exception { + TableEnvironment tEnv = buildEnvWithSensitiveOptions(); + + List rows = + CollectionUtil.iteratorToList( + tEnv.executeSql("DESCRIBE CATALOG EXTENDED " + CATALOG_NAME).collect()); + + assertThat(rows) + .contains( + Row.of("option:password", GlobalConfiguration.HIDDEN_CONTENT), + Row.of("option:my.token", GlobalConfiguration.HIDDEN_CONTENT), + Row.of("option:safe-option", "safe-value")) + .noneMatch(r -> "topsecret".equals(r.getField(1))) + .noneMatch(r -> "tok123".equals(r.getField(1))); + } + + @Test + void describeCatalogNonExtendedDoesNotExposeOptions() throws Exception { + TableEnvironment tEnv = buildEnvWithSensitiveOptions(); + + List rows = + CollectionUtil.iteratorToList( + tEnv.executeSql("DESCRIBE CATALOG " + CATALOG_NAME).collect()); + + assertThat(rows).noneMatch(r -> String.valueOf(r.getField(0)).startsWith("option:")); + } + + private static TableEnvironment buildEnvWithSensitiveOptions() throws Exception { + GenericInMemoryCatalogStore catalogStore = new GenericInMemoryCatalogStore(); + catalogStore.open(); + Configuration config = new Configuration(); + config.setString("type", "generic_in_memory"); + config.setString("safe-option", "safe-value"); + config.setString("password", "topsecret"); + config.setString("my.token", "tok123"); + catalogStore.storeCatalog(CATALOG_NAME, CatalogDescriptor.of(CATALOG_NAME, config)); + return TableEnvironment.create( + EnvironmentSettings.newInstance().withCatalogStore(catalogStore).build()); + } +}