Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

CursorofFuture for Cql3

  • Loading branch information...
commit 382312c9ce80928ff3765cbedb5e05390ad3d1b2 1 parent 6c2103a
@easility easility authored
View
111 src/main/java/com/alvazan/orm/layer9z/spi/db/cassandracql3/CqlSession.java
@@ -7,6 +7,9 @@
import javax.inject.Inject;
import javax.inject.Provider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.alvazan.orm.api.z8spi.BatchListener;
import com.alvazan.orm.api.z8spi.Cache;
import com.alvazan.orm.api.z8spi.ColumnSliceInfo;
@@ -22,6 +25,9 @@
import com.alvazan.orm.api.z8spi.action.IndexColumn;
import com.alvazan.orm.api.z8spi.action.Persist;
import com.alvazan.orm.api.z8spi.action.PersistIndex;
+import com.alvazan.orm.api.z8spi.action.Remove;
+import com.alvazan.orm.api.z8spi.action.RemoveColumn;
+import com.alvazan.orm.api.z8spi.action.RemoveIndex;
import com.alvazan.orm.api.z8spi.conv.StandardConverters;
import com.alvazan.orm.api.z8spi.conv.StorageTypeEnum;
import com.alvazan.orm.api.z8spi.iter.AbstractCursor;
@@ -32,9 +38,17 @@
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Query;
+import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Select;
+import com.datastax.driver.core.querybuilder.Select.Where;
public class CqlSession implements NoSqlRawSession {
+ private static final Logger log = LoggerFactory.getLogger(CqlSession.class);
+
private Session session = null;
private Cluster cluster = null;
private KeyspaceMetadata keyspaces = null;
@@ -67,7 +81,14 @@ public void sendChanges(List<Action> actions, MetaLookup ormSession) {
persist((Persist) action, ormSession);
} else if (action instanceof PersistIndex) {
persistIndex((PersistIndex) action, ormSession);
+ } else if(action instanceof Remove) {
+ remove((Remove)action, ormSession);
+ } else if(action instanceof RemoveIndex) {
+ removeIndex((RemoveIndex) action, ormSession);
+ } else if(action instanceof RemoveColumn) {
+ removeColumn((RemoveColumn) action, ormSession);
}
+
}
}
@@ -139,6 +160,87 @@ private void persistIndex(PersistIndex action, MetaLookup ormSession) {
}
+ private void remove(Remove action, MetaLookup ormSession) {
+ String colFamily = action.getColFamily().getColumnFamily();
+ String table = lookupOrCreate(colFamily, ormSession);
+ if (action.getAction() == null)
+ throw new IllegalArgumentException("action param is missing ActionEnum so we know to remove entire row or just columns in the row");
+ switch (action.getAction()) {
+ case REMOVE_ENTIRE_ROW:
+ String rowKey = StandardConverters.convertFromBytes(String.class, action.getRowKey());
+ Clause eqClause = QueryBuilder.eq("id", rowKey);
+ Query query = QueryBuilder.delete().from(keys, table).where(eqClause);
+ session.execute(query);
+ break;
+ case REMOVE_COLUMNS_FROM_ROW:
+ removeColumns(action, table);
+ break;
+ default:
+ throw new RuntimeException("bug, unknown remove action=" + action.getAction());
+ }
+ }
+
+ private void removeColumns(Remove action, String table) {
+ String rowKey = StandardConverters.convertFromBytes(String.class, action.getRowKey());
+ if (rowKey != null) {
+ for (byte[] name : action.getColumns()) {
+ String colName = StandardConverters.convertFromBytes(String.class, name);
+ removeColumnImpl(rowKey, table, colName);
+ }
+ }
+ }
+
+ private void removeColumn(RemoveColumn action, MetaLookup ormSession) {
+ String colFamily = action.getColFamily().getColumnFamily();
+ String table = lookupOrCreate(colFamily, ormSession);
+ String rowKey = StandardConverters.convertFromBytes(String.class, action.getRowKey());
+ if (rowKey != null) {
+ String colName = StandardConverters.convertFromBytes(String.class, action.getColumn());
+ removeColumnImpl(rowKey, table, colName);
+ }
+ }
+
+ private void removeColumnImpl(String rowKey, String table, String colName) {
+ Clause eqClause = QueryBuilder.eq("id", rowKey);
+ Clause eqColClause = QueryBuilder.eq("colname", colName);
+ Query query = QueryBuilder.delete().from(keys, table).where(eqClause).and(eqColClause);
+ session.execute(query);
+ }
+
+ private void removeIndex(RemoveIndex action, MetaLookup ormSession) {
+ String colFamily = action.getIndexCfName();
+ if (colFamily.equalsIgnoreCase("BytesIndice"))
+ return;
+ String table = lookupOrCreate(colFamily, ormSession);
+ String rowKey = StandardConverters.convertFromBytes(String.class, action.getRowKey());
+ IndexColumn column = action.getColumn();
+ byte[] value = column.getPrimaryKey();
+ boolean exists = findIndexRow(table, rowKey, value);
+ if (!exists) {
+ if (log.isInfoEnabled())
+ log.info("Index: " + column.toString() + " already removed.");
+ } else {
+ Clause eqClause = QueryBuilder.eq("id", rowKey);
+ Clause fkClause = QueryBuilder.eq("colvalue", ByteBuffer.wrap(value));
+ Query query = QueryBuilder.delete().from(keys, table).where(eqClause).and(fkClause);
+ session.execute(query);
+ }
+ }
+
+ public boolean findIndexRow(String table, String rowKey, byte[] key) {
+ Select selectQuery = QueryBuilder.select().all().from(keys, table).allowFiltering();
+ //Where whereClause = Cql3Util.createRowQuery(from, to, columnMeta, selectQuery, rowKeyString);
+ Where selectWhere = selectQuery.where();
+ Clause rkClause = QueryBuilder.eq("id", rowKey);
+ selectWhere.and(rkClause);
+ Clause keyClause = QueryBuilder.lte("colvalue", ByteBuffer.wrap(key));
+ selectWhere.and(keyClause);
+ Query query = selectWhere.limit(1);
+ //System.out.println("QUERY FOR FINDINDEXROW IS: " + query);
+ ResultSet resultSet = session.execute(query);
+ return !resultSet.isExhausted();
+ }
+
private String lookupOrCreate(String colFamily1, MetaLookup ormSession) {
if (cluster.getMetadata().getKeyspace(keys).getTable(colFamily1.toLowerCase()) == null) {
try {
@@ -213,9 +315,10 @@ public void close() {
@Override
public AbstractCursor<IndexColumn> scanIndex(ScanInfo scanInfo, List<byte[]> values, BatchListener list, MetaLookup mgr) {
- CursorForValues cursor = new CursorForValues(scanInfo, list,
- values, session, keys);
- return cursor;
+ StartQueryListener listener = new StartQueryManyKeys(keys, scanInfo, session, values, false);
+ //CursorForValues cursor = new CursorForValues(scanInfo, list, values, session, keys, listener);
+ //return cursor;
+ return new CursorOfFutures(listener, list, scanInfo);
}
@Override
@@ -224,7 +327,7 @@ public void close() {
String table = lookupOrCreate(colFamily.getColumnFamily(), mgr);
//Info info = fetchDbCollectionInfo(colFamily.getColumnFamily(), mgr);
if(table == null) {
- //If there is no column family in mongodb, then we need to return no rows to the user...
+ //If there is no column family in cassandra, then we need to return no rows to the user...
return new CursorReturnsEmptyRows2(rowKeys);
}
CursorKeysToRowsCql3 cursor = new CursorKeysToRowsCql3(rowKeys, batchSize, list, rowProvider, session, keys);
View
138 src/main/java/com/alvazan/orm/layer9z/spi/db/cassandracql3/CursorForValues.java
@@ -1,138 +0,0 @@
-package com.alvazan.orm.layer9z.spi.db.cassandracql3;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.ListIterator;
-
-import com.alvazan.orm.api.z8spi.BatchListener;
-import com.alvazan.orm.api.z8spi.ScanInfo;
-import com.alvazan.orm.api.z8spi.action.IndexColumn;
-import com.alvazan.orm.api.z8spi.conv.StandardConverters;
-import com.alvazan.orm.api.z8spi.iter.AbstractCursor;
-import com.alvazan.orm.api.z8spi.iter.StringLocal;
-import com.alvazan.orm.api.z8spi.meta.DboColumnMeta;
-import com.datastax.driver.core.Query;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.datastax.driver.core.querybuilder.Select;
-import com.datastax.driver.core.querybuilder.Select.Where;
-
-public class CursorForValues extends AbstractCursor<IndexColumn> {
-
- private byte[] rowKey;
- private BatchListener batchListener;
- private Session session;
- private ListIterator<IndexColumn> cachedRows;
- private String indTable;
- private String keySpace;
- private boolean needToGetBatch;
- private DboColumnMeta columnMeta;
- private List<byte[]> values;
-
- public CursorForValues(ScanInfo scanInfo, BatchListener list, List<byte[]> keys, Session session2, String keySpace2) {
- this.rowKey = scanInfo.getRowKey();
- this.batchListener = list;
- this.indTable = scanInfo.getIndexColFamily();
- this.cachedRows = null;
- this.values = keys;
- this.session = session2;
- this.keySpace = keySpace2;
- this.columnMeta = scanInfo.getColumnName();
- this.needToGetBatch = true;
- beforeFirst();
-
- }
-
- @Override
- public String toString() {
- String tabs = StringLocal.getAndAdd();
- String keys = "" + rowKey;
- String retVal = "CursorForValues(CQL3)[" + tabs + keys + tabs + "]";
- StringLocal.set(tabs.length());
- return retVal;
- }
-
- public void setupMore(String keySpace2, DboColumnMeta colMeta, Session session2) {
- if (keySpace2 == null)
- throw new IllegalArgumentException("DB was null");
-
- this.columnMeta = colMeta;
-
- }
-
- @Override
- public void beforeFirst() {
- cachedRows = null;
- needToGetBatch = true;
- }
-
- @Override
- public void afterLast() {
- cachedRows = null;
- needToGetBatch = true;
- }
-
- @Override
- public com.alvazan.orm.api.z8spi.iter.AbstractCursor.Holder<IndexColumn> nextImpl() {
- loadCache(false);
- if (cachedRows == null || !cachedRows.hasNext())
- return null;
-
- return new Holder<IndexColumn>(cachedRows.next());
- }
-
- @Override
- public com.alvazan.orm.api.z8spi.iter.AbstractCursor.Holder<IndexColumn> previousImpl() {
- loadCache(true);
- if (cachedRows == null || !cachedRows.hasPrevious())
- return null;
-
- return new Holder<IndexColumn>(cachedRows.previous());
- }
-
- private void loadCache(boolean reverse) {
- if (cachedRows != null && cachedRows.hasNext())
- return; // There are more rows so return and the code will return
- // the next result from cache
-
- ResultSet resultSet = null;
-
- if (needToGetBatch) {
- if (batchListener != null)
- batchListener.beforeFetchingNextBatch();
-
- String rowKeyString = StandardConverters.convertFromBytes(String.class, rowKey);
-
- Select selectQuery = QueryBuilder.select().all().from(keySpace, indTable).allowFiltering();
- Where whereClause = Cql3Util.createRowQueryFromValues(values, columnMeta, selectQuery, rowKeyString);
- Query query = whereClause.disableTracing();
-
- // System.out.println("QUERY IS: " + query);
- resultSet = session.execute(query);
-
- if (batchListener != null)
- batchListener.afterFetchingNextBatch(10);
-
- List<IndexColumn> finalRes = new ArrayList<IndexColumn>();
-
- if (resultSet == null) {
- cachedRows = new ArrayList<IndexColumn>().listIterator();
- } else {
- for (com.datastax.driver.core.Row row : resultSet) {
- IndexColumn indexCol = Cql3Util.convertToIndexCol(row, indTable);
- finalRes.add(indexCol);
- }
- cachedRows = finalRes.listIterator();
- }
- cachedRows = finalRes.listIterator();
- needToGetBatch = false;
- if (reverse) {
- while (cachedRows.hasNext())
- cachedRows.next();
- }
- }
-
- }
-
-}
View
123 src/main/java/com/alvazan/orm/layer9z/spi/db/cassandracql3/CursorOfFutures.java
@@ -0,0 +1,123 @@
+package com.alvazan.orm.layer9z.spi.db.cassandracql3;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.ListIterator;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.alvazan.orm.api.z8spi.BatchListener;
+import com.alvazan.orm.api.z8spi.ScanInfo;
+import com.alvazan.orm.api.z8spi.action.IndexColumn;
+import com.alvazan.orm.api.z8spi.conv.Precondition;
+import com.alvazan.orm.api.z8spi.iter.AbstractCursor;
+import com.datastax.driver.core.ResultSet;
+
+public class CursorOfFutures extends AbstractCursor<IndexColumn> {
+
+ private StartQueryListener queryListener;
+ private BatchListener batchListener;
+ private Iterator<Future<ResultSet>> theOneBatch;
+ private boolean needToGetBatch;
+ private String indTable;
+ private ListIterator<IndexColumn> cachedLastCols;
+
+ public CursorOfFutures(StartQueryListener l, BatchListener listener, ScanInfo scanInfo) {
+ Precondition.check(l,"l");
+ this.queryListener = l;
+ this.batchListener = listener;
+ this.indTable = scanInfo.getIndexColFamily();
+ beforeFirst();
+ }
+
+
+ @Override
+ public String toString() {
+ return "CursorOfFutures["+theOneBatch+"]";
+ }
+
+ @Override
+ public void beforeFirst() {
+ needToGetBatch = true;
+ }
+
+ @Override
+ public void afterLast() {
+ needToGetBatch = true;
+ }
+
+ @Override
+ public Holder<IndexColumn> nextImpl() {
+ if(batchListener != null)
+ batchListener.beforeFetchingNextBatch();
+ loadBatchIfNeeded();
+ if(cachedLastCols != null && cachedLastCols.hasNext()) {
+ IndexColumn indexedCol = cachedLastCols.next();
+ return new Holder<IndexColumn>(indexedCol);
+ }
+
+ while(true) {
+ if(!theOneBatch.hasNext())
+ return null;
+
+ Future<ResultSet> future = theOneBatch.next();
+ ResultSet results = get(future);
+ cachedLastCols = new ArrayList<IndexColumn>().listIterator();
+
+ if(!results.isExhausted()) {
+ com.datastax.driver.core.Row row = results.one();
+ IndexColumn indexCol = Cql3Util.convertToIndexCol(row, indTable);
+ cachedLastCols.add(indexCol);
+ if(batchListener != null)
+ batchListener.afterFetchingNextBatch(10);
+ return new Holder<IndexColumn>(indexCol);
+ }
+ }
+ }
+
+ @Override
+ public Holder<IndexColumn> previousImpl() {
+ if(batchListener != null)
+ batchListener.beforeFetchingNextBatch();
+ loadBatchIfNeeded();
+ if(cachedLastCols != null && cachedLastCols.hasPrevious()) {
+ IndexColumn indexedCol = cachedLastCols.previous();
+ return new Holder<IndexColumn>(indexedCol);
+ }
+
+ while(true) {
+ if(!theOneBatch.hasNext())
+ return null;
+ Future<ResultSet> future = theOneBatch.next();
+ ResultSet results = get(future);
+ cachedLastCols = new ArrayList<IndexColumn>().listIterator();
+
+ while(cachedLastCols.hasNext())cachedLastCols.next();
+
+
+ if(cachedLastCols.hasPrevious()) {
+ IndexColumn indexCol = cachedLastCols.previous();
+ if(batchListener != null)
+ batchListener.afterFetchingNextBatch(10);
+ return new Holder<IndexColumn>(indexCol);
+ }
+ }
+ }
+
+ private void loadBatchIfNeeded() {
+ if(needToGetBatch) {
+ theOneBatch = queryListener.start().iterator();
+ needToGetBatch = false;
+ }
+ }
+
+ private ResultSet get(Future<ResultSet> f) {
+ try {
+ return (ResultSet) f.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
View
12 src/main/java/com/alvazan/orm/layer9z/spi/db/cassandracql3/StartQueryListener.java
@@ -0,0 +1,12 @@
+package com.alvazan.orm.layer9z.spi.db.cassandracql3;
+
+import java.util.List;
+import java.util.concurrent.Future;
+
+import com.datastax.driver.core.ResultSet;
+
+public interface StartQueryListener {
+
+ List<Future<ResultSet>> start();
+
+}
View
71 src/main/java/com/alvazan/orm/layer9z/spi/db/cassandracql3/StartQueryManyKeys.java
@@ -0,0 +1,71 @@
+package com.alvazan.orm.layer9z.spi.db.cassandracql3;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alvazan.orm.api.z8spi.ScanInfo;
+import com.alvazan.orm.api.z8spi.conv.StandardConverters;
+import com.alvazan.orm.api.z8spi.meta.DboColumnMeta;
+import com.datastax.driver.core.Query;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Select;
+import com.datastax.driver.core.querybuilder.Select.Where;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class StartQueryManyKeys implements StartQueryListener {
+
+ private static final Logger log = LoggerFactory.getLogger(StartQueryManyKeys.class);
+ private List<byte[]> values;
+ private String keySpace;
+ private byte[] rowKey;
+ private Session session;
+ private String indTable;
+ private DboColumnMeta columnMeta;
+
+ public StartQueryManyKeys(String keys, ScanInfo info, Session session2, List<byte[]> values, boolean reverse) {
+ this.keySpace = keys;
+ this.rowKey = info.getRowKey();
+ this.indTable = info.getIndexColFamily();
+ this.values = values;
+ this.session = session2;
+ this.columnMeta = info.getColumnName();
+ }
+
+ @Override
+ public List<Future<ResultSet>> start() {
+
+ List<Future<ResultSet>> futures = new ArrayList<Future<ResultSet>>();
+
+ String rowKeyString = StandardConverters.convertFromBytes(String.class, rowKey);
+
+ for (byte[] val : values) {
+ Select selectQuery = QueryBuilder.select().all().from(keySpace, indTable).allowFiltering();
+ Where selectWhere = selectQuery.where();
+ Clause rkClause = QueryBuilder.eq("id", rowKeyString);
+ selectWhere.and(rkClause);
+
+ Object value = null;
+ value = columnMeta.getStorageType().convertFromNoSql(val);
+ value = Cql3Util.checkForBoolean(value);
+
+ Clause valClause = QueryBuilder.eq("colname", value);
+ selectWhere.and(valClause);
+
+ Query query = selectWhere.disableTracing();
+
+ System.out.println("QUERY of STARTQUERYMANYKEY IS: " + query);
+ Future future = session.executeAsync(query);
+ futures.add(future);
+ }
+
+ return futures;
+ }
+
+}
Please sign in to comment.
Something went wrong with that request. Please try again.