Skip to content

Commit

Permalink
IGNITE-5396: ODBC server cursor cleaned when last result piece is tra…
Browse files Browse the repository at this point in the history
…nsmitted
  • Loading branch information
isapego committed Jun 20, 2017
1 parent f58b222 commit c507152
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 19 deletions.
Expand Up @@ -29,7 +29,7 @@ public class OdbcQueryFetchResult {
/** Query result rows. */
private final Collection<?> items;

/** Flag indicating the query has no unfetched results. */
/** Flag indicating the query has no non-fetched results. */
private final boolean last;

/**
Expand Down Expand Up @@ -58,7 +58,7 @@ public Collection<?> items() {
}

/**
* @return Flag indicating the query has no unfetched results.
* @return Flag indicating the query has no non-fetched results.
*/
public boolean last() {
return last;
Expand Down
Expand Up @@ -296,29 +296,25 @@ private static long getRowsAffected(QueryCursor<List<?>> qryCur) {
* @return Response.
*/
private SqlListenerResponse closeQuery(OdbcQueryCloseRequest req) {
long queryId = req.queryId();

try {
IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(req.queryId());
IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(queryId);

if (tuple == null)
return new OdbcResponse(SqlListenerResponse.STATUS_FAILED,
"Failed to find query with ID: " + req.queryId());

QueryCursor cur = tuple.get1();

assert(cur != null);

cur.close();
"Failed to find query with ID: " + queryId);

qryCursors.remove(req.queryId());
CloseCursor(tuple, queryId);

OdbcQueryCloseResult res = new OdbcQueryCloseResult(req.queryId());
OdbcQueryCloseResult res = new OdbcQueryCloseResult(queryId);

return new OdbcResponse(res);
}
catch (Exception e) {
qryCursors.remove(req.queryId());
qryCursors.remove(queryId);

U.error(log, "Failed to close SQL query [reqId=" + req.requestId() + ", req=" + req.queryId() + ']', e);
U.error(log, "Failed to close SQL query [reqId=" + req.requestId() + ", req=" + queryId + ']', e);

return new OdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
}
Expand All @@ -332,17 +328,20 @@ private SqlListenerResponse closeQuery(OdbcQueryCloseRequest req) {
*/
private SqlListenerResponse fetchQuery(OdbcQueryFetchRequest req) {
try {
IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(req.queryId());
long queryId = req.queryId();
IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(queryId);

if (tuple == null)
return new OdbcResponse(SqlListenerResponse.STATUS_FAILED,
"Failed to find query with ID: " + req.queryId());
"Failed to find query with ID: " + queryId);

Iterator iter = tuple.get2();

if (iter == null) {
QueryCursor cur = tuple.get1();

assert(cur != null);

iter = cur.iterator();

tuple.put(cur, iter);
Expand All @@ -353,7 +352,13 @@ private SqlListenerResponse fetchQuery(OdbcQueryFetchRequest req) {
for (int i = 0; i < req.pageSize() && iter.hasNext(); ++i)
items.add(iter.next());

OdbcQueryFetchResult res = new OdbcQueryFetchResult(req.queryId(), items, !iter.hasNext());
boolean lastPage = !iter.hasNext();

// Automatically closing cursor if no more data is available.
if (lastPage)
CloseCursor(tuple, queryId);

OdbcQueryFetchResult res = new OdbcQueryFetchResult(queryId, items, lastPage);

return new OdbcResponse(res);
}
Expand Down Expand Up @@ -508,6 +513,21 @@ private SqlListenerResponse getParamsMeta(OdbcQueryGetParamsMetaRequest req) {
}
}

/**
* Close cursor.
* @param tuple Query map element.
* @param queryId Query ID.
*/
private void CloseCursor(IgniteBiTuple<QueryCursor, Iterator> tuple, long queryId) {
QueryCursor cur = tuple.get1();

assert(cur != null);

cur.close();

qryCursors.remove(queryId);
}

/**
* Convert {@link java.sql.Types} to binary type constant (See {@link GridBinaryMarshaller} constants).
*
Expand Down
5 changes: 4 additions & 1 deletion modules/platforms/cpp/odbc/src/query/data_query.cpp
Expand Up @@ -149,7 +149,10 @@ namespace ignite
if (!cursor.get())
return SqlResult::AI_SUCCESS;

SqlResult::Type result = MakeRequestClose();
SqlResult::Type result = SqlResult::AI_SUCCESS;

if (cursor->HasData())
result = MakeRequestClose();

if (result == SqlResult::AI_SUCCESS)
{
Expand Down
2 changes: 1 addition & 1 deletion modules/platforms/cpp/odbc/src/result_page.cpp
Expand Up @@ -40,7 +40,7 @@ namespace ignite
last = reader.ReadBool();
size = reader.ReadInt32();

ignite::impl::interop::InteropInputStream& stream = *reader.GetStream();
impl::interop::InteropInputStream& stream = *reader.GetStream();

int32_t dataToRead = stream.Remaining();

Expand Down

0 comments on commit c507152

Please sign in to comment.