From 7033cb452e00fb49b95941fec1f98c96c3e024ec Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 26 May 2015 11:01:27 +0900 Subject: [PATCH 1/6] TAJO-1619 --- .../org/apache/tajo/jdbc/JdbcConnection.java | 10 +++++++++- .../java/org/apache/tajo/jdbc/TajoDriver.java | 19 ++++++++++++++++++- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java index 287954083c..09e3c0be39 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java @@ -52,10 +52,12 @@ public class JdbcConnection implements Connection { @SuppressWarnings("unused") /** it will be used soon. */ private final Map> params; + private TajoDriver driver; - public JdbcConnection(String rawURI, Properties properties) throws SQLException { + public JdbcConnection(String rawURI, Properties properties, TajoDriver driver) throws SQLException { this.rawURI = rawURI; this.properties = properties; + this.driver = driver; try { if (!rawURI.startsWith(TajoDriver.TAJO_JDBC_URL_PREFIX)) { @@ -136,8 +138,14 @@ public void close() throws SQLException { if(tajoClient != null) { tajoClient.close(); } + try { + driver.close(); + } catch (IOException ie) { + throw new SQLException(ie); + } closed.set(true); + } } diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDriver.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDriver.java index 3190f5a2ab..77e597c508 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDriver.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDriver.java @@ -16,13 +16,20 @@ * limitations under the License. */ +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.rpc.RpcChannelFactory; +import org.apache.tajo.util.CommonTestingUtil; + import java.io.Closeable; import java.io.IOException; import java.sql.*; import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; public class TajoDriver implements Driver, Closeable { + private static final Log LOG = LogFactory.getLog(TajoDriver.class); public static final int MAJOR_VERSION = 1; public static final int MINOR_VERSION = 0; @@ -30,6 +37,7 @@ public class TajoDriver implements Driver, Closeable { public static final int JDBC_VERSION_MINOR = 0; public static final String TAJO_JDBC_URL_PREFIX = "jdbc:tajo:"; + private AtomicInteger connections = new AtomicInteger(); static { try { @@ -44,11 +52,20 @@ public TajoDriver() { @Override public void close() throws IOException { + if(connections.decrementAndGet() == 0) { + if (!System.getProperty(CommonTestingUtil.TAJO_TEST_KEY).equals(CommonTestingUtil.TAJO_TEST_TRUE)) { + RpcChannelFactory.shutdownGracefully(); + if (LOG.isDebugEnabled()) { + LOG.debug("Tajo driver is closed"); + } + } + } } @Override public Connection connect(String url, Properties properties) throws SQLException { - return acceptsURL(url) ? new JdbcConnection(url, properties) : null; + connections.incrementAndGet(); + return acceptsURL(url) ? new JdbcConnection(url, properties, this) : null; } @Override From 2dc7ba228e02ae5011f9b15671b1e02be576805a Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 26 May 2015 14:16:47 +0900 Subject: [PATCH 2/6] Fix the same problem with TajoClient --- .../apache/tajo/client/SessionConnection.java | 14 ++++++++++++++ .../org/apache/tajo/jdbc/JdbcConnection.java | 13 ++++--------- .../java/org/apache/tajo/jdbc/TajoDriver.java | 19 +------------------ 3 files changed, 19 insertions(+), 27 deletions(-) diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index 84decd57c8..5e9dc78cb4 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -30,9 +30,11 @@ import org.apache.tajo.ipc.ClientProtos.SessionUpdateResponse; import org.apache.tajo.ipc.TajoMasterClientProtocol; import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.ProtoUtil; @@ -45,6 +47,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.tajo.ipc.ClientProtos.CreateSessionRequest; import static org.apache.tajo.ipc.ClientProtos.CreateSessionResponse; @@ -73,6 +76,8 @@ public class SessionConnection implements Closeable { private KeyValueSet properties; + private AtomicInteger connections = new AtomicInteger(); + /** * Connect to TajoMaster * @@ -112,6 +117,7 @@ public synchronized NettyClientBase getTajoMasterConnection() throws ServiceExce // Client do not closed on idle state for support high available this.client = manager.newClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, false, manager.getRetries(), 0, TimeUnit.SECONDS, false); + connections.incrementAndGet(); } catch (Exception e) { throw new ServiceException(e); } @@ -272,6 +278,14 @@ public void close() { // ignore } finally { RpcClientManager.cleanup(client); + if(connections.decrementAndGet() == 0) { + if (!System.getProperty(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equals(CommonTestingUtil.TAJO_TEST_TRUE)) { + RpcChannelFactory.shutdownGracefully(); + if (LOG.isDebugEnabled()) { + LOG.debug("RPC connection is closed"); + } + } + } } } diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java index 09e3c0be39..1d420b618a 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java @@ -33,7 +33,9 @@ import java.io.IOException; import java.net.URI; import java.sql.*; -import java.util.*; +import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -52,12 +54,10 @@ public class JdbcConnection implements Connection { @SuppressWarnings("unused") /** it will be used soon. */ private final Map> params; - private TajoDriver driver; - public JdbcConnection(String rawURI, Properties properties, TajoDriver driver) throws SQLException { + public JdbcConnection(String rawURI, Properties properties) throws SQLException { this.rawURI = rawURI; this.properties = properties; - this.driver = driver; try { if (!rawURI.startsWith(TajoDriver.TAJO_JDBC_URL_PREFIX)) { @@ -138,11 +138,6 @@ public void close() throws SQLException { if(tajoClient != null) { tajoClient.close(); } - try { - driver.close(); - } catch (IOException ie) { - throw new SQLException(ie); - } closed.set(true); diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDriver.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDriver.java index 77e597c508..3190f5a2ab 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDriver.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDriver.java @@ -16,20 +16,13 @@ * limitations under the License. */ -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.rpc.RpcChannelFactory; -import org.apache.tajo.util.CommonTestingUtil; - import java.io.Closeable; import java.io.IOException; import java.sql.*; import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; public class TajoDriver implements Driver, Closeable { - private static final Log LOG = LogFactory.getLog(TajoDriver.class); public static final int MAJOR_VERSION = 1; public static final int MINOR_VERSION = 0; @@ -37,7 +30,6 @@ public class TajoDriver implements Driver, Closeable { public static final int JDBC_VERSION_MINOR = 0; public static final String TAJO_JDBC_URL_PREFIX = "jdbc:tajo:"; - private AtomicInteger connections = new AtomicInteger(); static { try { @@ -52,20 +44,11 @@ public TajoDriver() { @Override public void close() throws IOException { - if(connections.decrementAndGet() == 0) { - if (!System.getProperty(CommonTestingUtil.TAJO_TEST_KEY).equals(CommonTestingUtil.TAJO_TEST_TRUE)) { - RpcChannelFactory.shutdownGracefully(); - if (LOG.isDebugEnabled()) { - LOG.debug("Tajo driver is closed"); - } - } - } } @Override public Connection connect(String url, Properties properties) throws SQLException { - connections.incrementAndGet(); - return acceptsURL(url) ? new JdbcConnection(url, properties, this) : null; + return acceptsURL(url) ? new JdbcConnection(url, properties) : null; } @Override From f1462de59c917187ea0740e63b057e4ce95a587f Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 26 May 2015 14:29:45 +0900 Subject: [PATCH 3/6] Add a test case --- .../org/apache/tajo/client/SessionConnection.java | 7 ++++--- .../org/apache/tajo/client/TestTajoClient.java | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index 5e9dc78cb4..05bf9a4082 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -281,9 +281,10 @@ public void close() { if(connections.decrementAndGet() == 0) { if (!System.getProperty(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equals(CommonTestingUtil.TAJO_TEST_TRUE)) { RpcChannelFactory.shutdownGracefully(); - if (LOG.isDebugEnabled()) { - LOG.debug("RPC connection is closed"); - } +// if (LOG.isDebugEnabled()) { +// LOG.debug("RPC connection is closed"); +// } + LOG.info("RPC connection is closed"); } } } diff --git a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java index 765a084569..5c02ceaaa4 100644 --- a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java +++ b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java @@ -769,4 +769,18 @@ public int compare(ClientProtos.StageHistoryProto o1, StageHistoryProto o2) { assertEquals(1, taskHistories.get(1).getTotalReadRows()); assertEquals(1, taskHistories.get(1).getTotalWriteRows()); } + + @Test(timeout = 10000) + public void testClientClose() throws Exception { + System.setProperty(CommonTestingUtil.TAJO_TEST_KEY, "FALSE"); + + try { + String sql = "select count(*) from nation"; + ResultSet res = client.executeQueryAndGetResult(sql); + res.close(); + } finally { + client.close(); + System.setProperty(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE); + } + } } From 2de54bac86602a4d4474a46cb2545f9bc44afa8e Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 26 May 2015 14:51:54 +0900 Subject: [PATCH 4/6] Remove the test code --- .../org/apache/tajo/client/TestTajoClient.java | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java index 5c02ceaaa4..765a084569 100644 --- a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java +++ b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java @@ -769,18 +769,4 @@ public int compare(ClientProtos.StageHistoryProto o1, StageHistoryProto o2) { assertEquals(1, taskHistories.get(1).getTotalReadRows()); assertEquals(1, taskHistories.get(1).getTotalWriteRows()); } - - @Test(timeout = 10000) - public void testClientClose() throws Exception { - System.setProperty(CommonTestingUtil.TAJO_TEST_KEY, "FALSE"); - - try { - String sql = "select count(*) from nation"; - ResultSet res = client.executeQueryAndGetResult(sql); - res.close(); - } finally { - client.close(); - System.setProperty(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE); - } - } } From 697438643ac333d42d54d9c883afee0dce024f1d Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 26 May 2015 14:52:50 +0900 Subject: [PATCH 5/6] Cleanup codes --- .../java/org/apache/tajo/client/SessionConnection.java | 7 +++---- .../src/main/java/org/apache/tajo/jdbc/JdbcConnection.java | 1 - 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index 05bf9a4082..5e9dc78cb4 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -281,10 +281,9 @@ public void close() { if(connections.decrementAndGet() == 0) { if (!System.getProperty(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equals(CommonTestingUtil.TAJO_TEST_TRUE)) { RpcChannelFactory.shutdownGracefully(); -// if (LOG.isDebugEnabled()) { -// LOG.debug("RPC connection is closed"); -// } - LOG.info("RPC connection is closed"); + if (LOG.isDebugEnabled()) { + LOG.debug("RPC connection is closed"); + } } } } diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java index 1d420b618a..959860dd4a 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java @@ -140,7 +140,6 @@ public void close() throws SQLException { } closed.set(true); - } } From be8c776d3f11946963aaf61c51d4e47a63d5fefc Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 26 May 2015 15:18:30 +0900 Subject: [PATCH 6/6] Make connections static --- .../main/java/org/apache/tajo/client/SessionConnection.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index 5e9dc78cb4..ee2d45af53 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -57,6 +57,8 @@ public class SessionConnection implements Closeable { private final static Log LOG = LogFactory.getLog(SessionConnection.class); + private final static AtomicInteger connections = new AtomicInteger(); + final RpcClientManager manager; private String baseDatabase; @@ -76,8 +78,6 @@ public class SessionConnection implements Closeable { private KeyValueSet properties; - private AtomicInteger connections = new AtomicInteger(); - /** * Connect to TajoMaster *