New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CDAP-2013] Refactor Program States #9158
[CDAP-2013] Refactor Program States #9158
Conversation
83af8cd
to
45eda07
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the changes only affect local mode. How about in distributed mode?
final ProgramId programId = program.getId(); | ||
final Arguments systemArgs = options.getArguments(); | ||
final Arguments userArgs = options.getUserArguments(); | ||
final String twillRunId = systemArgs.getOption(ProgramOptionConstants.TWILL_RUN_ID); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This program runner only used in local mode, there won't be any twill run id.
final ProgramId programId = program.getId(); | ||
final Arguments systemArgs = options.getArguments(); | ||
final Arguments userArgs = options.getUserArguments(); | ||
final String twillRunId = systemArgs.getOption(ProgramOptionConstants.TWILL_RUN_ID); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
@@ -124,19 +143,65 @@ public ProgramController run(Program program, ProgramOptions options) { | |||
// Add a service listener to make sure the plugin instantiator is closed when the http server is finished. | |||
component.addListener(new ServiceListenerAdapter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the listener should refactor into a class such that the logic can be shared between FlowProgramRunner
, ServiceProgramRunner
and WorkerProgramRunner
.
b5a28ee
to
ebd4da2
Compare
fc3d04c
to
9c46cdf
Compare
254f8d1
to
4808102
Compare
@@ -348,9 +391,17 @@ private void recordProgramSuspendResume(ProgramId programId, String pid, String | |||
record = get(key, RunRecordMeta.class); | |||
} | |||
|
|||
// We can also suspend a workflow that is in the starting state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we introduced a starting state, it is possible for a program to be in the starting state and transition to other states besides RUNNING. There are several cases that I've had to handle now:
- STARTING->SUSPENDED
- STARTED->FAILED, STARTING->KILLED
- resuming->RUNNING
I've tried to make these as efficient as possible by only querying the table if the original case did not work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can a Workflow SUSPEND before getting in RUNNING state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it can.
ITN passes: https://builds.cask.co/browse/CDAP-ITM-126 |
8d25602
to
18fe493
Compare
@@ -81,23 +81,28 @@ public void testStatistics() throws Exception { | |||
ProgramId mapreduceProgram = WORKFLOW_APP.mr(mapreduceName); | |||
ProgramId sparkProgram = WORKFLOW_APP.spark(sparkName); | |||
|
|||
// Time from program starting to program running | |||
int turnoverTime = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we have added a distinction between a program starting and a program running, when we use setStart
in the tests to persist the state, I thought I would have a variable to mark the time between a program starting and a program running.
This variable represents the time in seconds after a program starts for the program to be marked running.
@@ -172,6 +175,11 @@ protected void configure() { | |||
// For binding DataSet transaction stuff | |||
install(new DataFabricFacadeModule()); | |||
|
|||
bind(ProgramStateWriter.class).to(DirectStoreProgramStateWriter.class); | |||
bind(ProgramStateWriter.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add TODO (with a JIRA number) to have this removed.
monitorProgram(runtimeInfo, cleanUpTask); | ||
return runtimeInfo; | ||
} catch (Exception e) { | ||
// Set the program state to an error when an exception is thrown | ||
programStateWriter.error(programId.run(runId), new Throwable(e.getMessage())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why new a Throwable
here? That would lost the stacktrace. Why not just use e
directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point - I wasn't aware that Exception
was a subclass of Throwable
. I will use e
directly.
@Nullable String componentName) { | ||
super(programRunId.getParent(), RunIds.fromString(programRunId.getRun()), componentName); | ||
|
||
service.addListener( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this constructor at all, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the ProgramControllerServiceAdapter
can simply call the other constructor now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, removing this will break a few tests. In the next PR, when we add the InMemory program runners, it should be able to be removed as the listeners are attached differently. But unfortunately for now, this constructor will be needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do tests fail after removing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely sure. DynamicPartitioningTestRun
fails for example, with issues seemingly unrelated with dataset tables not being found. I've narrowed it down to what I believe is this change.
Removing this constructor would "break" / change behavior from what's currently happening. For Spark / MR, we were adding the listener on the service, and not on the controller.
The issue is just on the test level, where sometimes it would see ProgramRunStatus.STARTING and think that the program is running. It has nothing to do with the constructor, so I will remove it.
addListener( | ||
new AbstractListener() { | ||
@Override | ||
public void init(ProgramController.State state, @Nullable Throwable cause) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, since you add the listener to itself in the constructor, there won't be any state change yet. So, yeah, you can remove this init
method.
9592d3c
to
be09036
Compare
@@ -81,23 +81,28 @@ public void testStatistics() throws Exception { | |||
ProgramId mapreduceProgram = WORKFLOW_APP.mr(mapreduceName); | |||
ProgramId sparkProgram = WORKFLOW_APP.spark(sparkName); | |||
|
|||
// Time from program starting to program running | |||
int turnoverTime = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird name, maybe startDelaySecs
?
long startTime = System.currentTimeMillis(); | ||
long currentTimeMillis = startTime; | ||
String outlierRunId = null; | ||
for (int i = 0; i < 10; i++) { | ||
// workflow runs every 5 minutes | ||
currentTimeMillis = startTime + (i * TimeUnit.MINUTES.toMillis(5)); | ||
RunId workflowRunId = RunIds.generate(currentTimeMillis); | ||
store.setStart(workflowProgram, workflowRunId.getId(), RunIds.getTime(workflowRunId, TimeUnit.SECONDS)); | ||
long startTimeProgram = RunIds.getTime(workflowRunId, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
workflowStartTimeSecs
might be a better name
RunId workflowRunId = RunIds.generate(currentTimeMillis); | ||
runIdList.add(workflowRunId); | ||
store.setStart(workflowProgram, workflowRunId.getId(), RunIds.getTime(workflowRunId, TimeUnit.SECONDS)); | ||
long startTimeProgram = RunIds.getTime(workflowRunId, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't startTimeProgram
just be currentTimeMillis
to seconds? try verify that
for (int i = 0; i < count; i++) { | ||
// work-flow runs every 5 minutes | ||
currentTimeMillis = startTime + (i * TimeUnit.MINUTES.toMillis(5)); | ||
currentTimeMillis = startTime + (i * TimeUnit.MINUTES.toMillis(5)) - TimeUnit.SECONDS.toMillis(turnoverTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why - TimeUnit.SECONDS.toMillis(turnoverTime)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed anymore, will remove.
long nowSecs = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); | ||
injector.getInstance(Store.class).setStartAndRun(controller.getProgramRunId().getParent(), | ||
controller.getProgramRunId().getRun(), | ||
nowSecs, nowSecs + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to comment + 1
or add a private static final int
for it
private final long startTs; | ||
|
||
@SerializedName("start") | ||
@Nullable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not nullable since it exists since previous versions
assertSatisfied(false, concurrencyConstraint.check(schedule, constraintContext)); | ||
|
||
// add a run for the program that wasn't from a schedule | ||
// there are now three concurrent runs, so the constraint will not be met | ||
store.setStart(WORKFLOW_ID, pid3, System.currentTimeMillis(), null, EMPTY_MAP, EMPTY_MAP); | ||
store.setStartAndRun(WORKFLOW_ID, pid3, System.currentTimeMillis(), 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
System.currentTimeMillis() + 1
? or you can just initialize local variables startTs
and runTs
at the beginning and let them be shared by all the setStartAndRun
since the time here doesn't matter
@@ -87,8 +87,10 @@ public void testOldRunRecordFormat() throws Exception { | |||
txnl.execute(new TransactionExecutor.Subroutine() { | |||
@Override | |||
public void apply() throws Exception { | |||
metadataStoreDataset.recordProgramStartOldFormat(program, runId.getId(), | |||
RunIds.getTime(runId, TimeUnit.SECONDS), null, null, null); | |||
metadataStoreDataset.recordProgramStart(program, runId.getId(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need of adding this? This test is for pre-3.6 version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but it uses the same recordProgramRunning
method. The recordProgramRunning
method expects there to be a record with ProgramRunStatus.STARTING
.
The only difference in this oldFormat method is that the key does not include an application version.
@@ -130,6 +130,14 @@ public void testStopBeforeStart() throws RuntimeException { | |||
store.setStop(programId, "runx", now, ProgramController.State.ERROR.getRunStatus()); | |||
} | |||
|
|||
@Test(expected = UnsupportedOperationException.class) | |||
public void testCompleteAfterStart() throws UnsupportedOperationException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this test if the check in AppMetadataStore
is removed
@@ -134,5 +135,6 @@ public Boolean call() throws Exception { | |||
|
|||
sparkManager.stop(); | |||
sparkManager.waitForStatus(false, 10, 1); | |||
sparkManager.waitForRun(ProgramRunStatus.KILLED, 10, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why adding this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, if we don't wait for the run record to be persisted in the store, it will try to do this in the next test that it runs, and in the next test, it will persist KILLED, which would throw an exception because there is no starting/running record.
In my branch that switches to TMS, I've had to do this to many tests as the tests do not correctly wait for the record to be persisted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the second test have a different runId? How will they have conflict?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They don't conflict. The issue is that the next test that runs will try to write KILLED to the store because it didn't get to finish at the end of this test, and throw an error.
This issue occurs sometimes, so its better to ensure that the program is truly killed by making sure that the run record has been written.
bc8c955
to
097198b
Compare
HttpResponse response = deploy(SleepingWorkflowApp.class, Constants.Gateway.API_VERSION_3_TOKEN, TEST_NAMESPACE2); | ||
Assert.assertEquals(200, response.getStatusLine().getStatusCode()); | ||
|
||
WorkflowId workflow = Ids.namespace(TEST_NAMESPACE2).app("SleepWorkflowApp").workflow("SleepWorkflow"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new WorkflowId(...)
WorkflowId workflow = Ids.namespace(TEST_NAMESPACE2).app("SleepWorkflowApp").workflow("SleepWorkflow"); | ||
|
||
// Start the workflow | ||
startProgram(workflow.toId(), 200); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove all calls of .toId()
. This method converts WorkflowId
to a deprecated class
@@ -29,7 +30,9 @@ | |||
public interface ProgramStateWriter { | |||
|
|||
/** | |||
* Updates the program run's status to be {@link ProgramRunStatus#STARTING} at the given start time | |||
* Updates the program run's status to be {@link ProgramRunStatus#STARTING} at the given start time when |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start time from twillRunId
, otherwise it's confusing where the start time comes from
@@ -134,5 +135,6 @@ public Boolean call() throws Exception { | |||
|
|||
sparkManager.stop(); | |||
sparkManager.waitForStatus(false, 10, 1); | |||
sparkManager.waitForRun(ProgramRunStatus.KILLED, 10, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the second test have a different runId? How will they have conflict?
1d9c77e
to
88816f0
Compare
@maochf agreed this was good to go, so this will be merged once the build passes. |
88816f0
to
85d4a19
Compare
0c29316
to
18e6626
Compare
Build passed https://builds.cask.co/browse/CDAP-DUT5896, merging |
Need to revert this to stabilize develop - will reopen once the Twill changes and corresponding CDAP changes have been tested. |
…rogram-states Revert "Merge pull request #9158 from caskdata/feature/refactor-progr…
Work Items:
AbstractStateChangeProgramController
as a parent class to all program type's controller so that the listener is automatically added to the controller when the controller used by the program runner is created.ProgramRunStatus.STARTING
and updateStore
to record this new state, and theRunRecord
class to hold the timesstartTs
,runTs
, andstopTs
to represent when the program is in the STARTING, RUNNING, and terminated states, respectively.recordType
forProgramRunStatus.STARTING
andProgramRunStatus.RUNNING
into different columns in the metadata storeProgramStateWriter
interface withDirectStoreProgramStateWriter
that persists to the runtime storeProgramStateWriter
for programs, and injectNoOpProgramStateWriter
for Workers and Service program runners (for in memory mode only) since they run multiple instances.Once https://issues.apache.org/jira/browse/TWILL-240 is resolved, the listener for all distributed mode will be added, and the individual program runners will no longer have the
ProgramStateWriter
. Rather, theInMemory
andDistributed
program runners will have the program state writers.This needs to happen for services/flows/workers to record program states in distributed mode correctly.
JIRA: https://issues.cask.co/browse/CDAP-2013
Build: https://builds.cask.co/browse/CDAP-DUT5896