Skip to content

Commit

Permalink
The ensemble task now properly strips the path and takes advantage of
Browse files Browse the repository at this point in the history
the trigger returned to it by the scheduler to remember where the
trigger occurred.
  • Loading branch information
bures committed Nov 6, 2013
1 parent 0c34001 commit 4cf823d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 60 deletions.
82 changes: 25 additions & 57 deletions jdeeco-core/src/cz/cuni/mff/d3s/deeco/task/EnsembleTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import cz.cuni.mff.d3s.deeco.model.runtime.api.PathNodeMember;
import cz.cuni.mff.d3s.deeco.model.runtime.api.PeriodicTrigger;
import cz.cuni.mff.d3s.deeco.model.runtime.api.Trigger;
import cz.cuni.mff.d3s.deeco.model.runtime.impl.TriggerImpl;
import cz.cuni.mff.d3s.deeco.model.runtime.meta.RuntimeMetadataFactory;
import cz.cuni.mff.d3s.deeco.scheduler.Scheduler;

Expand All @@ -31,29 +32,39 @@ public class EnsembleTask extends Task {

EnsembleController ensembleController;

private class KnowledgeManagerTriggerListenerImpl implements TriggerListener {
private static class LocalKMChangeTrigger extends TriggerImpl {
public LocalKMChangeTrigger(KnowledgeChangeTrigger knowledgeChangeTrigger) {
super();
this.knowledgeChangeTrigger = knowledgeChangeTrigger;
}

KnowledgeChangeTrigger knowledgeChangeTrigger;
}

private static class ShadowKMChangeTrigger extends TriggerImpl {
public ShadowKMChangeTrigger(ReadOnlyKnowledgeManager shadowKnowledgeManager, KnowledgeChangeTrigger knowledgeChangeTrigger) {
super();
this.knowledgeChangeTrigger = knowledgeChangeTrigger;
this.shadowKnowledgeManager = shadowKnowledgeManager;
}

KnowledgeChangeTrigger knowledgeChangeTrigger;
ReadOnlyKnowledgeManager shadowKnowledgeManager;
}

private class LocalKMTriggerListenerImpl implements TriggerListener {

/* (non-Javadoc)
* @see cz.cuni.mff.d3s.deeco.knowledge.TriggerListener#triggered(cz.cuni.mff.d3s.deeco.model.runtime.api.Trigger)
*/
@Override
public void triggered(Trigger trigger) {
if (isForCoordinatorKM(trigger)) {
// TODO: Schedule the execution of the ensemble (i.e. membership and possibly knowledge exchange) as the coordinator
// This means that we have to know what to do once the scheduler calls us in the next round.
} else if (isForMemberKM(trigger)) {
// TODO: Schedule the execution of the ensemble (i.e. membership and possibly knowledge exchange) as the member
// This means that we have to know what to do once the scheduler calls us in the next round.
} else {
assert(false);
}

if (listener != null) {
listener.triggered(EnsembleTask.this, trigger);
listener.triggered(EnsembleTask.this, new LocalKMChangeTrigger((KnowledgeChangeTrigger)trigger));
}
}
}
KnowledgeManagerTriggerListenerImpl knowledgeManagerTriggerListener = new KnowledgeManagerTriggerListenerImpl();
LocalKMTriggerListenerImpl knowledgeManagerTriggerListener = new LocalKMTriggerListenerImpl();

private class ShadowsTriggerListenerImpl implements ShadowsTriggerListener {

Expand All @@ -62,18 +73,8 @@ private class ShadowsTriggerListenerImpl implements ShadowsTriggerListener {
*/
@Override
public void triggered(ReadOnlyKnowledgeManager knowledgeManager, Trigger trigger) {
if (isForCoordinatorKM(trigger)) {
// TODO: Schedule execution of the ensemble (i.e. membership and possibly knowledge exchange) as the member
// This means that we have to know what to do once the scheduler calls us in the next round.
} else if (isForMemberKM(trigger)) {
// TODO: Schedule execution of the ensemble (i.e. membership and possibly knowledge exchange) as the coordinator
// This means that we have to know what to do once the scheduler calls us in the next round.
} else {
assert(false);
}

if (listener != null) {
listener.triggered(EnsembleTask.this, trigger);
listener.triggered(EnsembleTask.this, new ShadowKMChangeTrigger(knowledgeManager, (KnowledgeChangeTrigger)trigger));
}
}
}
Expand All @@ -85,39 +86,6 @@ public EnsembleTask(EnsembleController ensembleController, Scheduler scheduler)
this.ensembleController = ensembleController;
}

// FIXME TB: The following four methods should probably go somewhere else, as the EnsembleTask is not really supposed
// to understand internals of triggers. Maybe it would make sense to put it either to the meta-model or to knowledge package.

/**
* Helper method for methods isForCoordinatorKM and isForMemberKM.
* @param trigger Trigger to be checked. Currently only {@link KnowledgeChangeTrigger} is accepted.
* @param nodeType {@link PathNodeCoordinator}.class or {@link PathNodeMember}.class
* @return True if the trigger is to be registered in coordinator's or member's knowledge manager respectively.
*/
private boolean isForKM(Trigger trigger, Class<? extends PathNode> nodeType) {
KnowledgeChangeTrigger knowledgeChangeTrigger = (KnowledgeChangeTrigger)trigger;

List<PathNode> pathNodes = knowledgeChangeTrigger.getKnowledgePath().getNodes();

return !pathNodes.isEmpty() && (nodeType.isInstance(pathNodes.get(0)));
}

/**
* Returns true if the trigger is to be registered in the coordinator's knowledge manager.
* @param trigger
*/
private boolean isForCoordinatorKM(Trigger trigger) {
return isForKM(trigger, PathNodeCoordinator.class);
}

/**
* Returns true if the trigger is to be registered in the member's knowledge manager.
* @param trigger
*/
private boolean isForMemberKM(Trigger trigger) {
return isForKM(trigger, PathNodeMember.class);
}

/**
* Returns a trigger which can be understood by a knowledge manager. In particular this means that the knowledge path of the trigger (in case of
* the {@link KnowledgeChangeTrigger}) is striped of the coordinator/memeber prefix.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,21 @@ public void testTrigger() {
task.setTriggerListener(taskTriggerListener);
// THEN the task registers a trigger listener (regardless whether it is a trigger on coordinator's or member's knowledge) on the knowledge manager
verify(knowledgeManager).register(triggerCaptor.capture(), any(TriggerListener.class));
// AND the trigger has a knowledge path which omits the member/coordinator prefix
assert(equalToStrippedPath(model.ensembleKnowledgeChangeTrigger.getKnowledgePath(), ((KnowledgeChangeTrigger)triggerCaptor.getValue()).getKnowledgePath()));
// AND the task register a trigger listener (regardless whether it is a trigger on coordinator's or member's knowledge) on the shadow replicas
// AND the task registers a trigger listener (regardless whether it is a trigger on coordinator's or member's knowledge) on the shadow replicas
verify(shadowReplicasAccess).register(eq(triggerCaptor.getValue()), any(ShadowsTriggerListener.class));

// WHEN a trigger comes from the knowledge manager
knowledgeManagerTriggerListenerCaptor.getValue().triggered(triggerCaptor.getValue());
// THEN the task calls the registered listener
verify(taskTriggerListener).triggered(task, triggerCaptor.getValue());
verify(taskTriggerListener).triggered(eq(task), any(Trigger.class));

// WHEN a trigger comes from the shadow replica
reset(taskTriggerListener); // Without this, we would have to say that the verify below verifies two invocations -- because one already occurred above.
shadowReplicasTriggerListenerCaptor.getValue().triggered(shadowKnowledgeManager, triggerCaptor.getValue());
// THEN the task calls the registered listener
verify(taskTriggerListener).triggered(task, triggerCaptor.getValue());
verify(taskTriggerListener).triggered(eq(task), any(Trigger.class));

// WHEN the listener (i.e. the scheduler) is unregistered
task.unsetTriggerListener();
Expand Down

0 comments on commit 4cf823d

Please sign in to comment.