-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Delete extra refreshed segments after segment push #4464
Conversation
jenniferdai
commented
Jul 23, 2019
•
edited
Loading
edited
- Adds a config to make possible deleting extra segments that are pushed within the same configured time segment or refreshed
- Future work: deal with invalid prefixes (when people accidentally change their prefix, when they keep a huge amount of time in the same segment for append segments, etc)
- Hadoop Pre-processing Job #4353
* @return | ||
*/ | ||
private String removeSequenceId(String segmentName) { | ||
return segmentName.replaceAll("\\d*$", ""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add the comment on what would this regex is doing? (example will be sufficient).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the javadoc above i have the functionality but i will add an additional example
@@ -48,10 +54,45 @@ public void run() | |||
throws Exception { | |||
FileSystem fileSystem = FileSystem.get(_conf); | |||
try (ControllerRestApi controllerRestApi = getControllerRestApi()) { | |||
controllerRestApi.pushSegments(fileSystem, getDataFilePaths(_segmentPattern)); | |||
// TODO: Deal with invalid prefixes in the future |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if segment name based solution is the correct approach for deleting segments pushed together. The logic will break depending on the logic for segment name builder. Also, it will break in the case where people are pushing to the table from multiple push jobs.
One approach that I can think of is to mark some id (e.g. batchId) to segments from offline push job and delete segments with the same batchId.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not plannign on deleting segments pushed together. I am deleting extra segments. For example,
If table_day1_0
table_day1_1
table_day1_2 are pushed today.
Table_day1_0
table_day1_1 are pushed tomorrow,
I will delete table_day1_2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a little confused here -- the PR description says that you are deleting segments that are pushed or refreshed within a time range. What exactly is an extra segment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed and resolved this
@@ -57,4 +57,6 @@ | |||
public static final String ENABLE_PARTITIONING = "enable.partitioning"; | |||
public static final String ENABLE_SORTING = "enable.sorting"; | |||
public static final String ENABLE_RESIZING = "enable.resizing"; | |||
|
|||
public static final String DELETE_EXTRA_REFRESHED_SEGMENTS = "delete.extra.refreshed.segments"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename this if you have a better name?
dfbb694
to
f9ce70a
Compare
List<String> allSegments = new ArrayList<>(); | ||
allSegments.add("mytable_0"); | ||
allSegments.add("mytable_1"); | ||
allSegments.add("mytable_2"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do we decide which one is an extra segment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed offline
} | ||
|
||
relevantSegments.removeAll(segmentsToPushNames); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be a O(N^2) operation and potentially more heap heavy because of constant over-writing.
I think we should use a hashset (or any alternative) to make this constant time operation -- more efficient at the cost of extra space.
SegmentTarPushJob segmentTarPushJob = new SegmentTarPushJob(_defaultProperties); | ||
List<String> segmentsToDelete = segmentTarPushJob.getSegmentsToDelete(allSegments, currentSegments); | ||
Assert.assertEquals(segmentsToDelete.size(), 2); | ||
Assert.assertEquals(segmentsToDelete.contains("mytable_0"), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using contains() on an arraylist is generally not recommended and not intuitive since it is essentially a linear scan for an operation that should be amortized constant. May be we should change the return type of getSegmentsToDelete() to Set or build a set out in here in the test code before asserting
uniqueSegmentPrefixes.add(segmentNamePrefix); | ||
} | ||
|
||
List<String> relevantSegments = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great to complement this code with an example (in the javadoc or near the code) to walk through what list of segments we start with, how we filter them and got to a list of segments that we return (the ones to delete)
@@ -57,4 +57,6 @@ | |||
public static final String ENABLE_PARTITIONING = "enable.partitioning"; | |||
public static final String ENABLE_SORTING = "enable.sorting"; | |||
public static final String ENABLE_RESIZING = "enable.resizing"; | |||
|
|||
public static final String IS_DELETE_EXTRA_SEGMENTS = "is.delete.extra.segments"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be good to add some descriptions to this setting on how/when to use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete.extra.segments
|
||
@Test | ||
public void checkDelete() { | ||
List<String> allSegments = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename the variable to sth like allSegmentsInCluster
or sth like that?
SegmentTarPushJob segmentTarPushJob = new SegmentTarPushJob(_defaultProperties); | ||
List<String> segmentsToDelete = segmentTarPushJob.getSegmentsToDelete(allSegments, currentSegments); | ||
Assert.assertEquals(segmentsToDelete.size(), 2); | ||
Assert.assertEquals(segmentsToDelete.contains("mytable_2018-09-10_2018-09-10_0"), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use assertFalse()
instead of assertEquals()
SegmentTarPushJob segmentTarPushJob = new SegmentTarPushJob(_defaultProperties); | ||
List<String> segmentsToDelete = segmentTarPushJob.getSegmentsToDelete(allSegments, currentSegments); | ||
Assert.assertEquals(segmentsToDelete.size(), 2); | ||
Assert.assertEquals(segmentsToDelete.contains("mytable_0"), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
SegmentTarPushJob segmentTarPushJob = new SegmentTarPushJob(_defaultProperties); | ||
List<String> segmentsToDelete = segmentTarPushJob.getSegmentsToDelete(allSegments, currentSegments); | ||
Assert.assertEquals(segmentsToDelete.size(), 2); | ||
Assert.assertEquals(segmentsToDelete.contains("mytable_02"), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
} | ||
|
||
@AfterClass | ||
private void shutdown() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to keep this shutdown
method?
Set<String> uniqueSegmentPrefixes = new HashSet<>(); | ||
|
||
// Get all relevant segment prefixes that we are planning on pushing | ||
List<Path> segmentsToPushPaths = getDataFilePaths(_segmentPattern); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
segmentPathsToPush
|
||
// Get all relevant segment prefixes that we are planning on pushing | ||
List<Path> segmentsToPushPaths = getDataFilePaths(_segmentPattern); | ||
List<String> segmentsToPushNames = segmentsToPushPaths.stream().map(s -> s.getName()).collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
segmentNamesToPush
controllerRestApi.pushSegments(fileSystem, getDataFilePaths(_segmentPattern)); | ||
// TODO: Deal with invalid prefixes in the future | ||
if (_deleteExtraSegments) { | ||
List<String> allSegments = controllerRestApi.getAllSegments("OFFLINE"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currentSegments
74388dd
to
d87813c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we need to add some validation code to check if this new config can safely be enabled. e.g. it won't make sense if _excludeSequenceId = true
and _deleteExtraSegments=true
.
@@ -39,6 +45,7 @@ public SegmentTarPushJob(Properties properties) { | |||
int port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT)); | |||
_pushLocations = PushLocation.getPushLocations(hosts, port); | |||
_rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME)); | |||
_deleteExtraSegments = Boolean.parseBoolean(properties.getProperty(JobConfigConstants.IS_DELETE_EXTRA_SEGMENTS, "false")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DELETE_EXTRA_SEGMENTS
seems to be good enough :)
@@ -57,4 +57,6 @@ | |||
public static final String ENABLE_PARTITIONING = "enable.partitioning"; | |||
public static final String ENABLE_SORTING = "enable.sorting"; | |||
public static final String ENABLE_RESIZING = "enable.resizing"; | |||
|
|||
public static final String IS_DELETE_EXTRA_SEGMENTS = "is.delete.extra.segments"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete.extra.segments
/** | ||
* Tests logic to delete extra segments within the same time unit for APPEND or extra segments during REFRESH cases. | ||
*/ | ||
public class DeleteExtraPushedSegmentsTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add the case where there's no sequence id? I think it may result in the wrong behavior.
In NormalizedDateSegmentNameGenerator
, there is an explicit config excludeSequenceId
. So, segment name with no sequence id is a valid case and we cover that case correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a test to check, the code handles it correctly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sure that all the tests pass before merging this.
Codecov Report
@@ Coverage Diff @@
## master #4464 +/- ##
============================================
- Coverage 65.65% 65.61% -0.04%
Complexity 20 20
============================================
Files 1065 1065
Lines 55250 55250
Branches 8025 8025
============================================
- Hits 36276 36254 -22
- Misses 16359 16387 +28
+ Partials 2615 2609 -6
Continue to review full report at Codecov.
|
Your comment seems to have disappeared. What would you like me to remove? As the preconditions was added in your commit for multiple push and is tied with your feature, I think you probably have a better idea regarding all the edge cases you require this preconditions check, etc. It may be better for you to call out cases where that preconditions will be required as you merge subsequent prs for your feature. |
* Adds a config to make possible deleting extra segments that are pushed within the same configured time segment or refreshed