Skip to content

Commit

Permalink
# ignite 692 Reworked to use new SQL API.
Browse files Browse the repository at this point in the history
  • Loading branch information
akuznetsov-gridgain committed Apr 7, 2015
1 parent d20d29b commit c06a4ae
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 120 deletions.
Expand Up @@ -2306,12 +2306,13 @@ public <K, V> GridCache<K, V> publicCache(@Nullable String name) {


/** /**
* @param cacheName Cache name. * @param cacheName Cache name.
* @param userOnly If {@code true} then throws {@link IllegalStateException} in case of system cache.
* @param <K> type of keys. * @param <K> type of keys.
* @param <V> type of values. * @param <V> type of values.
* @return Cache instance for given name. * @return Cache instance for given name.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <K, V> IgniteCache<K, V> publicJCache(@Nullable String cacheName) { public <K, V> IgniteCache<K, V> privateJCache(@Nullable String cacheName, boolean userOnly) {
if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Getting public cache for name: " + cacheName); log.debug("Getting public cache for name: " + cacheName);


Expand All @@ -2322,7 +2323,7 @@ public <K, V> IgniteCache<K, V> publicJCache(@Nullable String cacheName) {


DynamicCacheDescriptor desc = registeredCaches.get(masked); DynamicCacheDescriptor desc = registeredCaches.get(masked);


if (desc != null && !desc.cacheType().userCache()) if (userOnly && desc != null && !desc.cacheType().userCache())
throw new IllegalStateException("Failed to get cache because it is a system cache: " + cacheName); throw new IllegalStateException("Failed to get cache because it is a system cache: " + cacheName);


if (cache == null) { if (cache == null) {
Expand Down Expand Up @@ -2360,6 +2361,16 @@ public <K, V> IgniteCache<K, V> publicJCache(@Nullable String cacheName) {
} }
} }


/**
* @param cacheName Cache name.
* @param <K> type of keys.
* @param <V> type of values.
* @return Cache instance for given name.
*/
public <K, V> IgniteCache<K, V> publicJCache(@Nullable String cacheName) {
return privateJCache(cacheName, true);
}

/** /**
* @param name Cache name. * @param name Cache name.
* @return Cache instance for given name. * @return Cache instance for given name.
Expand Down
Expand Up @@ -86,7 +86,7 @@ protected VisorQueryCleanupJob(Collection<String> arg, boolean debug) {


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override protected Void run(Collection<String> qryIds) { @Override protected Void run(Collection<String> qryIds) {
ConcurrentMap<String, VisorQueryTask.VisorFutureResultSetHolder> locMap = ignite.cluster().nodeLocalMap(); ConcurrentMap<String, VisorQueryTask.VisorQueryCursorHolder> locMap = ignite.cluster().nodeLocalMap();


for (String qryId : qryIds) for (String qryId : qryIds)
locMap.remove(qryId); locMap.remove(qryId);
Expand Down
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.visor.query;

import org.apache.ignite.cache.query.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.query.*;

import java.util.*;

/**
* Wrapper for query cursor.
*/
public class VisorQueryCursor<T> implements Iterator<T>, AutoCloseable {
/** */
private final QueryCursor<T> cur;

/** */
private final Iterator<T> itr;

/**
* @param cur Cursor.
*/
public VisorQueryCursor(QueryCursor<T> cur) {
this.cur = cur;

itr = cur.iterator();
}

/** {@inheritDoc} */
@Override public boolean hasNext() {
return itr.hasNext();
}

/** {@inheritDoc} */
@Override public T next() {
return itr.next();
}

/** {@inheritDoc} */
@Override public void remove() {
throw new UnsupportedOperationException();
}

/** {@inheritDoc} */
@Override public void close() throws Exception {
cur.close();
}

/**
* @return SQL Fields query result metadata.
*/
@SuppressWarnings("unchecked")
public Collection<GridQueryFieldMetadata> fieldsMeta() {
return ((QueryCursorImpl)cur).fieldsMeta();
}
}
Expand Up @@ -67,53 +67,68 @@ private VisorQueryNextPageJob(IgniteBiTuple<String, Integer> arg, boolean debug)
} }
} }


/** Collect data from SQL query */ /**
* Collect data from SQL query.
*
* @param arg
* @return
* @throws IgniteCheckedException
*/
private VisorQueryResult nextSqlPage(IgniteBiTuple<String, Integer> arg) throws IgniteCheckedException { private VisorQueryResult nextSqlPage(IgniteBiTuple<String, Integer> arg) throws IgniteCheckedException {
long start = U.currentTimeMillis(); long start = U.currentTimeMillis();


ConcurrentMap<String, VisorQueryTask.VisorFutureResultSetHolder<List<?>>> storage = ConcurrentMap<String, VisorQueryTask.VisorQueryCursorHolder> storage =
ignite.cluster().nodeLocalMap(); ignite.cluster().nodeLocalMap();


VisorQueryTask.VisorFutureResultSetHolder<List<?>> t = storage.get(arg.get1()); VisorQueryTask.VisorQueryCursorHolder t = storage.get(arg.get1());


if (t == null) if (t == null)
throw new GridInternalException("SQL query results are expired."); throw new GridInternalException("SQL query results are expired.");


IgniteBiTuple<List<Object[]>, List<?>> nextRows = VisorQueryUtils.fetchSqlQueryRows(t.future(), t.next(), arg.get2()); VisorQueryCursor cur = t.cursor();

List<Object[]> nextRows = VisorQueryUtils.fetchSqlQueryRows(cur, arg.get2());


boolean hasMore = nextRows.get2() != null; boolean hasMore = cur.hasNext();


if (hasMore) if (hasMore)
storage.put(arg.get1(), new VisorQueryTask.VisorFutureResultSetHolder<>(t.future(), nextRows.get2(), true)); storage.put(arg.get1(), new VisorQueryTask.VisorQueryCursorHolder(t.cursor(), true));
else else
storage.remove(arg.get1()); storage.remove(arg.get1());


return new VisorQueryResult(nextRows.get1(), hasMore, U.currentTimeMillis() - start); return new VisorQueryResult(nextRows, hasMore, U.currentTimeMillis() - start);
} }


/** Collect data from SCAN query */ /**
* Collect data from SCAN query
*
* @param arg
* @return
* @throws IgniteCheckedException
*/
private VisorQueryResult nextScanPage(IgniteBiTuple<String, Integer> arg) throws IgniteCheckedException { private VisorQueryResult nextScanPage(IgniteBiTuple<String, Integer> arg) throws IgniteCheckedException {
long start = U.currentTimeMillis(); long start = U.currentTimeMillis();


ConcurrentMap<String, VisorQueryTask.VisorFutureResultSetHolder<Map.Entry<Object, Object>>> storage = ConcurrentMap<String, VisorQueryTask.VisorQueryCursorHolder> storage =
ignite.cluster().nodeLocalMap(); ignite.cluster().nodeLocalMap();


VisorQueryTask.VisorFutureResultSetHolder<Map.Entry<Object, Object>> t = storage.get(arg.get1()); VisorQueryTask.VisorQueryCursorHolder t = storage.get(arg.get1());


if (t == null) if (t == null)
throw new GridInternalException("Scan query results are expired."); throw new GridInternalException("Scan query results are expired.");


IgniteBiTuple<List<Object[]>, Map.Entry<Object, Object>> rows = VisorQueryCursor cur = t.cursor();
VisorQueryUtils.fetchScanQueryRows(t.future(), t.next(), arg.get2());
List<Object[]> rows = VisorQueryUtils.fetchScanQueryRows(cur, arg.get2());


Boolean hasMore = rows.get2() != null; Boolean hasMore = cur.hasNext();


if (hasMore) if (hasMore)
storage.put(arg.get1(), new VisorQueryTask.VisorFutureResultSetHolder<>(t.future(), rows.get2(), true)); storage.put(arg.get1(), new VisorQueryTask.VisorQueryCursorHolder(cur, true));
else else
storage.remove(arg.get1()); storage.remove(arg.get1());


return new VisorQueryResult(rows.get1(), hasMore, U.currentTimeMillis() - start); return new VisorQueryResult(rows, hasMore, U.currentTimeMillis() - start);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down
Expand Up @@ -35,7 +35,7 @@ public class VisorQueryResultEx extends VisorQueryResult {
private final String qryId; private final String qryId;


/** Query columns descriptors. */ /** Query columns descriptors. */
private final VisorQueryField[] colNames; private final Collection<VisorQueryField> colNames;


/** /**
* @param resNodeId Node where query executed. * @param resNodeId Node where query executed.
Expand All @@ -48,7 +48,7 @@ public class VisorQueryResultEx extends VisorQueryResult {
public VisorQueryResultEx( public VisorQueryResultEx(
UUID resNodeId, UUID resNodeId,
String qryId, String qryId,
VisorQueryField[] colNames, Collection<VisorQueryField> colNames,
List<Object[]> rows, List<Object[]> rows,
Boolean hasMore, Boolean hasMore,
long duration long duration
Expand Down Expand Up @@ -77,7 +77,7 @@ public String queryId() {
/** /**
* @return Columns names. * @return Columns names.
*/ */
public VisorQueryField[] columnNames() { public Collection<VisorQueryField> columnNames() {
return colNames; return colNames;
} }


Expand Down

0 comments on commit c06a4ae

Please sign in to comment.