Skip to content

Commit

Permalink
-JDOQL basic range queries are now supported by the plugin.
Browse files Browse the repository at this point in the history
-plugin.xml contains the extension to signal JDOQL support
  • Loading branch information
pedro committed Jun 8, 2010
1 parent f1dedb7 commit fa1c55a
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 100 deletions.
10 changes: 10 additions & 0 deletions plugin.xml
Expand Up @@ -27,5 +27,15 @@
transactional="false"> transactional="false">
</connectionfactory> </connectionfactory>
</extension> </extension>
<extension
point="org.datanucleus.store_query_query">
<query class-name="org.datanucleus.store.cassandra.query.JDOQLQuery" datastore="cassandra" name="javax.jdo.query.JDOQL"/>
<query class-name="org.datanucleus.store.cassandra.query.JDOQLQuery" datastore="cassandra" name="JDOQL"/>
<query
class-name="org.datanucleus.store.cassandra.query.JDOQLQuery"
datastore="cassandra"
name="cassandra">
</query>
</extension>
</plugin> </plugin>


63 changes: 41 additions & 22 deletions src/org/datanucleus/store/cassandra/query/CassandraQuery.java
Expand Up @@ -28,9 +28,11 @@


public class CassandraQuery { public class CassandraQuery {


public static int search_slice_ratio = 1000; //should come from the properties

static List getObjectsOfCandidateType(final ExecutionContext ec, static List getObjectsOfCandidateType(final ExecutionContext ec,
final CassandraManagedConnection mconn, Class candidateClass, final CassandraManagedConnection mconn, Class candidateClass,
boolean subclasses, boolean ignoreCache) { boolean subclasses, boolean ignoreCache,long fromInclNo , long toExclNo) {
List results = new ArrayList(); List results = new ArrayList();


try { try {
Expand All @@ -55,8 +57,40 @@ static List getObjectsOfCandidateType(final ExecutionContext ec,
} }
slice_predicate.setColumn_names(column_names); slice_predicate.setColumn_names(column_names);


List<KeySlice> result = c.get_range_slice(keyspace, parent, String last_key = "";
slice_predicate, "", "", 3000, ConsistencyLevel.QUORUM); int number_keys =0;
boolean terminated = false;



long limit = toExclNo;//(toExclNo<0) ? -1 : (toExclNo-1);
List<KeySlice> result = new ArrayList<KeySlice>();

while (!terminated) {
List<KeySlice> keys = c.get_range_slice(keyspace, parent, slice_predicate, last_key, "", search_slice_ratio, ConsistencyLevel.QUORUM);

if (!keys.isEmpty()) {
last_key = keys.get(keys.size() - 1).key;
}

for (KeySlice key : keys) {
if(!key.getColumns().isEmpty()){
number_keys++;
if(number_keys>fromInclNo){
result.add(key);
}

}
if (number_keys >= limit) {
terminated = true;
break;
}

}
if (keys.size() < search_slice_ratio) {
terminated = true;
}
}


Iterator<KeySlice> iterator = result.iterator(); Iterator<KeySlice> iterator = result.iterator();


Expand All @@ -74,29 +108,14 @@ public FetchPlan getFetchPlanForLoading() {
} }


@Override @Override
public void fetchNonLoadedFields( public void fetchNonLoadedFields(ObjectProvider sm) {
ObjectProvider sm) { sm.replaceNonLoadedFields(acmd.getAllMemberPositions(),new CassandraFetchFieldManager(acmd, sm, keySlice.getColumns()));
sm.replaceNonLoadedFields(acmd
.getAllMemberPositions(),
new CassandraFetchFieldManager(
acmd, sm, keySlice
.getColumns()));

} }


@Override @Override
public void fetchFields(ObjectProvider sm) { public void fetchFields(ObjectProvider sm) {
sm.replaceFields(acmd sm.replaceFields(acmd.getPKMemberPositions(),new CassandraFetchFieldManager(acmd, sm, keySlice.getColumns()));
.getPKMemberPositions(), sm.replaceFields(acmd.getBasicMemberPositions(clr, ec.getMetaDataManager()),new CassandraFetchFieldManager(acmd, sm, keySlice.getColumns()));
new CassandraFetchFieldManager(
acmd, sm, keySlice
.getColumns()));
sm.replaceFields(acmd
.getBasicMemberPositions(clr, ec
.getMetaDataManager()),
new CassandraFetchFieldManager(
acmd, sm, keySlice
.getColumns()));


} }
}, ignoreCache, true)); }, ignoreCache, true));
Expand Down
180 changes: 102 additions & 78 deletions src/org/datanucleus/store/cassandra/query/JDOQLQuery.java
Expand Up @@ -7,96 +7,120 @@
import java.util.Map; import java.util.Map;


import org.datanucleus.ObjectManager; import org.datanucleus.ObjectManager;
import org.datanucleus.exceptions.NucleusUserException;
import org.datanucleus.query.evaluator.JDOQLEvaluator; import org.datanucleus.query.evaluator.JDOQLEvaluator;
import org.datanucleus.query.evaluator.JavaQueryEvaluator; import org.datanucleus.query.evaluator.JavaQueryEvaluator;
import org.datanucleus.store.ExecutionContext; import org.datanucleus.store.ExecutionContext;



import org.datanucleus.store.cassandra.CassandraManagedConnection; import org.datanucleus.store.cassandra.CassandraManagedConnection;
import org.datanucleus.store.query.AbstractJDOQLQuery; import org.datanucleus.store.query.AbstractJDOQLQuery;
import org.datanucleus.util.NucleusLogger; import org.datanucleus.util.NucleusLogger;


public class JDOQLQuery extends AbstractJDOQLQuery { public class JDOQLQuery extends AbstractJDOQLQuery {


/** /**
* Constructs a new query instance that uses the given persistence manager. * Constructs a new query instance that uses the given persistence manager.
* @param om the associated ExecutiongContext for this query. *
*/ * @param om
public JDOQLQuery(ExecutionContext ec) * the associated ExecutiongContext for this query.
{ */
this(ec, (JDOQLQuery) null); public JDOQLQuery(ExecutionContext ec) {
} this(ec, (JDOQLQuery) null);

}
/**
* Constructs a new query instance having the same criteria as the given query. /**
* @param om The Executing Manager * Constructs a new query instance having the same criteria as the given
* @param q The query from which to copy criteria. * query.
*/ *
public JDOQLQuery(ExecutionContext ec, JDOQLQuery q) * @param om
{ * The Executing Manager
super(ec, q); * @param q
} * The query from which to copy criteria.

*/
/** public JDOQLQuery(ExecutionContext ec, JDOQLQuery q) {
* Constructor for a JDOQL query where the query is specified using the "Single-String" format. super(ec, q);
* @param ec The execution context }
* @param query The query string
*/ /**
public JDOQLQuery(ExecutionContext ec, String query) * Constructor for a JDOQL query where the query is specified using the
{ * "Single-String" format.
super(ec, query); *
} * @param ec

* The execution context

* @param query
* The query string
*/
public JDOQLQuery(ExecutionContext ec, String query) {
super(ec, query);
}

@Override @Override
protected Object performExecute(Map parameters) { protected Object performExecute(Map parameters) {



if (range != null && !range.equals("")) {
CassandraManagedConnection mconn = (CassandraManagedConnection) ec.getStoreManager().getConnection(ec); // Range is of the format "from, to"
try String[] fromTo = range.split(",");
{ if (fromTo.length != 2) {
long startTime = System.currentTimeMillis(); throw new NucleusUserException("Malformed RANGE clause: "
if (NucleusLogger.QUERY.isDebugEnabled()) + range);
{ }
NucleusLogger.QUERY.debug(LOCALISER.msg("021046", "JDOQL", getSingleStringQuery(), null));
} try {
List candidates = null; fromInclNo = Long.parseLong(fromTo[0].trim());
if (candidateCollection != null) } catch (Exception e) {
{ throw new NucleusUserException("Malformed RANGE clause: "
candidates = new ArrayList(candidateCollection); + range);
} }
else if (candidateExtent != null)
{ try {
candidates = new ArrayList(); toExclNo = Long.parseLong(fromTo[1].trim());
Iterator iter = candidateExtent.iterator(); } catch (Exception e) {
while (iter.hasNext()) throw new NucleusUserException("Malformed RANGE clause: "
{ + range);
candidates.add(iter.next()); }
}
} }
else //
{
candidates = CassandraQuery.getObjectsOfCandidateType(ec, mconn, candidateClass, subclasses, CassandraManagedConnection mconn = (CassandraManagedConnection) ec
ignoreCache); .getStoreManager().getConnection(ec);
} try {

long startTime = System.currentTimeMillis();
// Apply any result restrictions to the results if (NucleusLogger.QUERY.isDebugEnabled()) {
JavaQueryEvaluator resultMapper = new JDOQLEvaluator(this, candidates, compilation, NucleusLogger.QUERY.debug(LOCALISER.msg("021046", "JDOQL",
parameters, ec.getClassLoaderResolver()); getSingleStringQuery(), null));
Collection results = resultMapper.execute(true, true, true, true, true); }

List candidates = null;
if (NucleusLogger.QUERY.isDebugEnabled()) if (candidateCollection != null) {
{ candidates = new ArrayList(candidateCollection);
NucleusLogger.QUERY.debug(LOCALISER.msg("021074", "JDOQL", } else if (candidateExtent != null) {
"" + (System.currentTimeMillis() - startTime))); candidates = new ArrayList();
} Iterator iter = candidateExtent.iterator();

while (iter.hasNext()) {
return results; candidates.add(iter.next());
} }
finally } else {
{ candidates = CassandraQuery.getObjectsOfCandidateType(ec,
mconn.release(); mconn, candidateClass, subclasses, ignoreCache,
} fromInclNo, toExclNo);
} }

// Apply any result restrictions to the results
JavaQueryEvaluator resultMapper = new JDOQLEvaluator(this,
candidates, compilation, parameters, ec
.getClassLoaderResolver());
Collection results = resultMapper.execute(true, true, true, true,
true);

if (NucleusLogger.QUERY.isDebugEnabled()) {
NucleusLogger.QUERY.debug(LOCALISER.msg("021074", "JDOQL", ""
+ (System.currentTimeMillis() - startTime)));
}

return results;
} finally {
mconn.release();
}
}


} }

0 comments on commit fa1c55a

Please sign in to comment.