Skip to content

Commit

Permalink
Merge branch 'master' of http://github.com/ryanobjc/YCSB into ryanobj…
Browse files Browse the repository at this point in the history
…c-master
  • Loading branch information
Brian Cooper committed Sep 29, 2010
2 parents 1acf1d0 + c84a7c3 commit 6676f08
Showing 1 changed file with 38 additions and 51 deletions.
89 changes: 38 additions & 51 deletions db/hbase/src/com/yahoo/ycsb/db/HBaseClient.java
Expand Up @@ -27,6 +27,8 @@
//import java.util.Set;
//import java.util.Vector;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
//import org.apache.hadoop.hbase.client.Scanner;
import org.apache.hadoop.hbase.client.Get;
Expand All @@ -45,8 +47,10 @@
*/
public class HBaseClient extends com.yahoo.ycsb.DB
{
private static final Configuration config = HBaseConfiguration.create();

public boolean _debug=false;

public String _table="";
public HTable _hTable=null;
public String _columnFamily="";
Expand All @@ -57,8 +61,8 @@ public class HBaseClient extends com.yahoo.ycsb.DB
public static final int HttpError=-2;
public static final int NoMatchingRecord=-3;

public static Object tableLock = new Object();
public static final Object tableLock = new Object();

/**
* Initialize any state for this DB.
* Called once per DB instance; there is one DB instance per client thread.
Expand Down Expand Up @@ -99,7 +103,6 @@ public void cleanup() throws DBException
public void getHTable(String table) throws IOException
{
synchronized (tableLock) {
HBaseConfiguration config = new HBaseConfiguration();
_hTable = new HTable(config, table);
//2 suggestions from http://ryantwopointoh.blogspot.com/2009/01/performance-of-hbase-importing.html
_hTable.setAutoFlush(false);
Expand Down Expand Up @@ -136,36 +139,43 @@ public int read(String table, String key, Set<String> fields, HashMap<String,Str
}

Result r = null;
try
try
{
if (_debug) {
System.out.println("Doing read from HBase columnfamily "+_columnFamily);
System.out.println("Doing read for key: "+key);
}
Get g = new Get(Bytes.toBytes(key));
if (fields == null) {
g.addFamily(_columnFamilyBytes);
} else {
for (String field : fields) {
g.addColumn(_columnFamilyBytes, Bytes.toBytes(field));
}
}
r = _hTable.get(g);
}
catch (IOException e)
{
System.err.println("Error doing get: "+e);
return ServerError;
}
catch (ConcurrentModificationException e)
catch (ConcurrentModificationException e)
{
//do nothing for now...need to understand HBase concurrency model better
return ServerError;
}

//now parse out all desired fields
if (fields != null) {
for (String field : fields) {
byte[] value = r.getValue(_columnFamilyBytes, Bytes.toBytes(field));
result.put(field,Bytes.toString(value));
if (_debug) {
System.out.println("Result for field: "+field+" is: "+Bytes.toString(value));
}
}
}
for (KeyValue kv : r.raw()) {
result.put(
Bytes.toString(kv.getQualifier()),
Bytes.toString(kv.getValue()));
if (_debug) {
System.out.println("Result for field: "+Bytes.toString(kv.getQualifier())+
" is: "+Bytes.toString(kv.getValue()));
}

}
return Ok;
}

Expand Down Expand Up @@ -197,71 +207,48 @@ public int scan(String table, String startkey, int recordcount, Set<String> fiel
}

Scan s = new Scan(Bytes.toBytes(startkey));
//HBase has no record limit. Here, assume recordcount is small enough to bring back in one call.
//HBase has no record limit. Here, assume recordcount is small enough to bring back in one call.
//We get back recordcount records
s.setCaching(recordcount);

//add specified fields or else all fields
if (fields == null)
if (fields == null)
{
s.addFamily(_columnFamilyBytes);
}
else
else
{
for (String field : fields)
for (String field : fields)
{
s.addColumn(_columnFamilyBytes,Bytes.toBytes(field));
}
}

//get results
ResultScanner scanner = null;
try {
scanner = _hTable.getScanner(s);
int numResults = 0;
for (Result rr = scanner.next(); rr != null; rr = scanner.next())
for (Result rr = scanner.next(); rr != null; rr = scanner.next())
{
//get row key
String key = Bytes.toString(rr.getRow());
if (_debug)
if (_debug)
{
System.out.println("Got scan result for key: "+key);
}

HashMap<String,String> rowResult = new HashMap<String, String>();

//parse row
if (fields != null) //parse specified field list
{
for (String field : fields) {
byte[] value = rr.getValue(_columnFamilyBytes,Bytes.toBytes(field));
rowResult.put(field,Bytes.toString(value));
if (_debug)
{
System.out.println("Result for field: "+field+" is: "+Bytes.toString(value));
}
}
}
else //get all fields
{
//HBase can return a mapping for all columns in a column family
NavigableMap<byte[], byte[]> scanMap = rr.getFamilyMap(_columnFamilyBytes);
for (byte[] fieldkey : scanMap.keySet())
{
String value = Bytes.toString(scanMap.get(fieldkey));
rowResult.put(Bytes.toString(fieldkey),value);
if (_debug)
{
System.out.println("Result for field: "+Bytes.toString(fieldkey)+" is: "+value);
}

}

for (KeyValue kv : rr.raw()) {
rowResult.put(
Bytes.toString(kv.getQualifier()),
Bytes.toString(kv.getValue()));
}
//add rowResult to result vector
result.add(rowResult);
numResults++;
if (numResults >= recordcount) //if hit recordcount, bail out
if (numResults >= recordcount) //if hit recordcount, bail out
{
break;
}
Expand Down

0 comments on commit 6676f08

Please sign in to comment.