Skip to content

Commit

Permalink
Add support for multiple (Diff-) Environments
Browse files Browse the repository at this point in the history
Fixes #68
  • Loading branch information
Micha Reiser committed Oct 10, 2016
1 parent f3b3b2c commit 1a60141
Show file tree
Hide file tree
Showing 24 changed files with 393 additions and 226 deletions.
7 changes: 4 additions & 3 deletions src/common/parallel/chain/dependent-parallel-chain-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {ScheduledParallelChainState} from "./scheduled-parallel-chain-state";
import {ParallelCollectionGenerator} from "../generator/parallel-collection-generator";
import {flattenArray} from "../../util/arrays";
import {ParallelStream} from "../stream/parallel-stream-impl";
import {ParallelEnvironmentDefinition} from "../parallel-environment-definition";

/**
* The state of a parallel chain if additional operations should be performed on an already scheduled parallel chain.
Expand All @@ -24,7 +25,7 @@ export class DependentParallelChainState<TPrevious, TElement> implements IParall
* @param environment the environment used by the job
* @param operations the operations to performed when the previous stream has completed
*/
constructor(private previousStream: IParallelStream<TPrevious[], TPrevious[]>, private options: IDefaultInitializedParallelOptions, private environment: IParallelChainEnvironment, private operations: IParallelOperation[] = []) {}
constructor(private previousStream: IParallelStream<TPrevious[], TPrevious[]>, private options: IDefaultInitializedParallelOptions, private environment: ParallelEnvironmentDefinition, private operations: IParallelOperation[] = []) {}

public resolve(): IScheduledParallelChainState<TElement> {
let next: ((subResult: TElement[], taskIndex: number, valuesPerTask: number) => void) | undefined = undefined;
Expand Down Expand Up @@ -56,7 +57,7 @@ export class DependentParallelChainState<TPrevious, TElement> implements IParall
return new DependentParallelChainState<TPrevious, TElementNew>(this.previousStream, this.options, this.environment, [...this.operations, operation]);
}

public changeEnvironment(environment: IParallelChainEnvironment): IParallelChainState<TElement> {
return new DependentParallelChainState(this.previousStream, this.options, environment, this.operations);
public addEnvironment(environment: IParallelChainEnvironment): IParallelChainState<TElement> {
return new DependentParallelChainState(this.previousStream, this.options, this.environment.add(environment), this.operations);
}
}
11 changes: 6 additions & 5 deletions src/common/parallel/chain/parallel-chain-factory.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import {PendingParallelChainState} from "./pending-parallel-chain-state";
import {ParallelChainImpl} from "./parallel-chain-impl";
import {IEmptyParallelEnvironment, IDefaultInitializedParallelOptions, IParallelOperation} from "../";
import {IParallelEnvironment, IDefaultInitializedParallelOptions, IParallelOperation} from "../";
import {IParallelGenerator} from "../generator/parallel-generator";
import {IParallelChain} from "./parallel-chain";
import {ParallelEnvironmentDefinition} from "../parallel-environment-definition";

/**
* Creates a new parallel chain
Expand All @@ -12,13 +13,13 @@ import {IParallelChain} from "./parallel-chain";
* @param TIn type of the elements generated by the generator
* @param TOut type of the elements resulting from this parallel chain
*/
export function createParallelChain<TIn, TOut>(generator: IParallelGenerator, options: IDefaultInitializedParallelOptions, operations?: IParallelOperation[]): IParallelChain<TIn, IEmptyParallelEnvironment, TOut>;
export function createParallelChain<TIn, TOut>(generator: IParallelGenerator, options: IDefaultInitializedParallelOptions, operations?: IParallelOperation[]): IParallelChain<TIn, IParallelEnvironment, TOut>;

/**
* @param sharedEnv the available environment in the job
*/
export function createParallelChain<TIn, TEnv extends IEmptyParallelEnvironment, TOut>(generator: IParallelGenerator, options: IDefaultInitializedParallelOptions, sharedEnv: TEnv, operations?: IParallelOperation[]): IParallelChain<TIn, TEnv, TOut>;
export function createParallelChain<TIn, TEnv extends IEmptyParallelEnvironment, TOut>(generator: IParallelGenerator, options: IDefaultInitializedParallelOptions, sharedEnv?: TEnv | IParallelOperation[], operations: IParallelOperation[] = []): IParallelChain<TIn, TEnv, TOut> {
export function createParallelChain<TIn, TEnv extends IParallelEnvironment, TOut>(generator: IParallelGenerator, options: IDefaultInitializedParallelOptions, sharedEnv: TEnv, operations?: IParallelOperation[]): IParallelChain<TIn, TEnv, TOut>;
export function createParallelChain<TIn, TEnv extends IParallelEnvironment, TOut>(generator: IParallelGenerator, options: IDefaultInitializedParallelOptions, sharedEnv?: TEnv | IParallelOperation[], operations: IParallelOperation[] = []): IParallelChain<TIn, TEnv, TOut> {
let environment: TEnv | undefined;

if (sharedEnv instanceof Array) {
Expand All @@ -28,6 +29,6 @@ export function createParallelChain<TIn, TEnv extends IEmptyParallelEnvironment,
environment = sharedEnv;
}

const chain = new ParallelChainImpl(new PendingParallelChainState(generator, options, undefined, operations));
const chain = new ParallelChainImpl(new PendingParallelChainState(generator, options, ParallelEnvironmentDefinition.of(), operations));
return environment ? chain.inEnvironment(environment) : chain;
}
8 changes: 4 additions & 4 deletions src/common/parallel/chain/parallel-chain-impl.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {IParallelChain} from "./parallel-chain";
import {IParallelStream} from "../stream/parallel-stream";
import {ParallelWorkerFunctionIds} from "../slave/parallel-worker-functions";
import {IEmptyParallelEnvironment, IParallelTaskEnvironment} from "../parallel-environment";
import {IParallelEnvironment, IParallelTaskEnvironment} from "../parallel-environment";
import {FunctionCall} from "../../function/function-call";
import {IParallelChainState, IParallelChainEnvironment} from "./parallel-chain-state";
import {ParallelStream} from "../stream/parallel-stream-impl";
Expand All @@ -18,7 +18,7 @@ import {IFunctionId} from "../../function/function-id";
* @param TEnv type of the job environment
* @param TOut type of the elements in the resulting array
*/
export class ParallelChainImpl<TIn, TEnv extends IEmptyParallelEnvironment, TOut> implements IParallelChain<TIn, TEnv, TOut> {
export class ParallelChainImpl<TIn, TEnv extends IParallelEnvironment, TOut> implements IParallelChain<TIn, TEnv, TOut> {
public state: IParallelChainState<TOut>;

/**
Expand All @@ -30,15 +30,15 @@ export class ParallelChainImpl<TIn, TEnv extends IEmptyParallelEnvironment, TOut
}

// region Chaining
public inEnvironment<TEnvNew extends TEnv>(newEnv: Function | IEmptyParallelEnvironment | IFunctionId, ...params: any[]): IParallelChain<TIn, TEnvNew, TOut> {
public inEnvironment<TEnvNew extends IParallelEnvironment>(newEnv: Function | IParallelEnvironment | IFunctionId, ...params: any[]): IParallelChain<TIn, TEnv & TEnvNew, TOut> {
let env: IParallelChainEnvironment | undefined;
if (typeof newEnv === "function") {
env = FunctionCall.createUnchecked(newEnv, ...params);
} else {
env = newEnv;
}

return new ParallelChainImpl<TIn, TEnvNew, TOut>(this.state.changeEnvironment(env));
return new ParallelChainImpl<TIn, TEnv & TEnvNew, TOut>(this.state.addEnvironment(env));
}

public map<TResult>(mapper: ((this: void, element: TOut, env: TEnv & IParallelTaskEnvironment) => TResult) | IFunctionId): IParallelChain<TIn, TEnv, TResult> {
Expand Down
8 changes: 4 additions & 4 deletions src/common/parallel/chain/parallel-chain-state.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import {FunctionCall} from "../../function/function-call";
import {IEmptyParallelEnvironment, IParallelOperation} from "../";
import {IParallelEnvironment, IParallelOperation} from "../";
import {IParallelStream} from "../stream/parallel-stream";

export type IParallelChainEnvironment = FunctionCall | IEmptyParallelEnvironment;
export type IParallelChainEnvironment = FunctionCall | IParallelEnvironment;

/**
* State of a parallel chain. Dependent of the state of the chain, the chaining of operation, changing of the
Expand All @@ -25,11 +25,11 @@ export interface IParallelChainState<TElement> {
chainOperation<TElementNew>(operation: IParallelOperation): IParallelChainState<TElementNew>;

/**
* Changes the environment of the change to the given one
* Adds the given environment to the current environment
* @param environment the new environment to use
* @returns the new state that uses the given environment instead of the existing one
*/
changeEnvironment(environment: IParallelChainEnvironment): IParallelChainState<TElement>;
addEnvironment(environment: IParallelChainEnvironment): IParallelChainState<TElement>;
}

/**
Expand Down
30 changes: 16 additions & 14 deletions src/common/parallel/chain/parallel-chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
/** needed, typedoc issue */

import {IParallelStream} from "../stream/parallel-stream";
import {IEmptyParallelEnvironment, IParallelTaskEnvironment} from "../parallel-environment";
import {IParallelEnvironment, IParallelTaskEnvironment} from "../parallel-environment";
import {IFunctionId} from "../../function/function-id";

/**
Expand All @@ -15,29 +15,31 @@ import {IFunctionId} from "../../function/function-id";
* @param TOut the type of the resulting elements
* @param TEnv the type of the environment
*/
export interface IParallelChain<TIn, TEnv extends IEmptyParallelEnvironment, TOut> extends IParallelStream<TOut[], TOut[]> {
export interface IParallelChain<TIn, TEnv extends IParallelEnvironment, TOut> extends IParallelStream<TOut[], TOut[]> {
/**
* Defines the environment that should be provided to all the iteratee or generator functions.
* The environment cannot contain function values.
* @param newEnv the environment that should be provided to the iteratee function
* Defines the environment that is provided to all iteratee or generator functions.
* The environment cannot contain function values. The values of the provided environment is merged with the values
* of any prior defined environments. If an environment provider is defined, then the values returned by the environment
* provider have a higher precedence.
* @param newEnv the environment that is provided to the iteratee function
*/
inEnvironment<TEnvNew extends TEnv>(newEnv: TEnvNew): IParallelChain<TIn, TEnvNew, TOut>;
inEnvironment<TEnvNew extends IParallelEnvironment>(newEnv: TEnvNew): IParallelChain<TIn, TEnv & TEnvNew, TOut>;

/**
* Defines a function that should be used to build the environment for each task. The function is executed as first
* Defines a function that should be used to build the environment for each task. The function is executed first
* on the scheduled task.
* @param provider the function providing the environment
* @param TEnvNew the type of the environment
* @returns the chain
*/
inEnvironment<TEnvNew extends TEnv>(provider: (this: void) => TEnvNew): IParallelChain<TIn, TEnvNew, TOut>;
inEnvironment<TEnvNew extends TEnv>(provider: IFunctionId): IParallelChain<TIn, TEnvNew, TOut>;
inEnvironment<TEnvNew extends IParallelEnvironment>(provider: (this: void) => TEnvNew): IParallelChain<TIn, TEnv & TEnvNew, TOut>;
inEnvironment<TEnvNew extends IParallelEnvironment>(provider: IFunctionId): IParallelChain<TIn, TEnv & TEnvNew, TOut>;

/**
* @param param1 single parameter that is passed to the provider
* @param TParam1 the type of the single parameter
*/
inEnvironment<TParam1, TEnvNew extends TEnv>(provider: (this: void, arg1: TParam1) => TEnvNew, param1: TParam1): IParallelChain<TIn, TEnvNew, TOut>;
inEnvironment<TParam1, TEnvNew extends IParallelEnvironment>(provider: (this: void, arg1: TParam1) => TEnvNew, param1: TParam1): IParallelChain<TIn, TEnv & TEnvNew, TOut>;

/**
*
Expand All @@ -46,24 +48,24 @@ export interface IParallelChain<TIn, TEnv extends IEmptyParallelEnvironment, TOu
* @param TParam1 type of the first parameter
* @param TParam2 type of the second parameter
*/
inEnvironment<TParam1, TParam2, TEnvNew extends TEnv>(provider: (this: void, arg1: TParam1, arg2: TParam2) => TEnvNew, param1: TParam1, param2: TParam2): IParallelChain<TIn, TEnvNew, TOut>;
inEnvironment<TParam1, TParam2, TEnvNew extends IParallelEnvironment>(provider: (this: void, arg1: TParam1, arg2: TParam2) => TEnvNew, param1: TParam1, param2: TParam2): IParallelChain<TIn, TEnv & TEnvNew, TOut>;

/**
* @param param3 third parameter that is passed to the provider funciton
* @param TParam3 type of the third parameter
*/
inEnvironment<TParam1, TParam2, TParam3, TEnvNew extends TEnv>(provider: (this: void, arg1: TParam1, arg2: TParam2, arg3: TParam3) => TEnvNew, param1: TParam1, param2: TParam2, param3: TParam3): IParallelChain<TIn, TEnvNew, TOut>;
inEnvironment<TParam1, TParam2, TParam3, TEnvNew extends IParallelEnvironment>(provider: (this: void, arg1: TParam1, arg2: TParam2, arg3: TParam3) => TEnvNew, param1: TParam1, param2: TParam2, param3: TParam3): IParallelChain<TIn, TEnv & TEnvNew, TOut>;

/**
* @param param4 fourth parameter that is passed to the provider function
* @param TParam4 type of the fourth parameter
*/
inEnvironment<TParam1, TParam2, TParam3, TParam4, TEnvNew extends TEnv>(provider: (this: void, arg1: TParam1, arg2: TParam2, arg3: TParam3, arg4: TParam4) => TEnvNew, param1: TParam1, param2: TParam2, param3: TParam3, param4: TParam4): IParallelChain<TIn, TEnvNew, TOut>;
inEnvironment<TParam1, TParam2, TParam3, TParam4, TEnvNew extends IParallelEnvironment>(provider: (this: void, arg1: TParam1, arg2: TParam2, arg3: TParam3, arg4: TParam4) => TEnvNew, param1: TParam1, param2: TParam2, param3: TParam3, param4: TParam4): IParallelChain<TIn, TEnv & TEnvNew, TOut>;

/**
* @param furtherParams further paramters that are passed to the provider function
*/
inEnvironment<TParam1, TParam2, TParam3, TParam4, TEnvNew extends TEnv>(provider: (this: void, arg1: TParam1, arg2: TParam2, arg3: TParam3, arg4: TParam4, ...furtherParams: any[]) => TEnvNew, param1: TParam1, param2: TParam2, param3: TParam3, param4: TParam4, ...furtherParams: any[]): IParallelChain<TIn, TEnvNew, TOut>;
inEnvironment<TParam1, TParam2, TParam3, TParam4, TEnvNew extends IParallelEnvironment>(provider: (this: void, arg1: TParam1, arg2: TParam2, arg3: TParam3, arg4: TParam4, ...furtherParams: any[]) => TEnvNew, param1: TParam1, param2: TParam2, param3: TParam3, param4: TParam4, ...furtherParams: any[]): IParallelChain<TIn, TEnv & TEnvNew, TOut>;

/**
* Maps all input values to an output value using the given mapper. The mapper is applied for each input element
Expand Down
13 changes: 7 additions & 6 deletions src/common/parallel/chain/pending-parallel-chain-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {IParallelGenerator} from "../generator/parallel-generator";
import {IParallelOperation, IDefaultInitializedParallelOptions} from "../";
import {ParallelStream} from "../stream/parallel-stream-impl";
import {flattenArray} from "../../util/arrays";
import {ParallelEnvironmentDefinition} from "../parallel-environment-definition";

/**
* Parallel chain has been defined but not yet scheduled.
Expand All @@ -14,21 +15,21 @@ import {flattenArray} from "../../util/arrays";
export class PendingParallelChainState<TElement> implements IParallelChainState<TElement> {

public generator: IParallelGenerator;
public environment: IParallelChainEnvironment;
public environment: ParallelEnvironmentDefinition;
public operations: IParallelOperation[];
public options: IDefaultInitializedParallelOptions;

/**
* Creates a new state
* @param generator the generator to use to generate the input elements and split the job
* @param options the options
* @param environment the environment for the job
* @param environment the environment builder that is used to create the environment for the job
* @param operations the operations to perform on the input elements
*/
constructor(generator: IParallelGenerator, options: IDefaultInitializedParallelOptions, environment: IParallelChainEnvironment | undefined, operations: IParallelOperation[]) {
constructor(generator: IParallelGenerator, options: IDefaultInitializedParallelOptions, environment: ParallelEnvironmentDefinition, operations: IParallelOperation[]) {
this.generator = generator;
this.options = options;
this.environment = environment || {};
this.environment = environment;
this.operations = operations;
}

Expand All @@ -47,7 +48,7 @@ export class PendingParallelChainState<TElement> implements IParallelChainState<
return new PendingParallelChainState(this.generator, this.options, this.environment, [...this.operations, operation]);
}

public changeEnvironment(environment: IParallelChainEnvironment): IParallelChainState<TElement> {
return new PendingParallelChainState(this.generator, this.options, environment, this.operations);
public addEnvironment(environment: IParallelChainEnvironment): IParallelChainState<TElement> {
return new PendingParallelChainState(this.generator, this.options, this.environment.add(environment), this.operations);
}
}
7 changes: 4 additions & 3 deletions src/common/parallel/chain/scheduled-parallel-chain-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {IParallelChainState, IParallelChainEnvironment, IScheduledParallelChainS
import {IParallelOperation, IDefaultInitializedParallelOptions} from "../";
import {IParallelStream} from "../stream/parallel-stream";
import {DependentParallelChainState} from "./dependent-parallel-chain-state";
import {ParallelEnvironmentDefinition} from "../parallel-environment-definition";

/**
* State of a parallel chain whose job has been scheduled on the thread pool (or even where the computation already has completed).
Expand All @@ -16,7 +17,7 @@ export class ScheduledParallelChainState<TElement> implements IScheduledParallel
* @param options the options used for the scheduled job
* @param environment the environment used for the scheduled job
*/
constructor(stream: IParallelStream<TElement[], TElement[]>, private options: IDefaultInitializedParallelOptions, private environment: IParallelChainEnvironment) {
constructor(stream: IParallelStream<TElement[], TElement[]>, private options: IDefaultInitializedParallelOptions, private environment: ParallelEnvironmentDefinition) {
this.stream = stream;
}

Expand All @@ -28,7 +29,7 @@ export class ScheduledParallelChainState<TElement> implements IScheduledParallel
return new DependentParallelChainState(this.stream, this.options, this.environment, [operation]);
}

public changeEnvironment(environment: IParallelChainEnvironment): IParallelChainState<TElement> {
return new DependentParallelChainState(this.stream, this.options, environment);
public addEnvironment(environment: IParallelChainEnvironment): IParallelChainState<TElement> {
return new DependentParallelChainState(this.stream, this.options, this.environment.add(environment));
}
}
2 changes: 1 addition & 1 deletion src/common/parallel/generator/parallel-times-generator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export class ParallelTimesGenerator implements IParallelGenerator {
public static create<T>(n: number, value: T): ParallelTimesGenerator;
public static create(n: number, call: FunctionCall): ParallelTimesGenerator;
public static create<T>(n: number, generator: ((this: void, index: number, env: IParallelTaskEnvironment) => T) | IFunctionId | T): ParallelTimesGenerator {
let generatorFunction: FunctionCall | T;
let generatorFunction: FunctionCall;
if (isFunctionId(generator) || typeof generator === "function") {
generatorFunction = FunctionCall.createUnchecked(generator);
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/common/parallel/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export {IParallel} from "./parallel";
export {IParallelChain} from "./chain/parallel-chain";
export {IParallelTaskEnvironment, IEmptyParallelEnvironment} from "./parallel-environment";
export {IParallelTaskEnvironment, IParallelEnvironment} from "./parallel-environment";
export {IParallelJob} from "./parallel-job";
export {IParallelOptions, IDefaultInitializedParallelOptions} from "./parallel-options";
export {IParallelJobScheduler} from "./scheduling/parallel-job-scheduler";
Expand Down
Loading

0 comments on commit 1a60141

Please sign in to comment.