Skip to content

Commit

Permalink
Merge pull request #315 from Acxiom/develop
Browse files Browse the repository at this point in the history
Release 1.8.7
  • Loading branch information
dafreels committed Jul 6, 2022
2 parents e29e5e2 + 784badb commit 2d56553
Show file tree
Hide file tree
Showing 12 changed files with 19 additions and 17 deletions.
2 changes: 1 addition & 1 deletion metalus-application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.acxiom</groupId>
<artifactId>metalus</artifactId>
<version>1.8.6-SNAPSHOT</version>
<version>1.8.7-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion metalus-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.acxiom</groupId>
<artifactId>metalus</artifactId>
<version>1.8.6-SNAPSHOT</version>
<version>1.8.7-SNAPSHOT</version>
</parent>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion metalus-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.acxiom</groupId>
<artifactId>metalus</artifactId>
<version>1.8.6-SNAPSHOT</version>
<version>1.8.7-SNAPSHOT</version>
</parent>

<dependencies>
Expand Down
2 changes: 1 addition & 1 deletion metalus-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.acxiom</groupId>
<artifactId>metalus</artifactId>
<version>1.8.6-SNAPSHOT</version>
<version>1.8.7-SNAPSHOT</version>
</parent>

<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,14 @@ object PipelineDependencyExecutor {
DependencyResult(execution, None,
Some(new IllegalArgumentException(s"fork execution ${execution.id} forkByValues ${execution.forkByValue.get} does not point to a valid list")))
} else {
val forkedProcesses = forkValues.asInstanceOf[List[Any]].map(forkValue => {
val forkedProcesses = forkValues.asInstanceOf[List[Any]].zipWithIndex.map{ case (forkValue, forkIndex) =>
val ctx = execution.pipelineContext.setGlobal("executionForkValue", forkValue)
.setGlobal("executionForkValueIndex", forkIndex)
Future {
processFutures(List(startExecution(execution.asInstanceOf[DefaultPipelineExecution].copy(pipelineContext = ctx))),
Map[String, DependencyResult](), executionGraph)
}
})
}
// Wait for the forks to complete
Await.ready(Future.sequence(forkedProcesses), Duration.Inf)
val finalFutureMap = forkedProcesses.foldLeft(FutureMap(List(), Map[String, DependencyResult]()))((futureMap, f) => {
Expand All @@ -169,10 +170,11 @@ object PipelineDependencyExecutor {
})
// Execute the join
val joinExecution = findJoinExecution(execution, executionGraph, execution).get
DependencyResult(PipelineExecution(joinExecution.id, joinExecution.pipelines, joinExecution.initialPipelineId,
prepareExecutionContext(joinExecution, finalFutureMap.resultMap), joinExecution.parents, None, None, "join"),
Some(PipelineExecutor.executePipelines(execution.pipelines, execution.initialPipelineId,
execution.pipelineContext.setGlobal("executionId", execution.id))), None, Some(finalFutureMap))
val joinWithResults = PipelineExecution(joinExecution.id, joinExecution.pipelines, joinExecution.initialPipelineId,
prepareExecutionContext(joinExecution, finalFutureMap.resultMap), joinExecution.parents, None, None, "join")
DependencyResult(joinWithResults,
Some(PipelineExecutor.executePipelines(joinWithResults.pipelines, joinWithResults.initialPipelineId,
joinWithResults.pipelineContext.setGlobal("executionId", joinWithResults.id))), None, Some(finalFutureMap))
}
} catch {
case t: Throwable => DependencyResult(execution, None, Some(t))
Expand Down
2 changes: 1 addition & 1 deletion metalus-delta/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<parent>
<groupId>com.acxiom</groupId>
<artifactId>metalus</artifactId>
<version>1.8.6-SNAPSHOT</version>
<version>1.8.7-SNAPSHOT</version>
</parent>

<dependencies>
Expand Down
2 changes: 1 addition & 1 deletion metalus-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.acxiom</groupId>
<artifactId>metalus</artifactId>
<version>1.8.6-SNAPSHOT</version>
<version>1.8.7-SNAPSHOT</version>
</parent>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion metalus-gcp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.acxiom</groupId>
<artifactId>metalus</artifactId>
<version>1.8.6-SNAPSHOT</version>
<version>1.8.7-SNAPSHOT</version>
</parent>

<dependencyManagement>
Expand Down
2 changes: 1 addition & 1 deletion metalus-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.acxiom</groupId>
<artifactId>metalus</artifactId>
<version>1.8.6-SNAPSHOT</version>
<version>1.8.7-SNAPSHOT</version>
</parent>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion metalus-mongo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.acxiom</groupId>
<artifactId>metalus</artifactId>
<version>1.8.6-SNAPSHOT</version>
<version>1.8.7-SNAPSHOT</version>
</parent>

<dependencies>
Expand Down
2 changes: 1 addition & 1 deletion metalus-utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.acxiom</groupId>
<artifactId>metalus</artifactId>
<version>1.8.6-SNAPSHOT</version>
<version>1.8.7-SNAPSHOT</version>
</parent>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.acxiom</groupId>
<artifactId>metalus</artifactId>
<version>1.8.6-SNAPSHOT</version>
<version>1.8.7-SNAPSHOT</version>
<name>${project.artifactId}</name>
<packaging>pom</packaging>
<description>Metalus Pipeline Library</description>
Expand Down

0 comments on commit 2d56553

Please sign in to comment.