Skip to content
This repository has been archived by the owner on Oct 30, 2023. It is now read-only.

Commit

Permalink
Decoupling NettyClient from control flow policy
Browse files Browse the repository at this point in the history
Summary: This diff refactors NettyClient by decoupling flow control mechanism from NettyClient. Through the refactoring process, some performance and correctness bugs have been found due to the better readability of the refactored code.

Test Plan:
mvn clean verify
Tested large jobs and the output was correct
Tested large jobs and it did not have any performance degradation for codes using the old mechanism

Reviewers: maja.kabiljo, sergey.edunov, avery.ching, dionysis.logothetis

Reviewed By: dionysis.logothetis

Differential Revision: https://reviews.facebook.net/D56367
  • Loading branch information
Sergey Edunov committed Apr 26, 2016
1 parent b5284cd commit b90b59d
Show file tree
Hide file tree
Showing 30 changed files with 921 additions and 679 deletions.
6 changes: 1 addition & 5 deletions findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,7 @@
<Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/>
</Match>
<Match>
<Class name="org.apache.giraph.comm.netty.handler.RequestServerHandler"/>
<Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/>
</Match>
<Match>
<Class name="org.apache.giraph.comm.netty.handler.ResponseClientHandler"/>
<Class name="org.apache.giraph.comm.flow_control.CreditBasedFlowControl"/>
<Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/>
</Match>
<Match>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;

import org.apache.giraph.job.JobProgressTracker;
import org.apache.giraph.master.MasterInfo;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
Expand Down Expand Up @@ -59,6 +60,13 @@ public interface CentralizedService<I extends WritableComparable,
*/
List<WorkerInfo> getWorkerInfoList();

/**
* Get master info
*
* @return Master info
*/
MasterInfo getMasterInfo();

/**
* Get JobProgressTracker to report progress to
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.giraph.master.AggregatorToGlobalCommTranslation;
import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.master.MasterGlobalCommHandler;
import org.apache.giraph.master.MasterInfo;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
Expand Down Expand Up @@ -53,13 +52,6 @@ public interface CentralizedServiceMaster<I extends WritableComparable,
*/
boolean becomeMaster();

/**
* Get master information
*
* @return Master information
*/
MasterInfo getMasterInfo();

/**
* Check all the {@link org.apache.giraph.worker.WorkerInfo} objects to ensure
* that a minimum number of good workers exists out of the total that have
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.giraph.graph.GraphTaskManager;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.superstep_output.SuperstepOutput;
import org.apache.giraph.master.MasterInfo;
import org.apache.giraph.metrics.GiraphTimerContext;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.partition.PartitionStats;
Expand Down Expand Up @@ -182,13 +181,6 @@ FinishedSuperstepStats finishSuperstep(
void exchangeVertexPartitions(
Collection<? extends PartitionOwner> masterSetPartitionOwners);

/**
* Get master info
*
* @return Master info
*/
MasterInfo getMasterInfo();

/**
* Get the GraphTaskManager that this service is using. Vertices need to know
* this.
Expand Down Expand Up @@ -247,13 +239,6 @@ void cleanup(FinishedSuperstepStats finishedSuperstepStats)
*/
GlobalStats getGlobalStats();

/**
* Get the number of partitions owned by this worker
*
* @return number of partitions owned
*/
int getNumPartitionsOwned();

/**
* Get input splits handler used during input
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;

import org.apache.giraph.comm.flow_control.FlowControl;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.hadoop.io.Writable;

Expand Down Expand Up @@ -66,5 +67,12 @@ void sendToOwner(String name, GlobalCommType type, Writable value)
* Closes all connections.
*/
void closeConnections();

/**
* Get the reference to the flow control policy used for sending requests
*
* @return reference to the flow control policy
*/
FlowControl getFlowControl();
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.giraph.comm;

import org.apache.giraph.comm.flow_control.FlowControl;

import java.net.InetSocketAddress;

/**
Expand All @@ -42,4 +44,11 @@ public interface MasterServer {
* Shuts down.
*/
void close();

/**
* Inform the server about the flow control policy used in sending requests
*
* @param flowControl reference to flow control policy
*/
void setFlowControl(FlowControl flowControl);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.giraph.comm;

import org.apache.giraph.comm.flow_control.FlowControl;
import org.apache.giraph.comm.requests.WritableRequest;

import org.apache.giraph.partition.PartitionOwner;
Expand Down Expand Up @@ -96,4 +97,9 @@ public interface WorkerClient<I extends WritableComparable,
*/
void authenticate() throws IOException;
/*end[HADOOP_NON_SECURE]*/

/**
* @return the flow control used in sending requests
*/
FlowControl getFlowControl();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.giraph.comm;

import org.apache.giraph.comm.flow_control.FlowControl;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

Expand Down Expand Up @@ -64,4 +65,11 @@ public interface WorkerServer<I extends WritableComparable,
* Shuts down.
*/
void close();

/**
* Inform this server about the flow control used in sending requests
*
* @param flowControl reference to the flow control policy
*/
void setFlowControl(FlowControl flowControl);
}
Loading

0 comments on commit b90b59d

Please sign in to comment.