Skip to content

Commit

Permalink
Do atomic replace in transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
civitaspo committed Aug 22, 2017
1 parent 07ed7f7 commit ecca877
Showing 1 changed file with 18 additions and 10 deletions.
28 changes: 18 additions & 10 deletions src/main/java/org/embulk/output/hdfs/HdfsFileOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,26 @@ public ConfigDiff transaction(ConfigSource config, int taskCount,
configure(task);

control.run(task.dump());

if (task.getAtomicMode()) {
atomicReplace(task);
}

return Exec.newConfigDiff();
}

private void atomicReplace(PluginTask task)
{
HdfsClient hdfsClient = HdfsClient.build(task);
String outputDir = getOutputSampleDir(task);
String safeWsWithOutput = Paths.get(task.getSafeWorkspace(), getOutputSampleDir(task)).toString();

if (!hdfsClient.swapDirectory(safeWsWithOutput, outputDir)) {
throw new DataException(String.format("Failed to swap: src: %s, dst: %s", safeWsWithOutput, outputDir));
}
logger.info("Swapped: src: {}, dst: {}", safeWsWithOutput, outputDir);
}

@Override
public ConfigDiff resume(TaskSource taskSource,
int taskCount,
Expand All @@ -147,22 +164,13 @@ public void cleanup(TaskSource taskSource,
List outputPaths = successTaskReport.get(List.class, "output_paths");
for (Object path : outputPaths) {
if (task.getAtomicMode()) {
logger.info("Move {} to {}", path, outputDir);
logger.info("Created and Moved: {} to {}", path, outputDir);
}
else {
logger.info("Created: {}", path);
}
}
}

if (task.getAtomicMode()) {
HdfsClient hdfsClient = HdfsClient.build(task);
String safeWsWithOutput = Paths.get(task.getSafeWorkspace(), getOutputSampleDir(task)).toString();
if (!hdfsClient.swapDirectory(safeWsWithOutput, outputDir)) {
throw new DataException(String.format("Failed to swap: src: %s, dst: %s", safeWsWithOutput, outputDir));
}
logger.info("Swapped: src: {}, dst: {}", safeWsWithOutput, outputDir);
}
}

@Override
Expand Down

0 comments on commit ecca877

Please sign in to comment.