Skip to content

Commit

Permalink
IGNITE-4249: ODBC: Fixed performance issue caused by ineddicient IO h…
Browse files Browse the repository at this point in the history
…andling on CPP side. This closes #1254.
  • Loading branch information
isapego authored and devozerov committed Nov 23, 2016
1 parent 9d82f2c commit b038730
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 22 deletions.
Expand Up @@ -192,9 +192,7 @@ private OdbcResponse executeQuery(long reqId, OdbcQueryExecuteRequest req) {

QueryCursor qryCur = cache.query(qry);

Iterator iter = qryCur.iterator();

qryCursors.put(qryId, new IgniteBiTuple<>(qryCur, iter));
qryCursors.put(qryId, new IgniteBiTuple<QueryCursor, Iterator>(qryCur, null));

List<?> fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta();

Expand All @@ -220,11 +218,15 @@ private OdbcResponse executeQuery(long reqId, OdbcQueryExecuteRequest req) {
*/
private OdbcResponse closeQuery(long reqId, OdbcQueryCloseRequest req) {
try {
QueryCursor cur = qryCursors.get(req.queryId()).get1();
IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(req.queryId());

if (cur == null)
if (tuple == null)
return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Failed to find query with ID: " + req.queryId());

QueryCursor cur = tuple.get1();

assert(cur != null);

cur.close();

qryCursors.remove(req.queryId());
Expand All @@ -251,17 +253,27 @@ private OdbcResponse closeQuery(long reqId, OdbcQueryCloseRequest req) {
*/
private OdbcResponse fetchQuery(long reqId, OdbcQueryFetchRequest req) {
try {
Iterator cur = qryCursors.get(req.queryId()).get2();
IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(req.queryId());

if (cur == null)
if (tuple == null)
return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Failed to find query with ID: " + req.queryId());

Iterator iter = tuple.get2();

if (iter == null) {
QueryCursor cur = tuple.get1();

iter = cur.iterator();

tuple.put(cur, iter);
}

List<Object> items = new ArrayList<>();

for (int i = 0; i < req.pageSize() && cur.hasNext(); ++i)
items.add(cur.next());
for (int i = 0; i < req.pageSize() && iter.hasNext(); ++i)
items.add(iter.next());

OdbcQueryFetchResult res = new OdbcQueryFetchResult(req.queryId(), items, !cur.hasNext());
OdbcQueryFetchResult res = new OdbcQueryFetchResult(req.queryId(), items, !iter.hasNext());

return new OdbcResponse(res);
}
Expand Down
Expand Up @@ -459,8 +459,7 @@ namespace ignite
int32_t realLen = stream->ReadInt32();

if (res && len >= realLen) {
for (int i = 0; i < realLen; i++)
*(res + i) = static_cast<char>(stream->ReadInt8());
stream->ReadInt8Array(reinterpret_cast<int8_t*>(res), realLen);

if (len > realLen)
*(res + realLen) = 0; // Set NULL terminator if possible.
Expand Down
21 changes: 11 additions & 10 deletions modules/platforms/cpp/odbc/src/connection.cpp
Expand Up @@ -19,6 +19,8 @@

#include <sstream>

#include <ignite/common/fixed_size_array.h>

#include "ignite/odbc/utility.h"
#include "ignite/odbc/statement.h"
#include "ignite/odbc/connection.h"
Expand Down Expand Up @@ -178,26 +180,25 @@ namespace ignite
if (!connected)
IGNITE_ERROR_1(IgniteError::IGNITE_ERR_ILLEGAL_STATE, "Connection is not established");

OdbcProtocolHeader hdr;
common::FixedSizeArray<int8_t> msg(len + sizeof(OdbcProtocolHeader));

hdr.len = static_cast<int32_t>(len);
OdbcProtocolHeader *hdr = reinterpret_cast<OdbcProtocolHeader*>(msg.GetData());

size_t sent = SendAll(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr));
hdr->len = static_cast<int32_t>(len);

if (sent != sizeof(hdr))
IGNITE_ERROR_1(IgniteError::IGNITE_ERR_GENERIC, "Can not send message header");
memcpy(msg.GetData() + sizeof(OdbcProtocolHeader), data, len);

sent = SendAll(data, len);
size_t sent = SendAll(msg.GetData(), msg.GetSize());

if (sent != len)
IGNITE_ERROR_1(IgniteError::IGNITE_ERR_GENERIC, "Can not send message body");
if (sent != len + sizeof(OdbcProtocolHeader))
IGNITE_ERROR_1(IgniteError::IGNITE_ERR_GENERIC, "Can not send message");
}

size_t Connection::SendAll(const int8_t* data, size_t len)
{
int sent = 0;

while (sent != len)
while (sent != static_cast<int64_t>(len))
{
int res = socket.Send(data + sent, len - sent);

Expand All @@ -221,7 +222,7 @@ namespace ignite

OdbcProtocolHeader hdr;

size_t received = ReceiveAll(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr));
int64_t received = ReceiveAll(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr));

if (received != sizeof(hdr))
IGNITE_ERROR_1(IgniteError::IGNITE_ERR_GENERIC, "Can not receive message header");
Expand Down

0 comments on commit b038730

Please sign in to comment.