Skip to content

Commit

Permalink
Dynamic adding components and ensembles
Browse files Browse the repository at this point in the history
Runtime can add components and ensembles without stopping the execution
  • Loading branch information
Michał Kit committed Feb 7, 2013
1 parent b76cb81 commit 0de8639
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.PARAMETER)
public @interface DEECoTriggered {
public @interface DEECoTrigger {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package cz.cuni.mff.d3s.deeco.invokable;

import java.util.ArrayList;
import java.util.List;

import cz.cuni.mff.d3s.deeco.knowledge.ISession;
import cz.cuni.mff.d3s.deeco.knowledge.KnowledgeManager;
import cz.cuni.mff.d3s.deeco.path.grammar.EEnsembleParty;
import cz.cuni.mff.d3s.deeco.scheduling.ETriggerType;
import cz.cuni.mff.d3s.deeco.scheduling.IKnowledgeChangeListener;
import cz.cuni.mff.d3s.deeco.scheduling.ProcessTriggeredSchedule;

public class SchedulableProcessTrigger implements IKnowledgeChangeListener {

public final SchedulableProcess sp;

public SchedulableProcessTrigger(SchedulableProcess sp) {
this.sp = sp;
}

@Override
public void knowledgeChanged(final String triggerer,
final ETriggerType recMode) {
Thread t = new Thread(new Runnable() {

@Override
public void run() {
if (sp.contextClassLoader != null)
Thread.currentThread().setContextClassLoader(
sp.contextClassLoader);
sp.invoke(triggerer, recMode);
}
});
t.run();
}

public boolean equals(Object o) {
return o != null && o instanceof SchedulableProcessTrigger
&& this.sp.equals(((SchedulableProcessTrigger) o).sp);
}

@Override
public List<String> getKnowledgePaths() {
if (isEnsembleTriggered()) {
return getEvaluatedKnowledgePaths(
EEnsembleParty.COORDINATOR.toString(),
EEnsembleParty.MEMBER.toString());
} else {
return getEvaluatedKnowledgePaths();
}
}

public KnowledgeManager getKnowledgeManager() {
return sp.km;
}

public void registerListener() {
getKnowledgeManager().registerListener(this);
}

public void unregisterListener() {
getKnowledgeManager().unregisterListener(this);
}

private List<String> getEvaluatedKnowledgePaths() {
return getEvaluatedKnowledgePaths(null, null);
}

private List<String> getEvaluatedKnowledgePaths(String coord, String member) {
List<Parameter> triggeredParams = ((ProcessTriggeredSchedule) sp.scheduling).parameters;
List<String> result = new ArrayList<String>();
ISession session;
for (Parameter p : triggeredParams) {
session = sp.km.createSession();
session.begin();
while (session.repeat()) {
result.add(p.kPath.getEvaluatedPath(sp.km, coord, member,
session));
session.end();
}
}
return result;
}

private boolean isEnsembleTriggered() {
return sp instanceof SchedulableEnsembleProcess;
}
}
2 changes: 1 addition & 1 deletion jdeeco-core/src/cz/cuni/mff/d3s/deeco/runtime/Runtime.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public synchronized void stopRuntime() {
*/
private synchronized void addSchedulableProcess(SchedulableProcess process) {
if (process != null)
scheduler.register(process);
scheduler.add(process);
}

/**
Expand Down
12 changes: 6 additions & 6 deletions jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduling/IScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
import java.util.List;

import cz.cuni.mff.d3s.deeco.invokable.SchedulableProcess;
import cz.cuni.mff.d3s.deeco.invokable.TriggeredSchedulableProcess;
import cz.cuni.mff.d3s.deeco.invokable.SchedulableProcessTrigger;

public interface IScheduler {

public boolean isRunning();

public void register(List<? extends SchedulableProcess> processes);
public void add(List<? extends SchedulableProcess> processes);

public boolean register(SchedulableProcess process);
public void add(SchedulableProcess process);

public void unregister(List<SchedulableProcess> processes);
public void remove(List<SchedulableProcess> processes);

public boolean unregister(SchedulableProcess process);
public void remove(SchedulableProcess process);

public List<TriggeredSchedulableProcess> getTriggeredProcesses();
public List<SchedulableProcessTrigger> getTriggeredProcesses();

public List<SchedulableProcess> getPeriodicProcesses();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import java.util.concurrent.TimeUnit;

import cz.cuni.mff.d3s.deeco.invokable.SchedulableProcess;
import cz.cuni.mff.d3s.deeco.invokable.TriggeredSchedulableProcess;
import cz.cuni.mff.d3s.deeco.invokable.SchedulableProcessTrigger;
import cz.cuni.mff.d3s.deeco.knowledge.KnowledgeManager;

public class MultithreadedScheduler extends Scheduler {
Expand All @@ -24,12 +24,11 @@ public MultithreadedScheduler() {
public synchronized void start() {
if (!running) {
for (SchedulableProcess sp : periodicProcesses) {
startPeriodicProcess(sp,
((ProcessPeriodicSchedule) sp.scheduling).interval);
startPeriodicProcess(sp);
}

List<KnowledgeManager> kms = new LinkedList<KnowledgeManager>();
for (TriggeredSchedulableProcess tsp : triggeredProcesses) {
for (SchedulableProcessTrigger tsp : triggeredProcesses) {
tsp.registerListener();
if (!kms.contains(tsp.getKnowledgeManager()))
kms.add(tsp.getKnowledgeManager());
Expand All @@ -48,7 +47,7 @@ public synchronized void stop() {
threads.get(sp).shutdown();
}
List<KnowledgeManager> kms = new LinkedList<KnowledgeManager>();
for (TriggeredSchedulableProcess tsp : triggeredProcesses) {
for (SchedulableProcessTrigger tsp : triggeredProcesses) {
if (!kms.contains(tsp.getKnowledgeManager()))
kms.add(tsp.getKnowledgeManager());
}
Expand All @@ -59,9 +58,11 @@ public synchronized void stop() {
}
}

private void startPeriodicProcess(SchedulableProcess process, long period) {
@Override
protected synchronized void startPeriodicProcess(SchedulableProcess process) {
ScheduledExecutorService ses = Executors.newScheduledThreadPool(1);
ses.scheduleAtFixedRate(new PeriodicProcessThread(process), 0, period,
ses.scheduleAtFixedRate(new PeriodicProcessThread(process), 0,
((ProcessPeriodicSchedule) process.scheduling).interval,
TimeUnit.MILLISECONDS);
threads.put(process, ses);
}
Expand All @@ -78,7 +79,8 @@ public PeriodicProcessThread(SchedulableProcess process) {
public void run() {
try {
if (process.contextClassLoader != null)
Thread.currentThread().setContextClassLoader(process.contextClassLoader);
Thread.currentThread().setContextClassLoader(
process.contextClassLoader);
process.invoke();
} catch (Exception e) {
System.out.println("Process scheduled exception!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import java.util.Set;

import cz.cuni.mff.d3s.deeco.invokable.SchedulableProcess;
import cz.cuni.mff.d3s.deeco.invokable.TriggeredSchedulableProcess;
import cz.cuni.mff.d3s.deeco.invokable.SchedulableProcessTrigger;
import cz.cuni.mff.d3s.deeco.knowledge.KnowledgeManager;

/**
Expand Down Expand Up @@ -37,11 +37,10 @@ public MultithreadedSchedulerJPF() {
public synchronized void start() {
if (!running) {
for (SchedulableProcess sp : periodicProcesses) {
startPeriodicProcess(sp,
((ProcessPeriodicSchedule) sp.scheduling).interval);
startPeriodicProcess(sp);
}
List<KnowledgeManager> kms = new LinkedList<KnowledgeManager>();
for (TriggeredSchedulableProcess tsp : triggeredProcesses) {
for (SchedulableProcessTrigger tsp : triggeredProcesses) {
if (true) {
assert (false); // ALF: Unsupported now
}
Expand All @@ -66,7 +65,7 @@ public synchronized void stop() {
t.interrupt();
}
List<KnowledgeManager> kms = new LinkedList<KnowledgeManager>();
for (TriggeredSchedulableProcess tsp : triggeredProcesses) {
for (SchedulableProcessTrigger tsp : triggeredProcesses) {
if (!kms.contains(tsp.getKnowledgeManager()))
kms.add(tsp.getKnowledgeManager());
}
Expand All @@ -77,7 +76,8 @@ public synchronized void stop() {
}
}

private void startPeriodicProcess(SchedulableProcess process, long period) {
@Override
protected synchronized void startPeriodicProcess(SchedulableProcess process) {
// Note Period is intentionally ignored - GC
// Because of the stop the world Garbage collector, which can postpone
// all threads and then any thread can execute its own action
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.List;

import cz.cuni.mff.d3s.deeco.annotations.DEECoPeriodicScheduling;
import cz.cuni.mff.d3s.deeco.annotations.DEECoTriggered;
import cz.cuni.mff.d3s.deeco.annotations.DEECoTrigger;
import cz.cuni.mff.d3s.deeco.invokable.AnnotationHelper;
import cz.cuni.mff.d3s.deeco.invokable.Parameter;

Expand Down Expand Up @@ -62,7 +62,7 @@ public static ProcessSchedule getTriggeredSchedule(
Annotation[][] pAnnotations, List<Parameter> in,
List<Parameter> inOut) {
List<Integer> triggeredIndecies = AnnotationHelper
.getAnnotationOuterIndecies(DEECoTriggered.class, pAnnotations);
.getAnnotationOuterIndecies(DEECoTrigger.class, pAnnotations);
if (triggeredIndecies.size() == 0)
return null;
else {
Expand Down
Loading

0 comments on commit 0de8639

Please sign in to comment.