Skip to content

Commit

Permalink
CQL3 stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
easility committed Sep 19, 2013
1 parent 89119bd commit a605c36
Show file tree
Hide file tree
Showing 3 changed files with 385 additions and 2 deletions.
Expand Up @@ -36,7 +36,7 @@ public class CqlSession implements NoSqlRawSession {
private Session session = null;
private Cluster cluster = null;
private KeyspaceMetadata keyspaces = null;
private String keys = "cql62";
private String keys = "cql63";
@Inject
private Provider<Row> rowProvider;

Expand Down Expand Up @@ -180,7 +180,15 @@ public AbstractCursor<IndexColumn> scanIndex(ScanInfo scanInfo, List<byte[]> val
@Override
public AbstractCursor<KeyValue<Row>> find(DboTableMeta colFamily, DirectCursor<byte[]> rowKeys, Cache cache, int batchSize, BatchListener list,
MetaLookup mgr) {
return null;
String table = lookupOrCreate(colFamily.getColumnFamily(), mgr);
//Info info = fetchDbCollectionInfo(colFamily.getColumnFamily(), mgr);
if(table == null) {
//If there is no column family in mongodb, then we need to return no rows to the user...
return new CursorReturnsEmptyRows2(rowKeys);
}
CursorKeysToRowsCql3 cursor = new CursorKeysToRowsCql3(rowKeys, batchSize, list, rowProvider, session, keys);
cursor.setupMore(colFamily, cache);
return cursor;
}

@Override
Expand Down
@@ -0,0 +1,322 @@
package com.alvazan.orm.layer9z.spi.db.cassandracql3;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;

import javax.inject.Provider;

import com.alvazan.orm.api.z8spi.BatchListener;
import com.alvazan.orm.api.z8spi.Cache;
import com.alvazan.orm.api.z8spi.KeyValue;
import com.alvazan.orm.api.z8spi.Row;
import com.alvazan.orm.api.z8spi.RowHolder;
import com.alvazan.orm.api.z8spi.action.Column;
import com.alvazan.orm.api.z8spi.conv.ByteArray;
import com.alvazan.orm.api.z8spi.conv.StandardConverters;
import com.alvazan.orm.api.z8spi.iter.AbstractCursor;
import com.alvazan.orm.api.z8spi.iter.DirectCursor;
import com.alvazan.orm.api.z8spi.iter.StringLocal;
import com.alvazan.orm.api.z8spi.meta.DboTableMeta;
import com.datastax.driver.core.Query;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.Clause;
import com.datastax.driver.core.querybuilder.QueryBuilder;

public class CursorKeysToRowsCql3 extends AbstractCursor<KeyValue<Row>> {

private DirectCursor<byte[]> rowKeys;
private int batchSize;
private BatchListener list;
private Session session;
private Info info;
private ListIterator<KeyValue<Row>> cachedRows;
private Provider<Row> rowProvider;
private Cache cache;
private DboTableMeta cf;
private String keys;

public CursorKeysToRowsCql3(DirectCursor<byte[]> rowKeys, int batchSize,
BatchListener list, Provider<Row> rowProvider,
Session session2, String keys2) {
this.rowProvider = rowProvider;
this.rowKeys = rowKeys;
this.batchSize = batchSize;
this.list = list;
this.session = session2;
this.keys = keys2;
}

@Override
public String toString() {
String tabs = StringLocal.getAndAdd();
String keys = "" + rowKeys;
if (rowKeys instanceof List)
keys = "List" + keys;
String retVal = "CursorKeysToRowsMDB[" + tabs + keys
+ tabs + "]";
StringLocal.set(tabs.length());
return retVal;
}

public void setupMore(DboTableMeta cf2, Cache cache2) {
if (cache2 == null || cf2 == null)
throw new IllegalArgumentException(
"no params can be null but one was null");
this.cf = cf2;
this.cache = cache2;
beforeFirst();
}

@Override
public void beforeFirst() {
rowKeys.beforeFirst();
cachedRows = null;
}

@Override
public void afterLast() {
rowKeys.afterLast();
cachedRows = null;
}

@Override
public com.alvazan.orm.api.z8spi.iter.AbstractCursor.Holder<KeyValue<Row>> nextImpl() {
loadCache();
if (cachedRows == null || !cachedRows.hasNext())
return null;

return new Holder<KeyValue<Row>>(cachedRows.next());
}

@Override
public com.alvazan.orm.api.z8spi.iter.AbstractCursor.Holder<KeyValue<Row>> previousImpl() {
loadCacheBackward();
if (cachedRows == null || !cachedRows.hasPrevious())
return null;

return new Holder<KeyValue<Row>>(cachedRows.previous());
}

private void loadCache() {
if (cachedRows != null && cachedRows.hasNext())
return; // There are more rows so return and the code will return
// the next result from cache

List<RowHolder<Row>> results = new ArrayList<RowHolder<Row>>();
List<byte[]> keysToLookup = new ArrayList<byte[]>();
while (results.size() < batchSize) {
Holder<byte[]> keyHolder = rowKeys.nextImpl();
if (keyHolder == null)
break; // we are officially exhausted
byte[] nextKey = keyHolder.getValue();
if (cache != null) {
RowHolder<Row> result = cache.fromCache(cf, nextKey);
if (result == null)
keysToLookup.add(nextKey);
results.add(result);
}

}

/* if (info.getDbObj() != null) {
dbCollection = info.getDbObj();
} else
return;
*/
ResultSet resultSet = null;
System.out.println("HERE 00 " + keysToLookup.size());

if (keysToLookup.size() > 0) {
System.out.println("HERE 11");
String[] keyStrings = new String[keysToLookup.size()];
int count = 0;
for (byte[] rowKey : keysToLookup) {
keyStrings[count] = StandardConverters.convertFromBytes(String.class, rowKey);
count++;
}
System.out.println("COUNT " +count);
if (list != null)
list.beforeFetchingNextBatch();
try {
Clause inClause = QueryBuilder.in("id", keyStrings);
Query query = QueryBuilder.select().all().from(keys, cf.getColumnFamily()).where(inClause).limit(batchSize);
//PreparedStatement statement = session.prepare("SELECT * FROM" + keys + "." + cf.getColumnFamily() + "WHERE IN (?, ?, ?)");
//BoundStatement boundStatement = new BoundStatement(statement);
resultSet = session.execute(query);
System.out.println(" results:" + resultSet);
} catch (Exception e) {
System.out.println(" Exception:" + e.getMessage());
}
/* BasicDBObject query = new BasicDBObject();
query.put("_id", new BasicDBObject("$in", keysToLookup));
BasicDBObject orderBy = new BasicDBObject();
orderBy.put("_id", 1);
cursor = dbCollection.find(query).sort(orderBy).batchSize(batchSize);*/
/* if (list != null)
list.afterFetchingNextBatch(cursor.count());*/
}

Map<ByteArray, KeyValue<Row>> map = new HashMap<ByteArray, KeyValue<Row>>();

fillCache(map, resultSet, keysToLookup);

// This is copied from Cassandra. Need to check how can we get results in an order.

List<KeyValue<Row>> finalRes = new ArrayList<KeyValue<Row>>();
Iterator<byte[]> keyIter = keysToLookup.iterator();
for (RowHolder<Row> r : results) {
if (r == null) {
byte[] key = keyIter.next();
ByteArray b = new ByteArray(key);
KeyValue<Row> kv = map.get(b);
if (kv!=null)
finalRes.add(kv);
} else {
Row row = r.getValue();
KeyValue<Row> kv = new KeyValue<Row>();
kv.setKey(r.getKey());
kv.setValue(row);
finalRes.add(kv);
}
}

cachedRows = finalRes.listIterator();
}

private void loadCacheBackward() {
if (cachedRows != null && cachedRows.hasPrevious())
return; // There are more rows so return and the code will return
// the next result from cache

List<RowHolder<Row>> results = new ArrayList<RowHolder<Row>>();
List<byte[]> keysToLookup = new ArrayList<byte[]>();
while (results.size() < batchSize) {
Holder<byte[]> keyHolder = rowKeys.previousImpl();
if (keyHolder == null)
break; // we are officially exhausted

byte[] previousKey = keyHolder.getValue();
RowHolder<Row> result = cache.fromCache(cf, previousKey);
if (result == null)
keysToLookup.add(0, previousKey);

results.add(result);
}

/*DBCursor cursor = null;
DBCollection dbCollection = null;
if (info.getDbObj() != null) {
dbCollection = info.getDbObj();
} else
return;
if (keysToLookup.size() > 0) {
if (list != null)
list.beforeFetchingNextBatch();
BasicDBObject query = new BasicDBObject();
query.put("_id", new BasicDBObject("$in", keysToLookup));
BasicDBObject orderBy = new BasicDBObject();
orderBy.put("_id", 1);
cursor = dbCollection.find(query).sort(orderBy)
.batchSize(batchSize);
if (list != null)
list.afterFetchingNextBatch(cursor.size());
} else {
cursor = new DBCursor(dbCollection, null, null, null);
}*/

Map<ByteArray, KeyValue<Row>> map = new HashMap<ByteArray, KeyValue<Row>>();

//fillCache(map, cursor, keysToLookup);

// UNFORTUNATELY, astyanax's result is NOT ORDERED by the keys we
// provided so, we need to iterate over the whole thing here
// into our own List :( :( .

List<KeyValue<Row>> finalRes = new ArrayList<KeyValue<Row>>();
Iterator<byte[]> keyIter = keysToLookup.iterator();
for (RowHolder<Row> r : results) {
if (r == null) {
byte[] key = keyIter.next();
ByteArray b = new ByteArray(key);
KeyValue<Row> kv = map.get(b);
if (kv != null)
finalRes.add(kv);
} else {
Row row = r.getValue();
KeyValue<Row> kv = new KeyValue<Row>();
kv.setKey(r.getKey());
kv.setValue(row);
finalRes.add(kv);
}
}

cachedRows = finalRes.listIterator();
while (cachedRows.hasNext())
cachedRows.next();
}

private void fillCache(Map<ByteArray, KeyValue<Row>> map, ResultSet cursor,
List<byte[]> keysToLookup) {

String rowKey = null;
// KeyValue<Row> kv = null;
// byte[] cqlRowKey = null;
List<List<com.datastax.driver.core.Row>> cqlRows = new ArrayList<List<com.datastax.driver.core.Row>>();
List<com.datastax.driver.core.Row> actualRowList = new ArrayList<com.datastax.driver.core.Row>();
if (cursor == null)
return;

for (com.datastax.driver.core.Row cqlRow : cursor) {
String rowKey1 = cqlRow.getString("id");
System.out.println("rowKey1 " + rowKey1);
if (rowKey1.equals(rowKey)) {
System.out.println("SAME ROW");
actualRowList.add(cqlRow);
} else {
if (rowKey != null)
cqlRows.add(actualRowList);
rowKey = rowKey1;
actualRowList = new ArrayList<com.datastax.driver.core.Row>();
actualRowList.add(cqlRow);
}
}
cqlRows.add(actualRowList);
System.out.println("cqlRows size: " + cqlRows.size());
System.out.println("actualRowList size: " + actualRowList.size());
for (List<com.datastax.driver.core.Row> actualRow : cqlRows) {
KeyValue<Row> kv = new KeyValue<Row>();
Row r = rowProvider.get();
byte[] cqlRowKey = null;
for (com.datastax.driver.core.Row cqlRow : actualRow) {
cqlRowKey = StandardConverters.convertToBytes(cqlRow.getString("id"));
kv.setKey(cqlRowKey);
r.setKey(cqlRowKey);
byte[] name = StandardConverters.convertToBytes(cqlRow.getString("colname"));
byte[] val = cqlRow.getBytesUnsafe("colvalue").array();
String strValue = StandardConverters.convertFromBytes(String.class, val);
System.out.println("STRVALUE " + strValue);
Column c = new Column();
c.setName(name);
if (!strValue.equals("_n"))
c.setValue(val);
r.put(c);

kv.setValue(r);
ByteArray b = new ByteArray(cqlRowKey);
map.put(b, kv);
cache.cacheRow(cf, cqlRowKey, kv.getValue());
}

}

}

}

0 comments on commit a605c36

Please sign in to comment.