Skip to content

Commit

Permalink
Bunch of docs and a new test case for a bug I thought of while writin…
Browse files Browse the repository at this point in the history
…g docs
  • Loading branch information
Josh Wills committed Apr 11, 2012
1 parent be31c68 commit 53573fa
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 52 deletions.
Expand Up @@ -19,7 +19,6 @@
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
Expand All @@ -45,8 +44,12 @@
*/
public class InputPreparer extends Configured implements Tool {

/**
* Map task that splits a single line of delimited text that consists of ID1,ID2,WEIGHT into the data
* that is fed into the reduce task that aggregates the information about each ID into a single
* record.
*/
public static class TwoVerticesFn extends DoFn<String, Pair<String, Pair<String, Integer>>> {

private final String sep;

public TwoVerticesFn(String sep) {
Expand All @@ -66,6 +69,10 @@ public void process(String input, Emitter<Pair<String, Pair<String, Integer>>> e
}
}

/**
* Reduce task that aggregates the data about each bidder/object vertex and checks for error
* conditions, like an identifier that is both a bidder and an object.
*/
public static class WriteVertexFn extends MapFn<Pair<String, Iterable<Pair<String, Integer>>>, VertexData> {
@Override
public VertexData map(Pair<String, Iterable<Pair<String, Integer>>> v) {
Expand Down Expand Up @@ -102,14 +109,18 @@ public PCollection<VertexData> exec(PCollection<String> input, String sep) {
@Override
public int run(String[] args) throws Exception {
if (args.length < 3) {
System.err.println("Usage: <input> <output> <sepchar>");
System.err.println("Usage: <input> <output> <delim>");
System.err.println("The input should be a file with 3-columns: bidder ID, object ID, and weight,");
System.err.println("separated by the given delimiter.");
System.err.println("The output will be a text file of JSON-serialized information about each vertex");
System.err.println("that is the input to the BipartiteMatchingRunner.");
return 1;
}

Pipeline p = new MRPipeline(InputPreparer.class, getConf());
exec(p.read(From.textFile(args[0])), args[2]).write(To.textFile(args[1]));
PCollection<VertexData> data = exec(p.read(From.textFile(args[0])), args[2]);
data.write(To.textFile(args[1]));
p.done();

return 0;
}

Expand Down
Expand Up @@ -38,14 +38,21 @@ public class OutputProcessor extends Configured implements Tool {

private static final PType<VertexData> vType = PTypes.jsonString(VertexData.class, WritableTypeFamily.getInstance());

/**
* Extracts the ID1,ID2,WEIGHT values from the JSON data written by the Giraph job.
*
* @param giraphOutput A {@code PCollection} that represents the output of the Giraph job.
* @return the ID1,ID2,WEIGHT values for each line of output.
*/
public static PCollection<String> exec(PCollection<VertexData> giraphOutput) {
return giraphOutput.parallelDo(new DoFn<VertexData, String>() {
@Override
public void process(VertexData input, Emitter<String> emitter) {
if (input.isBidder()) {
String vertexId = input.getVertexId();
String matchId = input.getMatchId();
Integer score = input.getEdges().get(matchId);
emitter.emit(String.format("%s,%s,%s", input.getVertexId(), matchId, score));
emitter.emit(String.format("%s,%s,%s", vertexId, matchId, score));
}
}
}, Writables.strings());
Expand All @@ -55,6 +62,10 @@ public void process(VertexData input, Emitter<String> emitter) {
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: <input> <output>");
System.err.println("The input should be the output of the BipartiteMatchingRunner, and the output");
System.err.println("of this job is a CSV file containing ID1,ID2,WEIGHT values for the matched");
System.err.println("pairs of bidder/object vertices in the bipartite graph.");
return 1;
}
Pipeline p = new MRPipeline(OutputProcessor.class, getConf());
exec(p.read(From.textFile(args[0], vType))).write(To.textFile(args[1]));
Expand Down
Expand Up @@ -23,14 +23,20 @@
import org.apache.hadoop.util.ToolRunner;

/**
*
* Main class that runs the Giraph implementation of the auction algorithm for
* solving assignment problems.
*
*/
public class BipartiteMatchingRunner extends Configured implements Tool {

@Override
public int run(String[] args) throws Exception {
if (args.length != 3) {
System.err.println("Usage: <input> <output> <numworkers>");
System.err.println("The input should be the output of the InputPreparer Crunch pipeline.");
System.err.println("The output is the directory where the output of the matching will be");
System.err.println("written, and the numworkers should be <= the number of map slots available");
System.err.println("on your Hadoop cluster.");
return 1;
}

Expand Down
Expand Up @@ -30,96 +30,82 @@
import com.google.common.collect.Maps;

/**
*
* Contains the logic for computing bids and prices at each node in the bipartite graph at each
* step in the computation. The implementation of the {@code compute} method follows Bertsekas' auction
* algorithm.
*
* @see <a href="http://18.7.29.232/bitstream/handle/1721.1/3154/P-1908-20783037.pdf?sequence=1">Algorithm Tutorial</a>
*/
public class BipartiteMatchingVertex extends EdgeListVertex<Text, VertexState, IntWritable, AuctionMessage> {

private static final BigDecimal REALLY_BIG_NUMBER = new BigDecimal(1000L * 1000L * 1000L * 1000L * 1000L);

public Map<Text, IntWritable> getEdges() {
Map<Text, IntWritable> out = Maps.newHashMap();
for (Text vertexId : this) {
out.put(vertexId, getEdgeValue(vertexId));
}
return out;
}

private AuctionMessage getMax(List<AuctionMessage> values, Text currentMatchId) {
Collections.sort(values);
if (currentMatchId == null || currentMatchId.toString().isEmpty()) {
return values.get(0);
} else {
AuctionMessage max = values.get(0);
if (max.getVertexId().equals(currentMatchId)) {
return max;
} else {
AuctionMessage currentValue = null;
for (int i = 1; i < values.size(); i++) {
if (values.get(i).getVertexId().equals(currentMatchId)) {
currentValue = values.get(i);
break;
}
}
if (currentValue != null) {
BigDecimal plusEps = currentValue.getValue().add(getEpsilon());
if (max.getValue().compareTo(plusEps) <= 0) {
return currentValue;
}
}
return max;
}
}
}

@Override
public void compute(Iterator<AuctionMessage> msgIterator) throws IOException {
long superstep = getSuperstep();
VertexState state = getVertexValue();
if (state.isBidder()) {
// Bidders only do work on even supersteps.
if (superstep % 2 == 0) {
// Need to track who I own.
// Load the data about which object I own, which I'm interested in,
// and their prices.
VertexPriceData vpd = new VertexPriceData(msgIterator, state.getPriceIndex());

// Update my object ownership if it has changed.
if (vpd.newMatchedId != null) {
Text currentMatchId = state.getMatchId();
if (currentMatchId != null && !currentMatchId.toString().isEmpty()) {
sendMsg(currentMatchId, newSignal(-1));
}
state.setMatchId(vpd.newMatchedId);
} else if (vpd.newLostId != null) {
state.setMatchId(null);
state.clearMatchId();
}

// Compute the value I assign to each object, based on its current price.
List<AuctionMessage> values = Lists.newArrayList();
for (Text vertexId : this) {
BigDecimal value = new BigDecimal(getEdgeValue(vertexId).get()).subtract(vpd.getPrice(vertexId));
values.add(new AuctionMessage(vertexId, value));
}

// Compare the value I get from the object I own now (if any) to the highest-value
// object that I am interested in.
Text currentMatchId = state.getMatchId();
AuctionMessage target = getMax(values, currentMatchId);
if (currentMatchId == null || !currentMatchId.equals(target.getVertexId())) {
BigDecimal bid = REALLY_BIG_NUMBER;
BigDecimal bid = REALLY_BIG_NUMBER; // Infinite bid, if it's the only match for me.
if (values.size() > 1) {
// Otherwise, compute the bid relative to the value I get from the first runner-up.
AuctionMessage runnerUp = values.get(1);
BigDecimal inc = target.getValue().subtract(runnerUp.getValue()).add(getEpsilon());
bid = vpd.getPrice(target.getVertexId()).add(inc);
}
// Make an offer to my new favorite vertex.
sendMsg(target.getVertexId(), newMsg(bid));
} else {
// Otherwise, I'm happy.
this.voteToHalt();
}
}
} else {
// Objects only do work on odd supersteps.
if (superstep % 2 == 1) {
BigDecimal price = state.getPrice();
List<AuctionMessage> bids = sortBids(msgIterator);

// Check to see if any of the inputs are actually a rejection signal
// from the current owner of this object.
AuctionMessage rejectionSignal = popRejection(bids);
if (rejectionSignal != null) {
state.setMatchId(null);
state.clearMatchId();
}

if (!bids.isEmpty()) {
Text currentMatchId = state.getMatchId();
AuctionMessage winningBid = bids.get(0);
Text newMatchId = winningBid.getVertexId();
// Verify that the high bidder beats the current best price.
if (currentMatchId == null ||
(!currentMatchId.equals(newMatchId) && winningBid.getValue().compareTo(price) > 0)) {
state.setMatchId(newMatchId);
Expand All @@ -140,7 +126,42 @@ public void compute(Iterator<AuctionMessage> msgIterator) throws IOException {
}
}

static class VertexPriceData {
public Map<Text, IntWritable> getEdges() {
Map<Text, IntWritable> out = Maps.newHashMap();
for (Text vertexId : this) {
out.put(vertexId, getEdgeValue(vertexId));
}
return out;
}

private AuctionMessage getMax(List<AuctionMessage> values, Text currentMatchId) {
Collections.sort(values);
if (currentMatchId == null || currentMatchId.toString().isEmpty()) {
return values.get(0);
} else {
AuctionMessage max = values.get(0);
if (max.getVertexId().equals(currentMatchId)) {
return max;
} else {
AuctionMessage currentValue = null;
for (int i = 1; i < values.size(); i++) {
if (values.get(i).getVertexId().equals(currentMatchId)) {
currentValue = values.get(i);
break;
}
}
if (currentValue != null) {
BigDecimal plusEps = currentValue.getValue().add(getEpsilon());
if (max.getValue().compareTo(plusEps) <= 0) {
return currentValue;
}
}
return max;
}
}
}

private static class VertexPriceData {
public Map<Text, BigDecimal> prices;
public Text newMatchedId;
public Text newLostId;
Expand Down
Expand Up @@ -62,6 +62,10 @@ public void setMatchId(Text ownerId) {
this.matchId = ownerId;
}

public void clearMatchId() {
this.matchId = new Text();
}

public BigDecimal getPrice() {
return price;
}
Expand All @@ -86,9 +90,8 @@ public void readFields(DataInput in) throws IOException {
for (int i = 0; i < sz; i++) {
Text vertexId = new Text();
vertexId.readFields(in);
Text price = new Text();
price.readFields(in);
priceIndex.put(vertexId, new BigDecimal(price.toString()));
String price = in.readUTF();
priceIndex.put(vertexId, new BigDecimal(price));
}
}

Expand All @@ -102,7 +105,7 @@ public void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, priceIndex.size());
for (Map.Entry<Text, BigDecimal> e : priceIndex.entrySet()) {
e.getKey().write(out);
new Text(e.getValue().toString()).write(out);
out.writeUTF(e.getValue().toString());
}
}
}
Expand Up @@ -101,6 +101,31 @@ public void testBothIndifferent() throws Exception {
assertEquals("2", out.get("4").getMatchId());
}

@Test
public void testSwaps() throws Exception {
VertexData d1 = new VertexData("1", true, ImmutableMap.of("4", 1, "5", 4, "6", 7));
VertexData d2 = new VertexData("2", true, ImmutableMap.of("4", 1, "5", 5, "6", 6));
VertexData d3 = new VertexData("3", true, ImmutableMap.of("4", 1, "5", 3, "6", 2));
VertexData d4 = new VertexData("4", false, ImmutableMap.of("1", -1, "2", -1, "3", -1));
VertexData d5 = new VertexData("5", false, ImmutableMap.of("1", -1, "2", -1, "3", -1));
VertexData d6 = new VertexData("6", false, ImmutableMap.of("1", -1, "2", -1, "3", -1));

String[] data = new String[] { mapper.writeValueAsString(d1),
mapper.writeValueAsString(d2),
mapper.writeValueAsString(d3),
mapper.writeValueAsString(d4),
mapper.writeValueAsString(d5),
mapper.writeValueAsString(d6),
};
Map<String, VertexData> out = run(data);
assertEquals("6", out.get("1").getMatchId());
assertEquals("5", out.get("2").getMatchId());
assertEquals("4", out.get("3").getMatchId());
assertEquals("1", out.get("6").getMatchId());
assertEquals("2", out.get("5").getMatchId());
assertEquals("3", out.get("4").getMatchId());
}

@Test
public void testSameDirectionalPrefs() throws Exception {
VertexData d1 = new VertexData("1", true, ImmutableMap.of("3", 1, "4", 2));
Expand Down

0 comments on commit 53573fa

Please sign in to comment.