Skip to content

Commit

Permalink
IGNITE-5167: ODBC: extracted handler/parser interfaces.
Browse files Browse the repository at this point in the history
  • Loading branch information
devozerov committed May 4, 2017
1 parent 7d288c7 commit 4e5a82d
Show file tree
Hide file tree
Showing 23 changed files with 255 additions and 254 deletions.
Expand Up @@ -19,6 +19,8 @@


import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.odbc.odbc.OdbcMessageParser;
import org.apache.ignite.internal.processors.odbc.odbc.OdbcRequestHandler;
import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter; import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession; import org.apache.ignite.internal.util.nio.GridNioSession;
Expand All @@ -32,7 +34,7 @@
*/ */
public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> { public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
/** Connection-related metadata key. */ /** Connection-related metadata key. */
private static final int CONN_DATA_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); private static final int CONN_CTX_META_KEY = GridNioSessionMetaKey.nextUniqueKey();


/** Request ID generator. */ /** Request ID generator. */
private static final AtomicLong REQ_ID_GEN = new AtomicLong(); private static final AtomicLong REQ_ID_GEN = new AtomicLong();
Expand Down Expand Up @@ -67,40 +69,41 @@ public OdbcNioListener(GridKernalContext ctx, GridSpinBusyLock busyLock, int max
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void onConnected(GridNioSession ses) { @Override public void onConnected(GridNioSession ses) {
if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("ODBC client connected: " + ses.remoteAddress()); log.debug("SQL client connected: " + ses.remoteAddress());


ses.addMeta(CONN_DATA_META_KEY, new OdbcConnectionData(ctx, busyLock, maxCursors)); OdbcRequestHandler handler = new OdbcRequestHandler(ctx, busyLock, maxCursors);
OdbcMessageParser parser = new OdbcMessageParser(ctx);

ses.addMeta(CONN_CTX_META_KEY, new SqlListenerConnectionContext(handler, parser));
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
if (e == null) if (e == null)
log.debug("ODBC client disconnected: " + ses.remoteAddress()); log.debug("SQL client disconnected: " + ses.remoteAddress());
else else
log.debug("ODBC client disconnected due to an error [addr=" + ses.remoteAddress() + ", err=" + e + ']'); log.debug("SQL client disconnected due to an error [addr=" + ses.remoteAddress() + ", err=" + e + ']');
} }
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void onMessage(GridNioSession ses, byte[] msg) { @Override public void onMessage(GridNioSession ses, byte[] msg) {
assert msg != null; assert msg != null;


long reqId = REQ_ID_GEN.incrementAndGet(); SqlListenerConnectionContext connData = ses.meta(CONN_CTX_META_KEY);

OdbcConnectionData connData = ses.meta(CONN_DATA_META_KEY);


assert connData != null; assert connData != null;


OdbcMessageParser parser = connData.getParser(); SqlListenerMessageParser parser = connData.parser();


SqlListenerRequest req; SqlListenerRequest req;


try { try {
req = parser.decode(msg); req = parser.decode(msg);
} }
catch (Exception e) { catch (Exception e) {
log.error("Failed to parse message [id=" + reqId + ", err=" + e + ']'); log.error("Failed to parse SQL client request [err=" + e + ']');


ses.close(); ses.close();


Expand All @@ -109,24 +112,26 @@ public OdbcNioListener(GridKernalContext ctx, GridSpinBusyLock busyLock, int max


assert req != null; assert req != null;


req.requestId(REQ_ID_GEN.incrementAndGet());

try { try {
long startTime = 0; long startTime = 0;


if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
startTime = System.nanoTime(); startTime = System.nanoTime();


log.debug("ODBC request received [id=" + reqId + ", addr=" + ses.remoteAddress() + log.debug("SQL client request received [reqId=" + req.requestId() + ", addr=" + ses.remoteAddress() +
", req=" + req + ']'); ", req=" + req + ']');
} }


OdbcRequestHandler handler = connData.getHandler(); SqlListenerRequestHandler handler = connData.handler();


OdbcResponse resp = handler.handle(reqId, req); SqlListenerResponse resp = handler.handle(req);


if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
long dur = (System.nanoTime() - startTime) / 1000; long dur = (System.nanoTime() - startTime) / 1000;


log.debug("ODBC request processed [id=" + reqId + ", dur(mcs)=" + dur + log.debug("SQL client request processed [reqId=" + req.requestId() + ", dur(mcs)=" + dur +
", resp=" + resp.status() + ']'); ", resp=" + resp.status() + ']');
} }


Expand All @@ -135,9 +140,9 @@ public OdbcNioListener(GridKernalContext ctx, GridSpinBusyLock busyLock, int max
ses.send(outMsg); ses.send(outMsg);
} }
catch (Exception e) { catch (Exception e) {
log.error("Failed to process ODBC request [id=" + reqId + ", err=" + e + ']'); log.error("Failed to process SQL client request [reqId=" + req.requestId() + ", err=" + e + ']');


ses.send(parser.encode(new OdbcResponse(OdbcResponse.STATUS_FAILED, e.toString()))); ses.send(parser.encode(new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString())));
} }
} }
} }
Expand Up @@ -24,7 +24,7 @@
*/ */
public class OdbcUtils { public class OdbcUtils {
/** Latest version. */ /** Latest version. */
public static final SqlListenerProtocolVersion VER_LATEST = SqlListenerProtocolVersion.VER_2_1_0; public static final SqlListenerProtocolVersion VER_LATEST = SqlListenerProtocolVersion.VER_2_0_0;


/** /**
* Add quotation marks at the beginning and end of the string. * Add quotation marks at the beginning and end of the string.
Expand Down
Expand Up @@ -17,44 +17,40 @@


package org.apache.ignite.internal.processors.odbc; package org.apache.ignite.internal.processors.odbc;


import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.util.GridSpinBusyLock;

/** /**
* Connection-related data. * SQL listener connection context.
*/ */
class OdbcConnectionData { public class SqlListenerConnectionContext {
/** Request handler. */ /** Request handler. */
private final OdbcRequestHandler handler; private final SqlListenerRequestHandler handler;


/** Message parser. */ /** Message parser. */
private final OdbcMessageParser parser; private final SqlListenerMessageParser parser;


/** /**
* Constructor. * Constructor.
* *
* @param ctx Context. * @param handler Handler.
* @param busyLock Shutdown busy lock. * @param parser Parser.
* @param maxCursors Maximum cursors.
*/ */
public OdbcConnectionData(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors) { public SqlListenerConnectionContext(SqlListenerRequestHandler handler, SqlListenerMessageParser parser) {
handler = new OdbcRequestHandler(ctx, busyLock, maxCursors); this.handler = handler;
parser = new OdbcMessageParser(ctx); this.parser = parser;
} }


/** /**
* Handler getter. * Handler getter.
* @return Request handler for the connection. * @return Request handler for the connection.
*/ */
public OdbcRequestHandler getHandler() { public SqlListenerRequestHandler handler() {
return handler; return handler;
} }


/** /**
* Parser getter * Parser getter
* @return Message parser for the connection. * @return Message parser for the connection.
*/ */
public OdbcMessageParser getParser() { public SqlListenerMessageParser parser() {
return parser; return parser;
} }
} }
Expand Up @@ -22,7 +22,7 @@
/** /**
* ODBC handshake request. * ODBC handshake request.
*/ */
public class OdbcHandshakeRequest extends SqlListenerRequest { public class SqlListenerHandshakeRequest extends SqlListenerRequest {
/** Protocol version. */ /** Protocol version. */
private final SqlListenerProtocolVersion ver; private final SqlListenerProtocolVersion ver;


Expand All @@ -35,7 +35,7 @@ public class OdbcHandshakeRequest extends SqlListenerRequest {
/** /**
* @param ver Long value for protocol version. * @param ver Long value for protocol version.
*/ */
public OdbcHandshakeRequest(long ver) { public SqlListenerHandshakeRequest(long ver) {
super(HANDSHAKE); super(HANDSHAKE);


this.ver = SqlListenerProtocolVersion.fromLong(ver); this.ver = SqlListenerProtocolVersion.fromLong(ver);
Expand Down Expand Up @@ -78,6 +78,6 @@ public void enforceJoinOrder(boolean enforceJoinOrder) {


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public String toString() { @Override public String toString() {
return S.toString(OdbcHandshakeRequest.class, this); return S.toString(SqlListenerHandshakeRequest.class, this);
} }
} }
Expand Up @@ -22,7 +22,7 @@
/** /**
* ODBC handshake result. * ODBC handshake result.
*/ */
public class OdbcHandshakeResult { public class SqlListenerHandshakeResult {
/** Handshake accepted. */ /** Handshake accepted. */
private final boolean accepted; private final boolean accepted;


Expand All @@ -39,7 +39,7 @@ public class OdbcHandshakeResult {
* @param protoVerSince Apache Ignite version when protocol version has been introduced. * @param protoVerSince Apache Ignite version when protocol version has been introduced.
* @param curVer Current Apache Ignite version. * @param curVer Current Apache Ignite version.
*/ */
public OdbcHandshakeResult(boolean accepted, String protoVerSince, String curVer) { public SqlListenerHandshakeResult(boolean accepted, String protoVerSince, String curVer) {
this.accepted = accepted; this.accepted = accepted;
this.protoVerSince = protoVerSince; this.protoVerSince = protoVerSince;
this.curVer = curVer; this.curVer = curVer;
Expand Down Expand Up @@ -68,6 +68,6 @@ public String currentVersion() {


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public String toString() { @Override public String toString() {
return S.toString(OdbcHandshakeResult.class, this); return S.toString(SqlListenerHandshakeResult.class, this);
} }
} }
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.odbc;

/**
* SQL listener message parser.
*/
public interface SqlListenerMessageParser {
/**
* Decode request from byte array.
*
* @param msg Message.
* @return Request.
*/
public SqlListenerRequest decode(byte[] msg);

/**
* Encode response to byte array.
*
* @param resp Response.
* @return Message.
*/
public byte[] encode(SqlListenerResponse resp);
}
Expand Up @@ -21,8 +21,8 @@
* SQL listener protocol version. * SQL listener protocol version.
*/ */
public enum SqlListenerProtocolVersion { public enum SqlListenerProtocolVersion {
/** Version 2.1.0. */ /** Version 2.0.0. */
VER_2_1_0(makeVersion(2, 1, 0), "2.1.0"), VER_2_0_0(makeVersion(2, 0, 0), "2.0.0"),


/** Unknown version. */ /** Unknown version. */
UNKNOWN(Long.MIN_VALUE, "UNKNOWN"); UNKNOWN(Long.MIN_VALUE, "UNKNOWN");
Expand Down
Expand Up @@ -45,6 +45,9 @@ public class SqlListenerRequest {
/** Command. */ /** Command. */
private final int cmd; private final int cmd;


/** Request ID. */
private long reqId;

/** /**
* @param cmd Command type. * @param cmd Command type.
*/ */
Expand All @@ -58,4 +61,18 @@ public SqlListenerRequest(int cmd) {
public int command() { public int command() {
return cmd; return cmd;
} }

/**
* @return Request ID.
*/
public long requestId() {
return reqId;
}

/**
* @param reqId Request ID.
*/
public void requestId(long reqId) {
this.reqId = reqId;
}
} }
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.odbc;

/**
* SQL listener request handler.
*/
public interface SqlListenerRequestHandler {
/**
* Handle request.
*
* @param req Request.
* @return Response.
*/
public SqlListenerResponse handle(SqlListenerRequest req);
}
Expand Up @@ -22,9 +22,9 @@
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;


/** /**
* ODBC protocol response. * SQL listener response.
*/ */
public class OdbcResponse { public class SqlListenerResponse {
/** Command succeeded. */ /** Command succeeded. */
public static final int STATUS_SUCCESS = 0; public static final int STATUS_SUCCESS = 0;


Expand All @@ -46,7 +46,7 @@ public class OdbcResponse {
* *
* @param obj Response object. * @param obj Response object.
*/ */
public OdbcResponse(Object obj) { public SqlListenerResponse(Object obj) {
this.status = STATUS_SUCCESS; this.status = STATUS_SUCCESS;


this.obj = obj; this.obj = obj;
Expand All @@ -59,7 +59,7 @@ public OdbcResponse(Object obj) {
* @param status Response status. * @param status Response status.
* @param err Error, {@code null} if success is {@code true}. * @param err Error, {@code null} if success is {@code true}.
*/ */
public OdbcResponse(int status, @Nullable String err) { public SqlListenerResponse(int status, @Nullable String err) {
assert status != STATUS_SUCCESS; assert status != STATUS_SUCCESS;


this.status = status; this.status = status;
Expand Down Expand Up @@ -91,6 +91,6 @@ public String error() {


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public String toString() { @Override public String toString() {
return S.toString(OdbcResponse.class, this); return S.toString(SqlListenerResponse.class, this);
} }
} }

0 comments on commit 4e5a82d

Please sign in to comment.