Skip to content

Commit

Permalink
new scheduler in DEECo core, start() is now synchronous and main loop
Browse files Browse the repository at this point in the history
delegates via nextStep() to SchedulerNotifier. Some Javadoc and tests
still missing.
  • Loading branch information
iliasger committed Feb 12, 2015
1 parent 8d1c99e commit b9a75d2
Show file tree
Hide file tree
Showing 20 changed files with 400 additions and 1,022 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void initialize(
RatingsManager ratingsManager) {
this.host = host;
this.scheduler = scheduler;
this.timeProvider = scheduler;
this.timeProvider = scheduler.getSchedulerNotifier();
this.kmContainer = kmContainer;
this.dataSender = dataSender;
this.keyManager = keyManager;
Expand Down
32 changes: 16 additions & 16 deletions jdeeco-core/src/cz/cuni/mff/d3s/deeco/runtime/DEECo.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import cz.cuni.mff.d3s.deeco.model.runtime.api.EnsembleDefinition;
import cz.cuni.mff.d3s.deeco.model.runtime.api.RuntimeMetadata;
import cz.cuni.mff.d3s.deeco.model.runtime.custom.RuntimeMetadataFactoryExt;
import cz.cuni.mff.d3s.deeco.scheduler.DiscreteEventSimulation;
import cz.cuni.mff.d3s.deeco.scheduler.NoExecutorAvailableException;
import cz.cuni.mff.d3s.deeco.scheduler.Scheduler;
import cz.cuni.mff.d3s.deeco.scheduler.SingleThreadedScheduler;

Expand Down Expand Up @@ -55,13 +57,7 @@ public class DEECo implements DEECoContainer {
*/
Map<Class<? extends DEECoPlugin>, DEECoPlugin> pluginsMap;

/**
* True if this DEECo application is running.
*/
protected boolean running = false;


public DEECo(DEECoPlugin... plugins) throws PluginDependencyException {
public DEECo(DEECoPlugin... plugins) throws DEECoException {
pluginsMap= new HashMap<>();
model = RuntimeMetadataFactoryExt.eINSTANCE.createRuntimeMetadata();
knowledgeManagerFactory = new CloningKnowledgeManagerFactory();
Expand All @@ -82,19 +78,18 @@ public EnsembleDefinition deployEnsemble(Class ensemble) throws AnnotationProces
}

public void start() throws InvalidOperationException {
if(running)
if(isRunning()) {
throw new InvalidOperationException("start");

}
runtime.start();
running = true;
}

public void stop() throws InvalidOperationException {
if(!running)
throw new InvalidOperationException("stop");
if(!isRunning()) {
throw new InvalidOperationException("stop");
}

runtime.stop();
running = false;
}

@Override
Expand Down Expand Up @@ -122,7 +117,7 @@ public RuntimeMetadata getRuntimeMetadata() {
@Override
public boolean isRunning()
{
return running;
return runtime.getScheduler().isRunning();
}

class DependencyNode {
Expand Down Expand Up @@ -201,13 +196,18 @@ void initializePlugins(DEECoPlugin[] plugins) throws PluginDependencyException {
}
}

private void createRuntime() {
Scheduler scheduler = new SingleThreadedScheduler();
private void createRuntime() throws NoExecutorAvailableException {
Executor executor = new SameThreadExecutor();
DiscreteEventSimulation discreteEventSimulation = new DiscreteEventSimulation();
Scheduler scheduler = new SingleThreadedScheduler(executor, discreteEventSimulation);
KnowledgeManagerContainer kmContainer = new KnowledgeManagerContainer(knowledgeManagerFactory, model);
scheduler.setExecutor(executor);
executor.setExecutionListener(scheduler);
runtime = new RuntimeFrameworkImpl(model, scheduler, executor, kmContainer);
}

public void setTerminationTime(long terminationTime) {
runtime.getScheduler().getSchedulerNotifier().setTerminationTime(terminationTime);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@ public interface RuntimeFramework extends DEECoPlugin {
*/
void stop();

/**
* Invokes the runnable according to the execution policy of the runtime and
* waits for it to finish.
*
* @param r the runnable to invoke.
* @throws InterruptedException if the invocation was interrupted.
*/
void invokeAndWait(Runnable r) throws InterruptedException;

Scheduler getScheduler();
KnowledgeManagerContainer getContainer();
RatingsManager getRatingsManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,15 +699,6 @@ public void stop() {
scheduler.stop();
}


/* (non-Javadoc)
* @see cz.cuni.mff.d3s.deeco.runtime.RuntimeFramework#invokeAndWait(java.lang.Runnable)
*/
@Override
public void invokeAndWait(Runnable r) throws InterruptedException {
scheduler.invokeAndWait(r);
}

public void ensembleFormed(final EnsembleDefinition e, final ComponentInstance c, final String coordID, final String memberID) {
Log.w("Ensemble "+e+" formed at the side of " + c + " with coord: "+coordID+" and member: "+memberID);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package cz.cuni.mff.d3s.deeco.scheduler;

import java.util.PriorityQueue;
import java.util.Queue;

public class DiscreteEventSimulation implements SchedulerNotifier {

Queue<EventTime> eventTimes;
SimulationTimeEventListener simulationTimeEventListener;

long currentTime;

public DiscreteEventSimulation() {
currentTime = 0;
eventTimes = new PriorityQueue<>();
}

class EventTime implements Comparable<EventTime> {
long timePoint;
boolean isTerminationEvent;

public EventTime(long timePoint, boolean isTerminationEvent) {
this.timePoint = timePoint;
this.isTerminationEvent = isTerminationEvent;
}

public int compareTo(EventTime e) {
if (this.timePoint > e.timePoint) {
return 1;
} else if (this.timePoint < e.timePoint) {
return -1;
} else {
if (e.isTerminationEvent) {
return -1;
} else if (this.isTerminationEvent) {
return 1;
}
return 0;
}
}
}

@Override
public long getCurrentMilliseconds() {
return currentTime;
}

@Override
public void setSimulationTimeEventListener(SimulationTimeEventListener simulationTimeEventListener) {
this.simulationTimeEventListener = simulationTimeEventListener;
}

@Override
public void notifyAt(long time) {
eventTimes.add(new EventTime(time,false));
}

@Override
public boolean tryToTerminate() {
if (!eventTimes.isEmpty()) {
EventTime eventTime = eventTimes.peek();
if (eventTime.isTerminationEvent) {
currentTime = eventTime.timePoint;
eventTimes.clear();
return true;
}
}
return false;
}

@Override
public void nextStep() {
if (!eventTimes.isEmpty()) {
currentTime = eventTimes.remove().timePoint;
simulationTimeEventListener.at(currentTime);
}
}

@Override
public void setTerminationTime(long duration) {
eventTimes.add(new EventTime(duration,true));
}

@Override
public void reset() {
currentTime = 0;
eventTimes.clear();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package cz.cuni.mff.d3s.deeco.scheduler;

import cz.cuni.mff.d3s.deeco.runtime.DEECoException;

public class NoExecutorAvailableException extends DEECoException {

private static final long serialVersionUID = 1L;

public NoExecutorAvailableException() {
super("No executor was available for the scheduler.");
}

}
19 changes: 7 additions & 12 deletions jdeeco-core/src/cz/cuni/mff/d3s/deeco/scheduler/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import cz.cuni.mff.d3s.deeco.executor.ExecutionListener;
import cz.cuni.mff.d3s.deeco.executor.Executor;
import cz.cuni.mff.d3s.deeco.runtime.InvalidOperationException;
import cz.cuni.mff.d3s.deeco.task.Task;

/**
Expand All @@ -28,7 +29,7 @@
*
*/

public interface Scheduler extends ExecutionListener, CurrentTimeProvider {
public interface Scheduler extends ExecutionListener {
/**
* Starts the scheduler thus triggering a sequence of operations on serving tasks in queue(if any).
* <p>
Expand Down Expand Up @@ -87,15 +88,9 @@ public interface Scheduler extends ExecutionListener, CurrentTimeProvider {
*
*/
public void setExecutor(Executor executor);

/**
* Invokes the runnable and waits for it to finish.
* <p>
* <b>Note:</b> This method is strongly implementation-based so it is advised to check with the documentation
* specific for each Scheduler implementation.
*
* @param doRun the runnable to invoke.
* @throws InterruptedException if the invocation was interrupted.
*/
void invokeAndWait(Runnable doRun) throws InterruptedException;

public SchedulerNotifier getSchedulerNotifier();

public boolean isRunning();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package cz.cuni.mff.d3s.deeco.scheduler;

public interface SchedulerNotifier extends CurrentTimeProvider, TerminationTimeHolder {

void notifyAt(long time);

void setSimulationTimeEventListener(SimulationTimeEventListener simulationTimeEventListener);

void nextStep();

boolean tryToTerminate();

/**
* This call sets the current time of the simulator to 0.
*/
void reset();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
*
*/
package cz.cuni.mff.d3s.deeco.scheduler;


/**
* Listener interface for time events triggered by a simulation.
*
* @author Michal Kit <kit@d3s.mff.cuni.cz>
*
*/
public interface SimulationTimeEventListener {

/**
* Simulation callback method for the previous registration
*
* @param time
* current simulation time
* @throws NoExecutorAvailableException
*/
void at(long time);
}
Loading

0 comments on commit b9a75d2

Please sign in to comment.