Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement builtin-purge task #8589

Merged
merged 14 commits into from
May 31, 2022
Merged

Conversation

Airliquide76
Copy link
Contributor

Instructions:

Feature to facilitate the purge process.

Tested mannually with my own use case not shared here. As I've developped the logic of wich record to purge directly in the BaseMinionStarter.

The RecordPurgerFactory implementation should be the only thing to do for a user to purge records.


private static final PurgeTaskGenerator INSTANCE = new PurgeTaskGenerator();

protected ClusterInfoAccessor _clusterInfoAccessor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change it to private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok


@TaskGenerator
public class PurgeTaskGenerator implements PinotTaskGenerator {
protected static final Logger LOGGER = LoggerFactory.getLogger(PurgeTaskGenerator.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. Change it to private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok


@Override
public void init(ClusterInfoAccessor clusterInfoAccessor) {
LOGGER.info("I'm in init for the purge");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the message to sth like Initializing purge task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woops -> fixed in comming commit

Preconditions.checkNotNull(tableTaskConfig);
Map<String, String> taskConfigs =
tableTaskConfig.getConfigsForTaskType(MinionConstants.PurgeTask.TASK_TYPE);
Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for Table: {}", tableName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one table fails on this precondition, the whole purge task generation for other tables will be stopped. Maybe we should continue with other tables?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I've surrounded with try catch to continue loop if Preconditions.checkNotNull for the task throw NullPointerException

taskType, TableType.REALTIME);
continue;
}
// Get max number of tasks for this table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please align with the indentation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@Airliquide76
Copy link
Contributor Author

There is a git issue somewhere on my side I've added only a single commit and other I don't even know pops into this PR 👎 I will check this

@Airliquide76 Airliquide76 reopened this Apr 26, 2022
@Airliquide76
Copy link
Contributor Author

Ok I've fix all the reviews on this. Many thanks for you patience @jackjlli 👍

Copy link
Contributor

@jackjlli jackjlli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add one TODO in the class that the recently purged segment should be scheduled at last so that it won't be scheduled again and again, whereas other segments don't have the chance to be scheduled.


@Override
public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
// TODO: Pick lastly purged segment first to avoid looping again and again on the same segment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purged segments should have a purge timestamp in the segment ZK metadata custom fields. We should rely on that to order the purged segments, and add a threshold and not purge the recently purged segments again and again

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes for sure.
I've added here a (set/get)PurgeTime in SegmentZKMetadata but as I've implemented timestamp will be set event if task is failed 👎 Not a super fan of this thing. Not pushed yet because I can't figure out how tu push the segment metatda from the TaskGenerator. Any example ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not add new field into the SegmentZKMetadata. The purge time is already available in the custom field of the SegmentZKMetadata under the key PurgeTask.time. You may take a look at the PurgeTaskExecutor class for more context

Copy link
Contributor Author

@Airliquide76 Airliquide76 May 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice that solve most of my issues :) I've pushed a first version null safe using comparator to sort the array of segments. I will look arround to other taskGenerators to see how implement the threesold

@Airliquide76
Copy link
Contributor Author

Hi made a few change based on you review (many thanks for the improvement made to my original naive implementation). Is that possible to continue the review with the recent change made @Jackie-Jiang or @jackjlli ? Thank you.

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add an integration test to ensure the purge task works end-to-end? You may refer to SimpleMinionClusterIntegrationTest and MergeRollupMinionClusterIntegrationTest

@@ -67,6 +67,7 @@ public static class ConvertToRawIndexTask {
// Purges rows inside segment that match chosen criteria
public static class PurgeTask {
public static final String TASK_TYPE = "PurgeTask";
public static final String DELTA_TIME_PERIOD_KEY = "deltaPurgeTimePeriod";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jackjlli Can you please check the key used in LinkedIn? We want to make them the same so that you can eventually move to the open source implementation without causing backward incompatible issue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Integration test added. Was hard to manipulate segmentMetadata in the setUp. I also was forced to implement a purge (does not purge return false every row and a BIG TODO in the baseMinionStarter ) to run the tests.

Open to review again @Jackie-Jiang many thx for you time

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked with @jackjlli, we may change it as followings

Suggested change
public static final String DELTA_TIME_PERIOD_KEY = "deltaPurgeTimePeriod";
public static final String LAST_PURGE_TIME_THRESHOLD_PERIOD = "lastPurgeTimeThresholdPeriod";
public static final String DEFAULT_LAST_PURGE_TIME_THRESHOLD_PERIOD = "14d";

@codecov-commenter
Copy link

codecov-commenter commented May 21, 2022

Codecov Report

Merging #8589 (5bf84c8) into master (4d36f3d) will increase coverage by 40.23%.
The diff coverage is 76.81%.

@@              Coverage Diff              @@
##             master    #8589       +/-   ##
=============================================
+ Coverage     29.41%   69.64%   +40.23%     
- Complexity        0     4617     +4617     
=============================================
  Files          1680     1736       +56     
  Lines         88398    91279     +2881     
  Branches      13432    13641      +209     
=============================================
+ Hits          25998    63574    +37576     
+ Misses        60013    23276    -36737     
- Partials       2387     4429     +2042     
Flag Coverage Δ
integration1 26.90% <4.34%> (-0.37%) ⬇️
integration2 25.62% <76.81%> (+0.06%) ⬆️
unittests1 66.15% <ø> (?)
unittests2 14.13% <4.34%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
.../org/apache/pinot/core/common/MinionConstants.java 0.00% <ø> (ø)
.../plugin/minion/tasks/purge/PurgeTaskGenerator.java 76.81% <76.81%> (ø)
...ava/org/apache/pinot/core/auth/BasicAuthUtils.java 28.81% <0.00%> (-48.46%) ⬇️
...che/pinot/controller/util/TableMetadataReader.java 63.41% <0.00%> (-36.59%) ⬇️
...ache/pinot/common/metadata/ZKMetadataProvider.java 65.92% <0.00%> (-13.28%) ⬇️
...y/aggregation/function/SumAggregationFunction.java 84.00% <0.00%> (-11.46%) ⬇️
...g/apache/pinot/sql/parsers/CalciteSqlCompiler.java 88.88% <0.00%> (-11.12%) ⬇️
.../org/apache/pinot/core/util/QueryOptionsUtils.java 65.00% <0.00%> (-7.23%) ⬇️
...nction/DistinctCountBitmapAggregationFunction.java 47.66% <0.00%> (-6.22%) ⬇️
...inot/controller/api/access/AccessControlUtils.java 73.91% <0.00%> (-6.09%) ⬇️
... and 1365 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 4d36f3d...5bf84c8. Read the comment docs.

@Airliquide76
Copy link
Contributor Author

So what is the next step ? 😄

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jackjlli @siddharthteotia Can you please take a look at the new config key DELTA_TIME_PERIOD_KEY? I'd suggest making it the same as the one used in LI so that in the future you may migrate to this one without compatible issue

} catch (Exception e) {
continue;
}
String deltaTimePeriod =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to introduce a new concept of DELTA_TIME_PERIOD_KEY to schedule the purge task. Instead the purge task should be set up on segment level. And since the number of segments is different for different tables, this new config will schedule unbalanced purge tasks per day. So I suggest removing this new table level DELTA_TIME_PERIOD_KEY.
Some variable called lastPurgeTime can be calculated for each of the segments. By default the value should be the segment creation time. If there is a field called "PurgeTask.time" in the custom map in the ZK metadata, it should override the default value.
You can leave a TODO in this generator, and I'll fill the logic once this PR is merged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implement the threesold asked here : #8589 (comment) .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jackjlli This delta is essentially the time threshold to skip purging if the segment is just purged within the threshold (e.g. 2 weeks for GDPR requirement)

@Airliquide76
Copy link
Contributor Author

Review fixed. I'm now waiting for the DELTA_TIME_PERIOD_KEY value from linkedin team to integrate this thing without breaking what have been done on their side.

@@ -67,6 +67,7 @@ public static class ConvertToRawIndexTask {
// Purges rows inside segment that match chosen criteria
public static class PurgeTask {
public static final String TASK_TYPE = "PurgeTask";
public static final String DELTA_TIME_PERIOD_KEY = "deltaPurgeTimePeriod";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked with @jackjlli, we may change it as followings

Suggested change
public static final String DELTA_TIME_PERIOD_KEY = "deltaPurgeTimePeriod";
public static final String LAST_PURGE_TIME_THRESHOLD_PERIOD = "lastPurgeTimeThresholdPeriod";
public static final String DEFAULT_LAST_PURGE_TIME_THRESHOLD_PERIOD = "14d";

.get(MinionConstants.PurgeTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX),
Comparator.nullsFirst(Comparator.naturalOrder())));
//add already purged segment at the end
notpurgedSegmentsZKMetadata.addAll(purgedSegmentsZKMetadata);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can avoid reading the same key multiple times by first processing the last purge time and put it alone with the segment metadata (Pair<SegmentZKMetadata, Long>). If the segment is never purged, put 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes but SegmentZKMetadata should then implement Serializable. Not really aware of all the impacts ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use org.apache.commons.lang3.tuple.Pair which does not require key to be Serializable

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly good, some minor comments

.get(MinionConstants.PurgeTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX),
Comparator.nullsFirst(Comparator.naturalOrder())));
//add already purged segment at the end
notpurgedSegmentsZKMetadata.addAll(purgedSegmentsZKMetadata);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use org.apache.commons.lang3.tuple.Pair which does not require key to be Serializable

@Airliquide76
Copy link
Contributor Author

Well I've fixed the minors review except the Pair thing. I don't know why the CI have failed on the integration tests as they all run well on my machine 👎

Copy link
Contributor

@jackjlli jackjlli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for adding the generator! 👍

@jackjlli jackjlli merged commit d2fa9dd into apache:master May 31, 2022
@Airliquide76 Airliquide76 deleted the PurgeImplementatation branch May 31, 2022 07:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants