Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Maintenance - MonitorSource #10308

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open

Conversation

pvary
Copy link
Contributor

@pvary pvary commented May 10, 2024

Implements a monitor source which emits TableChange events based on the new commits to the Iceberg table.
This will be used for Maintenance Task scheduling as described in the design doc. The Scheduling paragraph describes the require parameters of the TableChange object. Also the Monitor paragraph adds some more details.

Implements #10300

*
* @param <T> The return type of the source
*/
public abstract class SingleThreadedIteratorSource<T>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious on why having a base class. do we envision another implementation besides MonitorSource?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about adding this to the Flink source code in the future if this pans out for us.
I really hate the current Flink Source API which is very hard to read/understand for a simple source. I think we need to have an easier way to start creating the source.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also wondering if SingleThread is needed. If users don't construct the source operator manually. the stitch code can set the operator parallelism as 1, like the IcebergFilesCommitter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using the new Source API, we need to have a single split, to archive the parallelism 1.

I don't expect that we need higher parallelism to archive our goals, and it results in simpler code if we rely on it, to trigger Maintenance Tasks only once. Otherwise we need to find a way to drop TableChange events which are for snapshots which are committed before the snapshot used by an already triggered Maintenance Task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not saying we need multiple threads here. if the parallelism control is not exposed to users, the builder/glue code can always set the operator parallelism as 1 similar to how the FlinkSink set it for IcebergFilesCommitter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is for the best if we preaggregate as many of the TableChanges as possible. Otherwise we might end up triggering unnecessary rewrites.
Let's say, we have S1 (2 new files), S2 (3 new files), S3 (2 new files), S4 (3 new files), and we have set that we need manifest file compaction after 2 commits.
If we end up reading the snapshots parallel, then we might end up running the manifest rewrite task 4 times. The good solution would be to run it a single time.

I do not expect that the MonitorSource will need the parallelisation anyways. It would add complexity, and extra resource usage with no benefits.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu I agree with @pvary that this is a valuable addition and I have worked closely together with him on these parts.

I think the current implementation works very well for this use-case and has been tested extensively, complicating it with potential parallelism settings will only make it more error-prone. This can be changed in the future if we feel a strong need without any backward compatibility implications.

}

/** The single split of the {@link SingleThreadedIteratorSource}. */
public static class GlobalSplit<T> implements IteratorSourceSplit<T, Iterator<T>>, Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does global mean here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is only a single split - that is why I named it to Global. Open for better names, if you think you have one

import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

/** Event describing changes in an Iceberg table */
class TableChange {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how will this be used by downstream operators for planning purpose?

also how does this work with maintenance tasks like snapshot expiration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be used for Maintenance Task scheduling as described in the design doc. The Scheduling paragraph describes the require parameters of the TableChange object. Also the Monitor paragraph adds some more details.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you said the purpose of MonitorSource and TableChange are to provide input to Trigger operator so that the Trigger code.

Here are the trigger modes discussed before

  1. standalone streaming/long-running compaction job. This will use the MonitorSource to emit TableChange to Trigger operator, right?
  2. embedded trigger mode in post commit stage of a write job. I assume post commit can emit the TableChange event.
  3. external trigger mode (like Airflow). how is the Trigger input calculated in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. standalone streaming/long-running compaction job. This will use the MonitorSource to emit TableChange to Trigger operator, right?

Yes, you're right

  1. embedded trigger mode in post commit stage of a write job. I assume post commit can emit the TableChange event.

We need a simple mapper, I have called it Commit Converter in the doc

  1. external trigger mode (like Airflow). how is the Trigger input calculated in this case?

In this case the full sceduling is on the external trigger.
I expect that this will be based mostly on timers, but using the Java or Python API is a possibility too

@rodmeneses
Copy link
Contributor

Please link the design doc in the PR description

}

@Override
public SourceReader<TableChange, GlobalSplit<TableChange>> createReader(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createReader is public, yet TableChange is package private

Copy link
Contributor Author

@pvary pvary May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one needs to be public as inherited from the Source

return event;
} catch (Exception e) {
LOG.warn("Failed to fetch table changes for {}", table, e);
return new TableChange(0, 0, 0L, 0L, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've seen some places where we use TableChange(0, 0, 0L, 0L, 0) It would be nice if we just refactor this into a constant, and then we reference elsewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We update the field values with the merge later, so it is not an option here

this.serializer = serializer;
}

private static final int CURRENT_VERSION = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we start with CURRENT_VERSION=1 instead of, say: CURRENT_VERSION=0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, TBH. This is what I have seen at the other serializers 😄

Preconditions.checkArgument(checkpoint.size() < 2, PARALLELISM_ERROR);
if (checkpoint.isEmpty()) {
return new byte[] {0};
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to do else {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the following part is not too long/not too much additional depth, then I tend to keep them for easier understanding. One less thing to think about when reading the code.

this.commitNum = 1;
}

int dataFileNum() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be used later 😄
Since TableChange is a POJO like object, I have defined all the accessor methods


if (element == null) {
throw new TimeoutException();
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no. need of else {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again readability

}

@Override
public SinkWriter<T> createWriter(InitContext context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InitContext is deprecated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but still need to defined (for the correct deprecation process), so it is fine to use it now

public void afterEach(ExtensionContext context) throws IOException {
List<Row> tables = exec("SHOW TABLES");
tables.forEach(t -> exec("DROP TABLE IF EXISTS %s", t.getField(0)));
exec("USE CATALOG default_catalog");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dropCatalog in TestBase already has this implemented

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the FlinkSqlExtension is a better tool, and there is no current way to reuse it. Also we discussed, that it needs to be rewritten, so I did not want to depend on that.

if (propCount > 0) {
builder.append(",");
}
builder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do this with String.format instead ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with:

    return String.format(
        "(%s)",
        props.entrySet().stream()
            .map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue()))
            .collect(Collectors.joining(",")));

// Stop with savepoint
jobClient.stopWithSavepoint(false, savepointDir.getPath(), SavepointFormatType.CANONICAL);
// Wait until the savepoint is created and the job has been stopped
Awaitility.await().until(() -> savepointDir.listFiles(File::isDirectory).length == 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do we define the timeout for this call ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default values are set in the Awaitility. It is 10s

import org.apache.iceberg.relocated.com.google.common.collect.Lists;

/** Testing source implementation for Flink sources which can be triggered manually. */
public class ManualSource<T> implements SourceFunction<T>, ResultTypeQueryable<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SourceFunction is deprecated

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ManualSource can be made private package

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving to the new Source was a pain, but I have done it 🫡

*
* @param event to emit
*/
public void sendRecord(T event) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is will be used - for completness, I wouldn't remove it

import org.junit.jupiter.params.provider.ValueSource;

class TestMonitorSource extends OperatorTestBase {
private static final TableChange EMPTY_EVENT = new TableChange(0, 0, 0L, 0L, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in the tests, and we do not modify this object, so here is fine

Copy link
Contributor

@rodmeneses rodmeneses left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some minor comments, otherwise LGTM

/** The Iterator which returns the latest changes on an Iceberg table. */
@VisibleForTesting
static class SchedulerEventIterator implements Iterator<TableChange> {
private Long lastSnapshotId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is this checkpointed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see it in SchedulerEventIteratorSerializer.serialize

Long checking = current;
TableChange event = new TableChange(0, 0, 0L, 0L, 0);
long readBack = 0;
while (checking != null && !checking.equals(lastSnapshotId) && ++readBack <= maxReadBack) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxReadBack seems problematic. let's see here is the snapshot history (backwards left to right: from recent snapshot to oldest)
S7, S6, S5, S4, S3, S2,S1

lastSnapshotId=S2 before the while loop.
maxReadBack=2

after the while loop
lastSnapshotId=S5

S3 and S4 will be skipped in the next loop.

You would want to scan forward from the last position. You can see how the regular source throttle the enumeration. https://github.com/apache/iceberg/blob/main/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java#L89

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason behind the maxReadBack is to avoid reading thousands of snapshots, which could be especially costly when running the Maintenance on tables which are never compacted before.

In every other case the expectation is that the monitor only reads a few snapshot at a time, since it is continuously running.

If the monitor is down for so long, that we reach the threshold, that usually means that all of the Maintenance Tasks will be triggered anyway, so we don't need to calculate the exact values.

Also, if we don't add several snapshots to the resulting TableChange, that is perfectly fine. While the Maintenance Tasks are triggered a bit later, this doesn't effect correctness, as the Tasks themselves read the current state of the table, and the collected TableChange is only used as a trigger.

Copy link
Contributor

@stevenzwu stevenzwu May 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the Maintenance Tasks are triggered a bit later, this doesn't effect correctness, as the Tasks themselves read the current state of the table, and the collected TableChange is only used as a trigger.

this is true if we assume the scheduler would read the current state of the table for scheduling decision. Let's use small files compaction with table partitioned by ingestion time as an example. New commits typically touch 1 or 2 partitions. there is no need to scan the whole table metadata (which could be large) to determine the small files. compaction task can just check the new files for compaction candidates and possibly check the all data files from the partitions that the delta change/commit touched. that could reduce the scope of checking from whole table to small number of partitions. Often, it is wasteful to check the old partitions over and over again for compaction candidates of small files.

What I am trying to say is that it can be beneficial to have incremental check and compaction (from last compacted snapshot to a new snapshot).

There are certainly some tasks requiring whole table check (like orphan files cleanup).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxReadBack is another config that users need to know how to config. it may not make much difference to check from the last snapshot to the current snapshot, since the snapshots are loaded anyway as a list in TableMetadata.

  private synchronized void ensureSnapshotsLoaded() {
    if (!snapshotsLoaded) {
      List<Snapshot> loadedSnapshots = Lists.newArrayList(snapshotsSupplier.get());
      loadedSnapshots.removeIf(s -> s.sequenceNumber() > lastSequenceNumber);

      this.snapshots = ImmutableList.copyOf(loadedSnapshots);
      this.snapshotsById = indexAndValidateSnapshots(snapshots, lastSequenceNumber);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Monitor/TriggerManager is the same for all of the Maintenance Task, they should be kept generic. The compaction jobs itself could store the state of the previous runs if we decide it is worth the complexity (list of the candidate, but not yet compacted files, snapshots which are scanned / snapshots which are not yet scanned)

The savings could be significant, if this is an

  • Append only table
  • Only a few partitions are written

It is not optional, if any of the following is true:

  • The goal is delete file removal - any V2 table
  • The distribution of the partitions are even

Do you think that we can continue this discussion independently of the MonitorSource?

Copy link
Contributor Author

@pvary pvary May 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxReadBack is another config that users need to know how to config. it may not make much difference to check from the last snapshot to the current snapshot, since the snapshots are loaded anyway as a list in TableMetadata.

We need to have the added data files/added data size etc. for the snapshots - this means we need to read the manifest files. This could become costly after some size.

I have faced a situation with a table which was never compacted, that it had ~2000 snapshots (my test table left for the weekend with 2 min checkpointing period - 60243/2) and it took serious time to read all of the snapshots (more than 20 min) - In this case the parallel read might help if we want "correct" results, but as I have mentioned before, we don't really need exact data in this case.

So having 100 as a default, and allow for override if necessary would be better. Normal users don't have to bother with it, advanced users can play around with it, if they want - also I would be fine with not allowing to change it from 100 if that is the community decision.

Making it possible to configure on the operator, doesn't mean that we need to expose it to the user.

Snapshot snapshot = table.snapshot(checking);
if (snapshot != null) {
LOG.debug("Reading snapshot {}", snapshot.snapshotId());
event.merge(new TableChange(snapshot, table.io()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably skip DataOperations.REPLACE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

/** Event describing changes in an Iceberg table */
class TableChange {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you said the purpose of MonitorSource and TableChange are to provide input to Trigger operator so that the Trigger code.

Here are the trigger modes discussed before

  1. standalone streaming/long-running compaction job. This will use the MonitorSource to emit TableChange to Trigger operator, right?
  2. embedded trigger mode in post commit stage of a write job. I assume post commit can emit the TableChange event.
  3. external trigger mode (like Airflow). how is the Trigger input calculated in this case?

*
* @param <T> The return type of the source
*/
public abstract class SingleThreadedIteratorSource<T>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also wondering if SingleThread is needed. If users don't construct the source operator manually. the stitch code can set the operator parallelism as 1, like the IcebergFilesCommitter

return new IteratorSourceReader<>(readerContext);
}

/** The single split of the {@link SingleThreadedIteratorSource}. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe explain why it is a global split with a constant splitId. if my understanding is correctly, this is related to the per-split watermark. in this case, we only need the global watermark.


/** The Iterator which returns the latest changes on an Iceberg table. */
@VisibleForTesting
static class SchedulerEventIterator implements Iterator<TableChange> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it seems that TableChangeIterator is a better match

public SourceReader<TableChange, GlobalSplit<TableChange>> createReader(
SourceReaderContext readerContext) throws Exception {
RateLimiter rateLimiter = rateLimiterStrategy.createRateLimiter(1);
return new RateLimitedSourceReader<>(super.createReader(readerContext), rateLimiter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we just let the enumerator leverage the coordinator single thread pool to schedule the action. Using rate limiter reader seems unnecessary. reader will be idle if there is no split.

    enumeratorContext.callAsync(
        this::discoverSplits,
        this::processDiscoveredSplits,
        0L,
        scanContext.monitorInterval().toMillis());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then we probably don't need the iterator reader. enumerator just emit a global split when appropriate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are the requirements:

  • We need to emit the TableChange events for a given timestamp in a single event record - we want to prevent unnecessary Maintenance Tasks scheduling
  • We need to have state for the source - we want to avoid emitting duplications, even on state restore

I have considered the following:

  • The split is a Snapshot, and the enumerator emits snapshots, reader reads the snapshot and emits TableChange based on a given snapshot - this breaks our first requirement, as it will generate multiple small TableChange events on startup, or when multiple commits arrive between enumeration intervals
  • The split is basically a TableChange calculated by the enumerator, and the reader just emits the value it has received - I think this also breaks the first requirement on state restore. We might emit an old TableChange and a new TableChange immediately. This could again double trigger our Maintenance Tasks. How sensitive is the Enumerator for long running TableChange calculations?

Do I miss something, or do you have another idea?

Thanks, Peter

}
}

private static final class SchedulerEventIteratorSerializer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we change the enumerator threading model, this can become just a enumerator state serializer on last snapshot.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: In Progress
Development

Successfully merging this pull request may close these issues.

None yet

4 participants