Permalink
Browse files

CQL3 stuff: ColumnSlice API and other test cases

  • Loading branch information...
1 parent 121cbe0 commit 35376b1e3214d5d7fde680e70a1d56ff9bbe3b72 @easility easility committed Dec 15, 2013
@@ -51,11 +51,11 @@ public static Where createRowQuery(Key from, Key to, DboColumnMeta colMeta, Sele
if (colMeta != null) {
if (from != null) {
valFrom = colMeta.getStorageType().convertFromNoSql(from.getKey());
- valFrom = checkForBooleanAndNull(valFrom, indTable);
+ valFrom = checkForBooleanAndNull(valFrom, indTable, colMeta);
}
if (to != null) {
valTo = colMeta.getStorageType().convertFromNoSql(to.getKey());
- valTo = checkForBooleanAndNull(valTo, indTable);
+ valTo = checkForBooleanAndNull(valTo, indTable, colMeta);
}
} else
return selectWhere;
@@ -101,13 +101,9 @@ public static Where createRowQueryFromValues(List<byte[]> values, DboColumnMeta
return selectWhere;
}
- public static Object checkForBooleanAndNull(Object val, String indTable) {
+ public static Object checkForBooleanAndNull(Object val, String indTable, DboColumnMeta colMeta) {
if (val == null) {
- if (indTable.equalsIgnoreCase("IntegerIndice")) {
- return ByteBuffer.wrap(new byte[0]);
- } else {
- return "";
- }
+ return checkForNull(indTable, colMeta);
} else if (val instanceof Boolean) {
Boolean b = (Boolean) val;
if (b)
@@ -118,4 +114,15 @@ public static Object checkForBooleanAndNull(Object val, String indTable) {
return val;
}
+ public static Object checkForNull(String indTable, DboColumnMeta colMeta) {
+ if (indTable.equalsIgnoreCase("IntegerIndice")) {
+ // One more hack for Boolean
+ if (colMeta.getClassType().getName().equals("java.lang.Boolean"))
+ return 0;
+ else
+ return ByteBuffer.wrap(new byte[0]);
+ } else {
+ return "";
+ }
+ }
}
@@ -1,6 +1,5 @@
package com.alvazan.orm.layer9z.spi.db.cassandracql3;
-import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
@@ -101,7 +100,7 @@ private void persist(Persist action, MetaLookup ormSession) {
try {
PreparedStatement statement = session.prepare("INSERT INTO " + keys + "." + table + "(id, colname, colvalue) VALUES (?, ?, ?)");
BoundStatement boundStatement = new BoundStatement(statement);
- String colName = StandardConverters.convertFromBytes(String.class, c.getName());
+ String colName = StandardConverters.convertToString(c.getName());
checkIfRowExsits(table, rowkey, colName);
if (c.getValue() != null && c.getValue().length != 0) {
session.execute(boundStatement.bind(ByteBuffer.wrap(rowkey), colName, ByteBuffer.wrap(c.getValue())));
@@ -143,7 +142,6 @@ private void persistIndex(PersistIndex action, MetaLookup ormSession) {
byte[] value = column.getPrimaryKey();
try {
-
Object keyObject = null;
if (key != null) {
PreparedStatement statement = session.prepare("INSERT INTO " + keys + "." + table + "(id, colname, colvalue) VALUES (?, ?, ?)");
@@ -180,7 +178,6 @@ private void remove(Remove action, MetaLookup ormSession) {
throw new IllegalArgumentException("action param is missing ActionEnum so we know to remove entire row or just columns in the row");
switch (action.getAction()) {
case REMOVE_ENTIRE_ROW:
- //String rowKey = StandardConverters.convertFromBytes(String.class, action.getRowKey());
Clause eqClause = QueryBuilder.eq("id", ByteBuffer.wrap(action.getRowKey()));
Query query = QueryBuilder.delete().from(keys, table).where(eqClause);
session.execute(query);
@@ -197,7 +194,7 @@ private void removeColumns(Remove action, String table) {
String rowKey = StandardConverters.convertFromBytes(String.class, action.getRowKey());
if (rowKey != null) {
for (byte[] name : action.getColumns()) {
- String colName = StandardConverters.convertFromBytes(String.class, name);
+ String colName = StandardConverters.convertToString(name);
removeColumnImpl(action.getRowKey(), table, colName);
}
}
@@ -208,7 +205,7 @@ private void removeColumn(RemoveColumn action, MetaLookup ormSession) {
String table = lookupOrCreate(colFamily, ormSession);
String rowKey = StandardConverters.convertFromBytes(String.class, action.getRowKey());
if (rowKey != null) {
- String colName = StandardConverters.convertFromBytes(String.class, action.getColumn());
+ String colName = StandardConverters.convertToString(action.getColumn());
removeColumnImpl(action.getRowKey(), table, colName);
}
}
@@ -224,6 +221,7 @@ private void removeIndex(RemoveIndex action, MetaLookup ormSession) {
String colFamily = action.getIndexCfName();
if (colFamily.equalsIgnoreCase("BytesIndice"))
return;
+
String table = lookupOrCreate(colFamily, ormSession);
String rowKey = StandardConverters.convertFromBytes(String.class, action.getRowKey());
IndexColumn column = action.getColumn();
@@ -233,9 +231,9 @@ private void removeIndex(RemoveIndex action, MetaLookup ormSession) {
if (table.equalsIgnoreCase("StringIndice"))
indValue = StandardConverters.convertFromBytes(String.class, indexedValue);
else if (table.equalsIgnoreCase("IntegerIndice"))
- indValue = StandardConverters.convertFromBytes(Integer.class, indexedValue);
+ indValue = StandardConverters.convertFromBytes(Long.class, indexedValue);
else if (table.equalsIgnoreCase("DecimalIndice"))
- indValue = StandardConverters.convertFromBytes(BigDecimal.class, indexedValue);
+ indValue = StandardConverters.convertFromBytes(Float.class, indexedValue);
boolean exists = findIndexRow(table, rowKey, fk, indValue);
if (!exists) {
if (log.isInfoEnabled())
@@ -328,8 +326,14 @@ public void close() {
@Override
public AbstractCursor<Column> columnSlice(ColumnSliceInfo sliceInfo, Integer batchSize, BatchListener l, MetaLookup mgr) {
- // TODO Auto-generated method stub
- return null;
+ //Info info1 = lookupOrCreate(sliceInfo.getColFamily().getColumnFamily(), mgr);
+ /*if (info1 == null) {
+ return null;
+ }*/
+ String table = lookupOrCreate(sliceInfo.getColFamily().getColumnFamily(), mgr);
+ CursorColumnSliceCql cursor = new CursorColumnSliceCql(sliceInfo, l, batchSize, keys, table, session);
+ return cursor;
+
}
@Override
@@ -357,8 +361,6 @@ public void close() {
@Override
public AbstractCursor<IndexColumn> scanIndex(ScanInfo scanInfo, List<byte[]> values, BatchListener list, MetaLookup mgr) {
StartQueryListener listener = new StartQueryManyKeys(keys, scanInfo, session, values, false);
- //CursorForValues cursor = new CursorForValues(scanInfo, list, values, session, keys, listener);
- //return cursor;
return new CursorOfFutures(listener, list, scanInfo);
}
@@ -0,0 +1,218 @@
+package com.alvazan.orm.layer9z.spi.db.cassandracql3;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import com.alvazan.orm.api.z8spi.BatchListener;
+import com.alvazan.orm.api.z8spi.ColumnSliceInfo;
+import com.alvazan.orm.api.z8spi.iter.AbstractCursor;
+import com.alvazan.orm.api.z8spi.iter.StringLocal;
+
+import com.alvazan.orm.api.z8spi.action.Column;
+import com.alvazan.orm.api.z8spi.conv.StandardConverters;
+import com.datastax.driver.core.Query;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+
+class CursorColumnSliceCql<T> extends AbstractCursor<T> {
+
+ private BatchListener batchListener;
+ private Integer batchSize;
+ private String keys;
+ private int pointer = -1;
+ private ColumnSliceInfo sliceInfo;
+ private List<com.datastax.driver.core.Row> subList;
+ private byte[] from;
+ private byte[] to;
+ private byte[] rowKey;
+ private Boolean forward = null;
+ private String table;
+ private Class columnNameType = null;
+ private Session session = null;
+
+ public CursorColumnSliceCql(ColumnSliceInfo sliceInfo2, BatchListener bListener, Integer batchSize, String keys2, String table2, Session session2) {
+ this.batchListener = bListener;
+ this.batchSize = batchSize;
+ this.keys = keys2;
+ this.from = sliceInfo2.getFrom();
+ this.to = sliceInfo2.getTo();
+ this.table = table2;
+ this.rowKey = sliceInfo2.getRowKey();
+ this.sliceInfo = sliceInfo2;
+ this.columnNameType = sliceInfo2.getColumnNameType();
+ this.session = session2;
+ beforeFirst();
+
+ }
+
+ @Override
+ public String toString() {
+ String tabs = StringLocal.getAndAdd();
+ String retVal = "CursorColumnSliceCql[" + tabs + tabs + "]";
+ StringLocal.set(tabs.length());
+ return retVal;
+ }
+
+ @Override
+ public void beforeFirst() {
+ pointer = -1;
+ subList = null;
+ forward = true;
+
+ }
+
+ @Override
+ public void afterLast() {
+ pointer = -1;
+ subList = null;
+ forward = false;
+ }
+
+ @Override
+ public com.alvazan.orm.api.z8spi.iter.AbstractCursor.Holder<T> nextImpl() {
+ if (!forward)
+ throw new IllegalStateException(
+ "You must call beforeFirst to traverse the cursor forward, you cannot call next after calling previous due to limitations of talking to noSql apis");
+ fetchMoreResultsImpl();
+ pointer++;
+ if (pointer >= subList.size())
+ return null; // no more results
+ com.datastax.driver.core.Row column = subList.get(pointer);
+ return buildHolder(column);
+
+ }
+
+ @Override
+ public com.alvazan.orm.api.z8spi.iter.AbstractCursor.Holder<T> previousImpl() {
+ if (forward)
+ throw new IllegalStateException(
+ "You must call afterLast to traverse reverse. You cannot call previous after calling next due to limitations of calling into noSQL apis");
+ fetchMoreResultsImpl();
+ pointer++;
+ if (pointer >= subList.size())
+ return null; // no more results
+ com.datastax.driver.core.Row column = subList.get(pointer);
+ return buildHolder(column);
+ }
+
+ private void fetchMoreResultsImpl() {
+ if (subList != null) {
+ if (pointer < subList.size()) {
+ return;
+ } else if (batchSize == null) {
+ return;
+ } else if (subList.size() < batchSize) {
+ return;
+ }
+ }
+
+ pointer = -1;
+
+ if (batchListener != null)
+ batchListener.beforeFetchingNextBatch();
+ if (subList != null)
+ return;
+
+ if (batchListener != null)
+ batchListener.beforeFetchingNextBatch();
+
+ columnNameType = sliceInfo.getColumnNameType();
+ ResultSet resultSet = null;
+ Clause rkClause = QueryBuilder.eq("id", ByteBuffer.wrap(rowKey));
+ Query query = null;
+ query = QueryBuilder.select().all().from(keys, table).where(rkClause).disableTracing();
+
+ subList = new ArrayList<com.datastax.driver.core.Row>();
+ try {
+ resultSet = session.execute(query);
+ if (resultSet == null) {
+ return;
+ } else {
+ if (BigInteger.class.equals(columnNameType)) {
+ intColumnSlice(resultSet);
+ } else if (BigDecimal.class.equals(columnNameType)) {
+ decimalColumnSlice(resultSet);
+ } else if (String.class.equals(columnNameType)) {
+ stringColumSlice(resultSet);
+ } else
+ throw new UnsupportedOperationException("Type " + columnNameType.getName() + " is not allowed for ColumnSlice");
+
+ }
+ if (batchListener != null)
+ batchListener.afterFetchingNextBatch(2);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private com.alvazan.orm.api.z8spi.iter.AbstractCursor.Holder<T> buildHolder(com.datastax.driver.core.Row column) {
+ byte[] name = StandardConverters.convertFromString(byte[].class, column.getString("colname"));
+ ByteBuffer data = column.getBytes("colvalue");
+ byte[] val = new byte[data.remaining()];
+ data.get(val);
+ Column c = new Column();
+ c.setName(name);
+ if (val.length != 0)
+ c.setValue(val);
+ return new Holder<T>((T) c);
+ }
+
+ private void intColumnSlice(ResultSet resultSet) {
+ Map<BigInteger, com.datastax.driver.core.Row> map = new TreeMap<BigInteger, com.datastax.driver.core.Row>();
+ BigInteger fromField = StandardConverters.convertFromBytes(BigInteger.class, from);
+ BigInteger toField = StandardConverters.convertFromBytes(BigInteger.class, to);
+ for (com.datastax.driver.core.Row row : resultSet) {
+ String field = row.getString("colname");
+ BigInteger name = StandardConverters.convertFromBytes(BigInteger.class, StandardConverters.convertFromString(byte[].class, field));
+ if (name.compareTo(fromField) >= 0 && name.compareTo(toField) <= 0) {
+ map.put(name, row);
+ }
+ }
+ for (BigInteger field : map.keySet()) {
+ com.datastax.driver.core.Row row = map.get(field);
+ subList.add(row);
+ }
+ }
+
+ private void decimalColumnSlice(ResultSet resultSet) {
+ Map<BigDecimal, com.datastax.driver.core.Row> map = new TreeMap<BigDecimal, com.datastax.driver.core.Row>();
+ BigDecimal fromField = StandardConverters.convertFromBytes(BigDecimal.class, from);
+ BigDecimal toField = StandardConverters.convertFromBytes(BigDecimal.class, to);
+ for (com.datastax.driver.core.Row row : resultSet) {
+ String field = row.getString("colname");
+ BigDecimal name = StandardConverters.convertFromBytes(BigDecimal.class, StandardConverters.convertFromString(byte[].class, field));
+ if (name.compareTo(fromField) >= 0 && name.compareTo(toField) <= 0) {
+ map.put(name, row);
+ }
+ }
+ for (BigDecimal field : map.keySet()) {
+ com.datastax.driver.core.Row row = map.get(field);
+ subList.add(row);
+ }
+ }
+
+ private void stringColumSlice(ResultSet resultSet) {
+ Map<String, com.datastax.driver.core.Row> map = new TreeMap<String, com.datastax.driver.core.Row>();
+ String fromField = StandardConverters.convertFromBytes(String.class, from);
+ String toField = StandardConverters.convertFromBytes(String.class, to);
+ for (com.datastax.driver.core.Row row : resultSet) {
+ String field = row.getString("colname");
+ if (field.compareTo(fromField) >= 0 && field.compareTo(toField) <= 0) {
+ map.put(field, row);
+ }
+ }
+ for (String field : map.keySet()) {
+ com.datastax.driver.core.Row row = map.get(field);
+ subList.add(row);
+ }
+ }
+
+}
@@ -59,7 +59,7 @@ public String toString() {
String keys = "" + rowKeys;
if (rowKeys instanceof List)
keys = "List" + keys;
- String retVal = "CursorKeysToRowsMDB[" + tabs + keys
+ String retVal = "CursorKeysToRowsCQL3[" + tabs + keys
+ tabs + "]";
StringLocal.set(tabs.length());
return retVal;
@@ -295,7 +295,7 @@ private void fillCacheForCursor(Map<ByteArray, KeyValue<Row>> map, ResultSet cur
kv.setKey(cqlRowKey);
r.setKey(cqlRowKey);
- byte[] name = StandardConverters.convertToBytes(cqlRow.getString("colname"));
+ byte[] name = StandardConverters.convertFromString(byte[].class, cqlRow.getString("colname"));
ByteBuffer data = cqlRow.getBytes("colvalue");
byte[] val = new byte[data.remaining()];
data.get(val);
@@ -53,7 +53,7 @@ public StartQueryManyKeys(String keys, ScanInfo info, Session session2, List<byt
Object value = null;
value = columnMeta.getStorageType().convertFromNoSql(val);
- value = Cql3Util.checkForBooleanAndNull(value, indTable);
+ value = Cql3Util.checkForBooleanAndNull(value, indTable, columnMeta);
Clause valClause = QueryBuilder.eq("colname", value);
selectWhere.and(valClause);

0 comments on commit 35376b1

Please sign in to comment.