Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add intermediary data server for shuffle #8088

Merged
merged 14 commits into from
Jul 18, 2019

Conversation

jihoonson
Copy link
Contributor

@jihoonson jihoonson commented Jul 16, 2019

First PR for #8061.

Description

This PR adds intermediary data server functionality to middleManagers. Documents for new configurations will be added once full functionalities for native parallel batch with shuffle is implemented.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added unit tests or modified existing tests to cover new code paths.
  • been tested in a test Druid cluster.

Copy link
Contributor

@himanshug himanshug left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

took a high level look today

@@ -89,6 +94,13 @@ public TaskConfig(
this.directoryLockTimeout = directoryLockTimeout == null
? DEFAULT_DIRECTORY_LOCK_TIMEOUT
: directoryLockTimeout;
if (intermediarySegmentsLocations == null) {
this.intermediarySegmentsLocations = Collections.singletonList(
new StorageLocationConfig(new File(System.getProperty("java.io.tmpdir"), "intermediary-segments"), null, null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
new StorageLocationConfig(new File(System.getProperty("java.io.tmpdir"), "intermediary-segments"), null, null)
new StorageLocationConfig(new File(defaultDir(null, "intermediary-segments")), null, null)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

if (exception != null) {
throw exception;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is running in a scheduler , why do we throw the overall exception in the end instead of logging them as they happened ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to fix this before commit. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we log the exceptions as they occur instead of accumulating all of them inside one and then logging it in the end ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sure, fixed.

if (supervisorTaskChecker != null) {
supervisorTaskChecker.shutdownNow();
supervisorTaskChecker.awaitTermination(10, TimeUnit.SECONDS);
supervisorTaskChecker = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the use of setting it null here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I did out of habit. Why?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it piqued my interest as usually you would do this sort of thing if you wanted to make something eligible for GC if it was a big object and part of large method so as to let it be GC'd asap or if you wanted to do something along the lines of making IntermediaryDataManager restartable etc.
I was trying to understand why you did that, but sounds like there is no reason so better to remove it :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

* Write a segment into one of configured locations. The location to write is chosen in a round-robin manner per
* supervisorTaskId.
*
* This method is only useful for the new Indexer model, and must not be called when tasks are running in the existing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean parallel index task with shuffle support would not work with existing MMs?

why can't task processes have intermediary segment location propagated to them so that they could write data there and whole thing works for existing MMs as well ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shuffle system works with existing MMs. Tasks running in MMs just need to use another method instead of this one. Fixed the comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add what that other method is/would-be ? ... my understanding is that this method could be made static and peon process would write data by calling this method as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a new static method which could be used by MMs.

@Path("/task/{supervisorTaskId}")
public Response deletePartitions(
@PathParam("supervisorTaskId") String supervisorTaskId,
@QueryParam("dataSource") String dataSource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's.. used in TaskShuffleResourceFilter to check authorization. I know this is weird and, ideally, the resourceFilter should check authorization for task data instead of dataSource. However, the current security system only supports dataSource-level authorization. Added a comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok... but why does it need to be declared here ? I understand that url needs that in query param for the filter to work but we probably don't need to capture that param in here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I just removed dataSource and changed to use StateResourceFilter instead. Added the below javadoc.

We use StateResourceFilter here because it performs an admin-like authorization and all endpoints here are supposed to be used for only internal communcation. Another possible alternate could be performing datasource-level authorization as in TaskResourceFilter. However, datasource information is not available in middleManagers or indexers yet which makes hard to use it. We could develop a new ResourceFileter in the future if needed.

}

@Test
public void testAddSegment() throws IOException
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this test worth having since it is also implicitly tested by the other 2 tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, removed.

* This resource filter is used for data shuffle between native parallel index tasks. See ShuffleResource for details.
*
* It currently performs the authorization check for DATASOURCE, but ideally, it should be the authorization check
* for task data. This issue should be addressed in the future.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify in these javadocs that I think you are referring to this should be doing what TaskResourceFilter is doing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this class now in favor of using StateResourceFilter.

}
}

final Map<String, TaskStatus> taskStatuses = indexingServiceClient.getTaskStatuses(expiredSupervisorTasks);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has potential to throw exceptions and that would suppress further execution of this task... we should be stopping early only if the exception happened due to interrupt

Copy link
Contributor

@himanshug himanshug Jul 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usually it would be good to put scheduled task inside a big try-catch to avoid accidental premature exit or at least log what happened.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Added.

@jihoonson
Copy link
Contributor Author

@himanshug and @clintropolis, thank you for taking a look. I addressed your comments.

I also noticed that the state of StorageLocation was not updated properly before. I refactored StorageLocation to be a bit more decoupled with DataSegment and fixed it to update its state properly.

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall lgtm 👍

* all endpoints here are supposed to be used for only internal communcation.
* Another possible alternate could be performing datasource-level authorization as in TaskResourceFilter.
* However, datasource information is not available in middleManagers or indexers yet which makes hard to use it.
* We could develop a new ResourceFileter in the future if needed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

supervisorTaskChecker = Execs.scheduledSingleThreaded("intermediary-data-manager-%d");
// Discover partitions for new supervisorTasks
supervisorTaskChecker.scheduleAtFixedRate(
() -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This maybe seems large and complex enough to split into it's own method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted as a method.

// Note that the overlord sends a cleanup request when a supervisorTask is finished. The below check is to trigger
// the self-cleanup for when middleManager misses the cleanup request from the overlord.
supervisorTaskChecker.scheduleAtFixedRate(
() -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about splitting lambda into a method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted as a method.

throw new ISE("Can't find location to handle segment[%s]", segment);
}

public List<File> findPartitionFiles(String supervisorTaskId, Interval interval, int partitionId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you put this and the non-static method that follows up with the other non-static methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved around to put non-static methods together.

@clintropolis
Copy link
Member

CI failures look legitimate:

[ERROR] Failures: 
[ERROR] org.apache.druid.segment.loading.StorageLocationTest.testStorageLocation(org.apache.druid.segment.loading.StorageLocationTest)
[ERROR]   Run 1: StorageLocationTest.testStorageLocation:79->verifyLoc:99 expected:<990> but was:<980>
[ERROR]   Run 2: StorageLocationTest.testStorageLocation:79->verifyLoc:99 expected:<990> but was:<980>
[ERROR]   Run 3: StorageLocationTest.testStorageLocation:79->verifyLoc:99 expected:<990> but was:<980>
[ERROR]   Run 4: StorageLocationTest.testStorageLocation:79->verifyLoc:99 expected:<990> but was:<980>

@jihoonson
Copy link
Contributor Author

@clintropolis thanks, fixed the test failure.

}
catch (Exception e) {
log.warn(e, "Error while cleaning up partitions for expired supervisors");
}
Copy link
Contributor

@himanshug himanshug Jul 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to exit on "InterruptedException" or e.cause being InterruptedException

warn means no action needed, but in this case user might need to come in and do manual cleanup , so error level log is preferable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually it might be better for deleteExpiredSuprevisorTaskPartitionsIfNotRunning() to throw InterruptedException so that code here doesn't need to concern itself with e.cause check as that is internal detail of that method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, forgot to handle that. Thanks. 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just changed indexingServiceClient.getTaskStatuses to throw InterruptedException. I think it's better since it's a blocking call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, now that I see the change .. I realized it wasn't strictly necessary in this context ... but nice to have.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it's better to log as an error rather than warning.

@jihoonson
Copy link
Contributor Author

@himanshug thanks for the review!

@jihoonson jihoonson merged commit c7eb7cd into apache:master Jul 18, 2019
@clintropolis clintropolis added this to the 0.16.0 milestone Aug 8, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants