Skip to content

Commit

Permalink
Refactoring towards JSON-based vertex data serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Josh Wills committed Apr 8, 2012
1 parent 86e0f94 commit 241e871
Show file tree
Hide file tree
Showing 10 changed files with 293 additions and 228 deletions.
71 changes: 71 additions & 0 deletions src/main/java/com/cloudera/science/matching/VertexData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
*
* Cloudera, Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"). You may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for
* the specific language governing permissions and limitations under the
* License.
*/
package com.cloudera.science.matching;

import java.util.List;
import java.util.Map;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import com.google.common.collect.Maps;

/**
*
*/
public class VertexData {
private String vertexId;
private boolean bidder;
private Map<String, Integer> edges;

private String matchId;
private double price;

public VertexData() {
}

public VertexData(String vertexId, boolean bidder, Map<String, Integer> edges) {
this.vertexId = vertexId;
this.bidder = bidder;
this.edges = edges;
}

public VertexData(Text vertexId, VertexState vertexState, Map<Text, IntWritable> edges) {
this.vertexId = vertexId.toString();
this.bidder = vertexState.isBidder();
this.edges = Maps.newHashMap();
for (Map.Entry<Text, IntWritable> e : edges.entrySet()) {
this.edges.put(e.getKey().toString(), e.getValue().get());
}
this.matchId = vertexState.getMatchId().toString();
this.price = vertexState.getPrice();
}

public Text getVertexId() {
return new Text(vertexId);
}

public VertexState getVertexState() {
return new VertexState(bidder, new Text(matchId), price);
}

public Map<Text, IntWritable> getEdges() {
Map<Text, IntWritable> out = Maps.newHashMap();
for (Map.Entry<String, Integer> e : edges.entrySet()) {
out.put(new Text(e.getKey()), new IntWritable(e.getValue()));
}
return out;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,22 @@
* the specific language governing permissions and limitations under the
* License.
*/
package com.cloudera.science.matching.graph;
package com.cloudera.science.matching;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;

import com.google.common.collect.Lists;

/**
* Maintains the internal state of a vertex in the bipartite graph, with
*/
public class VertexState implements Writable {

private boolean bidder;
private Text ownerId = new Text();
private List<Text> ownedIds = Lists.newArrayListWithCapacity(0);
private Text matchId = new Text();
private double price = 0.0;

public VertexState() { }
Expand All @@ -41,20 +36,22 @@ public VertexState(boolean bidder) {
this.bidder = bidder;
}

public boolean isBidder() {
return bidder;
public VertexState(boolean bidder, Text matchId, double price) {
this.bidder = bidder;
this.matchId = matchId;
this.price = price;
}

public Text getOwnerId() {
return ownerId;
public boolean isBidder() {
return bidder;
}

public void setOwnerId(Text ownerId) {
this.ownerId = ownerId;
public Text getMatchId() {
return matchId;
}

public List<Text> getOwnedIds() {
return ownedIds;
public void setMatchId(Text ownerId) {
this.matchId = ownerId;
}

public double getPrice() {
Expand All @@ -68,31 +65,18 @@ public void setPrice(double price) {
@Override
public void readFields(DataInput in) throws IOException {
bidder = in.readBoolean();
if (bidder) {
ownedIds.clear();
int sz = WritableUtils.readVInt(in);
for (int i = 0; i < sz; i++) {
Text ownedId = new Text();
ownedId.readFields(in);
ownedIds.add(ownedId);
}
} else {
ownerId.readFields(in);
if (!bidder) {
price = in.readDouble();
}
matchId.readFields(in);
}

@Override
public void write(DataOutput out) throws IOException {
out.writeBoolean(bidder);
if (bidder) {
WritableUtils.writeVInt(out, ownedIds.size());
for (Text ownedId : ownedIds) {
ownedId.write(out);
}
} else {
ownerId.write(out);
if (!bidder) {
out.writeDouble(price);
}
matchId.write(out);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,29 @@
*/
package com.cloudera.science.matching.crunch;

import static com.cloudera.crunch.type.writable.Writables.*;
import static com.cloudera.crunch.type.avro.Avros.*;

import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.cloudera.crunch.DoFn;
import com.cloudera.crunch.Emitter;
import com.cloudera.crunch.MapFn;
import com.cloudera.crunch.Pair;
import com.cloudera.crunch.Pipeline;
import com.cloudera.crunch.fn.MapValuesFn;
import com.cloudera.crunch.impl.mr.MRPipeline;
import com.cloudera.crunch.io.From;
import com.cloudera.crunch.io.To;
import com.cloudera.science.matching.VertexDataWritable;
import com.cloudera.crunch.type.writable.WritableTypeFamily;
import com.cloudera.crunch.util.PTypes;
import com.cloudera.science.matching.VertexData;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

/**
*
Expand All @@ -59,30 +62,30 @@ public void process(String input, Emitter<Pair<String, Pair<String, Integer>>> e
}
}

public static class WriteVertexFn extends MapValuesFn<String, Iterable<Pair<String, Integer>>, VertexDataWritable> {
public static class WriteVertexFn extends MapFn<Pair<String, Iterable<Pair<String, Integer>>>, VertexData> {
@Override
public VertexDataWritable map(Iterable<Pair<String, Integer>> v) {
List<Pair<String, Integer>> pairs = Lists.newArrayList(v);
int[] values = new int[pairs.size()];
Text[] targets = new Text[pairs.size()];
for (int i = 0; i < pairs.size(); i++) {
Pair<String, Integer> p = pairs.get(i);
targets[i] = new Text(p.first());
values[i] = p.second();
}
public VertexData map(Pair<String, Iterable<Pair<String, Integer>>> v) {
List<Pair<String, Integer>> pairs = Lists.newArrayList(v.second());
Map<String, Integer> targets = Maps.newHashMap();
boolean bidder = true;
for (int i = 0; i < values.length; i++) {
if (values[i] < 0) {
if (i == 0) {
for (int i = 0; i < pairs.size(); i++) {
String id = pairs.get(i).first();
Integer score = pairs.get(i).second();
if (i == 0) {
if (score < 0) {
bidder = false;
} else if (bidder) {
throw new IllegalStateException("Invalid input: vertex id occurs in both sides of the graph");
}
} else if (!bidder) {
throw new IllegalStateException("Invalid input: vertex id occurs in both sides of the graph");

} else if (bidder && score < 0) {
throw new IllegalStateException(
String.format("Invalid input: vertex id %s occurs in both sides of the graph", id));
} else if (!bidder && score >= 0) {
throw new IllegalStateException(
String.format("Invalid input: vertex id %s occurs in both sides of the graph", id));
}
targets.put(id, score);
}
return new VertexDataWritable(bidder, targets, values);
return new VertexData(v.first(), bidder, targets);
}
}

Expand All @@ -108,8 +111,8 @@ public int run(String[] args) throws Exception {
p.read(From.textFile(args[0]))
.parallelDo(new TwoVerticesFn(args[2]), tableOf(strings(), pairs(strings(), ints())))
.groupByKey()
.parallelDo(new WriteVertexFn(), tableOf(strings(), writables(VertexDataWritable.class)))
.write(To.sequenceFile(args[1]));
.parallelDo(new WriteVertexFn(), PTypes.jsonString(VertexData.class, WritableTypeFamily.getInstance()))
.write(To.textFile(args[1]));
p.done();
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,24 @@ public double getValue() {

@Override
public void readFields(DataInput in) throws IOException {
value = in.readDouble();
if (vertexId == null) {
vertexId = new Text();
}
vertexId.readFields(in);
value = in.readDouble();
}

@Override
public void write(DataOutput out) throws IOException {
vertexId.write(out);
out.writeDouble(value);
vertexId.write(out);
}

@Override
public int compareTo(AuctionMessage other) {
if (other.value == value) {
return vertexId.hashCode() - other.vertexId.hashCode();
}
return (int) (other.value - value);
}
}
Loading

0 comments on commit 241e871

Please sign in to comment.