Skip to content

Commit

Permalink
+ Different thread pool can be specified for all operations via the n…
Browse files Browse the repository at this point in the history
…ew TaskConfig.ThreadPool function.

! Stability fixes by [Tomasso Ercole].
  • Loading branch information
gabr42@gmail.com committed Oct 14, 2013
1 parent 1401516 commit 86c3b7a
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 13 deletions.
1 change: 0 additions & 1 deletion OtlCommon.Utils.pas
Expand Up @@ -50,7 +50,6 @@
unit OtlCommon.Utils;

{$DEBUGINFO OFF}
{$TYPEDADDRESS OFF}

interface

Expand Down
53 changes: 42 additions & 11 deletions OtlParallel.pas
Expand Up @@ -31,10 +31,14 @@
///<remarks><para>
/// Author : Primoz Gabrijelcic
/// Creation date : 2010-01-08
/// Last modification : 2013-10-13
/// Version : 1.32
/// Last modification : 2013-10-14
/// Version : 1.33
///</para><para>
/// History:
/// 1.33: 2013-10-14
/// - Different thread pool can be specified for all operations via the new
/// TaskConfig.ThreadPool function.
/// - Included stability fixes by [Tomasso Ercole].
/// 1.32: 2013-10-13
/// - Removed optimization which caused ForEach to behave differently on
/// uniprocessor computers.
Expand Down Expand Up @@ -294,6 +298,7 @@ interface
function OnTerminated(eventHandler: TOmniTaskTerminatedEvent): IOmniTaskConfig; overload;
function OnTerminated(eventHandler: TOmniOnTerminatedFunction): IOmniTaskConfig; overload;
function OnTerminated(eventHandler: TOmniOnTerminatedFunctionSimple): IOmniTaskConfig; overload;
function ThreadPool(const threadPool: IOmniThreadPool): IOmniTaskConfig;
function WithCounter(const counter: IOmniCounter): IOmniTaskConfig;
function WithLock(const lock: TSynchroObject; autoDestroyLock: boolean = true): IOmniTaskConfig; overload;
function WithLock(const lock: IOmniCriticalSection): IOmniTaskConfig; overload;
Expand Down Expand Up @@ -796,12 +801,12 @@ TOmniJoinState = class(TInterfacedObject, IOmniJoinState, IOmniJoinStateEx)
function FatalException: Exception;
function IsCancelled: boolean;
function IsExceptional: boolean;
function NoWait: IOmniParallelJoin;
function NumTasks(numTasks: integer): IOmniParallelJoin;
function OnStop(const stopCode: TProc): IOmniParallelJoin;
function Task(const task: TProc): IOmniParallelJoin; overload;
function Task(const task: TOmniJoinDelegate): IOmniParallelJoin; overload;
function TaskConfig(const config: IOmniTaskConfig): IOmniParallelJoin;
function NoWait: IOmniParallelJoin;
function WaitFor(timeout_ms: cardinal): boolean;
end; { IOmniParallelJoin }

Expand Down Expand Up @@ -978,6 +983,7 @@ Parallel = class

// helpers
class function CompleteQueue(const queue: IOmniBlockingCollection): TProc;
class function GetPool(const taskConfig: IOmniTaskConfig): IOmniThreadPool;
end; { Parallel }

IOmniAwait = interface
Expand Down Expand Up @@ -1165,6 +1171,7 @@ TOmniTaskConfigTerminated = record

IOmniTaskConfigInternal = interface ['{8678C3A4-7825-4E7C-8FEF-4DD6CD3D3E29}']
procedure DetachTerminated(var terminated: TOmniTaskConfigTerminated);
function GetThreadPool: IOmniThreadPool;
end; { IOmniTaskConfigInternal }

TOmniTaskConfig = class(TInterfacedObject, IOmniTaskConfig, IOmniTaskConfigInternal)
Expand All @@ -1175,6 +1182,7 @@ TOmniTaskConfig = class(TInterfacedObject, IOmniTaskConfig, IOmniTaskConfigInt
otcOnMessageEventHandler : TOmniTaskMessageEvent;
otcOnMessageList : TGpIntegerObjectList;
otcOnTerminated : TOmniTaskConfigTerminated;
otcThreadPool : IOmniThreadPool;
otcWithCounterCounter : IOmniCounter;
otcWithLockAutoDestroy : boolean;
otcWithLockOmniLock : IOmniCriticalSection;
Expand All @@ -1193,11 +1201,13 @@ TOmniTaskConfig = class(TInterfacedObject, IOmniTaskConfig, IOmniTaskConfigInt
function OnTerminated(eventHandler: TOmniTaskTerminatedEvent): IOmniTaskConfig; overload; inline;
function OnTerminated(eventHandler: TOmniOnTerminatedFunction): IOmniTaskConfig; overload;
function OnTerminated(eventHandler: TOmniOnTerminatedFunctionSimple): IOmniTaskConfig; overload;
function ThreadPool(const threadPool: IOmniThreadPool): IOmniTaskConfig;
function WithCounter(const counter: IOmniCounter): IOmniTaskConfig; inline;
function WithLock(const lock: TSynchroObject; autoDestroyLock: boolean = true): IOmniTaskConfig; overload; inline;
function WithLock(const lock: IOmniCriticalSection): IOmniTaskConfig; overload; inline;
public //IOmniTaskConfigInternal
procedure DetachTerminated(var terminated: TOmniTaskConfigTerminated);
function GetThreadPool: IOmniThreadPool; inline;
end; { TOmniTaskConfig }

const
Expand Down Expand Up @@ -1527,7 +1537,7 @@ function TOmniParallelJoin.Execute: IOmniParallelJoin;
).SetParameter('NumWorker', iProc);
ApplyConfig(opjTaskConfig, taskControl);
(opjJoinStates[iProc] as IOmniJoinStateEx).TaskControl := taskControl;
taskControl.Schedule(GlobalParallelPool);
taskControl.Schedule(Parallel.GetPool(opjTaskConfig));
end;
if not opjNoWait then begin
WaitFor(INFINITE);
Expand Down Expand Up @@ -1649,7 +1659,8 @@ class procedure Parallel.Async(task: TOmniTaskDelegate; taskConfig: IOmniTaskCon
end
);
ApplyConfig(taskConfig, omniTask);
omniTask.Schedule(GlobalParallelPool);
omniTask.Unobserved;
omniTask.Schedule(GetPool(taskConfig));
end; { Parallel.Async }

class function Parallel.BackgroundWorker: IOmniBackgroundWorker;
Expand Down Expand Up @@ -1782,6 +1793,15 @@ class function Parallel.Future<T>(action: TOmniFutureDelegateEx<T>; taskConfig:
Result := TOmniFuture<T>.CreateEx(action, taskConfig);
end; { Parallel.Future<T> }

class function Parallel.GetPool(const taskConfig: IOmniTaskConfig): IOmniThreadPool;
begin
Result := nil;
if assigned(taskConfig) then
Result := (taskConfig as IOmniTaskConfigInternal).GetThreadPool;
if not assigned(Result) then
Result := GlobalParallelPool;
end; { Parallel.GetPool }

class function Parallel.Join(const task1, task2: TProc): IOmniParallelJoin;
begin
Result := TOmniParallelJoin.Create.Task(task1).Task(task2);
Expand Down Expand Up @@ -2141,14 +2161,14 @@ procedure TOmniParallelLoopBase.InternalExecuteTask(taskDelegate: TOmniTaskDeleg
end;
end,
'Parallel.ForEach worker #' + IntToStr(iTask))
.WithLock(lockAggregate)
.Unobserved;
.WithLock(lockAggregate);
ApplyConfig(oplTaskConfig, task);
task.Unobserved;
for kv in oplOnMessageList.WalkKV do
task.OnMessage(kv.Key, TOmniMessageExec.Clone(TOmniMessageExec(kv.Value)));
if assigned(oplOnTaskControlCreate) then
oplOnTaskControlCreate(task);
task.Schedule(GlobalParallelPool);
task.Schedule(Parallel.GetPool(oplTaskConfig));
end;
if not (ploNoWait in Options) then begin
WaitForSingleObject(oplCountStopped.Handle, INFINITE);
Expand Down Expand Up @@ -2740,7 +2760,7 @@ procedure TOmniFuture<T>.Execute(action: TOmniTaskDelegate; taskConfig: IOmniTas
begin
ofTask := CreateTask(action, 'TOmniFuture action');
ApplyConfig(taskConfig, ofTask);
ofTask.Schedule(GlobalParallelPool);
ofTask.Schedule(Parallel.GetPool(taskConfig));
end; { TOmniFuture<T>.Execute }

function TOmniFuture<T>.FatalException: Exception;
Expand Down Expand Up @@ -3112,7 +3132,6 @@ function TOmniPipeline.Run: IOmniPipeline;
end,
stageName
)
.Unobserved
.CancelWith(opCancelWith)
.SetParameter('From', inQueue)
.SetParameter('Stage', opStages[iStage])
Expand All @@ -3121,7 +3140,8 @@ function TOmniPipeline.Run: IOmniPipeline;
.SetParameter('TotalStopped', opCountStopped)
.SetParameter('Cancelled', opCancelWith);
ApplyConfig((opStages[iStage] as IOmniPipelineStageEx).TaskConfig, task);
task.Schedule(GlobalParallelPool);
task.Unobserved;
task.Schedule(Parallel.GetPool((opStages[iStage] as IOmniPipelineStageEx).TaskConfig));
end; //for iTask
end; //for iStage
opOutput.ReraiseExceptions(not opHandleExceptions);
Expand Down Expand Up @@ -3842,6 +3862,11 @@ procedure TOmniTaskConfig.DetachTerminated(var terminated: TOmniTaskConfigTermin
otcOnTerminated.Clear;
end; { TOmniTaskConfig.GetTerminated }

function TOmniTaskConfig.GetThreadPool: IOmniThreadPool;
begin
Result := otcThreadPool;
end; { TOmniTaskConfig.GetThreadPool }

function TOmniTaskConfig.MonitorWith(const monitor: IOmniTaskControlMonitor):
IOmniTaskConfig;
begin
Expand Down Expand Up @@ -3890,6 +3915,12 @@ function TOmniTaskConfig.OnTerminated(eventHandler: TOmniOnTerminatedFunctionSim
Result := Self;
end; { TOmniTaskConfig.OnTerminated }

function TOmniTaskConfig.ThreadPool(const threadPool: IOmniThreadPool): IOmniTaskConfig;
begin
otcThreadPool := threadPool;
Result := Self;
end; { TOmniTaskConfig.ThreadPool }

function TOmniTaskConfig.WithCounter(const counter: IOmniCounter): IOmniTaskConfig;
begin
otcWithCounterCounter := counter;
Expand Down
5 changes: 4 additions & 1 deletion history.txt
Expand Up @@ -16,11 +16,14 @@ Problems ([alex]):
- Added IOmniTaskControl.Stop - signals thread to stop and immediately returns.
- Fixed TOmniTaskGroup.TerminateAll.
+ added OtlSuperObject [Lee_Nover]
! Simple pipline stage handles exceptions in the executor function.
! Simple pipeline stage handles exceptions in the executor function.
! compiles when 'Typed @ operator' is enabled (tnx to [arioch])
- Compiles with XE5 (tnx to [Uwe Raabe])
/// - Removed optimization which caused ForEach to behave differently on
/// uniprocessor computers.
/// - Different thread pool can be specified for all operations via the new
/// TaskConfig.ThreadPool function.
/// - Included stability fixes by [Tomasso Ercole].

3.02: 2012-10-03
- New features:
Expand Down

0 comments on commit 86c3b7a

Please sign in to comment.