Skip to content
Permalink
Browse files
[HELIX-552] StateModelFactory#_stateModelMap should use both resource…
…Name and partitionKey to map a state model, rb=28211
  • Loading branch information
zzhang5 committed Nov 19, 2014
1 parent 45a9fd3 commit 9ddd0af347037f585a52b83bbc3a6b11a3934c82
Show file tree
Hide file tree
Showing 35 changed files with 250 additions and 91 deletions.
@@ -24,7 +24,7 @@
public class AgentStateModelFactory extends StateModelFactory<AgentStateModel> {

@Override
public AgentStateModel createNewStateModel(String partitionKey) {
public AgentStateModel createNewStateModel(String resourceName, String partitionKey) {
AgentStateModel model = new AgentStateModel();
return model;
}
@@ -305,6 +305,15 @@ public PropertyKey messages(String instanceName) {
return new PropertyKey(MESSAGES, Message.class, _clusterName, instanceName);
}

/**
* Get a property key associated with {@link Error} for an instance
* @param instanceName
* @return {@link PropertyKey}
*/
public PropertyKey errors(String instanceName) {
return new PropertyKey(ERRORS, Error.class, _clusterName, instanceName);
}

/**
* Get a property key associated with a specific {@link Message} on an instance
* @param instanceName
@@ -37,7 +37,7 @@
public class BootstrapHandler extends StateModelFactory<StateModel> {

@Override
public StateModel createNewStateModel(String stateUnitKey) {
public StateModel createNewStateModel(String resourceName, String stateUnitKey) {
return new BootstrapStateModel(stateUnitKey);
}

@@ -87,7 +87,7 @@ public void reset() {
// dummy master slave state model factory
public static class DummyMSModelFactory extends StateModelFactory<DummyMSStateModel> {
@Override
public DummyMSStateModel createNewStateModel(String partitionName) {
public DummyMSStateModel createNewStateModel(String resourceName, String partitionName) {
DummyMSStateModel model = new DummyMSStateModel();
return model;
}
@@ -32,7 +32,7 @@ public LeaderStandbyStateModelFactory(int delay) {
}

@Override
public StateModel createNewStateModel(String stateUnitKey) {
public StateModel createNewStateModel(String resourceName, String stateUnitKey) {
LeaderStandbyStateModel stateModel = new LeaderStandbyStateModel();
stateModel.setDelay(_delay);
return stateModel;
@@ -44,7 +44,7 @@ public MasterSlaveStateModelFactory() {
}

@Override
public StateModel createNewStateModel(String partitionName) {
public StateModel createNewStateModel(String resourceName, String partitionName) {
MasterSlaveStateModel stateModel = new MasterSlaveStateModel();
stateModel.setInstanceName(_instanceName);
stateModel.setDelay(_delay);
@@ -32,7 +32,7 @@ public OnlineOfflineStateModelFactory(int delay) {
}

@Override
public StateModel createNewStateModel(String stateUnitKey) {
public StateModel createNewStateModel(String resourceName, String stateUnitKey) {
OnlineOfflineStateModel stateModel = new OnlineOfflineStateModel();
stateModel.setDelay(_delay);
return stateModel;
@@ -195,7 +195,7 @@ void postHandleMessage() {
List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
deltaList.add(delta);
_currentStateDelta.setDeltaList(deltaList);
_stateModelFactory.removeStateModel(partitionKey);
_stateModelFactory.removeStateModel(resource, partitionKey);
} else {
// if the partition is not to be dropped, update _stateModel to the TO_STATE
_stateModel.updateState(toState);
@@ -30,7 +30,7 @@ public DistClusterControllerStateModelFactory(String zkAddr) {
}

@Override
public DistClusterControllerStateModel createNewStateModel(String stateUnitKey) {
public DistClusterControllerStateModel createNewStateModel(String resourceName, String partitionKey) {
return new DistClusterControllerStateModel(_zkAddr);
}

@@ -39,7 +39,7 @@ public GenericLeaderStandbyStateModelFactory(CustomCodeCallbackHandler callback,
}

@Override
public GenericLeaderStandbyModel createNewStateModel(String partitionKey) {
public GenericLeaderStandbyModel createNewStateModel(String resourceName, String partitionKey) {
return new GenericLeaderStandbyModel(_callback, _notificationTypes, partitionKey);
}
}
@@ -142,13 +142,15 @@ public void reset() {
for (Map<String, StateModelFactory<? extends StateModel>> ftyMap : _stateModelFactoryMap
.values()) {
for (StateModelFactory<? extends StateModel> stateModelFactory : ftyMap.values()) {
for (String resourceKey : stateModelFactory.getPartitionSet()) {
StateModel stateModel = stateModelFactory.getStateModel(resourceKey);
stateModel.reset();
String initialState = _stateModelParser.getInitialState(stateModel.getClass());
stateModel.updateState(initialState);
// TODO probably should update the state on ZK. Shi confirm what needs
// to be done here.
for (String resourceName : stateModelFactory.getResourceSet()) {
for (String partitionKey : stateModelFactory.getPartitionSet(resourceName)) {
StateModel stateModel = stateModelFactory.getStateModel(resourceName, partitionKey);
stateModel.reset();
String initialState = _stateModelParser.getInitialState(stateModel.getClass());
stateModel.updateState(initialState);
// TODO probably should update the state on ZK. Shi confirm what needs
// to be done here.
}
}
}
}
@@ -206,9 +208,9 @@ public MessageHandler createHandler(Message message, NotificationContext context
if (message.getBatchMessageMode() == false) {
// create currentStateDelta for this partition
String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState();
StateModel stateModel = stateModelFactory.getStateModel(partitionKey);
StateModel stateModel = stateModelFactory.getStateModel(resourceName, partitionKey);
if (stateModel == null) {
stateModel = stateModelFactory.createAndAddStateModel(partitionKey);
stateModel = stateModelFactory.createAndAddStateModel(resourceName, partitionKey);
stateModel.updateState(initState);
}

@@ -37,26 +37,28 @@ public class ScheduledTaskStateModel extends StateModel {
// StateModel with initial state other than OFFLINE should override this field
protected String _currentState = DEFAULT_INITIAL_STATE;
final ScheduledTaskStateModelFactory _factory;
final String _partitionName;
final String _resourceName;
final String _partitionKey;

final HelixTaskExecutor _executor;

public ScheduledTaskStateModel(ScheduledTaskStateModelFactory factory,
HelixTaskExecutor executor, String partitionName) {
HelixTaskExecutor executor, String resourceName, String partitionKey) {
_factory = factory;
_partitionName = partitionName;
_resourceName = resourceName;
_partitionKey = partitionKey;
_executor = executor;
}

@Transition(to = "COMPLETED", from = "OFFLINE")
public void onBecomeCompletedFromOffline(Message message, NotificationContext context)
throws InterruptedException {
logger.info(_partitionName + " onBecomeCompletedFromOffline");
logger.info(_partitionKey + " onBecomeCompletedFromOffline");

// Construct the inner task message from the mapfields of scheduledTaskQueue resource group
Map<String, String> messageInfo =
message.getRecord().getMapField(Message.Attributes.INNER_MESSAGE.toString());
ZNRecord record = new ZNRecord(_partitionName);
ZNRecord record = new ZNRecord(_partitionKey);
record.getSimpleFields().putAll(messageInfo);
Message taskMessage = new Message(record);
if (logger.isDebugEnabled()) {
@@ -66,49 +68,50 @@ public void onBecomeCompletedFromOffline(Message message, NotificationContext co
_executor.createMessageHandler(taskMessage, new NotificationContext(null));
if (handler == null) {
throw new HelixException("Task message " + taskMessage.getMsgType()
+ " handler not found, task id " + _partitionName);
+ " handler not found, task id " + _partitionKey);
}
// Invoke the internal handler to complete the task
handler.handleMessage();
logger.info(_partitionName + " onBecomeCompletedFromOffline completed");
logger.info(_partitionKey + " onBecomeCompletedFromOffline completed");
}

@Transition(to = "OFFLINE", from = "COMPLETED")
public void onBecomeOfflineFromCompleted(Message message, NotificationContext context) {
logger.info(_partitionName + " onBecomeOfflineFromCompleted");
logger.info(_partitionKey + " onBecomeOfflineFromCompleted");
}

@Transition(to = "DROPPED", from = "COMPLETED")
public void onBecomeDroppedFromCompleted(Message message, NotificationContext context) {
logger.info(_partitionName + " onBecomeDroppedFromCompleted");
logger.info(_partitionKey + " onBecomeDroppedFromCompleted");
removeFromStatemodelFactory();
}

@Transition(to = "DROPPED", from = "OFFLINE")
public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
throws InterruptedException {
logger.info(_partitionName + " onBecomeDroppedFromScheduled");
logger.info(_partitionKey + " onBecomeDroppedFromScheduled");
removeFromStatemodelFactory();
}

@Transition(to = "OFFLINE", from = "ERROR")
public void onBecomeOfflineFromError(Message message, NotificationContext context)
throws InterruptedException {
logger.info(_partitionName + " onBecomeOfflineFromError");
logger.info(_partitionKey + " onBecomeOfflineFromError");
}

@Override
public void reset() {
logger.info(_partitionName + " ScheduledTask reset");
logger.info(_partitionKey + " ScheduledTask reset");
removeFromStatemodelFactory();
}

// We need this to prevent state model leak
private void removeFromStatemodelFactory() {
if (_factory.getStateModel(_partitionName) != null) {
_factory.removeStateModel(_partitionName);
if (_factory.getStateModel(_resourceName, _partitionKey) != null) {
_factory.removeStateModel(_resourceName, _partitionKey);
} else {
logger.warn(_partitionName + " not found in ScheduledTaskStateModelFactory");
logger.warn(_resourceName + "_ " + _partitionKey
+ " not found in ScheduledTaskStateModelFactory");
}
}
}
@@ -32,8 +32,8 @@ public ScheduledTaskStateModelFactory(HelixTaskExecutor executor) {
}

@Override
public ScheduledTaskStateModel createNewStateModel(String partitionName) {
logger.info("Create state model for ScheduledTask " + partitionName);
return new ScheduledTaskStateModel(this, _executor, partitionName);
public ScheduledTaskStateModel createNewStateModel(String resourceName, String partitionKey) {
logger.info("Create state model for ScheduledTask " + resourceName + "_" + partitionKey);
return new ScheduledTaskStateModel(this, _executor, resourceName, partitionKey);
}
}
@@ -19,6 +19,7 @@
* under the License.
*/

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -28,9 +29,10 @@

public abstract class StateModelFactory<T extends StateModel> {
/**
* mapping from partitionName to StateModel
* mapping resourceName to map of partitionName to StateModel
*/
private final ConcurrentMap<String, T> _stateModelMap = new ConcurrentHashMap<String, T>();
private final ConcurrentMap<String, ConcurrentMap<String, T>> _stateModelMap =
new ConcurrentHashMap<String, ConcurrentMap<String, T>>();

/**
* mapping from resourceName to BatchMessageWrapper
@@ -39,48 +41,67 @@
new ConcurrentHashMap<String, BatchMessageWrapper>();

/**
* This method will be invoked only once per partitionName per session
* This method will be invoked only once per resource per partition per session
* Replacing old StateModelFactory#createNewStateModel(String partitionName)
* Add "resourceName" to signature @see HELIX-552
* @param resourceName
* @param partitionName
* @return
* @return state model
*/
public abstract T createNewStateModel(String partitionName);
public abstract T createNewStateModel(String resourceName, String partitionName);

/**
* Create a state model for a partition
* @param partitionName
* @param partitionKey
*/
public T createAndAddStateModel(String partitionName) {
T stateModel = createNewStateModel(partitionName);
_stateModelMap.put(partitionName, stateModel);
public T createAndAddStateModel(String resourceName, String partitionKey) {
T stateModel = createNewStateModel(resourceName, partitionKey);
_stateModelMap.putIfAbsent(resourceName, new ConcurrentHashMap<String, T>());
_stateModelMap.get(resourceName).put(partitionKey, stateModel);
return stateModel;
}

/**
* Get the state model for a partition
* @param partitionName
* @param resourceName
* @param partitionKey
* @return state model if exists, null otherwise
*/
public T getStateModel(String partitionName) {
return _stateModelMap.get(partitionName);
public T getStateModel(String resourceName, String partitionKey) {
Map<String, T> map = _stateModelMap.get(resourceName);
return map == null? null : map.get(partitionKey);
}

/**
* remove state model for a partition
* @param partitionName
* @param resourceName
* @param partitionKey
* @return state model removed or null if not exist
*/
public T removeStateModel(String partitionName) {
return _stateModelMap.remove(partitionName);
public T removeStateModel(String resourceName, String partitionKey) {
Map<String, T> map = _stateModelMap.get(resourceName);
return map == null? null : map.remove(partitionKey);
}

/**
* get partition set
* @return partition key set
* get resource set
* @param resourceName
* @return resource name set
*/
public Set<String> getPartitionSet() {
public Set<String> getResourceSet() {
return _stateModelMap.keySet();
}

/**
* get partition set for a resource
* @param resourceName
* @return partition key set
*/
public Set<String> getPartitionSet(String resourceName) {
Map<String, T> map = _stateModelMap.get(resourceName);
return (map == null? Collections.<String>emptySet() : map.keySet());
}

/**
* create a default batch-message-wrapper for a resource
* @param resourceName
@@ -37,7 +37,7 @@ public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> task
}

@Override
public TaskStateModel createNewStateModel(String partitionName) {
public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
return new TaskStateModel(_manager, _taskFactoryRegistry);
}
}
@@ -72,7 +72,7 @@ public void testCMTaskExecutor() throws Exception {
StateModelFactory<MockStateModel> stateModelFactory = new StateModelFactory<MockStateModel>() {

@Override
public MockStateModel createNewStateModel(String partitionName) {
public MockStateModel createNewStateModel(String resource, String partitionName) {
// TODO Auto-generated method stub
return new MockStateModel();
}
@@ -116,7 +116,7 @@ public void testInvocationAnnotated() throws Exception {
new StateModelFactory<MockStateModelAnnotated>() {

@Override
public MockStateModelAnnotated createNewStateModel(String partitionName) {
public MockStateModelAnnotated createNewStateModel(String resource, String partitionName) {
// TODO Auto-generated method stub
return new MockStateModelAnnotated();
}
@@ -196,7 +196,7 @@ public MyStateModelFactory(Map<String, Integer> counts) {
}

@Override
public MyStateModel createNewStateModel(String partitionId) {
public MyStateModel createNewStateModel(String resource, String partitionId) {
return new MyStateModel(_counts);
}
}
@@ -152,7 +152,7 @@ public void onBecomeAnyFromAny(Message message, NotificationContext context) {
private static class MockStateModelFactory extends StateModelFactory<MockStateModel> {

@Override
public MockStateModel createNewStateModel(String partitionName) {
public MockStateModel createNewStateModel(String resource, String partitionName) {
return new MockStateModel();
}

@@ -328,7 +328,7 @@ public MyStateModelFactory(HelixManager helixManager) {
}

@Override
public MyStateModel createNewStateModel(String partitionName) {
public MyStateModel createNewStateModel(String resource, String partitionName) {
return new MyStateModel(helixManager);
}
}

0 comments on commit 9ddd0af

Please sign in to comment.