-
Notifications
You must be signed in to change notification settings - Fork 339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(CDAP-12646) fix run record fixer #9652
(CDAP-12646) fix run record fixer #9652
Conversation
chtyim
commented
Sep 21, 2017
- Perform micro batches of scanning from store to avoid tx timeout
- Removed all unnecessary point lookup of run records
- Make ProgramRunId available to RunRecordMeta
- Updated AppMetaStore deserialize RunRecordMeta with the ProgramRunId in it (which comes from the MDS key during scan, no extra lookup is needed).
90c5398
to
af2f8f4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly minor comments from me.
I understand that the major change here is (apart from refactoring) that we now store the programRunId as part of the run record and therefore can avoid the point lookup for each record. Or did I miss anything?
Set<ProgramRunId> fixed = doFixRunRecords(); | ||
|
||
if (!fixed.isEmpty()) { | ||
LOG.info("Corrected {} of run records in {} status but no actual program running.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you improve this message? (The params are reversed and the grammar is odd). Perhaps:
"Correct {n} run records with status in {statuses} that have no actual running program. Such programs likely have crashed or were killed by external signal. "
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. This was copied without much thought.
startTime = Math.max(startTime, RunIds.getTime(record.getPid(), TimeUnit.SECONDS)); | ||
|
||
ProgramRunId programRunId = record.getProgramRunId(); | ||
if (!fixedPrograms.contains(programRunId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this already filtered by the createFilter(fixedProgram)
above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, it is the same Set. Moving it outside the loop.
if (!fixedPrograms.contains(programRunId)) { | ||
programStateWriter.error( | ||
programRunId, | ||
new Throwable("Marking run record for " + programRunId + " as failed since no running program found.")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a specific exception class for this? Seems odd to use Throwable...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
} | ||
|
||
// runs are marked as starting before they get into the runtimeService list. | ||
// Therefore, add some buffer so that we don't incorrectly 'fix' runs that are actually starting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment is confusing here, this is actually done at the end of the method. Maybe move it down there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't look at the message... it was copied.. I think it means don't fix record right away as the program running state may not be in the ProgramRuntimeService
yet. I'll rephase the message better.
return GSON.fromJson(Bytes.toString(serialized), typeOfT); | ||
protected <T> T deserialize(MDSKey key, byte[] serialized, Type typeOfT) { | ||
if (RunRecordMeta.class.equals(typeOfT)) { | ||
RunRecordMeta meta = GSON.fromJson(Bytes.toString(serialized), RunRecordMeta.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain this? Does the Json not have the correct program/run ID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ProgramRunId
is transient, hence is not serialized when stored into MDS (for backward compatibility and also not to duplicate what's in the row key already). Maybe we can reduce the temporary object creation by using a custom type adapter if necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine. If I had read this from bottom to top, I would have seen the changes down there first :)
} | ||
} | ||
|
||
private void validateExpectedState(ProgramRunId programRunId, ProgramRunStatus expectedStatus) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test case should be a lot faster now, right? 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes and test more conditions.
Thanks for the review @anew . Yes, the idea is to simply include the |
@@ -76,8 +76,9 @@ void fixRunRecords() { | |||
Set<ProgramRunId> fixed = doFixRunRecords(); | |||
|
|||
if (!fixed.isEmpty()) { | |||
LOG.info("Corrected {} of run records in {} status but no actual program running.", | |||
NOT_STOPPED_STATUSES, fixed.size()); | |||
LOG.info("Correct {} run records with status in {} that have no actual running program. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops this needs to be "Corrected"
programRunId, | ||
new Throwable("Marking run record for " + programRunId + " as failed since no running program found.")); | ||
programStateWriter.error(programRunId, | ||
new ProgramRunAbortedException("Marking run record for " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this use the same message as the warning?
Only two more comments about messages that slipped my radar the first time. |
15f15d0
to
6aa87f0
Compare
Build passed https://builds.cask.co/browse/CDAP-RUT1410 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
6aa87f0
to
4f1e80e
Compare
Squashed and merging. |