Skip to content

Commit

Permalink
IGNITE-1161 Close rest sql cursor after delay. - Fixes #197.
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey <anovikov@gridgain.com>
  • Loading branch information
nva committed Nov 9, 2015
1 parent 7dfaa3b commit 621ecac
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 59 deletions.
Expand Up @@ -75,6 +75,9 @@ abstract class AbstractRestProcessorSelfTest extends GridCommonAbstractTest {


clientCfg.setJettyPath("modules/clients/src/test/resources/jetty/rest-jetty.xml"); clientCfg.setJettyPath("modules/clients/src/test/resources/jetty/rest-jetty.xml");


clientCfg.setIdleQueryCursorTimeout(5000);
clientCfg.setIdleQueryCursorCheckFrequency(5000);

cfg.setConnectorConfiguration(clientCfg); cfg.setConnectorConfiguration(clientCfg);


TcpDiscoverySpi disco = new TcpDiscoverySpi(); TcpDiscoverySpi disco = new TcpDiscoverySpi();
Expand All @@ -99,4 +102,4 @@ abstract class AbstractRestProcessorSelfTest extends GridCommonAbstractTest {
@Override protected <K, V> IgniteCache<K, V> jcache() { @Override protected <K, V> IgniteCache<K, V> jcache() {
return grid(0).cache(null); return grid(0).cache(null);
} }
} }
Expand Up @@ -48,6 +48,7 @@
import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandler; import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandler;
import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.GridTestUtils;
Expand Down Expand Up @@ -1406,6 +1407,42 @@ public void testQueryClose() throws Exception {
assertFalse(queryCursorFound()); assertFalse(queryCursorFound());
} }


/**
* @throws Exception If failed.
*/
public void testQueryDelay() throws Exception {
String qry = "salary > ? and salary <= ?";

Map<String, String> params = new HashMap<>();
params.put("cmd", GridRestCommand.EXECUTE_SQL_QUERY.key());
params.put("type", "Person");
params.put("pageSize", "1");
params.put("cacheName", "person");
params.put("qry", URLEncoder.encode(qry));
params.put("arg1", "1000");
params.put("arg2", "2000");

String ret = null;

for (int i = 0; i < 10; ++i)
ret = content(params);

assertNotNull(ret);
assertTrue(!ret.isEmpty());

JSONObject json = JSONObject.fromObject(ret);

List items = (List)((Map)json.get("response")).get("items");

assertEquals(1, items.size());

assertTrue(queryCursorFound());

U.sleep(10000);

assertFalse(queryCursorFound());
}

protected abstract String signature() throws Exception; protected abstract String signature() throws Exception;


/** /**
Expand Down
Expand Up @@ -59,6 +59,12 @@ public class ConnectorConfiguration {
/** Default socket send and receive buffer size. */ /** Default socket send and receive buffer size. */
public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024; public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024;


/** Default REST idle timeout for query cursor. */
private static final long DFLT_IDLE_QRY_CUR_TIMEOUT = 10 * 60 * 1000;

/** Default REST check frequency for idle query cursor. */
private static final long DFLT_IDLE_QRY_CUR_CHECK_FRQ = 60 * 1000;

/** Jetty XML configuration path. */ /** Jetty XML configuration path. */
private String jettyPath; private String jettyPath;


Expand All @@ -83,6 +89,12 @@ public class ConnectorConfiguration {
/** REST TCP receive buffer size. */ /** REST TCP receive buffer size. */
private int rcvBufSize = DFLT_SOCK_BUF_SIZE; private int rcvBufSize = DFLT_SOCK_BUF_SIZE;


/** REST idle timeout for query cursor. */
private long idleQryCurTimeout = DFLT_IDLE_QRY_CUR_TIMEOUT;

/** REST idle check frequency for query cursor. */
private long idleQryCurCheckFreq = DFLT_IDLE_QRY_CUR_CHECK_FRQ;

/** REST TCP send queue limit. */ /** REST TCP send queue limit. */
private int sndQueueLimit; private int sndQueueLimit;


Expand Down Expand Up @@ -146,6 +158,8 @@ public ConnectorConfiguration(ConnectorConfiguration cfg) {
sslClientAuth = cfg.isSslClientAuth(); sslClientAuth = cfg.isSslClientAuth();
sslCtxFactory = cfg.getSslContextFactory(); sslCtxFactory = cfg.getSslContextFactory();
sslEnabled = cfg.isSslEnabled(); sslEnabled = cfg.isSslEnabled();
idleQryCurTimeout = cfg.getIdleQueryCursorTimeout();
idleQryCurCheckFreq = cfg.getIdleQueryCursorCheckFrequency();
} }


/** /**
Expand Down Expand Up @@ -545,4 +559,49 @@ public void setThreadPoolSize(int threadPoolSize) {
public void setMessageInterceptor(ConnectorMessageInterceptor interceptor) { public void setMessageInterceptor(ConnectorMessageInterceptor interceptor) {
msgInterceptor = interceptor; msgInterceptor = interceptor;
} }
}
/**
* Sets idle query cursors timeout.
*
* @param idleQryCurTimeout Idle query cursors timeout in milliseconds.
* @see #getIdleQueryCursorTimeout()
*/
public void setIdleQueryCursorTimeout(long idleQryCurTimeout) {
this.idleQryCurTimeout = idleQryCurTimeout;
}

/**
* Gets idle query cursors timeout in milliseconds.
* <p>
* This setting is used to reject open query cursors that is not used. If no fetch query request
* come within idle timeout, it will be removed on next check for old query cursors
* (see {@link #getIdleQueryCursorCheckFrequency()}).
*
* @return Idle query cursors timeout in milliseconds
*/
public long getIdleQueryCursorTimeout() {
return idleQryCurTimeout;
}

/**
* Sets idle query cursor check frequency.
*
* @param idleQryCurCheckFreq Idle query check frequency in milliseconds.
* @see #getIdleQueryCursorCheckFrequency()
*/
public void setIdleQueryCursorCheckFrequency(long idleQryCurCheckFreq) {
this.idleQryCurCheckFreq = idleQryCurCheckFreq;
}

/**
* Gets idle query cursors check frequency.
* This setting is used to reject open query cursors that is not used.
* <p>
* Scheduler tries with specified period to close queries' cursors that are overtime.
*
* @return Idle query cursor check frequency in milliseconds.
*/
public long getIdleQueryCursorCheckFrequency() {
return idleQryCurCheckFreq;
}
}

0 comments on commit 621ecac

Please sign in to comment.