Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@
<td><p>Enum</p></td>
<td>Specify the startup mode for log consumer.<br /><br />Possible values:<ul><li>"full": Perform a snapshot on the table upon first startup, and continue to read the latest changes.</li><li>"latest": Start from the latest.</li><li>"from-timestamp": Start from user-supplied timestamp.</li></ul></td>
</tr>
<tr>
<td><h5>log.scan.remove-normalize</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to force the removal of the normalize node when streaming read. Note: This is dangerous and is likely to cause data errors if downstream is used to calculate aggregation and the input is not complete changelog.</td>
</tr>
<tr>
<td><h5>log.scan.timestamp-millis</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
import static org.apache.flink.table.store.CoreOptions.LOG_SCAN_REMOVE_NORMALIZE;

/**
* Table source to create {@link FileStoreSource} under batch mode or change-tracking is disabled.
Expand Down Expand Up @@ -120,6 +121,10 @@ public ChangelogMode getChangelogMode() {
} else if (table instanceof ChangelogWithKeyFileStoreTable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: in the long run, I think we can simplify the configuration to some extent, e.g., depend on ChangelogProducer

Configuration options = Configuration.fromMap(table.schema().options());

if (options.get(LOG_SCAN_REMOVE_NORMALIZE)) {
return ChangelogMode.all();
}

if (logStoreTableFactory == null
&& options.get(CHANGELOG_PRODUCER) != ChangelogProducer.NONE) {
return ChangelogMode.all();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,15 @@ public class CoreOptions implements Serializable {
.defaultValue(LogChangelogMode.AUTO)
.withDescription("Specify the log changelog mode for table.");

public static final ConfigOption<Boolean> LOG_SCAN_REMOVE_NORMALIZE =
ConfigOptions.key("log.scan.remove-normalize")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to force the removal of the normalize node when streaming read."
+ " Note: This is dangerous and is likely to cause data errors if downstream"
+ " is used to calculate aggregation and the input is not complete changelog.");

public static final ConfigOption<String> LOG_KEY_FORMAT =
ConfigOptions.key("log.key.format")
.stringType()
Expand Down