Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Added Stateful processing related message handling in TMasterClient #2075

Merged
merged 1 commit into from Jul 18, 2017
Merged

Added Stateful processing related message handling in TMasterClient #2075

merged 1 commit into from Jul 18, 2017

Conversation

srkukarni
Copy link
Contributor

While running in stateful mode, TMaster sends the following messages to the stmgr

  1. Periodic Checkpoint messages(StartStatefulCheckpoint) so that stmgr can send them to local instances for checkpointing their state
  2. Restore Topology messages(RestoreTopologyStateRequest) so that stmgr can start restoring the state of the local instances to a specified checkpoint_id
  3. Start Processing message(StartStmgrStatefulProcessing) to start the processing when all stmgrs have restored their local instances' state.
    This change adds the ability for TMasterClient in stmgr to handle those messages and call the appropriate stmgr callback. Note that the stmgr callbacks themselves are not yet implemented, that will be coming in the next set of prs.

@@ -217,5 +235,48 @@ void TMasterClient::SendHeartbeatRequest() {
return;
}

void TMasterClient::SavedInstanceState(const proto::system::Instance& _instance,
const std::string& _checkpoint_id) {
proto::ckptmgr::InstanceStateStored message;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use the global mempool here and any following places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My feeling is that its not necessary since its on control path

__global_protobuf_pool_release__(_message);
}

void TMasterClient::HandleStartStmgrStatefulProcessing(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this start_stateful_processing callback? Whether a topology is stateful or not is configured in user's topology code. Once a stmgr is started, it should already know if it's stateful or not by reading the topology config. And thus can behaves accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So topologies not doing exactly once can start processing right away as soon as assignment are propagated to all stmgrs/instances. However for exactly once semantics, we have to wait until all of the stmgrs/instances are restored to a certain globally consistent checkpoint before proceeding. Once that determination is done by tmaster, this is the message that it sends to the stmgr to start the actual processing

optional string dead_stmgr = 1;
// Was there a dead/recovered local instance connection that was the reason
// for this request?
optional int32 dead_taskid = 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we allow multiple tasks death?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary. We issue these messages to the tmaster as soon as we notice any instance failure. The same applies for any stmgr client connection failures

@srkukarni srkukarni merged commit db60686 into apache:master Jul 18, 2017
@srkukarni srkukarni deleted the sanjeevk/ext1_tmasterclient branch July 18, 2017 05:10
@nlu90
Copy link
Member

nlu90 commented Jul 18, 2017

👍

@@ -138,6 +138,20 @@ class StMgr {
void BroadcastTmasterLocation(proto::tmaster::TMasterLocation* tmasterLocation);
void BroadcastMetricsCacheLocation(proto::tmaster::MetricsCacheLocation* tmasterLocation);

// Called when TMaster sends a InitiateStatefulCheckpoint message with a checkpoint_id
// This will send intiate checkpoint messages to local instances to capture their state.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: intiate

// Invoked when TMaster sends the StartStatefulProcessing request to kick
// start the computation. We send the StartStatefulProcessing to all our
// local instances so that they can start the processing.
void StartStatefulProcessing(sp_string _checkpoint_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the checkpoint id necessary to start processing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly for correctness checking purposes. Instances will check that they have recovered to this checkpoint id before starting. They should die if this checkpoint id does not match theirs.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants