Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multiple (Diff-) Environments #69

Merged
merged 1 commit into from
Oct 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/common/parallel/chain/dependent-parallel-chain-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {ScheduledParallelChainState} from "./scheduled-parallel-chain-state";
import {ParallelCollectionGenerator} from "../generator/parallel-collection-generator";
import {flattenArray} from "../../util/arrays";
import {ParallelStream} from "../stream/parallel-stream-impl";
import {ParallelEnvironmentDefinition} from "../parallel-environment-definition";

/**
* The state of a parallel chain if additional operations should be performed on an already scheduled parallel chain.
Expand All @@ -24,7 +25,7 @@ export class DependentParallelChainState<TPrevious, TElement> implements IParall
* @param environment the environment used by the job
* @param operations the operations to performed when the previous stream has completed
*/
constructor(private previousStream: IParallelStream<TPrevious[], TPrevious[]>, private options: IDefaultInitializedParallelOptions, private environment: IParallelChainEnvironment, private operations: IParallelOperation[] = []) {}
constructor(private previousStream: IParallelStream<TPrevious[], TPrevious[]>, private options: IDefaultInitializedParallelOptions, private environment: ParallelEnvironmentDefinition, private operations: IParallelOperation[] = []) {}

public resolve(): IScheduledParallelChainState<TElement> {
let next: ((subResult: TElement[], taskIndex: number, valuesPerTask: number) => void) | undefined = undefined;
Expand Down Expand Up @@ -56,7 +57,7 @@ export class DependentParallelChainState<TPrevious, TElement> implements IParall
return new DependentParallelChainState<TPrevious, TElementNew>(this.previousStream, this.options, this.environment, [...this.operations, operation]);
}

public changeEnvironment(environment: IParallelChainEnvironment): IParallelChainState<TElement> {
return new DependentParallelChainState(this.previousStream, this.options, environment, this.operations);
public addEnvironment(environment: IParallelChainEnvironment): IParallelChainState<TElement> {
return new DependentParallelChainState(this.previousStream, this.options, this.environment.add(environment), this.operations);
}
}
11 changes: 6 additions & 5 deletions src/common/parallel/chain/parallel-chain-factory.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import {PendingParallelChainState} from "./pending-parallel-chain-state";
import {ParallelChainImpl} from "./parallel-chain-impl";
import {IEmptyParallelEnvironment, IDefaultInitializedParallelOptions, IParallelOperation} from "../";
import {IParallelEnvironment, IDefaultInitializedParallelOptions, IParallelOperation} from "../";
import {IParallelGenerator} from "../generator/parallel-generator";
import {IParallelChain} from "./parallel-chain";
import {ParallelEnvironmentDefinition} from "../parallel-environment-definition";

/**
* Creates a new parallel chain
Expand All @@ -12,13 +13,13 @@ import {IParallelChain} from "./parallel-chain";
* @param TIn type of the elements generated by the generator
* @param TOut type of the elements resulting from this parallel chain
*/
export function createParallelChain<TIn, TOut>(generator: IParallelGenerator, options: IDefaultInitializedParallelOptions, operations?: IParallelOperation[]): IParallelChain<TIn, IEmptyParallelEnvironment, TOut>;
export function createParallelChain<TIn, TOut>(generator: IParallelGenerator, options: IDefaultInitializedParallelOptions, operations?: IParallelOperation[]): IParallelChain<TIn, IParallelEnvironment, TOut>;

/**
* @param sharedEnv the available environment in the job
*/
export function createParallelChain<TIn, TEnv extends IEmptyParallelEnvironment, TOut>(generator: IParallelGenerator, options: IDefaultInitializedParallelOptions, sharedEnv: TEnv, operations?: IParallelOperation[]): IParallelChain<TIn, TEnv, TOut>;
export function createParallelChain<TIn, TEnv extends IEmptyParallelEnvironment, TOut>(generator: IParallelGenerator, options: IDefaultInitializedParallelOptions, sharedEnv?: TEnv | IParallelOperation[], operations: IParallelOperation[] = []): IParallelChain<TIn, TEnv, TOut> {
export function createParallelChain<TIn, TEnv extends IParallelEnvironment, TOut>(generator: IParallelGenerator, options: IDefaultInitializedParallelOptions, sharedEnv: TEnv, operations?: IParallelOperation[]): IParallelChain<TIn, TEnv, TOut>;
export function createParallelChain<TIn, TEnv extends IParallelEnvironment, TOut>(generator: IParallelGenerator, options: IDefaultInitializedParallelOptions, sharedEnv?: TEnv | IParallelOperation[], operations: IParallelOperation[] = []): IParallelChain<TIn, TEnv, TOut> {
let environment: TEnv | undefined;

if (sharedEnv instanceof Array) {
Expand All @@ -28,6 +29,6 @@ export function createParallelChain<TIn, TEnv extends IEmptyParallelEnvironment,
environment = sharedEnv;
}

const chain = new ParallelChainImpl(new PendingParallelChainState(generator, options, undefined, operations));
const chain = new ParallelChainImpl(new PendingParallelChainState(generator, options, ParallelEnvironmentDefinition.of(), operations));
return environment ? chain.inEnvironment(environment) : chain;
}
8 changes: 4 additions & 4 deletions src/common/parallel/chain/parallel-chain-impl.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {IParallelChain} from "./parallel-chain";
import {IParallelStream} from "../stream/parallel-stream";
import {ParallelWorkerFunctionIds} from "../slave/parallel-worker-functions";
import {IEmptyParallelEnvironment, IParallelTaskEnvironment} from "../parallel-environment";
import {IParallelEnvironment, IParallelTaskEnvironment} from "../parallel-environment";
import {FunctionCall} from "../../function/function-call";
import {IParallelChainState, IParallelChainEnvironment} from "./parallel-chain-state";
import {ParallelStream} from "../stream/parallel-stream-impl";
Expand All @@ -18,7 +18,7 @@ import {IFunctionId} from "../../function/function-id";
* @param TEnv type of the job environment
* @param TOut type of the elements in the resulting array
*/
export class ParallelChainImpl<TIn, TEnv extends IEmptyParallelEnvironment, TOut> implements IParallelChain<TIn, TEnv, TOut> {
export class ParallelChainImpl<TIn, TEnv extends IParallelEnvironment, TOut> implements IParallelChain<TIn, TEnv, TOut> {
public state: IParallelChainState<TOut>;

/**
Expand All @@ -30,15 +30,15 @@ export class ParallelChainImpl<TIn, TEnv extends IEmptyParallelEnvironment, TOut
}

// region Chaining
public inEnvironment<TEnvNew extends TEnv>(newEnv: Function | IEmptyParallelEnvironment | IFunctionId, ...params: any[]): IParallelChain<TIn, TEnvNew, TOut> {
public inEnvironment<TEnvNew extends IParallelEnvironment>(newEnv: Function | IParallelEnvironment | IFunctionId, ...params: any[]): IParallelChain<TIn, TEnv & TEnvNew, TOut> {
let env: IParallelChainEnvironment | undefined;
if (typeof newEnv === "function") {
env = FunctionCall.createUnchecked(newEnv, ...params);
} else {
env = newEnv;
}

return new ParallelChainImpl<TIn, TEnvNew, TOut>(this.state.changeEnvironment(env));
return new ParallelChainImpl<TIn, TEnv & TEnvNew, TOut>(this.state.addEnvironment(env));
}

public map<TResult>(mapper: ((this: void, element: TOut, env: TEnv & IParallelTaskEnvironment) => TResult) | IFunctionId): IParallelChain<TIn, TEnv, TResult> {
Expand Down
8 changes: 4 additions & 4 deletions src/common/parallel/chain/parallel-chain-state.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import {FunctionCall} from "../../function/function-call";
import {IEmptyParallelEnvironment, IParallelOperation} from "../";
import {IParallelEnvironment, IParallelOperation} from "../";
import {IParallelStream} from "../stream/parallel-stream";

export type IParallelChainEnvironment = FunctionCall | IEmptyParallelEnvironment;
export type IParallelChainEnvironment = FunctionCall | IParallelEnvironment;

/**
* State of a parallel chain. Dependent of the state of the chain, the chaining of operation, changing of the
Expand All @@ -25,11 +25,11 @@ export interface IParallelChainState<TElement> {
chainOperation<TElementNew>(operation: IParallelOperation): IParallelChainState<TElementNew>;

/**
* Changes the environment of the change to the given one
* Adds the given environment to the current environment
* @param environment the new environment to use
* @returns the new state that uses the given environment instead of the existing one
*/
changeEnvironment(environment: IParallelChainEnvironment): IParallelChainState<TElement>;
addEnvironment(environment: IParallelChainEnvironment): IParallelChainState<TElement>;
}

/**
Expand Down
30 changes: 16 additions & 14 deletions src/common/parallel/chain/parallel-chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
/** needed, typedoc issue */

import {IParallelStream} from "../stream/parallel-stream";
import {IEmptyParallelEnvironment, IParallelTaskEnvironment} from "../parallel-environment";
import {IParallelEnvironment, IParallelTaskEnvironment} from "../parallel-environment";
import {IFunctionId} from "../../function/function-id";

/**
Expand All @@ -15,29 +15,31 @@ import {IFunctionId} from "../../function/function-id";
* @param TOut the type of the resulting elements
* @param TEnv the type of the environment
*/
export interface IParallelChain<TIn, TEnv extends IEmptyParallelEnvironment, TOut> extends IParallelStream<TOut[], TOut[]> {
export interface IParallelChain<TIn, TEnv extends IParallelEnvironment, TOut> extends IParallelStream<TOut[], TOut[]> {
/**
* Defines the environment that should be provided to all the iteratee or generator functions.
* The environment cannot contain function values.
* @param newEnv the environment that should be provided to the iteratee function
* Defines the environment that is provided to all iteratee or generator functions.
* The environment cannot contain function values. The values of the provided environment is merged with the values
* of any prior defined environments. If an environment provider is defined, then the values returned by the environment
* provider have a higher precedence.
* @param newEnv the environment that is provided to the iteratee function
*/
inEnvironment<TEnvNew extends TEnv>(newEnv: TEnvNew): IParallelChain<TIn, TEnvNew, TOut>;
inEnvironment<TEnvNew extends IParallelEnvironment>(newEnv: TEnvNew): IParallelChain<TIn, TEnv & TEnvNew, TOut>;

/**
* Defines a function that should be used to build the environment for each task. The function is executed as first
* Defines a function that should be used to build the environment for each task. The function is executed first
* on the scheduled task.
* @param provider the function providing the environment
* @param TEnvNew the type of the environment
* @returns the chain
*/
inEnvironment<TEnvNew extends TEnv>(provider: (this: void) => TEnvNew): IParallelChain<TIn, TEnvNew, TOut>;
inEnvironment<TEnvNew extends TEnv>(provider: IFunctionId): IParallelChain<TIn, TEnvNew, TOut>;
inEnvironment<TEnvNew extends IParallelEnvironment>(provider: (this: void) => TEnvNew): IParallelChain<TIn, TEnv & TEnvNew, TOut>;
inEnvironment<TEnvNew extends IParallelEnvironment>(provider: IFunctionId): IParallelChain<TIn, TEnv & TEnvNew, TOut>;

/**
* @param param1 single parameter that is passed to the provider
* @param TParam1 the type of the single parameter
*/
inEnvironment<TParam1, TEnvNew extends TEnv>(provider: (this: void, arg1: TParam1) => TEnvNew, param1: TParam1): IParallelChain<TIn, TEnvNew, TOut>;
inEnvironment<TParam1, TEnvNew extends IParallelEnvironment>(provider: (this: void, arg1: TParam1) => TEnvNew, param1: TParam1): IParallelChain<TIn, TEnv & TEnvNew, TOut>;

/**
*
Expand All @@ -46,24 +48,24 @@ export interface IParallelChain<TIn, TEnv extends IEmptyParallelEnvironment, TOu
* @param TParam1 type of the first parameter
* @param TParam2 type of the second parameter
*/
inEnvironment<TParam1, TParam2, TEnvNew extends TEnv>(provider: (this: void, arg1: TParam1, arg2: TParam2) => TEnvNew, param1: TParam1, param2: TParam2): IParallelChain<TIn, TEnvNew, TOut>;
inEnvironment<TParam1, TParam2, TEnvNew extends IParallelEnvironment>(provider: (this: void, arg1: TParam1, arg2: TParam2) => TEnvNew, param1: TParam1, param2: TParam2): IParallelChain<TIn, TEnv & TEnvNew, TOut>;

/**
* @param param3 third parameter that is passed to the provider funciton
* @param TParam3 type of the third parameter
*/
inEnvironment<TParam1, TParam2, TParam3, TEnvNew extends TEnv>(provider: (this: void, arg1: TParam1, arg2: TParam2, arg3: TParam3) => TEnvNew, param1: TParam1, param2: TParam2, param3: TParam3): IParallelChain<TIn, TEnvNew, TOut>;
inEnvironment<TParam1, TParam2, TParam3, TEnvNew extends IParallelEnvironment>(provider: (this: void, arg1: TParam1, arg2: TParam2, arg3: TParam3) => TEnvNew, param1: TParam1, param2: TParam2, param3: TParam3): IParallelChain<TIn, TEnv & TEnvNew, TOut>;

/**
* @param param4 fourth parameter that is passed to the provider function
* @param TParam4 type of the fourth parameter
*/
inEnvironment<TParam1, TParam2, TParam3, TParam4, TEnvNew extends TEnv>(provider: (this: void, arg1: TParam1, arg2: TParam2, arg3: TParam3, arg4: TParam4) => TEnvNew, param1: TParam1, param2: TParam2, param3: TParam3, param4: TParam4): IParallelChain<TIn, TEnvNew, TOut>;
inEnvironment<TParam1, TParam2, TParam3, TParam4, TEnvNew extends IParallelEnvironment>(provider: (this: void, arg1: TParam1, arg2: TParam2, arg3: TParam3, arg4: TParam4) => TEnvNew, param1: TParam1, param2: TParam2, param3: TParam3, param4: TParam4): IParallelChain<TIn, TEnv & TEnvNew, TOut>;

/**
* @param furtherParams further paramters that are passed to the provider function
*/
inEnvironment<TParam1, TParam2, TParam3, TParam4, TEnvNew extends TEnv>(provider: (this: void, arg1: TParam1, arg2: TParam2, arg3: TParam3, arg4: TParam4, ...furtherParams: any[]) => TEnvNew, param1: TParam1, param2: TParam2, param3: TParam3, param4: TParam4, ...furtherParams: any[]): IParallelChain<TIn, TEnvNew, TOut>;
inEnvironment<TParam1, TParam2, TParam3, TParam4, TEnvNew extends IParallelEnvironment>(provider: (this: void, arg1: TParam1, arg2: TParam2, arg3: TParam3, arg4: TParam4, ...furtherParams: any[]) => TEnvNew, param1: TParam1, param2: TParam2, param3: TParam3, param4: TParam4, ...furtherParams: any[]): IParallelChain<TIn, TEnv & TEnvNew, TOut>;

/**
* Maps all input values to an output value using the given mapper. The mapper is applied for each input element
Expand Down
13 changes: 7 additions & 6 deletions src/common/parallel/chain/pending-parallel-chain-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {IParallelGenerator} from "../generator/parallel-generator";
import {IParallelOperation, IDefaultInitializedParallelOptions} from "../";
import {ParallelStream} from "../stream/parallel-stream-impl";
import {flattenArray} from "../../util/arrays";
import {ParallelEnvironmentDefinition} from "../parallel-environment-definition";

/**
* Parallel chain has been defined but not yet scheduled.
Expand All @@ -14,21 +15,21 @@ import {flattenArray} from "../../util/arrays";
export class PendingParallelChainState<TElement> implements IParallelChainState<TElement> {

public generator: IParallelGenerator;
public environment: IParallelChainEnvironment;
public environment: ParallelEnvironmentDefinition;
public operations: IParallelOperation[];
public options: IDefaultInitializedParallelOptions;

/**
* Creates a new state
* @param generator the generator to use to generate the input elements and split the job
* @param options the options
* @param environment the environment for the job
* @param environment the environment builder that is used to create the environment for the job
* @param operations the operations to perform on the input elements
*/
constructor(generator: IParallelGenerator, options: IDefaultInitializedParallelOptions, environment: IParallelChainEnvironment | undefined, operations: IParallelOperation[]) {
constructor(generator: IParallelGenerator, options: IDefaultInitializedParallelOptions, environment: ParallelEnvironmentDefinition, operations: IParallelOperation[]) {
this.generator = generator;
this.options = options;
this.environment = environment || {};
this.environment = environment;
this.operations = operations;
}

Expand All @@ -47,7 +48,7 @@ export class PendingParallelChainState<TElement> implements IParallelChainState<
return new PendingParallelChainState(this.generator, this.options, this.environment, [...this.operations, operation]);
}

public changeEnvironment(environment: IParallelChainEnvironment): IParallelChainState<TElement> {
return new PendingParallelChainState(this.generator, this.options, environment, this.operations);
public addEnvironment(environment: IParallelChainEnvironment): IParallelChainState<TElement> {
return new PendingParallelChainState(this.generator, this.options, this.environment.add(environment), this.operations);
}
}
7 changes: 4 additions & 3 deletions src/common/parallel/chain/scheduled-parallel-chain-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {IParallelChainState, IParallelChainEnvironment, IScheduledParallelChainS
import {IParallelOperation, IDefaultInitializedParallelOptions} from "../";
import {IParallelStream} from "../stream/parallel-stream";
import {DependentParallelChainState} from "./dependent-parallel-chain-state";
import {ParallelEnvironmentDefinition} from "../parallel-environment-definition";

/**
* State of a parallel chain whose job has been scheduled on the thread pool (or even where the computation already has completed).
Expand All @@ -16,7 +17,7 @@ export class ScheduledParallelChainState<TElement> implements IScheduledParallel
* @param options the options used for the scheduled job
* @param environment the environment used for the scheduled job
*/
constructor(stream: IParallelStream<TElement[], TElement[]>, private options: IDefaultInitializedParallelOptions, private environment: IParallelChainEnvironment) {
constructor(stream: IParallelStream<TElement[], TElement[]>, private options: IDefaultInitializedParallelOptions, private environment: ParallelEnvironmentDefinition) {
this.stream = stream;
}

Expand All @@ -28,7 +29,7 @@ export class ScheduledParallelChainState<TElement> implements IScheduledParallel
return new DependentParallelChainState(this.stream, this.options, this.environment, [operation]);
}

public changeEnvironment(environment: IParallelChainEnvironment): IParallelChainState<TElement> {
return new DependentParallelChainState(this.stream, this.options, environment);
public addEnvironment(environment: IParallelChainEnvironment): IParallelChainState<TElement> {
return new DependentParallelChainState(this.stream, this.options, this.environment.add(environment));
}
}
2 changes: 1 addition & 1 deletion src/common/parallel/generator/parallel-times-generator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export class ParallelTimesGenerator implements IParallelGenerator {
public static create<T>(n: number, value: T): ParallelTimesGenerator;
public static create(n: number, call: FunctionCall): ParallelTimesGenerator;
public static create<T>(n: number, generator: ((this: void, index: number, env: IParallelTaskEnvironment) => T) | IFunctionId | T): ParallelTimesGenerator {
let generatorFunction: FunctionCall | T;
let generatorFunction: FunctionCall;
if (isFunctionId(generator) || typeof generator === "function") {
generatorFunction = FunctionCall.createUnchecked(generator);
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/common/parallel/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export {IParallel} from "./parallel";
export {IParallelChain} from "./chain/parallel-chain";
export {IParallelTaskEnvironment, IEmptyParallelEnvironment} from "./parallel-environment";
export {IParallelTaskEnvironment, IParallelEnvironment} from "./parallel-environment";
export {IParallelJob} from "./parallel-job";
export {IParallelOptions, IDefaultInitializedParallelOptions} from "./parallel-options";
export {IParallelJobScheduler} from "./scheduling/parallel-job-scheduler";
Expand Down
Loading