Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'default_newrpc_and_secure' into trunk

  • Loading branch information...
commit 54edf4dad22b29add2a9555c9bd9f7a69cce9de2 2 parents e28f7d8 + 528c5cd
@ekoontz authored
View
109 pom.xml
@@ -467,6 +467,29 @@ under the License.
<scope>provided</scope>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.sonatype.plugins</groupId>
+ <artifactId>munge-maven-plugin</artifactId>
+ <version>${munge-maven-plugin.version}</version>
+ <executions>
+ <execution>
+ <id>munge</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>munge</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <symbols>HADOOP_OLDRPC</symbols>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
</profile>
<profile>
<id>hadoop_non_secure</id>
@@ -503,7 +526,7 @@ under the License.
</execution>
</executions>
<configuration>
- <symbols>HADOOP_NON_SECURE</symbols>
+ <symbols>HADOOP_NON_SECURE,HADOOP_OLDRPC</symbols>
</configuration>
</plugin>
<plugin>
@@ -518,19 +541,6 @@ under the License.
<target>${compileSource}</target>
</configuration>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.6</version>
- <configuration>
- <systemProperties>
- <property>
- <name>prop.jarLocation</name>
- <value>target/munged/giraph-${project.version}-jar-with-dependencies.jar</value>
- </property>
- </systemProperties>
- </configuration>
- </plugin>
</plugins>
</build>
</profile>
@@ -575,9 +585,6 @@ under the License.
</goals>
</execution>
</executions>
- <configuration>
- <symbols>HADOOP_SECURE</symbols>
- </configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -606,40 +613,6 @@ under the License.
<properties>
<hadoop.version>0.23.1</hadoop.version>
</properties>
- <build>
- <plugins>
- <plugin>
- <groupId>org.sonatype.plugins</groupId>
- <artifactId>munge-maven-plugin</artifactId>
- <version>${munge-maven-plugin.version}</version>
- <executions>
- <execution>
- <id>munge</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>munge</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <symbols>HADOOP_SECURE</symbols>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.6</version>
- <configuration>
- <systemProperties>
- <property>
- <name>prop.jarLocation</name>
- <value>target/munged/giraph-${project.version}-jar-with-dependencies.jar</value>
- </property>
- </systemProperties>
- </configuration>
- </plugin>
- </plugins>
- </build>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -671,40 +644,6 @@ under the License.
<properties>
<hadoop.version>0.24.0-SNAPSHOT</hadoop.version>
</properties>
- <build>
- <plugins>
- <plugin>
- <groupId>org.sonatype.plugins</groupId>
- <artifactId>munge-maven-plugin</artifactId>
- <version>${munge-maven-plugin.version}</version>
- <executions>
- <execution>
- <id>munge</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>munge</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <symbols>HADOOP_SECURE</symbols>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.6</version>
- <configuration>
- <systemProperties>
- <property>
- <name>prop.jarLocation</name>
- <value>target/munged/giraph-${project.version}-jar-with-dependencies.jar</value>
- </property>
- </systemProperties>
- </configuration>
- </plugin>
- </plugins>
- </build>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
View
9 src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java
@@ -52,12 +52,11 @@ public void setupJob(JobContext context) throws IOException {
public void setupTask(TaskAttemptContext context) throws IOException {
}
- /*if[HADOOP_NON_SECURE]
- @Override
- public void cleanupJob(JobContext jobContext) throws IOException {
- }
- else[HADOOP_NON_SECURE]*/
@Override
+ /*if[HADOOP_NON_SECURE]
+ public void cleanupJob(JobContext jobContext) throws IOException {
+ }
+ else[HADOOP_NON_SECURE]*/
/*end[HADOOP_NON_SECURE]*/
public void commitJob(JobContext jobContext) throws IOException {
}
View
31 src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
@@ -61,9 +61,10 @@
import com.google.common.collect.Iterables;
-/*if[HADOOP_SECURE]
+/*if[HADOOP_OLDRPC]
+else[HADOOP_OLDRPC]*/
import org.apache.hadoop.ipc.ProtocolSignature;
-end[HADOOP_SECURE]*/
+/*end[HADOOP_OLDRPC]*/
/**
* Basic RPC communications object that implements the lower level operations
@@ -693,14 +694,24 @@ public final long getProtocolVersion(String protocol, long clientVersion)
return VERSION_ID;
}
- /*if[HADOOP_SECURE]
- public ProtocolSignature getProtocolSignature(
- String protocol,
- long clientVersion,
- int clientMethodsHash) throws IOException {
- return new ProtocolSignature(VERSION_ID, null);
- }
-end[HADOOP_SECURE]*/
+ /*if[HADOOP_OLDRPC]
+ else[HADOOP_OLDRPC]*/
+ /**
+ * Get the Protocol Signature for the given protocol,
+ * client version and method.
+ *
+ * @param protocol Protocol.
+ * @param clientVersion Version of Client.
+ * @param clientMethodsHash Hash of Client methods.
+ * @return ProtocolSignature for input parameters.
+ */
+ public ProtocolSignature getProtocolSignature(
+ String protocol,
+ long clientVersion,
+ int clientMethodsHash) throws IOException {
+ return new ProtocolSignature(VERSION_ID, null);
+ }
+ /*end[HADOOP_OLDRPC]*/
@Override
public void closeConnections() throws IOException {
View
158 src/main/java/org/apache/giraph/comm/RPCCommunications.java
@@ -22,7 +22,8 @@
import java.net.InetSocketAddress;
-/*if[HADOOP_SECURE]
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.security.TokenCache;
@@ -32,22 +33,21 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.security.token.Token;
-else[HADOOP_SECURE]*/
-/*end[HADOOP_SECURE]*/
+/*end[HADOOP_NON_SECURE]*/
import org.apache.log4j.Logger;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.graph.GraphState;
-/*if[HADOOP_SECURE]
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
import org.apache.giraph.hadoop.BspPolicyProvider;
-else[HADOOP_SECURE]*/
-/*end[HADOOP_SECURE]*/
+/*end[HADOOP_NON_SECURE]*/
import org.apache.hadoop.conf.Configuration;
-/*if[HADOOP_SECURE]
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
import org.apache.hadoop.io.Text;
-else[HADOOP_SECURE]*/
-/*end[HADOOP_SECURE]*/
+/*end[HADOOP_NON_SECURE]*/
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.ipc.RPC;
@@ -65,11 +65,11 @@
@SuppressWarnings("rawtypes")
public class RPCCommunications<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- /*if[HADOOP_SECURE]
- extends BasicRPCCommunications<I, V, E, M, Token<JobTokenIdentifier>> {
- else[HADOOP_SECURE]*/
+ /*if[HADOOP_OLDRPC]
extends BasicRPCCommunications<I, V, E, M, Object> {
- /*end[HADOOP_SECURE]*/
+ else[HADOOP_OLDRPC]*/
+ extends BasicRPCCommunications<I, V, E, M, Token<JobTokenIdentifier>> {
+ /*end[HADOOP_OLDRPC]*/
/** Class logger */
public static final Logger LOG = Logger.getLogger(RPCCommunications.class);
@@ -97,14 +97,15 @@ public RPCCommunications(Mapper<?, ?, ?, ?>.Context context,
* @return Job token.
*/
protected
- /*if[HADOOP_SECURE]
- Token<JobTokenIdentifier> createJobToken() throws IOException {
- else[HADOOP_SECURE]*/
+ /*if[HADOOP_NON_SECURE]
Object createJobToken() throws IOException {
- /*end[HADOOP_SECURE]*/
- /*if[HADOOP_SECURE]
+ else[HADOOP_NON_SECURE]*/
+ Token<JobTokenIdentifier> createJobToken() throws IOException {
+ /*end[HADOOP_NON_SECURE]*/
+ /*if[HADOOP_NON_SECURE]
+ else[HADOOP_NON_SECURE]*/
String localJobTokenFile = System.getenv().get(
- UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+ UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
if (localJobTokenFile != null) {
// TODO: learn how to initialize/configure JobConf objects.
JobConf jobConf = new JobConf();
@@ -112,8 +113,7 @@ Object createJobToken() throws IOException {
TokenCache.loadTokens(localJobTokenFile, jobConf);
return TokenCache.getJobToken(credentials);
}
- else[HADOOP_SECURE]*/
- /*end[HADOOP_SECURE]*/
+ /*end[HADOOP_NON_SECURE]*/
return null;
}
@@ -129,32 +129,31 @@ Object createJobToken() throws IOException {
@Override
protected Server getRPCServer(
InetSocketAddress myAddress, int numHandlers, String jobId,
- /*if[HADOOP_SECURE]
- // needs facebook/trunk distinction.
- Token<JobTokenIdentifier> jt) throws IOException {
- @SuppressWarnings("deprecation")
- JobTokenSecretManager jobTokenSecretManager =
- new JobTokenSecretManager();
- if (jt != null) { //could be null in the case of some unit tests
- jobTokenSecretManager.addTokenForJob(jobId, jt);
- if (LOG.isInfoEnabled()) {
- LOG.info("getRPCServer: Added jobToken " + jt);
- }
- }
- Server server = RPC.getServer(RPCCommunications.class, this,
- myAddress.getHostName(), myAddress.getPort(),
- numHandlers, false, conf, jobTokenSecretManager);
- String hadoopSecurityAuthorization =
- ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG;
- if (conf.getBoolean(hadoopSecurityAuthorization, false)) {
- server.refreshServiceAcl(conf, new BspPolicyProvider());
- }
- return server;
- else[HADOOP_SECURE]*/
+ /*if[HADOOP_OLDRPC]
Object jt) throws IOException {
return RPC.getServer(this, myAddress.getHostName(), myAddress.getPort(),
numHandlers, false, conf);
- /*end[HADOOP_SECURE]*/
+ else[HADOOP_OLDRPC]*/
+ Token<JobTokenIdentifier> jt) throws IOException {
+ @SuppressWarnings("deprecation")
+ JobTokenSecretManager jobTokenSecretManager =
+ new JobTokenSecretManager();
+ if (jt != null) { //could be null in the case of some unit tests
+ jobTokenSecretManager.addTokenForJob(jobId, jt);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("getRPCServer: Added jobToken " + jt);
+ }
+ }
+ Server server = RPC.getServer(RPCCommunications.class, this,
+ myAddress.getHostName(), myAddress.getPort(),
+ numHandlers, false, conf, jobTokenSecretManager);
+ String hadoopSecurityAuthorization =
+ ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG;
+ if (conf.getBoolean(hadoopSecurityAuthorization, false)) {
+ server.refreshServiceAcl(conf, new BspPolicyProvider());
+ }
+ return server;
+ /*end[HADOOP_OLDRPC]*/
}
@@ -167,54 +166,55 @@ protected Server getRPCServer(
* @return Proxy of the RPC server.
*/
protected
- /*if[HADOOP_SECURE]
+ /*if[HADOOP_OLDRPC]
+ CommunicationsInterface<I, V, E, M> getRPCProxy(
+ final InetSocketAddress addr,
+ String jobId,
+ Object jt) throws IOException, InterruptedException {
+ final Configuration config = new Configuration(conf);
+ @SuppressWarnings("unchecked")
+ CommunicationsInterface<I, V, E, M> proxy =
+ (CommunicationsInterface<I, V, E, M>) RPC.getProxy(
+ CommunicationsInterface.class, VERSION_ID, addr, config);
+ return proxy;
+ }
+ else[HADOOP_OLDRPC]*/
+
CommunicationsInterface<I, V, E, M> getRPCProxy(
- final InetSocketAddress addr,
- String jobId,
- Token<JobTokenIdentifier> jt)
- throws IOException, InterruptedException {
- final Configuration config = new Configuration(conf);
- if (jt == null) {
- @SuppressWarnings("unchecked")
+ final InetSocketAddress addr,
+ String jobId,
+ Token<JobTokenIdentifier> jt)
+ throws IOException, InterruptedException {
+ final Configuration config = new Configuration(conf);
+ if (jt == null) {
+ @SuppressWarnings("unchecked")
CommunicationsInterface<I, V, E, M> proxy =
(CommunicationsInterface<I, V, E, M>) RPC.getProxy(
CommunicationsInterface.class, VERSION_ID, addr, config);
- return proxy;
- }
- jt.setService(new Text(addr.getAddress().getHostAddress() + ":" +
- addr.getPort()));
- UserGroupInformation current = UserGroupInformation.getCurrentUser();
- current.addToken(jt);
- UserGroupInformation owner =
- UserGroupInformation.createRemoteUser(jobId);
- owner.addToken(jt);
- @SuppressWarnings("unchecked")
+ return proxy;
+ }
+ jt.setService(new Text(addr.getAddress().getHostAddress() + ":" +
+ addr.getPort()));
+ UserGroupInformation current = UserGroupInformation.getCurrentUser();
+ current.addToken(jt);
+ UserGroupInformation owner =
+ UserGroupInformation.createRemoteUser(jobId);
+ owner.addToken(jt);
+ @SuppressWarnings("unchecked")
CommunicationsInterface<I, V, E, M> proxy =
owner.doAs(new PrivilegedExceptionAction<
CommunicationsInterface<I, V, E, M>>() {
- @Override
- public CommunicationsInterface<I, V, E, M> run()
+ @Override
+ public CommunicationsInterface<I, V, E, M> run()
throws Exception {
// All methods in CommunicationsInterface will be used for
// RPC
- return (CommunicationsInterface<I, V, E, M>) RPC.getProxy(
+ return (CommunicationsInterface<I, V, E, M>) RPC.getProxy(
CommunicationsInterface.class, VERSION_ID, addr,
config);
- }
+ }
});
- return proxy;
- }
- else[HADOOP_SECURE]*/
- CommunicationsInterface<I, V, E, M> getRPCProxy(
- final InetSocketAddress addr,
- String jobId,
- Object jt) throws IOException, InterruptedException {
- final Configuration config = new Configuration(conf);
- @SuppressWarnings("unchecked")
- CommunicationsInterface<I, V, E, M> proxy =
- (CommunicationsInterface<I, V, E, M>) RPC.getProxy(
- CommunicationsInterface.class, VERSION_ID, addr, config);
return proxy;
}
- /*end[HADOOP_SECURE]*/
+ /*end[HADOOP_OLDRPC]*/
}
View
18 src/test/java/org/apache/giraph/TestBspBasic.java
@@ -52,10 +52,10 @@
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
-/*if[HADOOP_SECURE]
+/*if[HADOOP_OLDRPC]
+else[HADOOP_OLDRPC]*/
import org.apache.hadoop.mapreduce.task.JobContextImpl;
-else[HADOOP_SECURE]*/
-/*end[HADOOP_SECURE]*/
+/*end[HADOOP_OLDRPC]*/
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -115,15 +115,15 @@ public void testInstantiateVertex()
", graphState" + gs);
VertexInputFormat<LongWritable, IntWritable, FloatWritable, IntWritable>
inputFormat = BspUtils.createVertexInputFormat(job.getConfiguration());
- /*if[HADOOP_SECURE]
- List<InputSplit> splitArray =
- inputFormat.getSplits(
- new JobContextImpl(new Configuration(), new JobID()), 1);
- else[HADOOP_SECURE]*/
+ /*if[HADOOP_OLDRPC]
List<InputSplit> splitArray =
inputFormat.getSplits(
new JobContext(new Configuration(), new JobID()), 1);
- /*end[HADOOP_SECURE]*/
+ else[HADOOP_OLDRPC]*/
+ List<InputSplit> splitArray =
+ inputFormat.getSplits(
+ new JobContextImpl(new Configuration(), new JobID()), 1);
+ /*end[HADOOP_OLDRPC]*/
ByteArrayOutputStream byteArrayOutputStream =
new ByteArrayOutputStream();
DataOutputStream outputStream =
Please sign in to comment.
Something went wrong with that request. Please try again.