Skip to content
Permalink
Browse files
JIRA-1147
closes #38
  • Loading branch information
Maja Kabiljo committed May 19, 2017
1 parent 67540b3 commit 02e73b917972ef69c7b901c3b5c493a4a504731c
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 10 deletions.
@@ -34,16 +34,25 @@ public class AskForInputSplitRequest extends WritableRequest
private InputType splitType;
/** Task id of worker which requested the split */
private int workerTaskId;
/**
* Whether this is the first split a thread is requesting,
* or this request indicates that previously requested input split was done
*/
private boolean isFirstSplit;

/**
* Constructor
*
* @param splitType Type of split we are requesting
* @param workerTaskId Task id of worker which requested the split
* @param isFirstSplit Whether this is the first split a thread is requesting,
* or this request indicates that previously requested input split was done
*/
public AskForInputSplitRequest(InputType splitType, int workerTaskId) {
public AskForInputSplitRequest(InputType splitType, int workerTaskId,
boolean isFirstSplit) {
this.splitType = splitType;
this.workerTaskId = workerTaskId;
this.isFirstSplit = isFirstSplit;
}

/**
@@ -54,19 +63,22 @@ public AskForInputSplitRequest() {

@Override
public void doRequest(MasterGlobalCommHandler commHandler) {
commHandler.getInputSplitsHandler().sendSplitTo(splitType, workerTaskId);
commHandler.getInputSplitsHandler().sendSplitTo(
splitType, workerTaskId, isFirstSplit);
}

@Override
void readFieldsRequest(DataInput in) throws IOException {
splitType = InputType.values()[in.readInt()];
workerTaskId = in.readInt();
isFirstSplit = in.readBoolean();
}

@Override
void writeRequest(DataOutput out) throws IOException {
out.writeInt(splitType.ordinal());
out.writeInt(workerTaskId);
out.writeBoolean(isFirstSplit);
}

@Override
@@ -846,7 +846,7 @@ public boolean becomeMaster() {
globalCommHandler = new MasterGlobalCommHandler(
new MasterAggregatorHandler(getConfiguration(), getContext()),
new MasterInputSplitsHandler(
getConfiguration().useInputSplitLocality()));
getConfiguration().useInputSplitLocality(), getContext()));
aggregatorTranslation = new AggregatorToGlobalCommTranslation(
getConfiguration(), globalCommHandler);

@@ -20,10 +20,13 @@

import org.apache.giraph.comm.MasterClient;
import org.apache.giraph.comm.requests.ReplyWithInputSplitRequest;
import org.apache.giraph.conf.StrConfOption;
import org.apache.giraph.io.GiraphInputFormat;
import org.apache.giraph.io.InputType;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.ByteArrayOutputStream;
import java.io.DataOutput;
@@ -34,6 +37,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Handler for input splits on master
@@ -44,6 +48,15 @@
* these splits back to queues.
*/
public class MasterInputSplitsHandler {
/**
* Store in counters timestamps when we finished reading
* these fractions of input
*/
public static final StrConfOption DONE_FRACTIONS_TO_STORE_IN_COUNTERS =
new StrConfOption("giraph.master.input.doneFractionsToStoreInCounters",
"0.99,1", "Store in counters timestamps when we finished reading " +
"these fractions of input");

/** Whether to use locality information */
private final boolean useLocality;
/** Master client */
@@ -56,16 +69,42 @@ public class MasterInputSplitsHandler {
/** Latches to say when one input splits type is ready to be accessed */
private Map<InputType, CountDownLatch> latchesMap =
new EnumMap<>(InputType.class);
/** Context for accessing counters */
private final Mapper.Context context;
/** How many splits per type are there total */
private final Map<InputType, Integer> numSplitsPerType =
new EnumMap<>(InputType.class);
/** How many splits per type have been read so far */
private final Map<InputType, AtomicInteger> numSplitsReadPerType =
new EnumMap<>(InputType.class);
/** Timestamps when various splits were created */
private final Map<InputType, Long> splitsCreatedTimestamp =
new EnumMap<>(InputType.class);
/**
* Store in counters timestamps when we finished reading
* these fractions of input
*/
private final double[] doneFractionsToStoreInCounters;

/**
* Constructor
*
* @param useLocality Whether to use locality information or not
* @param context Context for accessing counters
*/
public MasterInputSplitsHandler(boolean useLocality) {
public MasterInputSplitsHandler(boolean useLocality, Mapper.Context context) {
this.useLocality = useLocality;
this.context = context;
for (InputType inputType : InputType.values()) {
latchesMap.put(inputType, new CountDownLatch(1));
numSplitsReadPerType.put(inputType, new AtomicInteger(0));
}

String[] tmp = DONE_FRACTIONS_TO_STORE_IN_COUNTERS.get(
context.getConfiguration()).split(",");
doneFractionsToStoreInCounters = new double[tmp.length];
for (int i = 0; i < tmp.length; i++) {
doneFractionsToStoreInCounters[i] = Double.parseDouble(tmp[i].trim());
}
}

@@ -89,6 +128,7 @@ public void initialize(MasterClient masterClient, List<WorkerInfo> workers) {
*/
public void addSplits(InputType splitsType, List<InputSplit> inputSplits,
GiraphInputFormat inputFormat) {
splitsCreatedTimestamp.put(splitsType, System.currentTimeMillis());
List<byte[]> serializedSplits = new ArrayList<>();
for (InputSplit inputSplit : inputSplits) {
try {
@@ -114,6 +154,7 @@ public void addSplits(InputType splitsType, List<InputSplit> inputSplits,
}
splitsMap.put(splitsType, inputSplitsOrganizer);
latchesMap.get(splitsType).countDown();
numSplitsPerType.put(splitsType, serializedSplits.size());
}

/**
@@ -123,8 +164,11 @@ public void addSplits(InputType splitsType, List<InputSplit> inputSplits,
*
* @param splitType Type of split requested
* @param workerTaskId Id of worker who requested split
* @param isFirstSplit Whether this is the first split a thread is requesting,
* or this request indicates that previously requested input split was done
*/
public void sendSplitTo(InputType splitType, int workerTaskId) {
public void sendSplitTo(InputType splitType, int workerTaskId,
boolean isFirstSplit) {
try {
// Make sure we don't try to retrieve splits before they were added
latchesMap.get(splitType).await();
@@ -136,5 +180,52 @@ public void sendSplitTo(InputType splitType, int workerTaskId) {
masterClient.sendWritableRequest(workerTaskId,
new ReplyWithInputSplitRequest(splitType,
serializedInputSplit == null ? new byte[0] : serializedInputSplit));
if (!isFirstSplit) {
incrementSplitsRead(splitType);
}
}

/**
* Increment splits read
*
* @param splitType Type of split which was read
*/
private void incrementSplitsRead(InputType splitType) {
int splitsRead = numSplitsReadPerType.get(splitType).incrementAndGet();
int splits = numSplitsPerType.get(splitType);
for (int i = 0; i < doneFractionsToStoreInCounters.length; i++) {
if (splitsRead == (int) (splits * doneFractionsToStoreInCounters[i])) {
splitFractionReached(
splitType, doneFractionsToStoreInCounters[i], context);
}
}
}

/**
* Call when we reached some fraction of split type done to set the
* timestamp counter
*
* @param inputType Type of input
* @param fraction Which fraction of input type was done reading
* @param context Context for accessing counters
*/
private void splitFractionReached(
InputType inputType, double fraction, Mapper.Context context) {
getSplitFractionDoneTimestampCounter(inputType, fraction, context).setValue(
System.currentTimeMillis() - splitsCreatedTimestamp.get(inputType));
}

/**
* Get counter
*
* @param inputType Type of input for counter
* @param fraction Fraction for counter
* @param context Context to get counter from
* @return Counter
*/
public static Counter getSplitFractionDoneTimestampCounter(
InputType inputType, double fraction, Mapper.Context context) {
return context.getCounter(inputType.name() + " input",
String.format("%.2f%% done time (ms)", fraction * 100));
}
}
@@ -208,15 +208,19 @@ protected abstract VertexEdgeCount readInputSplit(InputSplit inputSplit)
@Override
public VertexEdgeCount call() {
VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
byte[] serializedInputSplit;
int inputSplitsProcessed = 0;
try {
OutOfCoreEngine oocEngine = serviceWorker.getServerData().getOocEngine();
if (oocEngine != null) {
oocEngine.processingThreadStart();
}
while ((serializedInputSplit =
splitsHandler.reserveInputSplit(getInputType())) != null) {
while (true) {
byte[] serializedInputSplit = splitsHandler.reserveInputSplit(
getInputType(), inputSplitsProcessed == 0);
if (serializedInputSplit == null) {
// No splits left
break;
}
// If out-of-core mechanism is used, check whether this thread
// can stay active or it should temporarily suspend and stop
// processing and generating more data for the moment.
@@ -91,12 +91,14 @@ public void receivedInputSplit(InputType splitType,
* have been created.
*
* @param splitType Type of split
* @param isFirstSplit Whether this is the first split input thread reads
* @return reserved InputSplit or null if no unfinished InputSplits exist
*/
public byte[] reserveInputSplit(InputType splitType) {
public byte[] reserveInputSplit(InputType splitType, boolean isFirstSplit) {
// Send request
workerClient.sendWritableRequest(masterTaskId,
new AskForInputSplitRequest(splitType, workerInfo.getTaskId()));
new AskForInputSplitRequest(
splitType, workerInfo.getTaskId(), isFirstSplit));
try {
// Wait for some split to become available
byte[] serializedInputSplit = availableInputSplits.get(splitType).take();

0 comments on commit 02e73b9

Please sign in to comment.