Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-31603][table-planner] Line break should be removed in create t… #22276

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -56,11 +56,11 @@ public SqlNode getValue() {
}

public String getKeyString() {
return ((NlsString) SqlLiteral.value(key)).getValue();
return ((NlsString) SqlLiteral.value(key)).getValue().replaceAll("\n", "");
}

public String getValueString() {
return ((NlsString) SqlLiteral.value(value)).getValue();
return ((NlsString) SqlLiteral.value(value)).getValue().replaceAll("\n", "");
}

@Override
Expand Down
Expand Up @@ -64,11 +64,21 @@ public abstract class FlinkHints {
* null.
*/
public static Map<String, String> getHintedOptions(List<RelHint> tableHints) {
return tableHints.stream()
.filter(hint -> hint.hintName.equalsIgnoreCase(HINT_NAME_OPTIONS))
.findFirst()
.map(hint -> hint.kvOptions)
.orElse(Collections.emptyMap());
Map<String, String> hintedOptions =
tableHints.stream()
.filter(hint -> hint.hintName.equalsIgnoreCase(HINT_NAME_OPTIONS))
.findFirst()
.map(hint -> hint.kvOptions)
.orElse(Collections.emptyMap());

if (!hintedOptions.isEmpty()) {
return hintedOptions.entrySet().stream()
.collect(
Collectors.toMap(
entry -> entry.getKey().replaceAll("\n", ""),
entry -> entry.getValue().replaceAll("\n", "")));
}
return hintedOptions;
}

/**
Expand Down
Expand Up @@ -131,6 +131,23 @@ public void testCreateDatabase() {
}
}

@Test
public void testCreateDatabaseWithNewLineInTableOptions() {
String sql =
"create database cat1.db1 comment 'db1_comment' with ('k\n1' = 'v1', 'K2' = 'V\n2')";

Operation operation = parse(sql);

Map<String, String> properties = new HashMap<>();
properties.put("k1", "v1");
properties.put("K2", "V2");

assertThat(operation).isInstanceOf(CreateDatabaseOperation.class);
final CreateDatabaseOperation createDatabaseOperation = (CreateDatabaseOperation) operation;
assertThat(createDatabaseOperation.getCatalogDatabase().getProperties())
.isEqualTo(new HashMap<>(properties));
}

@Test
public void testDropDatabase() {
final String[] dropDatabaseSqls =
Expand Down
Expand Up @@ -119,6 +119,20 @@ public void testDynamicTableWithInvalidOptions() {
"Hint [OPTIONS] only support " + "non empty key value options");
}

@Test
public void testDynamicTableWithNewLineInOptions() {
final String sql =
"insert into t1 /*+ OPTIONS('k\n1'='v1', 'k2'='v\n2') */\n"
+ "select a, b, c, d from t2";
FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
Operation operation = parse(sql, planner, parser);
assertThat(operation).isInstanceOf(SinkModifyOperation.class);
SinkModifyOperation sinkModifyOperation = (SinkModifyOperation) operation;
Map<String, String> dynamicOptions = sinkModifyOperation.getDynamicOptions();
assertThat(dynamicOptions.toString()).isEqualTo("{k1=v1, k2=v2}");
}

@Test
public void testBeginStatementSet() {
final String sql = "BEGIN STATEMENT SET";
Expand Down
Expand Up @@ -103,6 +103,20 @@ public void testLoadModule() {
assertThat(loadModuleOperation.getOptions()).isEqualTo(expectedOptions);
}

@Test
public void testLoadModuleWithNewLineInTableOptions() {
final String sql = "LOAD MODULE dummy WITH ('k\n1' = 'v1', 'k2' = 'v\n2')";
final Map<String, String> expectedOptions = new HashMap<>();
expectedOptions.put("k1", "v1");
expectedOptions.put("k2", "v2");

Operation operation = parse(sql);
assertThat(operation).isInstanceOf(LoadModuleOperation.class);
final LoadModuleOperation loadModuleOperation = (LoadModuleOperation) operation;

assertThat(loadModuleOperation.getOptions()).isEqualTo(expectedOptions);
}

@Test
public void testUnloadModule() {
final String sql = "UNLOAD MODULE dummy";
Expand Down