Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

[Bug 4730055] Backwards compatibility with Hadoop 0.20.x. Use Base64

from net.iharder 2.3.8 to avoid issues with Hadoop using the same
package (common-codecs). Munge for non-secure Hadoop support
(preprocessor), add '-Dhadoop=non_secure' for unsecure Hadoop.
Backward compatible changes provided by ckunz and committed by aching.



git-svn-id: https://svn.apache.org/repos/asf/incubator/giraph/trunk@1161943 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
commit 231c5a3c24c37ba90935a66c194643afc94e3ac8 1 parent 13e034d
@aching aching authored
View
30 README
@@ -20,8 +20,7 @@ graph-processing framework that is launched as a typical Hadoop job to
leverage existing Hadoop infrastructure, such as Amazon’s EC2. Giraph
builds upon the graph-oriented nature of Pregel but additionally adds
fault-tolerance to the coordinator process with the use of ZooKeeper
-as its centralized coordination service and is in the process of being
-open-sourced.
+as its centralized coordination service.
Giraph follows the bulk-synchronous parallel model relative to graphs
where vertices can send messages to other vertices during a given
@@ -33,22 +32,37 @@ automatically take over if the current application coordinator fails.
-------------------------------
+Hadoop versions for use with Giraph:
+
+Secure Hadoop versions:
+- Apache Hadoop 0.20.203, 0.20.204, other secure versions may work as well
+-- Other versions reported working include:
+--- Cloudera CDH3u0, CDH3u1
+
+Unsecure Hadoop versions:
+- Apache Hadoop 0.20.1, 0.20.2, 0.20.3
+
+While we provide support for unsecure Hadoop with the maven profile
+'hadoop_non_secure', we have been primarily focusing on secure Hadoop
+releases at this time.
+
+-------------------------------
+
Building and testing:
You will need the following:
- Java 1.6
- Maven 2 or higher
-Use the maven commands to:
+Use the maven commands with secure Hadoop to:
- compile (i.e mvn compile)
- package (i.e. mvn package)
- test (i.e. mvn test)
-- For testing, one can submit the test to a running Hadoop instance
(i.e. mvn test -Dprop.mapred.job.tracker=localhost:50300)
--------------------------------
-
-Running:
+For the non-secure versions of Hadoop, run the maven commands with the
+additional argument '-Dhadoop=non_secure' to enable the maven profile
+ 'hadoop_non_secure'. An example compilation command is
+'mvn -Dhadoop=non_secure compile'.
-You will need the following:
-- Hadoop 0.20.203 supported, other versions may work as well
View
63 pom.xml
@@ -42,6 +42,7 @@
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2</version>
<configuration>
+ <outputDirectory>target</outputDirectory>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
@@ -112,6 +113,62 @@
</plugins>
</build>
+ <profiles>
+ <profile>
+ <id>hadoop_non_secure</id>
+ <activation>
+ <property>
+ <name>hadoop</name>
+ <value>non_secure</value>
+ </property>
+ </activation>
+ <properties>
+ <hadoop.version>0.20.2</hadoop.version>
+ </properties>
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/java/org/apache/giraph/hadoop</directory>
+ <excludes>
+ <exclude>BspTokenSelector.java</exclude>
+ </excludes>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.sonatype.plugins</groupId>
+ <artifactId>munge-maven-plugin</artifactId>
+ <version>1.0</version>
+ <executions>
+ <execution>
+ <id>munge</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>munge</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <symbols>HADOOP_NON_SECURE</symbols>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <excludes>
+ <exclude>**/BspTokenSelector.java</exclude>
+ </excludes>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
<dependencies>
<dependency>
<groupId>junit</groupId>
@@ -160,9 +217,9 @@
<version>1.3.2</version>
</dependency>
<dependency>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- <version>1.4</version>
+ <groupId>net.iharder</groupId>
+ <artifactId>base64</artifactId>
+ <version>2.3.8</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
View
2  src/main/java/org/apache/giraph/bsp/BspOutputFormat.java
@@ -66,6 +66,6 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
@Override
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
- return null;
+ return new BspRecordWriter();
}
}
View
29 src/main/java/org/apache/giraph/bsp/BspRecordWriter.java
@@ -0,0 +1,29 @@
+package org.apache.giraph.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Used by {@link BspOutputFormat} since some versions of Hadoop
+ * require that a RecordWriter is returned from getRecordWriter.
+ * Does nothing, except insures that write is never called.
+ */
+public class BspRecordWriter extends RecordWriter<Text, Text> {
+
+ @Override
+ public void close(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ // Do nothing
+ }
+
+ @Override
+ public void write(Text key, Text value)
+ throws IOException, InterruptedException {
+ throw new IOException("write: Cannot write with " +
+ getClass().getName() +
+ ". Should never be called");
+ }
+}
View
10 src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -52,7 +52,13 @@ public void setupJob(JobContext context) throws IOException {
public void setupTask(TaskAttemptContext context) throws IOException {
}
+ /*if[HADOOP_NON_SECURE]
@Override
+ public void cleanupJob(JobContext jobContext) throws IOException {
+ }
+ else[HADOOP_NON_SECURE]*/
+ @Override
+ /*end[HADOOP_NON_SECURE]*/
public void commitJob(JobContext jobContext) throws IOException {
}
}
View
8 src/main/java/org/apache/giraph/comm/CommunicationsInterface.java
@@ -23,11 +23,14 @@
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.MutableVertex;
import org.apache.giraph.graph.VertexRange;
+/*if[HADOOP_NON_SECURE]
+ else[HADOOP_NON_SECURE]*/
import org.apache.giraph.hadoop.BspTokenSelector;
+import org.apache.hadoop.security.token.TokenInfo;
+/*end[HADOOP_NON_SECURE]*/
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.hadoop.security.token.TokenInfo;
/**
* Basic interface for communication between workers.
@@ -38,7 +41,10 @@
*
**/
@SuppressWarnings("rawtypes")
+/*if[HADOOP_NON_SECURE]
+ else[HADOOP_NON_SECURE]*/
@TokenInfo(BspTokenSelector.class)
+/*end[HADOOP_NON_SECURE]*/
public interface CommunicationsInterface<
I extends WritableComparable,
V extends Writable,
View
51 src/main/java/org/apache/giraph/comm/RPCCommunications.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,7 +23,17 @@
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
import java.security.PrivilegedExceptionAction;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.hadoop.security.token.Token;
+/*end[HADOOP_NON_SECURE]*/
import org.apache.log4j.Logger;
@@ -37,21 +47,17 @@
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
-import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
-import org.apache.hadoop.security.token.Token;
-
@SuppressWarnings("rawtypes")
public class RPCCommunications<
I extends WritableComparable,
V extends Writable,
E extends Writable,
M extends Writable>
+/*if[HADOOP_NON_SECURE]
+extends BasicRPCCommunications<I, V, E, M, Object> {
+else[HADOOP_NON_SECURE]*/
extends BasicRPCCommunications<I, V, E, M, Token<JobTokenIdentifier>> {
+/*end[HADOOP_NON_SECURE]*/
/** Class logger */
public static final Logger LOG = Logger.getLogger(RPCCommunications.class);
@@ -62,6 +68,11 @@ public RPCCommunications(Mapper<?, ?, ?, ?>.Context context,
super(context, service);
}
+/*if[HADOOP_NON_SECURE]
+ protected Object createJobToken() throws IOException {
+ return null;
+ }
+else[HADOOP_NON_SECURE]*/
protected Token<JobTokenIdentifier> createJobToken() throws IOException {
String localJobTokenFile = System.getenv().get(
UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
@@ -72,9 +83,16 @@ public RPCCommunications(Mapper<?, ?, ?, ?>.Context context,
}
return null;
}
+/*end[HADOOP_NON_SECURE]*/
protected Server getRPCServer(
InetSocketAddress myAddress, int numHandlers, String jobId,
+/*if[HADOOP_NON_SECURE]
+ Object jt) throws IOException {
+ return RPC.getServer(this, myAddress.getHostName(), myAddress.getPort(),
+ numHandlers, false, conf);
+ }
+else[HADOOP_NON_SECURE]*/
Token<JobTokenIdentifier> jt) throws IOException {
@SuppressWarnings("deprecation")
String hadoopSecurityAuthorization =
@@ -95,14 +113,26 @@ protected Server getRPCServer(
return RPC.getServer(this, myAddress.getHostName(), myAddress.getPort(),
numHandlers, false, conf, jobTokenSecretManager);
}
+/*end[HADOOP_NON_SECURE]*/
protected CommunicationsInterface<I, V, E, M> getRPCProxy(
final InetSocketAddress addr,
String jobId,
+/*if[HADOOP_NON_SECURE]
+ Object jt)
+else[HADOOP_NON_SECURE]*/
Token<JobTokenIdentifier> jt)
+/*end[HADOOP_NON_SECURE]*/
throws IOException, InterruptedException {
final Configuration config = new Configuration(conf);
+/*if[HADOOP_NON_SECURE]
+ @SuppressWarnings("unchecked")
+ CommunicationsInterface<I, V, E, M> proxy =
+ (CommunicationsInterface<I, V, E, M>)RPC.getProxy(
+ CommunicationsInterface.class, versionID, addr, config);
+ return proxy;
+else[HADOOP_NON_SECURE]*/
if (jt == null) {
@SuppressWarnings("unchecked")
CommunicationsInterface<I, V, E, M> proxy =
@@ -129,5 +159,6 @@ protected Server getRPCServer(
}
});
return proxy;
+/*end[HADOOP_NON_SECURE]*/
}
}
View
10 src/main/java/org/apache/giraph/graph/BspServiceMaster.java
@@ -38,12 +38,12 @@
import java.util.TreeMap;
import java.util.TreeSet;
+import net.iharder.Base64;
+
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
-import org.apache.commons.codec.binary.Base64;
-
import org.apache.log4j.Logger;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -666,7 +666,7 @@ private void mapFilesToWorkers(long superstep,
vertexRangeObj.put(JSONOBJ_CHECKPOINT_FILE_PREFIX_KEY,
checkpointFilePrefix);
vertexRangeObj.put(JSONOBJ_MAX_VERTEX_INDEX_KEY,
- Base64.encodeBase64String(
+ Base64.encodeBytes(
outputStream.toByteArray()));
vertexRangeMetaArray.put(vertexRangeObj);
vertexRangeArray.put(outputStream.toString("UTF-8"));
@@ -1034,7 +1034,7 @@ private void collectAndProcessAggregatorValues(long superstep) {
aggregator.createAggregatedValue();
InputStream input =
new ByteArrayInputStream(
- Base64.decodeBase64(
+ Base64.decode(
aggregatorArray.getJSONObject(i).
getString(AGGREGATOR_VALUE_KEY)));
aggregatorValue.readFields(new DataInputStream(input));
@@ -1095,7 +1095,7 @@ private void collectAndProcessAggregatorValues(long superstep) {
entry.getKey());
aggregatorObj.put(
AGGREGATOR_VALUE_KEY,
- Base64.encodeBase64String(outputStream.toByteArray()));
+ Base64.encodeBytes(outputStream.toByteArray()));
aggregatorArray.put(aggregatorObj);
if (LOG.isInfoEnabled()) {
LOG.info("collectAndProcessAggregatorValues: " +
View
9 src/main/java/org/apache/giraph/graph/BspServiceWorker.java
@@ -36,7 +36,7 @@
import java.util.TreeMap;
import java.util.TreeSet;
-import org.apache.commons.codec.binary.Base64;
+import net.iharder.Base64;
import org.json.JSONArray;
import org.json.JSONException;
@@ -239,7 +239,7 @@ private void setInputSplitVertexRanges(
vertexRangeObj.put(JSONOBJ_HOSTNAME_ID_KEY,
getHostnamePartitionId());
vertexRangeObj.put(JSONOBJ_MAX_VERTEX_INDEX_KEY,
- Base64.encodeBase64String(
+ Base64.encodeBytes(
outputStream.toByteArray()));
vertexRangeObj.put(JSONOBJ_NUM_MESSAGES_KEY, 0L);
statArray.put(vertexRangeObj);
@@ -582,7 +582,6 @@ private JSONArray marshalAggregatorValues(long superstep) {
return aggregatorArray;
}
- Base64 base64 = new Base64();
for (String name : aggregatorInUse) {
try {
Aggregator<Writable> aggregator = getAggregatorMap().get(name);
@@ -597,7 +596,7 @@ private JSONArray marshalAggregatorValues(long superstep) {
aggregator.getClass().getName());
aggregatorObj.put(
AGGREGATOR_VALUE_KEY,
- base64.encodeToString(outputStream.toByteArray()));
+ Base64.encodeBytes(outputStream.toByteArray()));
aggregatorArray.put(aggregatorObj);
LOG.info("marshalAggregatorValues: " +
"Found aggregatorObj " +
@@ -654,7 +653,7 @@ private void getAggregatorValues(long superstep) {
Writable aggregatorValue = aggregator.getAggregatedValue();
InputStream input =
new ByteArrayInputStream(
- Base64.decodeBase64(aggregatorArray.getJSONObject(i).
+ Base64.decode(aggregatorArray.getJSONObject(i).
getString(AGGREGATOR_VALUE_KEY)));
aggregatorValue.readFields(
new DataInputStream(input));
View
1  src/main/java/org/apache/giraph/graph/GiraphJob.java
@@ -487,6 +487,7 @@ final public boolean run(boolean verbose)
// Should work in MAPREDUCE-1938 to let the user jars/classes
// get loaded first
conf.setBoolean("mapreduce.user.classpath.first", true);
+
setMapperClass(GraphMapper.class);
setInputFormatClass(BspInputFormat.class);
setOutputFormatClass(BspOutputFormat.class);
View
7 src/main/java/org/apache/giraph/graph/VertexRange.java
@@ -29,7 +29,8 @@
import java.util.SortedMap;
import java.util.TreeMap;
-import org.apache.commons.codec.binary.Base64;
+import net.iharder.Base64;
+
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
@@ -111,7 +112,7 @@ public VertexRange(Class<I> indexClass, JSONObject vertexRangeObj)
IllegalAccessException {
maxVertexIndex = indexClass.newInstance();
byte[] maxVertexIndexByteArray =
- Base64.decodeBase64(
+ Base64.decode(
vertexRangeObj.getString(
BspService.JSONOBJ_MAX_VERTEX_INDEX_KEY));
InputStream input = new ByteArrayInputStream(maxVertexIndexByteArray);
@@ -217,7 +218,7 @@ public JSONObject toJSONObject() throws IOException, JSONException {
DataOutput output = new DataOutputStream(outputStream);
maxVertexIndex.write(output);
vertexRangeObj.put(BspService.JSONOBJ_MAX_VERTEX_INDEX_KEY,
- Base64.encodeBase64String(
+ Base64.encodeBytes(
outputStream.toByteArray()));
vertexRangeObj.put(BspService.JSONOBJ_HOSTNAME_KEY, hostname);
vertexRangeObj.put(BspService.JSONOBJ_PORT_KEY, port);
View
14 src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,7 +22,9 @@
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
-import org.apache.commons.codec.binary.Base64;
+
+import net.iharder.Base64;
+
import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.MutableVertex;
@@ -93,7 +95,7 @@ public boolean next(MutableVertex<I, V, E, ?> vertex)
DataInput input = null;
byte[] decodedWritable = null;
try {
- decodedWritable = Base64.decodeBase64(
+ decodedWritable = Base64.decode(
vertexObject.getString(VERTEX_ID_KEY));
input = new DataInputStream(
new ByteArrayInputStream(decodedWritable));
@@ -105,7 +107,7 @@ public boolean next(MutableVertex<I, V, E, ?> vertex)
"next: Failed to get vertex id", e);
}
try {
- decodedWritable = Base64.decodeBase64(
+ decodedWritable = Base64.decode(
vertexObject.getString(VERTEX_VALUE_KEY));
input = new DataInputStream(
new ByteArrayInputStream(decodedWritable));
@@ -126,7 +128,7 @@ public boolean next(MutableVertex<I, V, E, ?> vertex)
for (int i = 0; i < edgeArray.length(); ++i) {
try {
decodedWritable =
- Base64.decodeBase64(edgeArray.getString(i));
+ Base64.decode(edgeArray.getString(i));
} catch (JSONException e) {
throw new IllegalArgumentException(
"next: Failed to get edge value", e);
View
10 src/main/java/org/apache/giraph/lib/JsonBase64VertexOutputFormat.java
@@ -22,7 +22,9 @@
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
-import org.apache.commons.codec.binary.Base64;
+
+import net.iharder.Base64;
+
import org.apache.giraph.graph.BasicVertex;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.VertexWriter;
@@ -81,7 +83,7 @@ public void writeVertex(BasicVertex<I, V, E, ?> vertex)
try {
vertexObject.put(
VERTEX_ID_KEY,
- Base64.encodeBase64String(outputStream.toByteArray()));
+ Base64.encodeBytes(outputStream.toByteArray()));
} catch (JSONException e) {
throw new IllegalStateException(
"writerVertex: Failed to insert vertex id", e);
@@ -91,7 +93,7 @@ public void writeVertex(BasicVertex<I, V, E, ?> vertex)
try {
vertexObject.put(
VERTEX_VALUE_KEY,
- Base64.encodeBase64String(outputStream.toByteArray()));
+ Base64.encodeBytes(outputStream.toByteArray()));
} catch (JSONException e) {
throw new IllegalStateException(
"writerVertex: Failed to insert vertex value", e);
@@ -100,7 +102,7 @@ public void writeVertex(BasicVertex<I, V, E, ?> vertex)
for (Edge<I, E> edge : vertex.getOutEdgeMap().values()) {
outputStream.reset();
edge.write(output);
- edgeArray.put(Base64.encodeBase64String(outputStream.toByteArray()));
+ edgeArray.put(Base64.encodeBytes(outputStream.toByteArray()));
}
try {
vertexObject.put(EDGE_ARRAY_KEY, edgeArray);
Please sign in to comment.
Something went wrong with that request. Please try again.