Skip to content

Commit

Permalink
[HUDI-4619] Change HoodieFlinkEngineContext.flatMap to single concu…
Browse files Browse the repository at this point in the history
…rrent access.
  • Loading branch information
LinMingQiang committed Aug 15, 2022
1 parent c3621eb commit 09f49ab
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ HoodieCompactionPlan generateCompactionPlan(
return new CompactionOperation(dataFile, partitionPath, logFiles,
config.getCompactionStrategy().captureMetrics(config, s));
})
.filter(c -> !c.getDeltaFileNames().isEmpty()), 1).stream()
.filter(c -> !c.getDeltaFileNames().isEmpty()), partitionPaths.size()).stream()
.map(CompactionUtils::buildHoodieCompactionOperation).collect(toList());

LOG.info("Total of " + operations.size() + " compactions are retrieved");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public <I, K, V> List<V> reduceByKey(

@Override
public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism) {
return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(Collectors.toList());
return data.stream().flatMap(throwingFlatMapWrapper(func)).collect(Collectors.toList());
}

@Override
Expand Down

0 comments on commit 09f49ab

Please sign in to comment.