Skip to content

Conversation

@whitecloud6688
Copy link

What is the purpose of the change

请支持类似以下语法:
select if(1>2,1,NULL) as col1;
select NULL as col1;

Brief change log

需要将 NULL 赋值给一个字段,然后写入表,但报语法错误。版本:FLINK-1.13.6。
Flink SQL> select if(1>2,1,NULL) as col1;
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Illegal use of 'NULL'

Flink SQL> select NULL as col1;
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Illegal use of 'NULL'

以下语法能执行
Flink SQL> select NULL is NULL as col1;
+----+--------+
| op | col1 |
+----+--------+
| +I | true |
+----+--------+
Received a total of 1 row

Verifying this change

select if(1>2,1,NULL) as col1;
select NULL as col1;

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

XComp and others added 30 commits March 15, 2022 08:07
…lelism

The deadline of 10s wasn't sufficient enough for AzureCI runs.
Remove planner&executor string identifiers from `EnvironmentSettings`
and use the default strings which are only used anyway.
Those 2 methods accept also a TableConfig, which is not necessary,
since with the new approach, extra configuration will be passed through
`EnvironmentSettings` instead. If those methods are still around will
create confusion.
Use `EnvironmentSettings` with the new method `withConfiguration` in its
`Builder` to specify configuration options on top of the one inherited
by the environment (flink-conf.yml, CLI params).

The `TableConfig` contains all of the config options specified by the env
(flink-conf.yml, CLI params) + all the extra configuration defined by the
user app through the `EnvironmentSettings`.
Since each test uses a separate table we no longer need to re-create the keyspace for each run, reducing the load on the cluster.
Some InternalPriorityQueue implementations need a correct key/group
set before performing poll() or remove().

In particular, ChangelogKeyGroupedPriorityQueue logs key group so that
state changes can be re-distributed or shuffled.

This change re-orders queue.poll and keyContext.setCurrentKey.
Implements the SEARCH operator in the codegen and removes
the scalar implementation of IN and NOT_IN. Now every scalar
IN/NOT_IN using a constant set is implemented through SEARCH
(following Calcite's development on the topic CALCITE-4173)
and plans will only have SEARCH.

This closes #19001.
Previously, the tmpWorkingDirectory was created in the current working
directory, and as a result there were directories created in the root
directories of the modules, i.e. `flink-table/flink-table-planner` which
were not cleaned up with `mvn clean`.
The user explicitly marked the cleanup retry logic to
terminate after a certain amount of attempts. This should be
considered as desired behavior and shouldn't make the cluster
fail fatally.
We want to try infinitely if nothing is specified.
…ined sink operators

Since the topology has changes between Flink 1.14 and 1.15 it might
happen that stateful upgrades are not possible if no pior operator uids
were set. With this commit, users can set operator uid hashes for the
respective operators.
…attern

Since there is no dedicated committer operator in Flink 1.14 it is safe
to use the uid pattern of 1.13 to ease upgrades from Flink 1.13 to 1.15.
…nly the path of Path instance

The issue before the fix was, that using getPath would strip
off the scheme information which causes problems in situations
where the FileSystem is not the default FileSystem
XComp and others added 28 commits April 4, 2022 14:32
The writeValue calls close by default internally. Calling flush afterwards
could cause errors. It's also not really necessary. OutputStream.flush does
not guarantee persistence according to its JavaDoc. In contrast, calling
close does guarantee it.
…ify that no operation is allowed on a closed stream
…AMP_WITH_LOCAL_TIMEZONE fields in dynamic index
…zer for deserializing Kafka message value as string
When in batch mode, the `getMonotonicity()` method of a function is
called, to determine possible optimisations. For `TRY_CAST` the
implementation needs to call `getOperandType(1)` to get the target (also
the function's return) type of the cast. This fails as for `CAST` and
`TRY_CAST` at this point we have only one operand.

`CAST` solves this in Calcite code, more specifically in
`RexCallBinding#create()` where in case the `kind` of the function is
`CAST`, a special `RexCastCallBinding` instance is created which stores
the return (target) type and returns it when `getOperandType(1)` is
called.

For `TRY_CAST` we don't have access to the stack to do something
similar, and we cannot set the kind of `TRY_CAST` to `CAST` (currently,
it's `OTHER_FUNCTION`, as this will allow the calcite stack to apply
rules and optimisations to the `TRY_CAST` call and at some point
convert it to a regular `CAST` call, thus breaking the functionality
of `TRY_CAST` (return null instead of failing).

As a solution to the problem, we simply don't implement the
`getMonotonicity()` method for `TRY_CAST`, lossing possible
optmisations.

This closes #19379.
…figuration

Following the work on [FLINK-16835] update the docs accordingly to
mention the use of `EnvironmentSettings` and that `TableEnvironement`
related configuration can also be set in `flink-conf.yaml`.

This closes #19401

(cherry picked from commit 7b7d96b)
…ints

Currently, in LEGACY restore mode, shared state of
incremental checkpoints can be discarded regardless
of whether they were created by this job or not.
This invalidates the checkpoint from which the job
was restored.

The bug was introduced in FLINK-24611. Before that,
reference count was maintained for each shared state
entry; "initial" checkpoints did not decrement this
count, preventing their shared state from being discarded.

This change makes SharedStateRegistry to:
1. Remember the max checkpiont ID encountered during recovery
2. Associate each shared state entry with a checkpoint ID that created it
3. Only discard the entry if its createdByCheckpointID > highestRetainCheckpointID

(1) is called from:
- CheckpointCoordinator.restoreSavepoint - to cover initial restore from a checkpoint
- SharedStateFactory, when building checkpoint store - to cover the failover case
(see DefaultExecutionGraphFactory.createAndRestoreExecutionGraph)

Adjusting only the CheckpointCoordinator path isn't sufficient:
- job recovers from an existing checkpoints, adds it to the store
- a new checkpoint is created - with the default restore settings
- a failure happens, job recovers from a newer checkpoint
- when a newer checkpoint is subsumed, its (inherited) shared state
might be deleted
This reverts commit 241c097

Signed-off-by: Jing <beyond1920@126.com>
@flinkbot
Copy link
Collaborator

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@zentol zentol closed this Apr 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.