Skip to content

Commit

Permalink
changed constructor to support Fields instead of Strings
Browse files Browse the repository at this point in the history
  • Loading branch information
drujensen committed Feb 18, 2009
1 parent 8faeb68 commit b5e922c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 34 deletions.
37 changes: 9 additions & 28 deletions src/java/cascading/hbase/HBaseFullScheme.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,13 @@ public class HBaseFullScheme extends Scheme
private static final Logger LOG = LoggerFactory.getLogger(HBaseFullScheme.class);

/** Field keyFields */
private String keyName;
private Fields keyFields;
private Fields keyField;

/** Field valueFields */
private String[] columnNames;
private Fields[] columnFields;
private byte[][] fields = null;

/**
* Constructor HBaseScheme creates a new HBaseScheme instance.
*
* @param keyName
* of type String
* @param columnName
* of type String
*/
public HBaseFullScheme(String keyName, String columnName)
{
this(keyName, new String[] { columnName });
}

/**
* Constructor HBaseScheme creates a new HBaseScheme instance.
Expand All @@ -78,19 +65,18 @@ public HBaseFullScheme(String keyName, String columnName)
* @param columnNames
* of type String[]
*/
public HBaseFullScheme(String keyName, String[] columnNames)
public HBaseFullScheme(Fields keyField, Fields[] columnFields)
{
this.keyName = keyName;
this.keyFields = new Fields(keyName);
this.columnNames = columnNames;
this.keyField = keyField;
this.columnFields = columnFields;

this.columnFields = new Fields[columnNames.length];
for (int i = 0; i < columnNames.length; i++)
this.columnNames = new String[columnFields.length];
for (int i = 0; i < columnFields.length; i++)
{
this.columnFields[i] = new Fields(columnNames[i]);
this.columnNames[i] = (String) columnFields[i].get(0);
}

setSourceSink(this.keyFields, this.columnFields);
setSourceSink(this.keyField, this.columnFields);

}

Expand All @@ -103,11 +89,6 @@ private void setSourceSink(Fields keyFields, Fields[] columnFields)
setSinkFields(allFields);
}

public String getKeyName()
{
return keyName;
}

/**
* Method getFamilyNames returns the familyNames of this HBaseScheme object.
*
Expand Down Expand Up @@ -161,7 +142,7 @@ private byte[][] getFields(String[] columnNames)

public void sink(TupleEntry tupleEntry, OutputCollector outputCollector) throws IOException
{
Tuple key = tupleEntry.selectTuple(keyFields);
Tuple key = tupleEntry.selectTuple(keyField);

byte[] keyBytes = Bytes.toBytes(key.getString(0));
BatchUpdate batchUpdate = new BatchUpdate(keyBytes);
Expand Down
12 changes: 6 additions & 6 deletions src/test/cascading/hbase/HBaseFullTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ public void testHBaseMultiFamily() throws IOException
Pipe parsePipe = new Each("insert", new Fields("line"), new RegexSplitter(new Fields("num", "content:lower",
"content:upper"), " "));

String keyName = "num";
String[] columnNames = { "content:lower", "content:upper" };
Tap hBaseTap = new HBaseFullTap("multitable", new HBaseFullScheme(keyName, columnNames), SinkMode.REPLACE);
Fields keyField = new Fields("num");
Fields[] columnFields = { new Fields( "content:lower") , new Fields("content:upper") };
Tap hBaseTap = new HBaseFullTap("multitable", new HBaseFullScheme(keyField, columnFields), SinkMode.REPLACE);

Flow parseFlow = new FlowConnector(properties).connect(source, hBaseTap, parsePipe);

Expand Down Expand Up @@ -110,9 +110,9 @@ private void verifySink(Flow flow, int expects) throws IOException
public void testGroupByCount() throws IOException
{

String keyName = "num";
String[] columnNames = { "content:lower", "content:upper" };
Tap source = new HBaseFullTap("multitable", new HBaseFullScheme(keyName, columnNames), SinkMode.REPLACE);
Fields keyField = new Fields("num");
Fields[] columnFields = { new Fields( "content:lower") , new Fields("content:upper") };
Tap source = new HBaseFullTap("multitable", new HBaseFullScheme(keyField, columnFields), SinkMode.REPLACE);

Scheme sinkScheme = new TextLine(new Fields("content:lower", "count"));
Tap sink = new Hfs(sinkScheme, "lowercount", true);
Expand Down

0 comments on commit b5e922c

Please sign in to comment.