Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-27376][sql] Support current_database function #19218

Merged
merged 3 commits into from
Jul 14, 2022

Conversation

luoyuxia
Copy link
Contributor

@luoyuxia luoyuxia commented Mar 24, 2022

What is the purpose of the change

To support current_database() function in Flink

Brief change log

  • Add CURRENT_DATABASE definition in BuiltInFunctionDefinitions.java and FlinkSqlOperatorTable.java
  • Add method currentDatabase to call CURRENT_DATABASE in Expressions.java to make current_database can be used in TableAPI.
  • Before translation, set a configuration with value current database in PlannerBase. And read the configuration value put to get the current in codegen phase for current_database() function.
  • Disable current_database in HiveMoudle, so that it'll always use Flink's built-in current_database function.

Verifying this change

Added test in testCurrentDatabase, testCurrentDatabase, testCurrentDatabase, testCurrentDatabase for batch/stream, sql/table.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? Yes
  • If yes, how is the feature documented? Yes

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 24, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@luoyuxia luoyuxia changed the title hive dialect supports select current database [FLINK-27376][hive] Hive dialect supports select current database Apr 25, 2022
@luoyuxia luoyuxia changed the title [FLINK-27376][hive] Hive dialect supports select current database [FLINK-27376][hive] Hive dialect supports select current_database() Apr 25, 2022
@luoyuxia luoyuxia force-pushed the support-current-database branch 2 times, most recently from 9fdaa74 to 2ddd919 Compare June 22, 2022 12:05
Copy link
Member

@fsk119 fsk119 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution. I am not sure whether user with hive module in default dialect still get the same problem.

} catch (SemanticException e) {
throw new FlinkHiveException(e);
}
return convertToLiteral(hiveShim.toHiveTimestamp(currentTS));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we can use

if (convertedOp instanceof SqlCastFunction) {
} else if (convertedOp instanceof FlinkSqlTimestampFunction) {

} else if (convertedOp.getName().equals("current_database")) {
} else {
            return builder.makeCall(convertedOp, visitList(operands, update));

}

throw new FlinkHiveException(e);
}
return convertToLiteral(hiveShim.toHiveTimestamp(currentTS));
} else if (convertedOp.getName().equals("current_database")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's very hacked to fix this problem in this way. Currently, we determine which function is used by comparing the method name, parameters, and return type. But here we only consider the function name.

I think we should introduce a class similar to SqlFunctionCast here and use instanceOf to determine.

tableEnv.executeSql("select current_database()").collect());
assertThat(result.toString()).isEqualTo("[+I[db1]]");
// switch to default database for following test use default database
tableEnv.executeSql("use default");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better if we can use

@After
public void cleanup() {
            tableEnv.executeSql("use default");
}

Copy link
Contributor Author

@luoyuxia luoyuxia Jul 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to add a cleanup method for currently only one test needs it. It would be better to limit use default in this single test.

@luoyuxia luoyuxia changed the title [FLINK-27376][hive] Hive dialect supports select current_database() [FLINK-27376][sql] Support current_database() function Jul 5, 2022
@luoyuxia
Copy link
Contributor Author

luoyuxia commented Jul 5, 2022

@fsk119 Thanks for review. Now, I add function current_database for flink itselft. So that both Flink sql and Hive dialect can use it.

@luoyuxia luoyuxia changed the title [FLINK-27376][sql] Support current_database() function [FLINK-27376][sql] Support current_database as built-in function Jul 5, 2022
@luoyuxia luoyuxia changed the title [FLINK-27376][sql] Support current_database as built-in function [FLINK-27376][sql] Support current_database function Jul 5, 2022
@luoyuxia luoyuxia force-pushed the support-current-database branch 2 times, most recently from 22d297c to 163bc2d Compare July 5, 2022 04:19
Copy link
Member

@fsk119 fsk119 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for update. It's better we can keep the origin tests in the hive.

// --------------------------------------------------------------------------------------------
// Catalog functions
// --------------------------------------------------------------------------------------------
public static final BuiltInFunctionDefinition CURRENT_DATABASE =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave a blank line before the CURRENT_DATABASE. We should align with others.

@@ -233,6 +233,10 @@ object StringCallGen {
isCharacterString(operands(2).resultType) =>
methodGen(BuiltInMethods.CONVERT_TZ)

case CURRENT_DATABASE =>
val timestamp = ctx.addReusableQueryLevelCurrentDatabase()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timestamp -> database

@@ -52,6 +52,13 @@ public final class InternalConfigOptions {
+ " some temporal functions like LOCAL_TIMESTAMP in batch job to make sure these"
+ " temporal functions has query-start semantics.");

public static final ConfigOption<String> TABLE_QUERY_CURRENT_DATABASE =
key("__table.query-start.current_database__")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other options use the "-" rather than "_". How about renaming to __table.query-start.current-database__?

.stringType()
.noDefaultValue()
.withDescription(
"The config used to save the current database at query start.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, document who will use the value?

@luoyuxia
Copy link
Contributor Author

luoyuxia commented Jul 6, 2022

@fsk119 Thanks for reviewing. I have updated it.

Copy link
Member

@fsk119 fsk119 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

But the CI test fails. Could you take a look?

@luoyuxia luoyuxia force-pushed the support-current-database branch 4 times, most recently from 520d407 to e6f8f44 Compare July 8, 2022 01:51
@luoyuxia
Copy link
Contributor Author

@flinkbot run azure

@luoyuxia
Copy link
Contributor Author

@fsk119 The ci is passed now. Could you please help merge when you're free?

@fsk119 fsk119 merged commit 641fb89 into apache:master Jul 14, 2022
liujiawinds pushed a commit to liujiawinds/flink that referenced this pull request Jul 22, 2022
huangxiaofeng10047 pushed a commit to huangxiaofeng10047/flink that referenced this pull request Nov 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants