Skip to content

Commit

Permalink
hbase support
Browse files Browse the repository at this point in the history
  • Loading branch information
wenshao committed May 21, 2012
1 parent 401ec28 commit dbe458d
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 63 deletions.

This file was deleted.

Expand Up @@ -6,21 +6,26 @@

import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

import com.alibaba.druid.hdriver.impl.HBaseConnectionImpl;
import com.alibaba.druid.hdriver.impl.HPreparedStatementImpl;
import com.alibaba.druid.hdriver.impl.mapping.HMapping;
import com.alibaba.druid.hdriver.impl.mapping.HMappingDefaultImpl;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.visitor.SQLEvalVisitorUtils;

public class InsertExecutePlan extends SingleTableExecutePlan {

private Map<String, SQLExpr> columns = new LinkedHashMap<String, SQLExpr>();
private byte[] family = Bytes.toBytes("d");

@Override
public boolean execute(HPreparedStatementImpl statement) throws SQLException {
try {
HMapping mapping = this.getMapping();
if (mapping == null) {
mapping = new HMappingDefaultImpl();
}

HBaseConnectionImpl connection = statement.getConnection();
String dbType = connection.getConnectProperties().getProperty("dbType");

Expand All @@ -34,13 +39,14 @@ public boolean execute(HPreparedStatementImpl statement) throws SQLException {
if (value == null) {
continue;
}

byte[] bytes = mapping.toBytes(column, value);

byte[] bytes = HBaseUtils.toBytes(value);

if (put == null) { // first value is key, TODO
if (mapping.isRow(column)) {
put = new Put(bytes);
} else {
byte[] qualifier = Bytes.toBytes(column);
byte[] family = mapping.getFamily(column);
byte[] qualifier = mapping.getQualifier(column);
put.add(family, qualifier, bytes);
}
}
Expand Down
@@ -1,8 +1,12 @@
package com.alibaba.druid.hdriver.impl.execute;

import com.alibaba.druid.hdriver.impl.mapping.HMapping;

public class SingleTableExecutePlan extends ExecutePlanAdapter {

private String tableName;
private String tableName;

private HMapping mapping;

public String getTableName() {
return tableName;
Expand All @@ -12,4 +16,12 @@ public void setTableName(String tableName) {
this.tableName = tableName;
}

public HMapping getMapping() {
return mapping;
}

public void setMapping(HMapping mapping) {
this.mapping = mapping;
}

}
Expand Up @@ -13,13 +13,13 @@
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;

import com.alibaba.druid.hdriver.impl.HBaseConnectionImpl;
import com.alibaba.druid.hdriver.impl.HPreparedStatementImpl;
import com.alibaba.druid.hdriver.impl.HResultSetMetaDataImpl;
import com.alibaba.druid.hdriver.impl.HScannerResultSetImpl;
import com.alibaba.druid.hdriver.impl.mapping.HMapping;
import com.alibaba.druid.hdriver.impl.mapping.HMappingDefaultImpl;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr;
import com.alibaba.druid.sql.ast.expr.SQLBinaryOperator;
Expand All @@ -31,24 +31,12 @@ public class SingleTableQueryExecutePlan extends SingleTableExecutePlan {
private List<String> columeNames = new ArrayList<String>();
private List<SQLExpr> conditions = new ArrayList<SQLExpr>();

private byte[] family = Bytes.toBytes("d");

private HResultSetMetaDataImpl resultMetaData;

private HMapping mapping;

public SingleTableQueryExecutePlan(){

}

public HMapping getMapping() {
return mapping;
}

public void setMapping(HMapping mapping) {
this.mapping = mapping;
}

public List<SQLExpr> getConditions() {
return conditions;
}
Expand All @@ -72,6 +60,11 @@ public void setResultMetaData(HResultSetMetaDataImpl resultMetaData) {
@Override
public HScannerResultSetImpl executeQuery(HPreparedStatementImpl statement) throws SQLException {
try {
HMapping mapping = this.getMapping();
if (mapping == null) {
mapping = new HMappingDefaultImpl();
}

HBaseConnectionImpl connection = statement.getConnection();
String dbType = connection.getConnectProperties().getProperty("dbType");

Expand All @@ -82,8 +75,8 @@ public HScannerResultSetImpl executeQuery(HPreparedStatementImpl statement) thro
String fieldName = ((SQLIdentifierExpr) condition.getLeft()).getName();
Object value = SQLEvalVisitorUtils.eval(dbType, condition.getRight(), statement.getParameters());

byte[] bytes = HBaseUtils.toBytes(value);
if ("id".equals(fieldName)) {
byte[] bytes = mapping.toBytes(fieldName, value);
if (mapping.isRow(fieldName)) {
if (condition.getOperator() == SQLBinaryOperator.GreaterThanOrEqual) {
scan.setStartRow(bytes);
} else if (condition.getOperator() == SQLBinaryOperator.LessThan) {
Expand All @@ -95,7 +88,8 @@ public HScannerResultSetImpl executeQuery(HPreparedStatementImpl statement) thro
throw new SQLException("TODO");
}
} else {
byte[] qualifier = Bytes.toBytes(fieldName);
byte[] qualifier = mapping.getQualifier(fieldName);
byte[] family = mapping.getFamily(fieldName);

CompareOp compareOp;
if (condition.getOperator() == SQLBinaryOperator.Equality) {
Expand Down
@@ -1,16 +1,20 @@
package com.alibaba.druid.hdriver.impl.mapping;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Result;

public interface HMapping {

byte[] getFamily(String columnName);

byte[] getQualifier(String columnName);

byte[] getRow(Result result, String columnName);

boolean isRow(String columnName);

Object getObject(Result result, String name);
Object getObject(Result result, String columnName);

byte[] toBytes(String columnName, Object value) throws IOException;
}
@@ -1,5 +1,8 @@
package com.alibaba.druid.hdriver.impl.mapping;

import java.io.IOException;
import java.math.BigDecimal;

import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;

Expand Down Expand Up @@ -30,4 +33,32 @@ public Object getObject(Result result, String columnName) {
return bytes;
}

@Override
public byte[] toBytes(String columnName, Object value) throws IOException {
if (value == null) {
return null;
}

byte[] bytes;
if (value instanceof String) {
String strValue = (String) value;
bytes = Bytes.toBytes(strValue);
} else if (value instanceof Integer) {
int intValue = ((Integer) value).intValue();
bytes = Bytes.toBytes(intValue);
} else if (value instanceof Long) {
long longValue = ((Long) value).longValue();
bytes = Bytes.toBytes(longValue);
} else if (value instanceof Boolean) {
boolean booleanValue = ((Boolean) value).booleanValue();
bytes = Bytes.toBytes(booleanValue);
} else if (value instanceof BigDecimal) {
BigDecimal decimalValue = (BigDecimal) value;
bytes = Bytes.toBytes(decimalValue);
} else {
throw new IOException("TODO"); // TODO
}

return bytes;
}
}
@@ -1,5 +1,8 @@
package com.alibaba.druid.hdriver.impl.mapping;

import java.io.IOException;
import java.math.BigDecimal;

import org.apache.hadoop.hbase.util.Bytes;

public class HMappingDefaultImpl extends HMappingAdapter implements HMapping {
Expand Down Expand Up @@ -35,4 +38,5 @@ public boolean isRow(String columnName) {

return false;
}

}

0 comments on commit dbe458d

Please sign in to comment.