-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Plumb task peon host/ports back out to the overlord. #2419
Conversation
|
||
import io.druid.indexing.common.TaskLocation; | ||
|
||
public interface TaskRunnerListener |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can u add some comments about what this does and how to extend it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not have the listener take an executor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It works better when the runner takes it
999af33
to
8d882ae
Compare
I do want to review this but it will take a bit to chew through. |
|
||
import java.util.Objects; | ||
|
||
public class TaskLocation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this just use Worker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not really semantically a worker- one worker is going to have many tasks at many taskLocations (all at different ports from the parent worker).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then can they be DruidNode
s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess chatPort makes that a no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to simply propagate DruidNode and have chatPort discoverable from the node data exposed at DruidNode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about just using HostAndPort? (I'm trying to minimize the number of items that are added to the code)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fwiw, in #2242 I am making DruidServerMetadata the source of truth for Druid server's metadata. I think it's reasonable to make DruidServerMetadata to contain a minimum set of metadata(e.g., host, port, name, etc), and then let Worker, TaskLocation, DruidServer extend it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hostAndPort doesn't have jackson annotations. We could register a jackson module for it, I suppose (or maybe it's already part of the GuavaModule?). I am also ok with replacing this with the stuff from #2242 when that pr is ready.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like HostAndPort serde is included in the GuavaModule, although it uses the ToStringSerializer, which will be kind of a pain for people to deserialize that aren't linking guava in their app. so I am leaning towards keeping TaskLocation as a thing and potentially changing it after #2242.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
FYI, this also uses similar functionality to the remote task runner replacement in #2246 |
public void handle() | ||
{ | ||
if (running.containsKey(task.getId())) { | ||
log.warn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if a task finishes in this block of code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, should be fine if unannounceTask is idempotent
👍 even if my comments not addressed |
8d882ae
to
ad38fad
Compare
|
||
try { | ||
log.info("Updating task [%s] announcement to with location [%s]", taskId, location); | ||
workerCuratorCoordinator.updateAnnouncement( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you could skip this announcement if details.location was already same as location.
@gianm i see that this PR allows you to know host and port of peons from runningTask HTTP endpoint at overlord. so, in tranquility, you would use same to find the tasks . however realtime indexing task open a separate chat handler port which you do not obtain from the "TaskLocation" . are you planning to continue using service discovery for finding the chat handler port or should the TaskLocation be updated to have some kind of metadata object inside it too so that it can have additional information like chat handler port? |
@himanshug that's a good point, I was thinking that the chat handler was still on the same port as the main servlet, but that's not true anymore since the separateIngestionEndpoint was added. ideally this should be part of the location as well. will adjust that. |
ad38fad
to
007891d
Compare
@himanshug reopened with chatPort included. |
b4aa259
to
ab9ee21
Compare
ab9ee21
to
06d4b5c
Compare
0c6298e
to
0a5956b
Compare
@guobingkun a task should only have one location at a time, if it is running in two places then the RTR should kill the one it doesn't like |
0a5956b
to
7086e06
Compare
@Override | ||
public void run() | ||
{ | ||
listener.lhs.locationChanged(taskId, location); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no guarantee of execution order or completion here (nor error reporting on error?)
For example, if an over-burdened executor is used that does not have a FIFO queue, location changes can be processed in no particular order compared to the call to notifyLocationChanged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added comments to registerListener on TaskRunner clarifying the intended usage.
2acbfef
to
e18f9ad
Compare
- Add TaskLocation class - Add registerListener to TaskRunner - Add getLocation to TaskRunnerWorkItem - Implement location tracking in existing TaskRunners - Rework WorkerTaskMonitor to do management out of a single thread so it can handle status and location updates more simply.
e18f9ad
to
cf0bc90
Compare
Cool, 👍 but suggest removing #2419 (diff) |
I think this failed due to #2430 (https://travis-ci.org/druid-io/druid/builds/111621322)
|
Plumb task peon host/ports back out to the overlord.
The intent is to get rid of the need for Curator service discovery to find tasks.
Motivation
Curator based service discovery is annoying because it needs ZK, and also because it doesn't clean up after itself when a service goes away, requiring hacks like this in tranquility: https://github.com/druid-io/tranquility/blob/v0.7.2/core/src/main/scala/com/metamx/tranquility/druid/DruidBeamMaker.scala#L229. This should also make life easier for the ingestion supervisors needed by #2220, as they will likely run on the overlord and will benefit from the overlord knowing where tasks are.
Intended usage
Tranquility would use this by implementing a resolver (similar to the DiscoResolver for Curator discovery) that polls the overlord's runningTasks endpoint instead of watching ZK.
Overlord-based ingestion supervisors (like the kafka one implied by #2220) would probably register a listener directly with the TaskRunner.
Implementation