Skip to content
This repository has been archived by the owner on Oct 30, 2023. It is now read-only.

Commit

Permalink
GIRAPH-1033: Remove zookeeper from input splits handling
Browse files Browse the repository at this point in the history
Summary: Currently we use zookeeper for handling input splits, by having each worker checking each split, and when a lot of splits are used this becomes very slow. We should have master coordinate input splits allocation instead, making the complexity proportional to #splits instead of #workers*#splits. Master holds all the splits and worker send requests to him asking for splits when they need them.

Test Plan: Run a job with 200 machines and 200k small splits - without this change input superstep takes 30 minutes, and with it less than 2 minutes. Also verified correctness on sample job. mvn clean verify passes.

Differential Revision: https://reviews.facebook.net/D48531
  • Loading branch information
Maja Kabiljo committed Oct 19, 2015
1 parent 47da751 commit 5b0cd0e
Show file tree
Hide file tree
Showing 41 changed files with 1,164 additions and 1,648 deletions.
334 changes: 43 additions & 291 deletions giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java

Large diffs are not rendered by default.

Original file line number Original file line Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import java.util.List; import java.util.List;


import org.apache.giraph.master.AggregatorToGlobalCommTranslation; import org.apache.giraph.master.AggregatorToGlobalCommTranslation;
import org.apache.giraph.master.MasterAggregatorHandler;
import org.apache.giraph.master.MasterCompute; import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.master.MasterGlobalCommHandler;
import org.apache.giraph.master.MasterInfo; import org.apache.giraph.master.MasterInfo;
import org.apache.giraph.worker.WorkerInfo; import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
Expand Down Expand Up @@ -144,7 +144,7 @@ void setJobState(ApplicationState state,
* *
* @return Global communication handler * @return Global communication handler
*/ */
MasterAggregatorHandler getGlobalCommHandler(); MasterGlobalCommHandler getGlobalCommHandler();


/** /**
* Handler for aggregators to reduce/broadcast translation * Handler for aggregators to reduce/broadcast translation
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.partition.PartitionStats; import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.partition.PartitionStore; import org.apache.giraph.partition.PartitionStore;
import org.apache.giraph.worker.WorkerInputSplitsHandler;
import org.apache.giraph.worker.WorkerAggregatorHandler; import org.apache.giraph.worker.WorkerAggregatorHandler;
import org.apache.giraph.worker.WorkerContext; import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerInfo; import org.apache.giraph.worker.WorkerInfo;
Expand Down Expand Up @@ -252,4 +253,11 @@ void cleanup(FinishedSuperstepStats finishedSuperstepStats)
* @return number of partitions owned * @return number of partitions owned
*/ */
int getNumPartitionsOwned(); int getNumPartitionsOwned();

/**
* Get input splits handler used during input
*
* @return Input splits handler
*/
WorkerInputSplitsHandler getInputSplitsHandler();
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@


import java.io.IOException; import java.io.IOException;


import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;


/** /**
Expand Down Expand Up @@ -53,6 +54,14 @@ void sendToOwner(String name, GlobalCommType type, Writable value)
*/ */
void flush(); void flush();


/**
* Send a request to a remote server (should be already connected)
*
* @param destTaskId Destination worker id
* @param request Request to send
*/
void sendWritableRequest(int destTaskId, WritableRequest request);

/** /**
* Closes all connections. * Closes all connections.
*/ */
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.giraph.comm.aggregators.AggregatorUtils; import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.giraph.comm.aggregators.SendGlobalCommCache; import org.apache.giraph.comm.aggregators.SendGlobalCommCache;
import org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest; import org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.worker.WorkerInfo; import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
Expand Down Expand Up @@ -116,6 +117,11 @@ public void flush() {
nettyClient.waitAllRequests(); nettyClient.waitAllRequests();
} }


@Override
public void sendWritableRequest(int destTaskId, WritableRequest request) {
nettyClient.sendWritableRequest(destTaskId, request);
}

@Override @Override
public void closeConnections() { public void closeConnections() {
nettyClient.stop(); nettyClient.stop();
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -21,52 +21,52 @@
import org.apache.giraph.comm.requests.MasterRequest; import org.apache.giraph.comm.requests.MasterRequest;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.TaskInfo; import org.apache.giraph.graph.TaskInfo;
import org.apache.giraph.master.MasterAggregatorHandler; import org.apache.giraph.master.MasterGlobalCommHandler;


/** Handler for requests on master */ /** Handler for requests on master */
public class MasterRequestServerHandler extends public class MasterRequestServerHandler extends
RequestServerHandler<MasterRequest> { RequestServerHandler<MasterRequest> {
/** Aggregator handler */ /** Aggregator handler */
private final MasterAggregatorHandler aggregatorHandler; private final MasterGlobalCommHandler commHandler;


/** /**
* Constructor * Constructor
* *
* @param workerRequestReservedMap Worker request reservation map * @param workerRequestReservedMap Worker request reservation map
* @param conf Configuration * @param conf Configuration
* @param myTaskInfo Current task info * @param myTaskInfo Current task info
* @param aggregatorHandler Master aggregator handler * @param commHandler Master communication handler
* @param exceptionHandler Handles uncaught exceptions * @param exceptionHandler Handles uncaught exceptions
*/ */
public MasterRequestServerHandler( public MasterRequestServerHandler(
WorkerRequestReservedMap workerRequestReservedMap, WorkerRequestReservedMap workerRequestReservedMap,
ImmutableClassesGiraphConfiguration conf, ImmutableClassesGiraphConfiguration conf,
TaskInfo myTaskInfo, TaskInfo myTaskInfo,
MasterAggregatorHandler aggregatorHandler, MasterGlobalCommHandler commHandler,
Thread.UncaughtExceptionHandler exceptionHandler) { Thread.UncaughtExceptionHandler exceptionHandler) {
super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler); super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler);
this.aggregatorHandler = aggregatorHandler; this.commHandler = commHandler;
} }


@Override @Override
public void processRequest(MasterRequest request) { public void processRequest(MasterRequest request) {
request.doRequest(aggregatorHandler); request.doRequest(commHandler);
} }


/** /**
* Factory for {@link MasterRequestServerHandler} * Factory for {@link MasterRequestServerHandler}
*/ */
public static class Factory implements RequestServerHandler.Factory { public static class Factory implements RequestServerHandler.Factory {
/** Master aggregator handler */ /** Master aggregator handler */
private final MasterAggregatorHandler aggregatorHandler; private final MasterGlobalCommHandler commHandler;


/** /**
* Constructor * Constructor
* *
* @param aggregatorHandler Master aggregator handler * @param commHandler Master global communication handler
*/ */
public Factory(MasterAggregatorHandler aggregatorHandler) { public Factory(MasterGlobalCommHandler commHandler) {
this.aggregatorHandler = aggregatorHandler; this.commHandler = commHandler;
} }


@Override @Override
Expand All @@ -76,7 +76,7 @@ public RequestServerHandler newHandler(
TaskInfo myTaskInfo, TaskInfo myTaskInfo,
Thread.UncaughtExceptionHandler exceptionHandler) { Thread.UncaughtExceptionHandler exceptionHandler) {
return new MasterRequestServerHandler(workerRequestReservedMap, conf, return new MasterRequestServerHandler(workerRequestReservedMap, conf,
myTaskInfo, aggregatorHandler, exceptionHandler); myTaskInfo, commHandler, exceptionHandler);
} }
} }
} }
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.giraph.comm.requests;

import org.apache.giraph.master.MasterGlobalCommHandler;
import org.apache.giraph.io.InputType;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
* A request which workers will send to master to ask it to give them splits
*/
public class AskForInputSplitRequest extends WritableRequest
implements MasterRequest {
/** Type of split we are requesting */
private InputType splitType;
/** Task id of worker which requested the split */
private int workerTaskId;

/**
* Constructor
*
* @param splitType Type of split we are requesting
* @param workerTaskId Task id of worker which requested the split
*/
public AskForInputSplitRequest(InputType splitType, int workerTaskId) {
this.splitType = splitType;
this.workerTaskId = workerTaskId;
}

/**
* Constructor used for reflection only
*/
public AskForInputSplitRequest() {
}

@Override
public void doRequest(MasterGlobalCommHandler commHandler) {
commHandler.getInputSplitsHandler().sendSplitTo(splitType, workerTaskId);
}

@Override
void readFieldsRequest(DataInput in) throws IOException {
splitType = InputType.values()[in.readInt()];
workerTaskId = in.readInt();
}

@Override
void writeRequest(DataOutput out) throws IOException {
out.writeInt(splitType.ordinal());
out.writeInt(workerTaskId);
}

@Override
public RequestType getType() {
return RequestType.ASK_FOR_INPUT_SPLIT_REQUEST;
}
}
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


package org.apache.giraph.comm.requests; package org.apache.giraph.comm.requests;


import org.apache.giraph.master.MasterAggregatorHandler; import org.apache.giraph.master.MasterGlobalCommHandler;


/** /**
* Interface for requests sent to master to extend * Interface for requests sent to master to extend
Expand All @@ -27,7 +27,7 @@ public interface MasterRequest {
/** /**
* Execute the request * Execute the request
* *
* @param aggregatorHandler Master aggregator handler * @param commHandler Master communication handler
*/ */
void doRequest(MasterAggregatorHandler aggregatorHandler); void doRequest(MasterGlobalCommHandler commHandler);
} }
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.giraph.comm.requests;

import org.apache.giraph.comm.ServerData;
import org.apache.giraph.io.InputType;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
* A request which master will send to workers to give them splits
*/
public class ReplyWithInputSplitRequest extends WritableRequest
implements WorkerRequest {
/** Type of input split */
private InputType splitType;
/** Serialized input split */
private byte[] serializedInputSplit;

/**
* Constructor
*
* @param splitType Type of input split
* @param serializedInputSplit Serialized input split
*/
public ReplyWithInputSplitRequest(InputType splitType,
byte[] serializedInputSplit) {
this.splitType = splitType;
this.serializedInputSplit = serializedInputSplit;
}

/**
* Constructor used for reflection only
*/
public ReplyWithInputSplitRequest() {
}

@Override
void readFieldsRequest(DataInput in) throws IOException {
splitType = InputType.values()[in.readInt()];
int size = in.readInt();
serializedInputSplit = new byte[size];
in.readFully(serializedInputSplit);
}

@Override
void writeRequest(DataOutput out) throws IOException {
out.writeInt(splitType.ordinal());
out.writeInt(serializedInputSplit.length);
out.write(serializedInputSplit);
}

@Override
public void doRequest(ServerData serverData) {
serverData.getServiceWorker().getInputSplitsHandler().receivedInputSplit(
splitType, serializedInputSplit);
}

@Override
public RequestType getType() {
return RequestType.REPLY_WITH_INPUT_SPLIT_REQUEST;
}
}
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ public enum RequestType {
/** Send aggregators from worker owner to other workers */ /** Send aggregators from worker owner to other workers */
SEND_AGGREGATORS_TO_WORKER_REQUEST(SendAggregatorsToWorkerRequest.class), SEND_AGGREGATORS_TO_WORKER_REQUEST(SendAggregatorsToWorkerRequest.class),
/** Send message from worker to worker */ /** Send message from worker to worker */
SEND_WORKER_TO_WORKER_MESSAGE_REQUEST(SendWorkerToWorkerMessageRequest.class); SEND_WORKER_TO_WORKER_MESSAGE_REQUEST(SendWorkerToWorkerMessageRequest.class),
/** Send request for input split from worker to master */
ASK_FOR_INPUT_SPLIT_REQUEST(AskForInputSplitRequest.class),
/** Send request with granted input split from master to workers */
REPLY_WITH_INPUT_SPLIT_REQUEST(ReplyWithInputSplitRequest.class);


/** Class of request which this type corresponds to */ /** Class of request which this type corresponds to */
private final Class<? extends WritableRequest> requestClass; private final Class<? extends WritableRequest> requestClass;
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


import java.io.IOException; import java.io.IOException;


import org.apache.giraph.master.MasterAggregatorHandler; import org.apache.giraph.master.MasterGlobalCommHandler;


/** /**
* Request to send final aggregated values from worker which owns * Request to send final aggregated values from worker which owns
Expand All @@ -45,9 +45,9 @@ public SendReducedToMasterRequest() {
} }


@Override @Override
public void doRequest(MasterAggregatorHandler aggregatorHandler) { public void doRequest(MasterGlobalCommHandler commHandler) {
try { try {
aggregatorHandler.acceptReducedValues(getDataInput()); commHandler.getAggregatorHandler().acceptReducedValues(getDataInput());
} catch (IOException e) { } catch (IOException e) {
throw new IllegalStateException("doRequest: " + throw new IllegalStateException("doRequest: " +
"IOException occurred while processing request", e); "IOException occurred while processing request", e);
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public FinishedSuperstepStats(long numLocalVertices,
long numEdges, long numEdges,
boolean mustLoadCheckpoint, boolean mustLoadCheckpoint,
CheckpointStatus checkpointStatus) { CheckpointStatus checkpointStatus) {
super(numVertices, numEdges); super(numVertices, numEdges, 0);
this.localVertexCount = numLocalVertices; this.localVertexCount = numLocalVertices;
this.allVerticesHalted = allVerticesHalted; this.allVerticesHalted = allVerticesHalted;
this.mustLoadCheckpoint = mustLoadCheckpoint; this.mustLoadCheckpoint = mustLoadCheckpoint;
Expand Down
Loading

0 comments on commit 5b0cd0e

Please sign in to comment.