Permalink
Browse files

CQL3 stuff

  • Loading branch information...
1 parent b90380f commit 6c2103a1a20710d13324fe10dfa677a569627062 @easility easility committed Sep 25, 2013
View
4 src/main/java/com/alvazan/orm/api/z8spi/SpiConstants.java
@@ -4,5 +4,7 @@
public static final String CASSANDRA_BUILDER = "nosql.cassandra.builder";
public static final String CASSANDRA_CF_CREATE_CALLBACK = "nosql.cassandra.createcfcallback";
-
+
+ public static final String NULL_STRING_FORCQL3 = "_n";
+
}
View
117 src/main/java/com/alvazan/orm/layer9z/spi/db/cassandracql3/Cql3Util.java
@@ -0,0 +1,117 @@
+package com.alvazan.orm.layer9z.spi.db.cassandracql3;
+
+
+import java.nio.ByteBuffer;
+
+import java.util.List;
+
+import com.alvazan.orm.api.z8spi.Key;
+
+import com.alvazan.orm.api.z8spi.action.IndexColumn;
+import com.alvazan.orm.api.z8spi.conv.StandardConverters;
+import com.alvazan.orm.api.z8spi.meta.DboColumnMeta;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Select;
+import com.datastax.driver.core.querybuilder.Select.Where;
+
+
+public class Cql3Util {
+ public static IndexColumn convertToIndexCol(com.datastax.driver.core.Row row, String tableName) {
+ Object indValue = null;
+ if (tableName.equalsIgnoreCase("StringIndice")) {
+ indValue = row.getString("colname");
+ } else if (tableName.equalsIgnoreCase("IntegerIndice")) {
+ indValue = row.getLong("colname");
+ } else if (tableName.equalsIgnoreCase("DecimalIndice")) {
+ indValue = row.getFloat("colname");
+ }
+ ByteBuffer data = row.getBytes("colvalue");
+ byte[] val = new byte[data.remaining()];
+ data.get(val);
+ IndexColumn c = new IndexColumn();
+ // c.setColumnName(columnName); Will we ever need this now?
+ if (val != null) {
+ c.setPrimaryKey(val);
+ }
+ if (indValue != null) {
+ c.setIndexedValue(StandardConverters.convertToBytes(indValue));
+ }
+
+ c.setValue(null);
+ return c;
+ }
+
+ public static Where createRowQuery(Key from, Key to, DboColumnMeta colMeta, Select selectQuery, String rowKey) {
+ Where selectWhere = selectQuery.where();
+ Clause rkClause = QueryBuilder.eq("id", rowKey);
+ selectWhere.and(rkClause);
+
+ Object valFrom = null, valTo = null;
+ if (colMeta != null) {
+ if (from != null) {
+ valFrom = colMeta.getStorageType().convertFromNoSql(from.getKey());
+ valFrom = checkForBoolean(valFrom);
+ }
+ if (to != null) {
+ valTo = colMeta.getStorageType().convertFromNoSql(to.getKey());
+ valTo = checkForBoolean(valTo);
+ }
+ } else
+ return selectWhere;
+
+ if (from != null) {
+ if (from.isInclusive()) {
+ Clause gteClause = QueryBuilder.gte("colname", valFrom);
+ selectWhere.and(gteClause);
+ } else {
+ Clause gtClause = QueryBuilder.gt("colname", valFrom);
+ selectWhere.and(gtClause);
+ }
+
+ }
+ if (to != null) {
+ if (to.isInclusive()) {
+ Clause lteClause = QueryBuilder.lte("colname", valTo);
+ selectWhere.and(lteClause);
+ }
+ else {
+ Clause ltClause = QueryBuilder.lt("colname", valTo);
+ selectWhere.and(ltClause);
+ }
+ }
+ return selectWhere;
+ }
+
+ public static Where createRowQueryFromValues(List<byte[]> values, DboColumnMeta colMeta, Select selectQuery, String rowKey) {
+ Where selectWhere = selectQuery.where();
+
+ Clause rkClause = QueryBuilder.eq("id", rowKey);
+ selectWhere.and(rkClause);
+
+ Object[] valStrings = new Object[values.size()];
+ int count = 0;
+ for (byte[] value : values) {
+ valStrings[count] = StandardConverters.convertFromBytes(String.class, value);
+ count++;
+ }
+
+ Clause inClause = QueryBuilder.in("colname", valStrings);
+ selectWhere.and(inClause);
+ return selectWhere;
+ }
+
+ public static Object checkForBoolean(Object val) {
+ if (val == null)
+ return null;
+ else if (val instanceof Boolean) {
+ Boolean b = (Boolean) val;
+ if (b)
+ return 1;
+ else
+ return 0;
+ }
+ return val;
+ }
+
+}
View
69 src/main/java/com/alvazan/orm/layer9z/spi/db/cassandracql3/CqlSession.java
@@ -16,6 +16,7 @@
import com.alvazan.orm.api.z8spi.NoSqlRawSession;
import com.alvazan.orm.api.z8spi.Row;
import com.alvazan.orm.api.z8spi.ScanInfo;
+import com.alvazan.orm.api.z8spi.SpiConstants;
import com.alvazan.orm.api.z8spi.action.Action;
import com.alvazan.orm.api.z8spi.action.Column;
import com.alvazan.orm.api.z8spi.action.IndexColumn;
@@ -25,6 +26,7 @@
import com.alvazan.orm.api.z8spi.conv.StorageTypeEnum;
import com.alvazan.orm.api.z8spi.iter.AbstractCursor;
import com.alvazan.orm.api.z8spi.iter.DirectCursor;
+import com.alvazan.orm.api.z8spi.meta.DboColumnMeta;
import com.alvazan.orm.api.z8spi.meta.DboTableMeta;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
@@ -36,10 +38,11 @@
private Session session = null;
private Cluster cluster = null;
private KeyspaceMetadata keyspaces = null;
- private String keys = "cql63";
+ private String keys = "cql70";
@Inject
private Provider<Row> rowProvider;
+
@Override
public void start(Map<String, Object> properties) {
@@ -74,7 +77,7 @@ private void persist(Persist action, MetaLookup ormSession) {
String table = lookupOrCreate(colFamily, ormSession);
List<Column> s = action.getColumns();
byte[] rowkey = action.getRowKey();
- byte[] nullArray = StandardConverters.convertToBytes("_n");
+ byte[] nullArray = StandardConverters.convertToBytes(SpiConstants.NULL_STRING_FORCQL3);
for (Column c : s) {
try {
@@ -104,11 +107,13 @@ private void persistIndex(PersistIndex action, MetaLookup ormSession) {
IndexColumn column = action.getColumn();
byte[] key = column.getIndexedValue();
byte[] value = column.getPrimaryKey();
- if (key != null) {
- try {
+
+ try {
+
+ Object keyObject = null;
+ if (key != null) {
PreparedStatement statement = session.prepare("INSERT INTO " + keys + "." + table + "(id, colname, colvalue) VALUES (?, ?, ?)");
BoundStatement boundStatement = new BoundStatement(statement);
- Object keyObject = null;
if (indexCfName.equalsIgnoreCase("StringIndice")) {
keyObject = StandardConverters.convertFromBytes(String.class, key);
} else if (indexCfName.equalsIgnoreCase("IntegerIndice")) {
@@ -117,9 +122,19 @@ private void persistIndex(PersistIndex action, MetaLookup ormSession) {
keyObject = StandardConverters.convertFromBytes(Float.class, key);
}
session.execute(boundStatement.bind(StandardConverters.convertFromBytes(String.class, rowKey), keyObject, ByteBuffer.wrap(value)));
- } catch (Exception e) {
- System.out.println(indexCfName + " Exception:" + e.getMessage());
- }
+ } else {
+ PreparedStatement statement = session.prepare("INSERT INTO " + keys + "." + table + "(id, colname, colvalue) VALUES (?, ?, ?)");
+ BoundStatement boundStatement = new BoundStatement(statement);
+ if (indexCfName.equalsIgnoreCase("IntegerIndice")) {
+ boundStatement.setString("id", StandardConverters.convertFromBytes(String.class, rowKey));
+ boundStatement.setBytesUnsafe("colname", ByteBuffer.wrap(new byte[0]));
+ boundStatement.setBytes("colvalue", ByteBuffer.wrap(value));
+ session.execute(boundStatement);
+ } else
+ session.execute(boundStatement.bind(StandardConverters.convertFromBytes(String.class, rowKey), "", ByteBuffer.wrap(value)));
+ }
+ } catch (Exception e) {
+ System.out.println(indexCfName + " Exception:" + e.getMessage());
}
}
@@ -151,13 +166,21 @@ private String lookupOrCreate(String colFamily1, MetaLookup ormSession) {
@Override
public void clearDatabase() {
-
+ session.execute("DROP KEYSPACE " + keys);
+ keyspaces = cluster.getMetadata().getKeyspace(keys);
+ if (keyspaces == null) {
+ try {
+ session.execute("CREATE KEYSPACE " + keys + " WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};");
+ } catch (Exception e) {
+ System.out.println("Exception :" + e.getMessage());
+ }
+ }
+ session = cluster.connect(keys);
}
@Override
public void close() {
- // TODO Auto-generated method stub
-
+ //cluster.shutdown();
}
@Override
@@ -168,13 +191,31 @@ public void close() {
@Override
public AbstractCursor<IndexColumn> scanIndex(ScanInfo scan, Key from, Key to, Integer batchSize, BatchListener l, MetaLookup mgr) {
- return null;
+ byte[] rowKey = scan.getRowKey();
+ String indexTableName = scan.getIndexColFamily();
+ DboColumnMeta colMeta = scan.getColumnName();
+ DboTableMeta entityDbCollection = scan.getEntityColFamily();
+
+ // Here we don't bother using an index at all since there is no where clause to begin with
+ // ALSO, we don't want this code to be the case if we are doing a CursorToMany which has to
+ // use an index so check the column type
+/* if (!entityDbCollection.isVirtualCf() && from == null && to == null && !(scan.getColumnName() instanceof DboColumnToManyMeta)
+ && !entityDbCollection.isInheritance()) {
+ ScanMongoDbCollection scanner = new ScanMongoDbCollection(batchSize, l, entityDbCollection.getColumnFamily(), db);
+ scanner.beforeFirst();
+ return scanner;
+ }
+*/
+ CursorOfIndexes cursor = new CursorOfIndexes(rowKey, batchSize, l, indexTableName, from, to);
+ cursor.setupMore(keys, colMeta, session);
+ return cursor;
}
@Override
public AbstractCursor<IndexColumn> scanIndex(ScanInfo scanInfo, List<byte[]> values, BatchListener list, MetaLookup mgr) {
- // TODO Auto-generated method stub
- return null;
+ CursorForValues cursor = new CursorForValues(scanInfo, list,
+ values, session, keys);
+ return cursor;
}
@Override
View
138 src/main/java/com/alvazan/orm/layer9z/spi/db/cassandracql3/CursorForValues.java
@@ -0,0 +1,138 @@
+package com.alvazan.orm.layer9z.spi.db.cassandracql3;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+import com.alvazan.orm.api.z8spi.BatchListener;
+import com.alvazan.orm.api.z8spi.ScanInfo;
+import com.alvazan.orm.api.z8spi.action.IndexColumn;
+import com.alvazan.orm.api.z8spi.conv.StandardConverters;
+import com.alvazan.orm.api.z8spi.iter.AbstractCursor;
+import com.alvazan.orm.api.z8spi.iter.StringLocal;
+import com.alvazan.orm.api.z8spi.meta.DboColumnMeta;
+import com.datastax.driver.core.Query;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Select;
+import com.datastax.driver.core.querybuilder.Select.Where;
+
+public class CursorForValues extends AbstractCursor<IndexColumn> {
+
+ private byte[] rowKey;
+ private BatchListener batchListener;
+ private Session session;
+ private ListIterator<IndexColumn> cachedRows;
+ private String indTable;
+ private String keySpace;
+ private boolean needToGetBatch;
+ private DboColumnMeta columnMeta;
+ private List<byte[]> values;
+
+ public CursorForValues(ScanInfo scanInfo, BatchListener list, List<byte[]> keys, Session session2, String keySpace2) {
+ this.rowKey = scanInfo.getRowKey();
+ this.batchListener = list;
+ this.indTable = scanInfo.getIndexColFamily();
+ this.cachedRows = null;
+ this.values = keys;
+ this.session = session2;
+ this.keySpace = keySpace2;
+ this.columnMeta = scanInfo.getColumnName();
+ this.needToGetBatch = true;
+ beforeFirst();
+
+ }
+
+ @Override
+ public String toString() {
+ String tabs = StringLocal.getAndAdd();
+ String keys = "" + rowKey;
+ String retVal = "CursorForValues(CQL3)[" + tabs + keys + tabs + "]";
+ StringLocal.set(tabs.length());
+ return retVal;
+ }
+
+ public void setupMore(String keySpace2, DboColumnMeta colMeta, Session session2) {
+ if (keySpace2 == null)
+ throw new IllegalArgumentException("DB was null");
+
+ this.columnMeta = colMeta;
+
+ }
+
+ @Override
+ public void beforeFirst() {
+ cachedRows = null;
+ needToGetBatch = true;
+ }
+
+ @Override
+ public void afterLast() {
+ cachedRows = null;
+ needToGetBatch = true;
+ }
+
+ @Override
+ public com.alvazan.orm.api.z8spi.iter.AbstractCursor.Holder<IndexColumn> nextImpl() {
+ loadCache(false);
+ if (cachedRows == null || !cachedRows.hasNext())
+ return null;
+
+ return new Holder<IndexColumn>(cachedRows.next());
+ }
+
+ @Override
+ public com.alvazan.orm.api.z8spi.iter.AbstractCursor.Holder<IndexColumn> previousImpl() {
+ loadCache(true);
+ if (cachedRows == null || !cachedRows.hasPrevious())
+ return null;
+
+ return new Holder<IndexColumn>(cachedRows.previous());
+ }
+
+ private void loadCache(boolean reverse) {
+ if (cachedRows != null && cachedRows.hasNext())
+ return; // There are more rows so return and the code will return
+ // the next result from cache
+
+ ResultSet resultSet = null;
+
+ if (needToGetBatch) {
+ if (batchListener != null)
+ batchListener.beforeFetchingNextBatch();
+
+ String rowKeyString = StandardConverters.convertFromBytes(String.class, rowKey);
+
+ Select selectQuery = QueryBuilder.select().all().from(keySpace, indTable).allowFiltering();
+ Where whereClause = Cql3Util.createRowQueryFromValues(values, columnMeta, selectQuery, rowKeyString);
+ Query query = whereClause.disableTracing();
+
+ // System.out.println("QUERY IS: " + query);
+ resultSet = session.execute(query);
+
+ if (batchListener != null)
+ batchListener.afterFetchingNextBatch(10);
+
+ List<IndexColumn> finalRes = new ArrayList<IndexColumn>();
+
+ if (resultSet == null) {
+ cachedRows = new ArrayList<IndexColumn>().listIterator();
+ } else {
+ for (com.datastax.driver.core.Row row : resultSet) {
+ IndexColumn indexCol = Cql3Util.convertToIndexCol(row, indTable);
+ finalRes.add(indexCol);
+ }
+ cachedRows = finalRes.listIterator();
+ }
+ cachedRows = finalRes.listIterator();
+ needToGetBatch = false;
+ if (reverse) {
+ while (cachedRows.hasNext())
+ cachedRows.next();
+ }
+ }
+
+ }
+
+}
View
104 src/main/java/com/alvazan/orm/layer9z/spi/db/cassandracql3/CursorKeysToRowsCql3.java
@@ -15,6 +15,7 @@
import com.alvazan.orm.api.z8spi.KeyValue;
import com.alvazan.orm.api.z8spi.Row;
import com.alvazan.orm.api.z8spi.RowHolder;
+import com.alvazan.orm.api.z8spi.SpiConstants;
import com.alvazan.orm.api.z8spi.action.Column;
import com.alvazan.orm.api.z8spi.conv.ByteArray;
import com.alvazan.orm.api.z8spi.conv.StandardConverters;
@@ -124,33 +125,27 @@ private void loadCache() {
}
-/* if (info.getDbObj() != null) {
- dbCollection = info.getDbObj();
- } else
- return;
- */
ResultSet resultSet = null;
- if (keysToLookup.size() > 0) {
- String[] keyStrings = new String[keysToLookup.size()];
- int count = 0;
- for (byte[] rowKey : keysToLookup) {
- keyStrings[count] = StandardConverters.convertFromBytes(String.class, rowKey);
- count++;
- }
-
- if (list != null)
- list.beforeFetchingNextBatch();
- try {
- Clause inClause = QueryBuilder.in("id", keyStrings);
- Query query = QueryBuilder.select().all().from(keys, cf.getColumnFamily()).where(inClause).limit(batchSize);
- resultSet = session.execute(query);
- } catch (Exception e) {
- System.out.println(" Exception:" + e.getMessage());
- }
- /* if (list != null)
- list.afterFetchingNextBatch(cursor.count());*/
- }
+ if (keysToLookup.size() > 0) {
+ String[] keyStrings = new String[keysToLookup.size()];
+ int count = 0;
+ for (byte[] rowKey : keysToLookup) {
+ keyStrings[count] = StandardConverters.convertFromBytes(String.class, rowKey);
+ count++;
+ }
+
+ if (list != null)
+ list.beforeFetchingNextBatch();
+ try {
+ Clause inClause = QueryBuilder.in("id", keyStrings);
+ Query query = QueryBuilder.select().all().from(keys, cf.getColumnFamily()).where(inClause).limit(batchSize);
+ resultSet = session.execute(query);
+ } catch (Exception e) {
+ System.out.println(" Exception:" + e.getMessage());
+ }
+ if (list != null) list.afterFetchingNextBatch(batchSize);
+ }
Map<ByteArray, KeyValue<Row>> map = new HashMap<ByteArray, KeyValue<Row>>();
@@ -198,38 +193,31 @@ private void loadCacheBackward() {
results.add(result);
}
+ ResultSet resultSet = null;
+
+ if (keysToLookup.size() > 0) {
+ String[] keyStrings = new String[keysToLookup.size()];
+ int count = 0;
+ for (byte[] rowKey : keysToLookup) {
+ keyStrings[count] = StandardConverters.convertFromBytes(String.class, rowKey);
+ count++;
+ }
- /*DBCursor cursor = null;
- DBCollection dbCollection = null;
- if (info.getDbObj() != null) {
- dbCollection = info.getDbObj();
- } else
- return;
-
- if (keysToLookup.size() > 0) {
- if (list != null)
- list.beforeFetchingNextBatch();
-
- BasicDBObject query = new BasicDBObject();
- query.put("_id", new BasicDBObject("$in", keysToLookup));
- BasicDBObject orderBy = new BasicDBObject();
- orderBy.put("_id", 1);
- cursor = dbCollection.find(query).sort(orderBy)
- .batchSize(batchSize);
-
- if (list != null)
- list.afterFetchingNextBatch(cursor.size());
- } else {
- cursor = new DBCursor(dbCollection, null, null, null);
- }*/
-
+ if (list != null)
+ list.beforeFetchingNextBatch();
+ try {
+ Clause inClause = QueryBuilder.in("id", keyStrings);
+ Query query = QueryBuilder.select().all().from(keys, cf.getColumnFamily()).where(inClause).limit(batchSize);
+ resultSet = session.execute(query);
+ } catch (Exception e) {
+ System.out.println(" Exception:" + e.getMessage());
+ }
+ if (list != null) list.afterFetchingNextBatch(batchSize);
+ }
Map<ByteArray, KeyValue<Row>> map = new HashMap<ByteArray, KeyValue<Row>>();
- //fillCache(map, cursor, keysToLookup);
+ fillCache(map, resultSet, keysToLookup);
- // UNFORTUNATELY, astyanax's result is NOT ORDERED by the keys we
- // provided so, we need to iterate over the whole thing here
- // into our own List :( :( .
List<KeyValue<Row>> finalRes = new ArrayList<KeyValue<Row>>();
Iterator<byte[]> keyIter = keysToLookup.iterator();
@@ -254,17 +242,13 @@ private void loadCacheBackward() {
cachedRows.next();
}
- private void fillCache(Map<ByteArray, KeyValue<Row>> map, ResultSet cursor,
- List<byte[]> keysToLookup) {
+ private void fillCache(Map<ByteArray, KeyValue<Row>> map, ResultSet cursor, List<byte[]> keysToLookup) {
String rowKey = null;
- // KeyValue<Row> kv = null;
- // byte[] cqlRowKey = null;
List<List<com.datastax.driver.core.Row>> cqlRows = new ArrayList<List<com.datastax.driver.core.Row>>();
List<com.datastax.driver.core.Row> actualRowList = new ArrayList<com.datastax.driver.core.Row>();
if (cursor == null)
return;
-
for (com.datastax.driver.core.Row cqlRow : cursor) {
String rowKey1 = cqlRow.getString("id");
if (rowKey1.equals(rowKey)) {
@@ -287,13 +271,13 @@ private void fillCache(Map<ByteArray, KeyValue<Row>> map, ResultSet cursor,
kv.setKey(cqlRowKey);
r.setKey(cqlRowKey);
byte[] name = StandardConverters.convertToBytes(cqlRow.getString("colname"));
- ByteBuffer data = cqlRow.getBytes("colvalue");
+ ByteBuffer data = cqlRow.getBytes("colvalue");
byte[] val = new byte[data.remaining()];
data.get(val);
String strValue = StandardConverters.convertFromBytes(String.class, val);
Column c = new Column();
c.setName(name);
- if (!strValue.equals("_n"))
+ if (!strValue.equals(SpiConstants.NULL_STRING_FORCQL3))
c.setValue(val);
r.put(c);
View
143 src/main/java/com/alvazan/orm/layer9z/spi/db/cassandracql3/CursorOfIndexes.java
@@ -0,0 +1,143 @@
+package com.alvazan.orm.layer9z.spi.db.cassandracql3;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+import com.alvazan.orm.api.z8spi.BatchListener;
+import com.alvazan.orm.api.z8spi.Key;
+import com.alvazan.orm.api.z8spi.action.IndexColumn;
+import com.alvazan.orm.api.z8spi.conv.StandardConverters;
+import com.alvazan.orm.api.z8spi.iter.AbstractCursor;
+import com.alvazan.orm.api.z8spi.iter.StringLocal;
+import com.alvazan.orm.api.z8spi.meta.DboColumnMeta;
+import com.datastax.driver.core.Query;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Select;
+import com.datastax.driver.core.querybuilder.Select.Where;
+
+public class CursorOfIndexes extends AbstractCursor<IndexColumn> {
+
+ private byte[] rowKey;
+ private Integer batchSize;
+ private BatchListener batchListener;
+ private Session session;
+ private String keySpace;
+ private ListIterator<com.datastax.driver.core.Row> cachedRows;
+ private String indTable;
+ private boolean needToGetBatch;
+ private Key from;
+ private Key to;
+ private DboColumnMeta columnMeta;
+
+ public CursorOfIndexes(byte[] rowKeys, Integer batchSize, BatchListener list, String indTable, Key from, Key to) {
+ this.rowKey = rowKeys;
+ this.batchSize = batchSize;
+ this.batchListener = list;
+ this.indTable = indTable;
+ this.from = from;
+ this.to = to;
+ this.needToGetBatch = true;
+ this.cachedRows = null;
+ beforeFirst();
+ }
+
+ @Override
+ public String toString() {
+ String tabs = StringLocal.getAndAdd();
+ String keys = "" + rowKey;
+ String retVal = "CursorOfIndexes[" + tabs + keys + tabs + "]";
+ StringLocal.set(tabs.length());
+ return retVal;
+ }
+
+ public void setupMore(String keyspace2, DboColumnMeta colMeta, Session session2) {
+ if (keyspace2 == null)
+ throw new IllegalArgumentException("DB was null");
+ this.keySpace = keyspace2;
+ this.columnMeta = colMeta;
+ this.session = session2;
+ beforeFirst();
+ }
+
+ @Override
+ public void beforeFirst() {
+ cachedRows = null;
+ needToGetBatch = true;
+ }
+
+ @Override
+ public void afterLast() {
+ cachedRows = null;
+ needToGetBatch = true;
+ }
+
+ @Override
+ public com.alvazan.orm.api.z8spi.iter.AbstractCursor.Holder<IndexColumn> nextImpl() {
+ loadCache(false);
+ if (cachedRows == null || !cachedRows.hasNext())
+ return null;
+ IndexColumn indexCol = Cql3Util.convertToIndexCol(cachedRows.next(), indTable);
+ return new Holder<IndexColumn>(indexCol);
+ }
+
+ @Override
+ public com.alvazan.orm.api.z8spi.iter.AbstractCursor.Holder<IndexColumn> previousImpl() {
+ loadCache(true);
+ if (cachedRows == null || !cachedRows.hasPrevious())
+ return null;
+ IndexColumn indexCol = Cql3Util.convertToIndexCol(cachedRows.previous(), indTable);
+ return new Holder<IndexColumn>(indexCol);
+ }
+
+ private void loadCache(boolean reverse) {
+ if (cachedRows != null && cachedRows.hasNext())
+ return; // There are more rows so return and the code will return
+ // the next result from cache
+
+ ResultSet resultSet = null;
+
+ if (needToGetBatch) {
+ if (batchListener != null)
+ batchListener.beforeFetchingNextBatch();
+
+ String rowKeyString = StandardConverters.convertFromBytes(String.class, rowKey);
+ Select selectQuery = QueryBuilder.select().all().from(keySpace, indTable).allowFiltering();
+ Where whereClause = Cql3Util.createRowQuery(from, to, columnMeta, selectQuery, rowKeyString);
+ Query query = null;
+
+ if (batchSize != null)
+ query = whereClause.limit(batchSize);
+ else
+ query = whereClause.disableTracing();
+ //System.out.println("QUERY IS: " + query);
+ resultSet = session.execute(query);
+
+ // Need to see where we use this batchListener
+ if (batchListener != null && batchSize != null)
+ batchListener.afterFetchingNextBatch(batchSize);
+
+ List<com.datastax.driver.core.Row> finalRes = new ArrayList<com.datastax.driver.core.Row>();
+ fillinCache(finalRes, resultSet);
+ needToGetBatch = false;
+ if (reverse) {
+ while (cachedRows.hasNext())
+ cachedRows.next();
+ }
+ }
+ }
+
+ private void fillinCache(List<com.datastax.driver.core.Row> finalRes, ResultSet cursor) {
+ if (cursor == null) {
+ cachedRows = new ArrayList<com.datastax.driver.core.Row>().listIterator();
+ } else {
+ for (com.datastax.driver.core.Row row : cursor) {
+ finalRes.add(row);
+ }
+ cachedRows = finalRes.listIterator();
+ }
+ }
+
+}
View
4 src/test/java/com/alvazan/test/FactorySingleton.java
@@ -26,6 +26,7 @@ public static Config getConfigForAllTests() {
//serverType = DbTypeEnum.MONGODB;
serverType = DbTypeEnum.IN_MEMORY;
//serverType = DbTypeEnum.HBASE;
+ //serverType = DbTypeEnum.CQL;
String seeds = "localhost:9160";
if (serverType.equals(DbTypeEnum.MONGODB))
seeds = "localhost:27017";
@@ -63,6 +64,9 @@ public static NoSqlEntityManagerFactory createFactory(Config config, Map<String,
case HBASE:
Bootstrap.createAndAddBestHBaseConfiguration(props, config.getClusterName(), "PlayOrmKeyspace", config.getSeeds());
break;
+ case CQL:
+ Bootstrap.createAndAddBestCqlConfiguration(props, config.getClusterName(), "PlayOrmKeyspace", config.getSeeds());
+ break;
default:
throw new UnsupportedOperationException("not supported yet, server type="+config.getServerType());
}

0 comments on commit 6c2103a

Please sign in to comment.