[FLINK-39650][table] REGEXP_REPLACE plan-time validation and hot-path log cleanup#28189
Conversation
AHeise
left a comment
There was a problem hiding this comment.
Lots of seemingly random changes. Could you subdivide the commit into more logical pieces?
| BuiltInFunctionDefinition.newBuilder() | ||
| .name("regexpReplace") | ||
| .sqlName("REGEXP_REPLACE") | ||
| .name("REGEXP_REPLACE") |
There was a problem hiding this comment.
I don't understand these changes. Why are we changing the name here?
There was a problem hiding this comment.
This changes are required to expose function via the modern BridgingSqlFunction path.
Same convention is already used by REGEXP_COUNT, REGEXP_EXTRACT_ALL, REGEXP_INSTR, REGEXP_SUBSTR (all name("REGEXP_..."). Aligns REGEXP_REPLACE with the family.
|
|
||
| /** Shared helpers for input type strategies of the regex function family. */ | ||
| @Internal | ||
| public final class RegexpUtils { |
There was a problem hiding this comment.
Check if there are other validation utils already.
There was a problem hiding this comment.
Checked. Three candidates in flink-table-common:
org.apache.flink.table.types.inference.TypeInferenceUtil— framework-wide validation/inference plumbing (signature generation, error wrapping).org.apache.flink.table.types.inference.strategies.StrategyUtils— package-private, data-type bridging helpers.org.apache.flink.table.types.inference.strategies.ChangelogTypeStrategyUtils— changelog-PTF specific.
None hosts regex-pattern validation.
I can move this to StrategyUtils. WDYT?
| TestSpec.forStrategy("Invalid literal regex fails at plan time", REGEXP_EXTRACT) | ||
| .calledWithArgumentTypes(DataTypes.STRING(), DataTypes.STRING()) | ||
| .calledWithLiteralAt(1, "(") | ||
| .expectErrorMessage("Invalid regular expression for REGEXP_EXTRACT:")); |
There was a problem hiding this comment.
Readd it to the error message. Just pass the name to the validation.
There was a problem hiding this comment.
I am using callContext.getName() and in the InputTypeStrategiesTestBase the name of the function is set to f (return value of callContext.getName()) that is why I have removed it here.
We can pass the function name through the caller, but I kinda like the API like this.
However, in the IT cases you can see the full error message with the correct function name.
| BuiltInFunctionDefinitions.REPEAT, FlinkSqlOperatorTable.REPEAT); | ||
| definitionSqlOperatorHashMap.put( | ||
| BuiltInFunctionDefinitions.REGEXP, FlinkSqlOperatorTable.REGEXP); | ||
| definitionSqlOperatorHashMap.put( |
There was a problem hiding this comment.
Why do we need to remove it here? What about the REGEXP funciton above?
There was a problem hiding this comment.
The DirectConvertRule entry maps a BuiltInFunctionDefinition to a legacy FlinkSqlOperatorTable SqlFunction. Keeping it routes the Table API conversion through that legacy SqlFunction, which uses Calcite's OperandTypes for validation — bypassing the new RegexpReplaceInputTypeStrategy. To make plan-time validation fire on both SQL and Table API, REGEXP_REPLACE has to go through BridgingSqlFunction, which means removing both the FlinkSqlOperatorTable entry and this DirectConvertRule mapping.
REGEXP above is intentionally untouched: it's the next sub-task of the FLINK-39648 umbrella (separate JIRA). Same fix shape will pply there; will be done in its own PR so this one stays focused.
| str, regex, replacement), | ||
| e); | ||
| // return null if exception in regex replace | ||
| return REGEXP_PATTERN_CACHE |
There was a problem hiding this comment.
Before String.replaceAll was calling Pattern.compile(regex) internally on every invocation. No cache.
The new code uses the ThreadLocalCache, compiled once per regex value per thread. This is a slight performance boost !
There was a problem hiding this comment.
not sure we need it
instead of this better to make it reuse on code gen level
then no need to even enter/invoke same method second time
There was a problem hiding this comment.
That is a good point. I am planing to change it. But Since I have applied this to REGEX_EXTRACT function too I would keep it in this PR.
There was a problem hiding this comment.
not sure we need it
instead of this better to make it reuse on code gen level then no need to even enter/invoke same method second time
I’d lean towards handling this separately, ideally with a benchmark.
This PR already removes the more expensive per-record Pattern.compile. Skipping REGEXP_PATTERN_CACHE.get(...) for literal regexps seems like a narrower optimization, and may add some complexity to the function/codegen path. If we pursue it, it would be better to apply it consistently across the regexp family in a follow-up PR.
… regexpReplace SqlFunctionUtils.regexpReplace called str.replaceAll which compiles the regex inside the engine on every invocation, and caught any exception with LOG.error producing one stack trace per record processed. Switches to REGEXP_PATTERN_CACHE.get(regex).matcher(str).replaceAll(...), reusing the existing ThreadLocalCache shared with getRegexpMatcher and regExp. The pattern is now compiled at most once per regex value per thread. PatternSyntaxException is caught silently so non-literal invalid regex patterns preserve the prior null-return contract without flooding the log. Adds runtime IT cases for REGEXP_REPLACE in RegexpFunctionsITCase covering literal valid, column-ref invalid, and function-call regex paths.
Migrates REGEXP_REPLACE to the modern BuiltInFunctionDefinition stack while keeping the existing StringCallGen codegen path.
The function definition is renamed to name("REGEXP_REPLACE") (CoreModule looks up by name().toUpperCase(), so the previous "regexpReplace" key did not match SQL) and marked runtimeProvided() so it is exposed to both SQL and Table API conversion.
A new arm in ExprCodeGenerator's BridgingSqlFunction block routes the BridgingSqlFunction to StringCallGen.generateRegexpReplace. The legacy FlinkSqlOperatorTable.REGEXP_REPLACE entry, the DirectConvertRule mapping, and the case REGEXP_REPLACE arm in StringCallGen are removed.
Behavior is preserved: same arg types, same codegen call, same runtime helper. Only the registration path changes.
… time Adds StrategyUtils.validateLiteralPattern as a shared helper for the regex function family. It eagerly compiles literal regex arguments during type inference and surfaces failures as ValidationException via callContext.fail. Non-literal or null-literal arguments are deferred to runtime. Wires REGEXP_REPLACE through the new RegexpReplaceInputTypeStrategy which validates 3 STRING arguments and delegates the literal-pattern check to the shared helper. REGEXP_EXTRACT is refactored to use the same helper. Tests: new RegexpReplaceInputTypeStrategyTest covers the four branches (non-literal defers, valid literal compiles, null literal is deferred, invalid literal fails). RegexpFunctionsITCase gains a plan-time validation TestSetSpec for REGEXP_REPLACE covering both Table API and SQL paths.
| logical(LogicalTypeFamily.CHARACTER_STRING), | ||
| logical(LogicalTypeFamily.CHARACTER_STRING))) | ||
| .inputTypeStrategy(SpecificInputTypeStrategies.REGEXP_REPLACE) | ||
| .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING()))) |
There was a problem hiding this comment.
Sorry for the late comment, but I think the return type should stay nullable, since REGEXP_REPLACE can still return NULL for invalid non-literal regexps.
Could you fix this in the next PR? @raminqaf
What is the purpose of the change
REGEXP_REPLACEdoes not validate literal regex patterns at planning time. When the regex is a string literal that failsPattern.compile, every record produces anERROR-level log line and a wasted compile attempt on the hot path (String.replaceAllrecompiles per call). This PR surfaces the failure as aValidationExceptionduring type inference and removes the per-record log spam.This is the second sub-task of the FLINK-39648 umbrella, following the same pattern established for
REGEXP_EXTRACT(FLINK-39649). The shared literal-pattern validation logic is extracted intoRegexpUtilsso the remaining sub-tasks (REGEXP, etc.) can reuse it.Brief change log
RegexpUtils.validateLiteralPattern(callContext, regexArgPos, throwOnFailure)as a shared helper.RegexpExtractInputTypeStrategyto delegate toRegexpUtils.RegexpReplaceInputTypeStrategythat validates 3 STRING args and the literal regex viaRegexpUtils.BuiltInFunctionDefinitions.REGEXP_REPLACEtoname("REGEXP_REPLACE")+runtimeProvided(). DropsqlName. Use the new strategy.FlinkSqlOperatorTable.REGEXP_REPLACEentry, theDirectConvertRulemapping, and thecase REGEXP_REPLACEarm inStringCallGen.case REGEXP_REPLACE => StringCallGen.generateRegexpReplace(...)inExprCodeGenerator'sBridgingSqlFunctionblock.SqlFunctionUtils.regexpReplaceto useREGEXP_PATTERN_CACHE.get(regex).matcher(str).replaceAll(...). Silentnullreturn onPatternSyntaxException. NoLOG.erroron the hot path.Verifying this change
This change added tests and can be verified as follows:
RegexpReplaceInputTypeStrategyTestcovers four branches: non-literal regex defers, valid literal compiles, null literal is deferred, invalid literal fails at plan time.regexpReplaceTestCases()inRegexpFunctionsITCasecovers runtime behavior end-to-end: literal valid replace, column-ref invalid regex returns null, function-call regex (CONCAT) valid and invalid, ValidationException for invalid literal via Table API and SQL.RegexpFunctionsITCaseandScalarFunctionsTestregress all other regex functions.Does this pull request potentially affect one of the following parts:
@Public(Evolving): noPattern.compileandLOG.errorinregexpReplace; uses cached pattern instead)Documentation
Was generative AI tooling used to co-author this PR?
Generated-by: Opus 4.7