Skip to content

Commit

Permalink
Merge branch 'stack-compaction'
Browse files Browse the repository at this point in the history
# Conflicts:
#	applications/micro/src/main/java/jadex/micro/examples/mandelbrot_new/CalculateService.java
#	applications/micro/src/main/java/jadex/micro/examples/mandelbrot_new/DisplayPanel.java
#	util/concurrent/src/main/java/jadex/commons/future/IntermediateDelegationResultListener.java
  • Loading branch information
ap-actoron committed Feb 3, 2021
2 parents dd1bb42 + ec8ea4e commit 7cf770d
Show file tree
Hide file tree
Showing 15 changed files with 761 additions and 344 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,29 @@ public class CalculateService implements ICalculateService
@Timeout(30000)
public IIntermediateFuture<PartDataChunk> calculateArea(AreaData data)
{
IntermediateFuture<PartDataChunk> ret = new IntermediateFuture<>();
IntermediateFuture<PartDataChunk> ret = new IntermediateFuture<PartDataChunk>();
// {
// @Override
// protected boolean doAddResultListener(IResultListener<Collection<PartDataChunk>> listener)
// {
// System.out.println(agent.agent+": doAddResultListener("+this+", "+listener+"), "+Thread.currentThread());
// return super.doAddResultListener(listener);
// }
//
// @Override
// protected void scheduleNotification(IResultListener<Collection<PartDataChunk>> listener, ICommand<IResultListener<Collection<PartDataChunk>>> command)
// {
// System.out.println(agent.agent+":scheduleNotification("+this+", "+listener+", "+command+"), "+Thread.currentThread());
// super.scheduleNotification(listener, command);
// }
//
// @Override
// protected void notifyIntermediateResult(IIntermediateResultListener<PartDataChunk> listener, PartDataChunk result)
// {
// System.out.println(agent.agent+":notifyIntermediateResult("+this+", "+listener+", "+result+"), "+Thread.currentThread());
// super.notifyIntermediateResult(listener, result);
// }
// };

//long start = System.currentTimeMillis();
System.out.println("calc start: "+data.getId()+" "+ag.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ protected IFuture<AreaData> distributeWork(final AreaData data)
int numx = (int)Math.max(Math.round(Math.sqrt((double)pic/task)), 1);
int numy = (int)Math.max(Math.round((double)pic/(task*numx)), 1);

//numx = 2;
//numy = 1;
// numx = 1;
// numy = 1;

// final long time = System.nanoTime();
//System.out.println("Number of tasks: "+numx+", "+numy+", max="+data.getMax()+" tasksize="+data.getTaskSize());
Expand Down Expand Up @@ -187,7 +187,7 @@ protected IFuture<AreaData> distributeWork(final AreaData data)
// Assign tasks to service pool.
final int number = areas.size();

//System.out.println("tasks: "+areas);
System.out.println("tasks: "+areas);

//manager.setMax(data.getParallel());
performTasks(areas, true, data).addResultListener(agent.getFeature(IExecutionFeature.class).createResultListener(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public boolean isOptimizationAllowed()
public AreaData getDefaultSettings()
{
//return new AreaData(-2, 1, -1.5, 1.5, 100, 100, (short)256, 10, 300, this, null);
return new AreaData(-2, 1, -1.5, 1.5, 100, 100, (short)256, 300, this, null, 20);
// return new AreaData(-2, 1, -1.5, 1.5, 100, 100, (short)256, 300, this, null, 20);
return new AreaData(-2, 1, -1.5, 1.5, 100, 100, (short)256, 300, this, null, 4);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion platform/bridge/src/main/java/jadex/base/Starter.java
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,8 @@ public static IFuture<IExternalAccess> createPlatform(final IPlatformConfigurati
// pass configuration parameters to static fields:
MethodInvocationInterceptor.DEBUG = getBooleanValueWithArgs(args, "debugservices", config.getExtendedPlatformConfiguration().getDebugServices());
ExecutionComponentFeature.DEBUG = getBooleanValueWithArgs(args, "debugsteps", config.getExtendedPlatformConfiguration().getDebugSteps());
// Future.NO_STACK_COMPACTION = true;
Future.NO_STACK_COMPACTION = getBooleanValueWithArgs(args, "nostackcompaction", config.getExtendedPlatformConfiguration().getNoStackCompaction());
// Future.NO_STACK_COMPACTION = true;
Future.DEBUG = getBooleanValueWithArgs(args, "debugfutures", config.getExtendedPlatformConfiguration().getDebugFutures());

// new FastClasspathScanner(new String[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.logging.Logger;
Expand Down Expand Up @@ -62,12 +61,10 @@
import jadex.bridge.service.types.simulation.ISimulationService;
import jadex.bridge.service.types.simulation.SSimulation;
import jadex.commons.DebugException;
import jadex.commons.ICommand;
import jadex.commons.IResultCommand;
import jadex.commons.SReflect;
import jadex.commons.SUtil;
import jadex.commons.TimeoutException;
import jadex.commons.Tuple3;
import jadex.commons.concurrent.Executor;
import jadex.commons.concurrent.IExecutable;
import jadex.commons.functional.Consumer;
Expand Down Expand Up @@ -117,8 +114,8 @@ public class ExecutionComponentFeature extends AbstractComponentFeature implemen
/** The current timer. */
protected List<ITimer> timers = new ArrayList<ITimer>();

/** Retained listener notifications when switching threads due to blocking. */
protected Queue<Tuple3<Future<?>, IResultListener<?>, ICommand<IResultListener<?>>>> notifications;
// /** Retained listener notifications when switching threads due to blocking. */
// protected Set<Future<?>> notifications;

/** Flag for testing double execution. */
protected volatile boolean executing;
Expand Down Expand Up @@ -1017,9 +1014,12 @@ public void block(final Object monitor, long timeout, boolean realtime)
}
else
{
// Retain listener notifications for new component thread.
assert notifications==null : getComponent()+", "+IComponentIdentifier.LOCAL.get();
notifications = FutureHelper.removeStackedListeners();
// Perform scheduled notifications before blocking thread.
FutureHelper.notifyStackedListeners();

// // Retain listener notifications for new component thread.
// assert notifications==null : getComponent()+", "+IComponentIdentifier.LOCAL.get();
// notifications = FutureHelper.removeStackedListeners();
// if(notifications!=null && getComponent().toString().indexOf("IntermediateBlockingTest@")!=-1)
// System.err.println("setting notifications: "+getComponent());

Expand Down Expand Up @@ -1116,14 +1116,14 @@ public String toString()

afterBlock();

// If no other thread for component in mean time, maybe there are notifications left -> readd
if(notifications!=null)
{
// if(getComponent().toString().indexOf("IntermediateBlockingTest@")!=-1)
// System.err.println("unsetting notifications2: "+getComponent());
FutureHelper.addStackedListeners(notifications);
notifications = null;
}
// // If no other thread for component in mean time, maybe there are notifications left -> readd
// if(notifications!=null)
// {
//// if(getComponent().toString().indexOf("IntermediateBlockingTest@")!=-1)
//// System.err.println("unsetting notifications2: "+getComponent());
// FutureHelper.addStackedListeners(notifications);
// notifications = null;
// }
}
}
}
Expand Down Expand Up @@ -1209,34 +1209,34 @@ public boolean execute()

ClassLoader cl = setExecutionState();

// Process listener notifications from old component thread.
// boolean notifexecuted = false;
if(notifications!=null)
{
// if(getComponent().toString().indexOf("IntermediateBlockingTest@")!=-1)
// System.err.println("unsetting notifications: "+getComponent());
FutureHelper.addStackedListeners(notifications);
notifications = null;

// Todo: termination and exception!?
// try
// {
FutureHelper.notifyStackedListeners();
// notifexecuted = true;
// }
// catch(Exception e)
// {
// fatalError(e);
// }
// catch(StepAborted sa)
// {
// }
// catch(Throwable t)
// {
// fatalError(new RuntimeException(t));
// }

}
// // Process listener notifications from old component thread.
//// boolean notifexecuted = false;
// if(notifications!=null)
// {
//// if(getComponent().toString().indexOf("IntermediateBlockingTest@")!=-1)
//// System.err.println("unsetting notifications: "+getComponent());
// FutureHelper.addStackedListeners(notifications);
// notifications = null;
//
// // Todo: termination and exception!?
//// try
//// {
// FutureHelper.notifyStackedListeners();
//// notifexecuted = true;
//// }
//// catch(Exception e)
//// {
//// fatalError(e);
//// }
//// catch(StepAborted sa)
//// {
//// }
//// catch(Throwable t)
//// {
//// fatalError(new RuntimeException(t));
//// }
//
// }
if(endstepcnt!=-1 && debug)
getComponent().getLogger().severe("execute()2: "+getComponent().getId()+", "+IComponentIdentifier.LOCAL.get()+", endstepcnt="+endstepcnt+", stepcnt="+stepcnt);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import jadex.commons.DebugException;
import jadex.commons.ICommand;
import jadex.commons.IResultCommand;
import jadex.commons.SUtil;
import jadex.commons.future.DelegationResultListener;
import jadex.commons.future.Future;
import jadex.commons.future.IFuture;
Expand Down Expand Up @@ -677,17 +678,17 @@ class DelegatingSubscriptionIntermediateDelegationFuture extends SubscriptionInt
@Override
public String toString()
{
return super.toString() + "(storeforfirst="+storeforfirst+", listener="+(listener!=null)+", src="+mysrc+", results="+results+", ownresults="+ownresults+", myresults="+myresults+")";
return super.toString() + "(storeforfirst="+storeforfirst+", src="+mysrc+", results="+results+", ownresults="+ownresults+", myresults="+myresults+")";
}
@Override
protected void storeResult(Object result)
protected void storeResult(Object result, boolean scheduled)
{
if((""+result).contains("IMarkerService"))
{
try
{
myresults.add(result);
super.storeResult(result);
super.storeResult(result, scheduled);
}
finally
{
Expand All @@ -696,7 +697,7 @@ protected void storeResult(Object result)
}
else
{
super.storeResult(result);
super.storeResult(result, scheduled);
}
}
//-------- debugging end --------
Expand Down Expand Up @@ -759,6 +760,7 @@ protected boolean doSetException(Exception exception, boolean undone)
protected boolean doAddIntermediateResult(Object result, boolean undone)
{
if((""+result).contains("IMarkerService"))
// || (""+result).contains("PartDataChunk"))
{
Logger.getLogger(getClass().getName()).info("add: "+this+", "+result+", "+IComponentIdentifier.LOCAL.get());
}
Expand Down Expand Up @@ -1078,13 +1080,13 @@ public void execute(Void args)
*/
class DelegatingIntermediateFuture extends IntermediateFuture<Object>
{
// //-------- debugging --------
// @Override
// public String toString()
// {
// return super.toString() + "(listener="+listener+", listeners="+listeners+")";
// }
// //-------- debugging end --------
//-------- debugging --------
@Override
public String toString()
{
return super.toString() + "(listeners="+listeners+")";
}
//-------- debugging end --------


/** The future functionality. */
Expand Down Expand Up @@ -1136,7 +1138,7 @@ protected boolean doAddIntermediateResult(Object result, boolean undone)
// //-------- debugging --------
// if((""+result).contains("PartDataChunk"))
// {
// Logger.getLogger(getClass().getName()).info("doAddIntermediateResult: "+this+", "+result+", "+IComponentIdentifier.LOCAL.get());
// System.out.println("DelegatingIntermediateFuture.doAddIntermediateResult: "+this+", "+result+", "+IComponentIdentifier.LOCAL.get());
// }
// //-------- debugging end --------

Expand Down

0 comments on commit 7cf770d

Please sign in to comment.