From cf3ea721b6e47eec7ec6284a31bb057c9858e8c6 Mon Sep 17 00:00:00 2001 From: "k.usachev" Date: Thu, 10 Mar 2016 18:08:32 +0300 Subject: [PATCH] AVRO-1808. Added possibility of overriding lock mechanics for Requestor.handshakeLock --- .../java/org/apache/avro/ipc/Requestor.java | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java index 53799451dfe..225a916ed4e 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java @@ -128,7 +128,16 @@ public void request(String messageName, Object request, Callback callback throws Exception { request(new Request(messageName, request, new RPCContext()), callback); } - + + protected void lockHandshake() { + handshakeLock.lock(); + } + + protected void unlockHandshake() { + if (handshakeLock.isHeldByCurrentThread()) + handshakeLock.unlock(); + } + /** Writes a request message and returns the result through a Callback. */ void request(Request request, Callback callback) throws Exception { @@ -136,12 +145,12 @@ void request(Request request, Callback callback) if (!t.isConnected()) { // Acquire handshake lock so that only one thread is performing the // handshake and other threads block until the handshake is completed - handshakeLock.lock(); + lockHandshake(); try { if (t.isConnected()) { // Another thread already completed the handshake; no need to hold // the write lock - handshakeLock.unlock(); + unlockHandshake(); } else { CallFuture callFuture = new CallFuture(callback); t.transceive(request.getBytes(), @@ -161,9 +170,7 @@ void request(Request request, Callback callback) return; } } finally{ - if (handshakeLock.isHeldByCurrentThread()) { - handshakeLock.unlock(); - } + unlockHandshake(); } } @@ -272,7 +279,7 @@ public Protocol getRemote() throws IOException { remote = REMOTE_PROTOCOLS.get(remoteHash); if (remote != null) return remote; // already cached } - handshakeLock.lock(); + lockHandshake(); try { // force handshake ByteBufferOutputStream bbo = new ByteBufferOutputStream(); @@ -289,7 +296,7 @@ public Protocol getRemote() throws IOException { readHandshake(in); return this.remote; } finally { - handshakeLock.unlock(); + unlockHandshake(); } }