Skip to content
Permalink
Browse files
JIRA-1222
closes #104
  • Loading branch information
Maja Kabiljo committed Aug 16, 2019
1 parent 0f41dbb commit 5edbdb28dcd8895d31b19313f9441791f6b71bc9
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 67 deletions.
@@ -43,6 +43,18 @@
*/
OW createOutputWriter(Configuration conf, Progressable hadoopProgressable);

/**
* This method will be called before creating any writers
*/
default void preWriting() {
}

/**
* This method will be called after all writers are closed
*/
default void postWriting() {
}

/**
* Commit everything
*/
@@ -52,6 +52,7 @@ public BlockOutputHandle(String jobIdentifier, Configuration conf,
outputDescMap = BlockOutputFormat.createInitAndCheckOutputDescsMap(
conf, jobIdentifier);
for (String confOption : outputDescMap.keySet()) {
outputDescMap.get(confOption).preWriting();
freeWriters.put(confOption,
new ConcurrentLinkedQueue<BlockOutputWriter>());
occupiedWriters.put(confOption,
@@ -127,5 +128,9 @@ public Void call() throws Exception {
ProgressableUtils.getResultsWithNCallables(callableFactory,
Math.min(GiraphConstants.NUM_OUTPUT_THREADS.get(conf),
allWriters.size()), "close-writers-%d", progressable);
// Close all output formats
for (BlockOutputDesc outputDesc : outputDescMap.values()) {
outputDesc.postWriting();
}
}
}
@@ -20,11 +20,8 @@

import java.io.IOException;

import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/**
@@ -38,7 +35,7 @@
public abstract class EdgeOutputFormat<
I extends WritableComparable, V extends Writable,
E extends Writable> extends
DefaultImmutableClassesGiraphConfigurable<I, V, E> {
OutputFormat<I, V, E> {
/**
* Create an edge writer for a given split. The framework will call
* {@link EdgeWriter#initialize(TaskAttemptContext)} before
@@ -51,32 +48,4 @@
*/
public abstract EdgeWriter<I, V, E> createEdgeWriter(
TaskAttemptContext context) throws IOException, InterruptedException;

/**
* Check for validity of the output-specification for the job.
* (Copied from Hadoop OutputFormat)
*
* <p>This is to validate the output specification for the job when it is
* a job is submitted. Typically checks that it does not already exist,
* throwing an exception when it already exists, so that output is not
* overwritten.</p>
*
* @param context information about the job
* @throws IOException when output should not be attempted
*/
public abstract void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException;

/**
* Get the output committer for this output format. This is responsible
* for ensuring the output is committed correctly.
* (Copied from Hadoop OutputFormat)
*
* @param context the task context
* @return an output committer
* @throws IOException
* @throws InterruptedException
*/
public abstract OutputCommitter getOutputCommitter(
TaskAttemptContext context) throws IOException, InterruptedException;
}
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.giraph.io;

import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

/**
* Parent class for vertex and edge output formats
*
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
*/
public abstract class OutputFormat<
I extends WritableComparable, V extends Writable,
E extends Writable> extends
DefaultImmutableClassesGiraphConfigurable<I, V, E> {
/**
* Check for validity of the output-specification for the job.
* (Copied from Hadoop OutputFormat)
*
* <p>This is to validate the output specification for the job when it is
* a job is submitted. Typically checks that it does not already exist,
* throwing an exception when it already exists, so that output is not
* overwritten.</p>
*
* @param context information about the job
* @throws IOException when output should not be attempted
*/
public abstract void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException;

/**
* Get the output committer for this output format. This is responsible
* for ensuring the output is committed correctly.
* (Copied from Hadoop OutputFormat)
*
* @param context the task context
* @return an output committer
* @throws IOException
* @throws InterruptedException
*/
public abstract OutputCommitter getOutputCommitter(
TaskAttemptContext context) throws IOException, InterruptedException;

/**
* This method will be called before creating any writers
*
* @param context the task context
*/
public void preWriting(TaskAttemptContext context) {
}

/**
* This method will be called after all writers are closed
*
* @param context the task context
*/
public void postWriting(TaskAttemptContext context) {
}
}
@@ -20,10 +20,6 @@

import java.io.IOException;

import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -49,7 +45,7 @@
public abstract class VertexOutputFormat<
I extends WritableComparable, V extends Writable,
E extends Writable> extends
DefaultImmutableClassesGiraphConfigurable<I, V, E> {
OutputFormat<I, V, E> {
/**
* Create a vertex writer for a given split. The framework will call
* {@link VertexWriter#initialize(TaskAttemptContext)} before
@@ -62,32 +58,4 @@
*/
public abstract VertexWriter<I, V, E> createVertexWriter(
TaskAttemptContext context) throws IOException, InterruptedException;

/**
* Check for validity of the output-specification for the job.
* (Copied from Hadoop OutputFormat)
*
* <p>This is to validate the output specification for the job when it is
* a job is submitted. Typically checks that it does not already exist,
* throwing an exception when it already exists, so that output is not
* overwritten.</p>
*
* @param context information about the job
* @throws IOException when output should not be attempted
*/
public abstract void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException;

/**
* Get the output committer for this output format. This is responsible
* for ensuring the output is committed correctly.
* (Copied from Hadoop OutputFormat)
*
* @param context the task context
* @return an output committer
* @throws IOException
* @throws InterruptedException
*/
public abstract OutputCommitter getOutputCommitter(
TaskAttemptContext context) throws IOException, InterruptedException;
}
@@ -164,4 +164,14 @@ public void abortJob(JobContext context,
}
};
}

@Override
public void preWriting(TaskAttemptContext context) {
originalOutputFormat.preWriting(context);
}

@Override
public void postWriting(TaskAttemptContext context) {
originalOutputFormat.postWriting(context);
}
}
@@ -161,4 +161,14 @@ public void abortJob(JobContext context,
}
};
}

@Override
public void preWriting(TaskAttemptContext context) {
originalOutputFormat.preWriting(context);
}

@Override
public void postWriting(TaskAttemptContext context) {
originalOutputFormat.postWriting(context);
}
}
@@ -75,6 +75,7 @@ public MultiThreadedSuperstepOutput(
this.context = context;
availableVertexWriters = Lists.newArrayList();
occupiedVertexWriters = Sets.newHashSet();
vertexOutputFormat.preWriting(context);
}

@Override
@@ -145,5 +146,6 @@ public Void call() throws Exception {
ProgressableUtils.getResultsWithNCallables(callableFactory,
Math.min(configuration.getNumOutputThreads(),
availableVertexWriters.size()), "close-writers-%d", context);
vertexOutputFormat.postWriting(context);
}
}
@@ -22,6 +22,7 @@
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.SimpleVertexWriter;
import org.apache.giraph.io.VertexWriter;
import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -43,6 +44,8 @@
private final Mapper<?, ?, ?, ?>.Context context;
/** Main vertex writer */
private final VertexWriter<I, V, E> vertexWriter;
/** Vertex output format */
private final WrappedVertexOutputFormat<I, V, E> vertexOutputFormat;
/**
* Simple vertex writer, wrapper for {@link #vertexWriter}.
* Call to writeVertex is thread-safe.
@@ -61,8 +64,9 @@ public SynchronizedSuperstepOutput(
Mapper<?, ?, ?, ?>.Context context) {
this.context = context;
try {
vertexWriter =
conf.createWrappedVertexOutputFormat().createVertexWriter(context);
vertexOutputFormat = conf.createWrappedVertexOutputFormat();
vertexOutputFormat.preWriting(context);
vertexWriter = vertexOutputFormat.createVertexWriter(context);
vertexWriter.setConf(conf);
vertexWriter.initialize(context);
} catch (IOException e) {
@@ -93,5 +97,6 @@ public void returnVertexWriter(SimpleVertexWriter<I, V, E> vertexWriter) {
@Override
public void postApplication() throws IOException, InterruptedException {
vertexWriter.close(context);
vertexOutputFormat.postWriting(context);
}
}
@@ -976,6 +976,7 @@ private void saveVertices(long numLocalVertices) throws IOException,
"using " + numThreads + " threads");
final VertexOutputFormat<I, V, E> vertexOutputFormat =
getConfiguration().createWrappedVertexOutputFormat();
vertexOutputFormat.preWriting(getContext());

getPartitionStore().startIteration();

@@ -1050,6 +1051,8 @@ public Void call() throws Exception {
ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
"save-vertices-%d", getContext());

vertexOutputFormat.postWriting(getContext());

LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
"saveVertices: Done saving vertices.");
// YARN: must complete the commit the "task" output, Hadoop isn't there.
@@ -1100,6 +1103,7 @@ private void saveEdges() throws IOException, InterruptedException {
numThreads + " threads");
final EdgeOutputFormat<I, V, E> edgeOutputFormat =
conf.createWrappedEdgeOutputFormat();
edgeOutputFormat.preWriting(getContext());

getPartitionStore().startIteration();

@@ -1159,6 +1163,8 @@ public Void call() throws Exception {
ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
"save-vertices-%d", getContext());

edgeOutputFormat.postWriting(getContext());

LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
"saveEdges: Done saving edges.");
// YARN: must complete the commit the "task" output, Hadoop isn't there.

0 comments on commit 5edbdb2

Please sign in to comment.