Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an option for ingestion task to drop (mark unused) all existing segments that are contained by interval in the ingestionSpec #11025

Merged
merged 17 commits into from
Apr 1, 2021

Conversation

maytasm
Copy link
Contributor

@maytasm maytasm commented Mar 24, 2021

Add an option for ingestion task to drop (mark unused) segments that are of the interval in the ingestionSpec

Description

This PR adds a new option called dropExisting in the ioConfig for Batch parallel task and Batch simple task. This config default value is false (which is the same as the current behavior) and is not required. If this config is set to true (and appendToExisting is false and interval in granularitySpec is given), then the ingestion task would transactionally drop (mark unused) all existing segments that are fully contain by the intervals in the granularitySpec's interval when it publishes new segments (to the metadata store). Note that no segments would be drop (mark unused) if the ingestion fails or publishing of new segments to metadata store fails. Note that if either appendToExisting is true or interval is not given in granularitySpec then no segments would be drop even if user try to set dropExisting to true.

While in most cases, the newly ingested segments would automatically overshadows existing segments and causes those existing segments to be dropped, there are cases where existing segments is no longer needed but yet is not dropped.

Example 1:
When a user try to compact existing segment using a smaller segmentGranularity. For example, existing segment has a interval of 2020-01-01 to 2021-01-01 (YEAR interval). User then try to compact this interval with segmentGranularity of MONTH. However, if we do not have existing data in every month, the compact task would not create MONTH segment of months without data. Thus, we may not have MONTH segments for every month in the interval of 2020-01-01 to 2021-01-01 post-compaction. This would then prevent the original YEAR segment to be drop although the MONTH segments does contains all the data of the YEAR segment. Compaction task can always set dropExisting to true as compaction task would always produce segments with all the data of the input intervals.

Example 2:
When a user is ingesting with appendToExisting=false (i.e. re-ingesting a datasource) and the new data does not contains time intervals that already existed in the datasource. For example, if a user has the following MONTH segments:
Jan has 1 record
Feb has 10 records
Mar has 10 records
Now the user is trying to re-ingest with new data that overwrites the existing data (appendToExisting=false). The new data has the following data for each month:
Jan has 0 record
Feb has 10 records
Mar has 9 records
Without this new dropExisting config, the current result post-ingestion of interval Jan to Mar (even with appendToExisting=false) would be:
Jan has 1 record
Feb has 10 records
Mar has 9 records
However, this is incorrect. The new data has 0 record for Jan and the user would expect to see that Jan has 0 record.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@maytasm maytasm changed the title Add an option for ingestion task to drop (mark unused) segments that are old Add an option for ingestion task to drop (mark unused) segments that are of the interval in the ingestionSpec Mar 26, 2021
@@ -89,6 +89,8 @@ You may want to consider the below things:
data in segments where it actively adds data: if there are segments in your `granularitySpec`'s intervals that have
no data written by this task, they will be left alone. If any existing segments partially overlap with the
`granularitySpec`'s intervals, the portion of those segments outside the new segments' intervals will still be visible.
You can set `dropExisting` flag in the `ioConfig` to true if you want the ingestion task to drop all existing data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: You mentioned that is only occurs if a few other configs are set as well, appendToExisting = false, and granularitySpec set, I believe. Should we mention that here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Collection<DataSegment> usedSegment = toolbox.getTaskActionClient().submit(new RetrieveUsedSegmentsAction(dataSource, null, condensedIntervals, Segments.ONLY_VISIBLE));
for (DataSegment segment : usedSegment) {
for (Interval interval : condensedIntervals) {
if (interval.contains(segment.getInterval())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think that the segments returned from the RetrieveUsedSegmentsAction are gaurenteed to be within the interval specified, is that not the case? If so, do we need to check again that the segment is in the interval?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is not the case. For example, if you have a segment with interval 2000-01-01/2001-01-01 and your interval specified is 2000-04-28/2000-04-29. Then if the above segment has data for the day 2000-04-28/2000-04-29, it would be returned by RetrieveUsedSegmentsAction. However, we cannot drop this segment since the interval specified is 2000-04-28/2000-04-29. We can only drop segments that starts and ends within the interval specified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually let me make the doc a little clearer than we are only dropping segments that starts and ends within the interval specified in granularitySpec

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see ok, makes sense.

@@ -301,11 +317,12 @@ public boolean isAudited()
public String toString()
{
return "SegmentTransactionalInsertAction{" +
"segmentsToBeOverwritten=" + SegmentUtils.commaSeparatedIdentifiers(segmentsToBeOverwritten) +
", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
"segmentsToBeOverwritten=" + segmentsToBeOverwritten +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this preserve the SegmentUtils.commaSeparatedIdentifiers( and apply it to segmentsToBeDropped as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Thanks

for (DataSegment segment : usedSegment) {
for (Interval interval : condensedIntervals) {
if (interval.contains(segment.getInterval())) {
segmentsFoundForDrop.add(segment);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could add a continue here once a segment is found for efficiency

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Btw I think it should be a break instead of a continue. This is to break out of the for (Interval interval : condensedIntervals) { when we confirmed that the segment is within one of the interval specified in granularitySpec

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, you're right, should be a break

@@ -89,6 +89,8 @@ You may want to consider the below things:
data in segments where it actively adds data: if there are segments in your `granularitySpec`'s intervals that have
no data written by this task, they will be left alone. If any existing segments partially overlap with the
`granularitySpec`'s intervals, the portion of those segments outside the new segments' intervals will still be visible.
You can set `dropExisting` flag in the `ioConfig` to true if you want the ingestion task to drop all existing data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest adding an example where the user would want to enable it, such as the YEAR/MONTH example in the PR description

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -193,6 +195,7 @@ that range if there's some stray data with unexpected timestamps.
|type|The task type, this should always be `index_parallel`.|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no|
|dropExisting|If set to true (and `appendToExisting` set to false and `interval` specified in `granularitySpec`), then the ingestion task would drop (mark unused) all existing segments that are fully contain by the `interval` in the `granularitySpec` when the task publishes new segments (no segments would be drop (mark unused) if the ingestion fails). Note that if either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec` then no segments would be drop even if `dropExisting` is set to `true`.|false|no|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
|dropExisting|If set to true (and `appendToExisting` set to false and `interval` specified in `granularitySpec`), then the ingestion task would drop (mark unused) all existing segments that are fully contain by the `interval` in the `granularitySpec` when the task publishes new segments (no segments would be drop (mark unused) if the ingestion fails). Note that if either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec` then no segments would be drop even if `dropExisting` is set to `true`.|false|no|
|dropExisting|If set to true (and `appendToExisting` is set to false and `interval` is specified in `granularitySpec`), then the ingestion task would drop (mark unused) all existing segments that are fully contained by the `interval` in the `granularitySpec` when the task publishes new segments (no segments would be dropped (marked unused) if the ingestion fails). Note that if either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec` then no segments would be dropped even if `dropExisting` is set to `true`.|false|no|

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -719,6 +723,7 @@ that range if there's some stray data with unexpected timestamps.
|type|The task type, this should always be "index".|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no|
|dropExisting|If set to true (and `appendToExisting` set to false and `interval` specified in `granularitySpec`), then the ingestion task would drop (mark unused) all existing segments that are fully contain by the `interval` in the `granularitySpec` when the task publishes new segments (no segments would be drop (mark unused) if the ingestion fails). Note that if either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec` then no segments would be drop even if `dropExisting` is set to `true`.|false|no|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
|dropExisting|If set to true (and `appendToExisting` set to false and `interval` specified in `granularitySpec`), then the ingestion task would drop (mark unused) all existing segments that are fully contain by the `interval` in the `granularitySpec` when the task publishes new segments (no segments would be drop (mark unused) if the ingestion fails). Note that if either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec` then no segments would be drop even if `dropExisting` is set to `true`.|false|no|
|dropExisting|If set to true (and `appendToExisting` is set to false and `interval` is specified in `granularitySpec`), then the ingestion task would drop (mark unused) all existing segments that are fully contained by the `interval` in the `granularitySpec` when the task publishes new segments (no segments would be dropped (marked unused) if the ingestion fails). Note that if either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec` then no segments would be dropped even if `dropExisting` is set to `true`.|false|no|

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -108,7 +110,7 @@ public IndexerSQLMetadataStorageCoordinator(
this.connector = connector;
}

enum DataSourceMetadataUpdateResult
enum DataStoreMetadataUpdateResult
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasoning behind the rename?

Copy link
Contributor Author

@maytasm maytasm Mar 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, in IndexerSQLMetadataStorageCoordinator we only update the datasource table, hence the result enum is named DataSourceMetadataUpdateResult. However, now the IndexerSQLMetadataStorageCoordinator have methods to update both the datasource table and the segment table in metadata store. Instead of having two enum which are pretty much the same, I just rename this enum so that it can be use for handling the result of updating any/both tables in metadata store...no matter which tables.

@@ -193,6 +195,7 @@ that range if there's some stray data with unexpected timestamps.
|type|The task type, this should always be `index_parallel`.|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no|
|dropExisting|If set to true (and `appendToExisting` set to false and `interval` specified in `granularitySpec`), then the ingestion task would drop (mark unused) all existing segments that are fully contain by the `interval` in the `granularitySpec` when the task publishes new segments (no segments would be drop (mark unused) if the ingestion fails). Note that if either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec` then no segments would be drop even if `dropExisting` is set to `true`.|false|no|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest noting somewhere that compaction tasks will have this enabled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -63,13 +62,16 @@
private final DataSourceMetadata endMetadata;
@Nullable
private final String dataSource;
@Nullable
private final Set<DataSegment> segmentsToBeDropped;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some javadocs to this class that explain the difference between segmentsToBeOverwritten and segmentsToBeDropped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -52,7 +52,7 @@ In cases where you require more control over compaction, you can manually submit
See [Setting up a manual compaction task](#setting-up-manual-compaction) for more about manual compaction tasks.

## Data handling with compaction
During compaction, Druid overwrites the original set of segments with the compacted set. During compaction Druid locks the segments for the time interval being compacted to ensure data consistency. By default, compaction tasks do not modify the underlying data. You can configure the compaction task to change the query granularity or add or remove dimensions in the compaction task. This means that the only changes to query results should be the result of intentional, not automatic, changes.
During compaction, Druid overwrites the original set of segments with the compacted set. During compaction Druid locks the segments for the time interval being compacted to ensure data consistency. By default, compaction tasks do not modify the underlying data. You can configure the compaction task to change the query granularity or add or remove dimensions in the compaction task. This means that the only changes to query results should be the result of intentional, not automatic, changes. Note that compaction task automatically set `dropExisting` flag of the underlying ingestion task to true. This means that compaction task would drop (mark unused) all existing segments that are fully contain by the `interval` in the compaction task. This is to handle when compaction task changes segmentGranularity of the existing data to a finer segmentGranularity and the set of new segments (with the new segmentGranularity) does not fully cover the original croaser granularity time interval (as there may not be data in every time chunk of the new finer segmentGranularity).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
During compaction, Druid overwrites the original set of segments with the compacted set. During compaction Druid locks the segments for the time interval being compacted to ensure data consistency. By default, compaction tasks do not modify the underlying data. You can configure the compaction task to change the query granularity or add or remove dimensions in the compaction task. This means that the only changes to query results should be the result of intentional, not automatic, changes. Note that compaction task automatically set `dropExisting` flag of the underlying ingestion task to true. This means that compaction task would drop (mark unused) all existing segments that are fully contain by the `interval` in the compaction task. This is to handle when compaction task changes segmentGranularity of the existing data to a finer segmentGranularity and the set of new segments (with the new segmentGranularity) does not fully cover the original croaser granularity time interval (as there may not be data in every time chunk of the new finer segmentGranularity).
During compaction, Druid overwrites the original set of segments with the compacted set. Druid also locks the segments for the time interval being compacted to ensure data consistency. By default, compaction tasks do not modify the underlying data. You can configure the compaction task to change the query granularity or add or remove dimensions in the compaction task. This means that the only changes to query results should be the result of intentional, not automatic, changes.
For compaction tasks, `dropExisting` for underlying ingestion tasks is "true". This means that Druid can drop or mark unused all the un-compacted segments fully within interval for the compaction task. For an example of why this is important, see the suggestion for reindexing with finer granularity under [Implementation considerations](native-batch.md#implementation-considerations).

I think it is better not to clutter this section with an example, especially if you can't change the value. The customer doesn't need to figure out how to set it another way. If they want to understand, they can read the example in native-batch.md. I had to add the header in because the recommendations don't relate to the compression header.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -89,6 +89,26 @@ You may want to consider the below things:
data in segments where it actively adds data: if there are segments in your `granularitySpec`'s intervals that have
no data written by this task, they will be left alone. If any existing segments partially overlap with the
`granularitySpec`'s intervals, the portion of those segments outside the new segments' intervals will still be visible.
- You can set `dropExisting` flag in the `ioConfig` to true if you want the ingestion task to drop all existing segments that
start and end within your `granularitySpec`'s intervals, regardless of if new data are in existing segments or not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
start and end within your `granularitySpec`'s intervals, regardless of if new data are in existing segments or not
start and end within your `granularitySpec`'s intervals. This applies whether or not the new data covers all existing segments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Done

@@ -89,6 +89,26 @@ You may want to consider the below things:
data in segments where it actively adds data: if there are segments in your `granularitySpec`'s intervals that have
no data written by this task, they will be left alone. If any existing segments partially overlap with the
`granularitySpec`'s intervals, the portion of those segments outside the new segments' intervals will still be visible.
- You can set `dropExisting` flag in the `ioConfig` to true if you want the ingestion task to drop all existing segments that
start and end within your `granularitySpec`'s intervals, regardless of if new data are in existing segments or not
(this is only applicable if `appendToExisting` is set to false and `interval` specified in `granularitySpec`).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(this is only applicable if `appendToExisting` is set to false and `interval` specified in `granularitySpec`).
`dropExisting` only applies when `appendToExisting` is false and the `granularitySpec` contains an `interval`.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Done

start and end within your `granularitySpec`'s intervals, regardless of if new data are in existing segments or not
(this is only applicable if `appendToExisting` is set to false and `interval` specified in `granularitySpec`).

Here are some examples on when to set `dropExisting` flag in the `ioConfig` to true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Here are some examples on when to set `dropExisting` flag in the `ioConfig` to true
The following examples demonstrate when to set the `dropExisting` property to true in the `ioConfig`:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Done


Here are some examples on when to set `dropExisting` flag in the `ioConfig` to true

- Example 1: Existing segment has a interval of 2020-01-01 to 2021-01-01 (YEAR segmentGranularity) and we are trying to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Example 1: Existing segment has a interval of 2020-01-01 to 2021-01-01 (YEAR segmentGranularity) and we are trying to
- Example 1: Consider an existing segment with an interval of 2020-01-01 to 2021-01-01 and YEAR segmentGranularity. You want to

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Done

in the datasource. Now the user is trying to re-ingest with new data that overwrites all the existing data.
The new data has the following data for each month: `Jan has 0 record, Feb has 10 records, Mar has 9 records`.
Without setting `dropExisting` to true, the result after ingestion with overwrite (using the same MONTH segmentGranularity) would be:
`Jan has 1 record, Feb has 10 records, Mar has 9 records`. However, this is incorrect as the new data has 0 record for Jan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
`Jan has 1 record, Feb has 10 records, Mar has 9 records`. However, this is incorrect as the new data has 0 record for Jan
This is incorrect since the new data has 0 records for January. Setting `dropExisting` to true to drop the original
segment for Janurary that is not needed since the newly ingested data has no records for January.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Done

The new data has the following data for each month: `Jan has 0 record, Feb has 10 records, Mar has 9 records`.
Without setting `dropExisting` to true, the result after ingestion with overwrite (using the same MONTH segmentGranularity) would be:
`Jan has 1 record, Feb has 10 records, Mar has 9 records`. However, this is incorrect as the new data has 0 record for Jan
and the user would expect to see that Jan has 0 record. By setting `dropExisting` flag to true, we can drop the original
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
and the user would expect to see that Jan has 0 record. By setting `dropExisting` flag to true, we can drop the original

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Done

Without setting `dropExisting` to true, the result after ingestion with overwrite (using the same MONTH segmentGranularity) would be:
`Jan has 1 record, Feb has 10 records, Mar has 9 records`. However, this is incorrect as the new data has 0 record for Jan
and the user would expect to see that Jan has 0 record. By setting `dropExisting` flag to true, we can drop the original
segment of Janurary which is no longer needed (as new ingested data does not have any data in Janurary).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
segment of Janurary which is no longer needed (as new ingested data does not have any data in Janurary).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Done

@@ -193,6 +213,7 @@ that range if there's some stray data with unexpected timestamps.
|type|The task type, this should always be `index_parallel`.|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no|
|dropExisting|If set to true (and `appendToExisting` is set to false and `interval` is specified in `granularitySpec`), then the ingestion task would drop (mark unused) all existing segments that are fully contained by the `interval` in the `granularitySpec` when the task publishes new segments (no segments would be dropped (marked unused) if the ingestion fails). Note that if either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec` then no segments would be dropped even if `dropExisting` is set to `true`.|false|no|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
|dropExisting|If set to true (and `appendToExisting` is set to false and `interval` is specified in `granularitySpec`), then the ingestion task would drop (mark unused) all existing segments that are fully contained by the `interval` in the `granularitySpec` when the task publishes new segments (no segments would be dropped (marked unused) if the ingestion fails). Note that if either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec` then no segments would be dropped even if `dropExisting` is set to `true`.|false|no|
|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`.|false|no|

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Done

@@ -719,6 +741,7 @@ that range if there's some stray data with unexpected timestamps.
|type|The task type, this should always be "index".|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no|
|dropExisting|If set to true (and `appendToExisting` is set to false and `interval` is specified in `granularitySpec`), then the ingestion task would drop (mark unused) all existing segments that are fully contained by the `interval` in the `granularitySpec` when the task publishes new segments (no segments would be dropped (marked unused) if the ingestion fails). Note that if either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec` then no segments would be dropped even if `dropExisting` is set to `true`.|false|no|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as line 216

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Done

Copy link
Contributor

@jon-wei jon-wei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM after incorporating @techdocsmith 's doc suggestions

@maytasm
Copy link
Contributor Author

maytasm commented Apr 1, 2021

Thanks for the review @jon-wei and @techdocsmith. I have incorporated @techdocsmith doc suggestions.

@maytasm maytasm changed the title Add an option for ingestion task to drop (mark unused) segments that are of the interval in the ingestionSpec Add an option for ingestion task to drop (mark unused) all existing segments that are contained by interval in the ingestionSpec Apr 1, 2021
@jon-wei jon-wei merged commit d7f5293 into apache:master Apr 1, 2021
@maytasm maytasm deleted the IMPLY-6432 branch April 1, 2021 19:34
@gianm
Copy link
Contributor

gianm commented Apr 1, 2021

Hmm, it's important that the old segments not be marked unused until after the new segments are loaded. The reason is that when the old segments are marked unused, they'll be dropped from historicals on the next coordinator run. The new segments might not be loaded by then. It'll lead to a period of data unavailability. Does this PR handle that case?

@maytasm
Copy link
Contributor Author

maytasm commented Apr 1, 2021

Hmm, it's important that the old segments not be marked unused until after the new segments are loaded. The reason is that when the old segments are marked unused, they'll be dropped from historicals on the next coordinator run. The new segments might not be loaded by then. It'll lead to a period of data unavailability. Does this PR handle that case?

In a Coordinator run, the RunRules happen before the UnloadUnusedSegments. Hence, we should be loading the new segments before dropping the existing segments. Hmm...or is the worry if the loading fails?

@gianm
Copy link
Contributor

gianm commented Apr 1, 2021

In a Coordinator run, the RunRules happen before the UnloadUnusedSegments. Hence, we should be loading the new segments before dropping the existing segments.

I'm not sure that it's guaranteed that this will happen. Some scenarios that come to mind:

  • It's possible the historical load queues are maxed out (they are at maxSegmentsInNodeLoadingQueue) and so the coordinator may not issue load commands for all of the new segments.
  • I think segment loading is asynchronous, so even if all load commands are issued, they might not have finished executing by the time UnloadUnusedSegments runs.

@maytasm
Copy link
Contributor Author

maytasm commented Apr 1, 2021

In a Coordinator run, the RunRules happen before the UnloadUnusedSegments. Hence, we should be loading the new segments before dropping the existing segments.

I'm not sure that it's guaranteed that this will happen. Some scenarios that come to mind:

  • It's possible the historical load queues are maxed out (they are at maxSegmentsInNodeLoadingQueue) and so the coordinator may not issue load commands for all of the new segments.
  • I think segment loading is asynchronous, so even if all load commands are issued, they might not have finished executing by the time UnloadUnusedSegments runs.

You're right. Seems like there is no easy way of marking old segments as used=false before the new segments are loaded. Changing logic in loading/dropping will be too complicated. Maybe we should make it generate empty segments with new granularity for all the missing gaps. And let the loading/dropping handle it as normal.

@gianm
Copy link
Contributor

gianm commented Apr 1, 2021

I just wish there was a way to do it without really generating physical empty segments. Maybe empty segments that are somehow metadata-only?

@maytasm
Copy link
Contributor Author

maytasm commented Apr 1, 2021

I just wish there was a way to do it without really generating physical empty segments. Maybe empty segments that are somehow metadata-only?

Hmm..if as part of the ingestion task we insert metadata-only (fake) empty segment and trick the coordinator in thinking that it is already loaded (so that we don't try to load it). Then the coordinator will only drop the old segments when the other new (real) segments are loaded. I guess we also have to make the query path skip these fake segments. Do you think this is about the right track? or is it over-complicating the solution?

@clintropolis
Copy link
Member

Perhaps the mechanism in #10676 could be adapted to be used to just block until segments are loaded before marking them unused? (I haven't looked closely at either of these PRs myself)

@maytasm
Copy link
Contributor Author

maytasm commented Apr 1, 2021

Perhaps the mechanism in #10676 could be adapted to be used to just block until segments are loaded before marking them unused? (I haven't looked closely at either of these PRs myself)

Currently, the functionality of marking existing segment as unused is done transactionally with publishing segments. This means that the job fails if either marking unused or publishing fail (we don't mark unused if publish fails and don't publish if marking unused fails). I think if we go with #10676 we have to make marking segment unused as a best-effort. We would have to publish the new segments first, then wait for the loading of new segment (by mechanism in #10676), then only if that succeed, we can mark the existing segments as unused. However, the mechanism in #10676 can timeout. This means that we cannot mark existing segments as unused since it is not confirmed if new segments is loaded yet or not. In those cases, task would still succeed and new segments would be punished but marking existing segments unused would fails.

@jihoonson
Copy link
Contributor

jihoonson commented Apr 1, 2021

Probably #10676 doesn't help much for this problem because this PR will mark old segments as unused as a part of publishing segments.

Hmm..if as part of the ingestion task we insert metadata-only (fake) empty segment and trick the coordinator in thinking that it is already loaded (so that we don't try to load it). Then the coordinator will only drop the old segments when the other new (real) segments are loaded. I guess we also have to make the query path skip these fake segments. Do you think this is about the right track? or is it over-complicating the solution?

Publishing empty segments sounds like an easy fix. I think it will be less complicated than you described. We don't have to trick the coordinator, but should make it to skip loading empty segments. The coordinator will mark the segments as unused if they are overshadowed by empty ones no matter whether empty ones are loaded in historicals. The broker will not be aware of empty segments since historicals won't load nor announce them.

@jihoonson
Copy link
Contributor

A problem I can think of is that deleting old segments won't happen at once. Instead, the atomicity of deletion will be guaranteed inside each time chunk.

@maytasm
Copy link
Contributor Author

maytasm commented Apr 1, 2021

A problem I can think of is that deleting old segments won't happen at once. Instead, the atomicity of deletion will be guaranteed inside each time chunk.

Why is this a problem? Old segments being available should not be a problem as long as it is eventually drop. One problem might be auto compaction as it would try to compact the same interval again.

@jihoonson
Copy link
Contributor

jihoonson commented Apr 1, 2021

I meant, that might not be what users expect. I think they will want to know when they can query the new data without seeing old one. Maybe this can be addressed using #10676.

One problem might be auto compaction as it would try to compact the same interval again.

Can you elaborate more on this problem? I'm not sure why it would compact the same interval again.

@maytasm
Copy link
Contributor Author

maytasm commented Apr 2, 2021

I meant, that might not be what users expect. I think they will want to know when they can query the new data without seeing old one. Maybe this can be addressed using #10676.

One problem might be auto compaction as it would try to compact the same interval again.

Can you elaborate more on this problem? I'm not sure why it would compact the same interval again.

Ah I see. For compaction task, the lag in dropping old segments would not be a problem for querying. For new data without seeing old data, I agree with you that it can be addressed using mechanism in #10676.

I was mistaken. We will not have any problem in auto compaction with the fake empty segment. As the fake empty segment + the new real segments would fully overshadow the old segment. Marking old segments as unused is enough for auto compaction to not run on the same interval again even if old segments are drop later

@clintropolis clintropolis added this to the 0.22.0 milestone Aug 12, 2021
jon-wei added a commit to jon-wei/druid that referenced this pull request Nov 22, 2021
* IMPLY-6556 remove offending settings.xml for intellij inspections

* GCS lookup support (apache#11026)

* GCS lookup support

* checkstyle fix

* review comments

* review comments

* remove unused import

* remove experimental from Kinesis with caveats (apache#10998)

* remove experimental from Kinesis with caveats

* add suggested known issue

* spelling fixes

* Bump aliyun SDK to 3.11.3 (apache#11044)

* Update reset-cluster.md (apache#10990)

fixed Error: Could not find or load main class org.apache.druid.cli.Main

* Make imply-view-manager non-experimental (apache#316)

* Make druid.indexer.task.ignoreTimestampSpecForDruidInputSource default to true, for backwards compat (apache#315)

* Add explicit EOF and use assert instead of exception (apache#11041)

* Add Calcite Avatica protobuf handler (apache#10543)

* bump to latest of same version node and npm versions, bump frontend-maven-plugin (apache#11057)

* request logs through kafka emitter (apache#11036)

* request logs through kafka emitter

* travis fixes

* review comments

* kafka emitter unit test

* new line

* travis checks

* checkstyle fix

* count request lost when request topic is null

* IMPLY-6556 map local repository instead .m2

* remove outdated info from faq (apache#11053)

* remove outdated info from faq

* Add an option for ingestion task to drop (mark unused) all existing segments that are contained by interval in the ingestionSpec (apache#11025)

* Auto-Compaction can run indefinitely when segmentGranularity is changed from coarser to finer.

* Add option to drop segments after ingestion

* fix checkstyle

* add tests

* add tests

* add tests

* fix test

* add tests

* fix checkstyle

* fix checkstyle

* add docs

* fix docs

* address comments

* address comments

* fix spelling

* Allow list for JDBC connection properties to address CVE-2021-26919 (apache#11047)

* Allow list for JDBC connection properties to address CVE-2021-26919

* fix tests for java 11

* Fix compile issue from dropExisting in ingest-service (apache#320)

Co-authored-by: Slava Mogilevsky <triggerwoods91@gmail.com>
Co-authored-by: Parag Jain <pjain1@apache.org>
Co-authored-by: Charles Smith <38529548+techdocsmith@users.noreply.github.com>
Co-authored-by: github-actions <github-actions@github.com>
Co-authored-by: frank chen <frank.chen021@outlook.com>
Co-authored-by: Tushar Raj <43772524+tushar-1728@users.noreply.github.com>
Co-authored-by: Jonathan Wei <jon-wei@users.noreply.github.com>
Co-authored-by: Jihoon Son <jihoonson@apache.org>
Co-authored-by: Lasse Krogh Mammen <lkm@bookboon.com>
Co-authored-by: Clint Wylie <cwylie@apache.org>
Co-authored-by: Maytas Monsereenusorn <maytasm@apache.org>
@loquisgon loquisgon mentioned this pull request Jan 19, 2022
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants