From 4b96f1c45c4911ace23eec878e785195b341dbab Mon Sep 17 00:00:00 2001 From: HyoungJun Kim Date: Fri, 5 Sep 2014 20:35:07 +0900 Subject: [PATCH] TAJO-1028: JDBC should support SET command. --- .../org/apache/tajo/jdbc/TajoResultSet.java | 2 +- .../org/apache/tajo/jdbc/TestTajoJdbc.java | 95 ++++++++++++++++++- .../org/apache/tajo/jdbc/TajoConnection.java | 8 +- .../tajo/jdbc/TajoPreparedStatement.java | 16 +++- .../org/apache/tajo/jdbc/TajoStatement.java | 76 ++++++++++++++- 5 files changed, 182 insertions(+), 15 deletions(-) diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java index 8595970a7e..65954f1fc8 100644 --- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java +++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java @@ -145,7 +145,7 @@ public synchronized void close() throws SQLException { } try { - if(tajoClient != null && !queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { + if(tajoClient != null && queryId != null && !queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { this.tajoClient.closeQuery(queryId); } } catch (Exception e) { diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java index f922d87991..e477939b98 100644 --- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java +++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java @@ -25,6 +25,7 @@ import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.client.TajoClient; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -54,8 +55,8 @@ public static void setUp() throws Exception { public static void tearDown() throws Exception { } - public static String buildConnectionUri(String hostName, int port, String databaseNme) { - return "jdbc:tajo://" + hostName + ":" + port + "/" + databaseNme; + public static String buildConnectionUri(String hostName, int port, String databaseName) { + return "jdbc:tajo://" + hostName + ":" + port + "/" + databaseName; } @Test @@ -396,4 +397,94 @@ public void testMultipleConnectionsSequentialClose() throws Exception { } } } + + @Test + public void testSetStatement() throws Exception { + assertTrue(TajoStatement.isSetVariableQuery("Set JOIN_TASK_INPUT_SIZE 123")); + assertTrue(TajoStatement.isSetVariableQuery("SET JOIN_TASK_INPUT_SIZE 123")); + assertFalse(TajoStatement.isSetVariableQuery("--SET JOIN_TASK_INPUT_SIZE 123")); + + String connUri = buildConnectionUri(tajoMasterAddress.getHostName(), tajoMasterAddress.getPort(), + DEFAULT_DATABASE_NAME); + + Connection conn = DriverManager.getConnection(connUri); + + Statement stmt = null; + ResultSet res = null; + try { + stmt = conn.createStatement(); + res = stmt.executeQuery("Set JOIN_TASK_INPUT_SIZE 123"); + assertFalse(res.next()); + ResultSetMetaData rsmd = res.getMetaData(); + assertNotNull(rsmd); + assertEquals(0, rsmd.getColumnCount()); + + TajoClient connTajoClient = ((TajoConnection)stmt.getConnection()).getTajoClient(); + Map variables = connTajoClient.getAllSessionVariables(); + String value = variables.get("JOIN_TASK_INPUT_SIZE"); + assertNotNull(value); + assertEquals("123", value); + + res.close(); + + res = stmt.executeQuery("unset JOIN_TASK_INPUT_SIZE"); + variables = connTajoClient.getAllSessionVariables(); + value = variables.get("JOIN_TASK_INPUT_SIZE"); + assertNull(value); + } finally { + if (res != null) { + res.close(); + } + if (stmt != null) { + stmt.close(); + } + if (conn != null) { + conn.close(); + } + } + } + + @Test + public void testSetPreparedStatement() throws Exception { + String connUri = buildConnectionUri(tajoMasterAddress.getHostName(), tajoMasterAddress.getPort(), + DEFAULT_DATABASE_NAME); + + Connection conn = DriverManager.getConnection(connUri); + + PreparedStatement stmt = null; + ResultSet res = null; + try { + stmt = conn.prepareStatement("Set JOIN_TASK_INPUT_SIZE 123"); + res = stmt.executeQuery(); + assertFalse(res.next()); + ResultSetMetaData rsmd = res.getMetaData(); + assertNotNull(rsmd); + assertEquals(0, rsmd.getColumnCount()); + + TajoClient connTajoClient = ((TajoConnection)stmt.getConnection()).getTajoClient(); + Map variables = connTajoClient.getAllSessionVariables(); + String value = variables.get("JOIN_TASK_INPUT_SIZE"); + assertNotNull(value); + assertEquals("123", value); + + res.close(); + stmt.close(); + + stmt = conn.prepareStatement("unset JOIN_TASK_INPUT_SIZE"); + res = stmt.executeQuery(); + variables = connTajoClient.getAllSessionVariables(); + value = variables.get("JOIN_TASK_INPUT_SIZE"); + assertNull(value); + } finally { + if (res != null) { + res.close(); + } + if (stmt != null) { + stmt.close(); + } + if (conn != null) { + conn.close(); + } + } + } } \ No newline at end of file diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoConnection.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoConnection.java index f5b5b68622..1a2c6c54fa 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoConnection.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoConnection.java @@ -170,7 +170,7 @@ public Statement createStatement() throws SQLException { if (isClosed()) { throw new SQLException("Can't create Statement, connection is closed"); } - return new TajoStatement(tajoClient); + return new TajoStatement(this, tajoClient); } @Override @@ -295,13 +295,13 @@ public CallableStatement prepareCall(String sql, int resultSetType, @Override public PreparedStatement prepareStatement(String sql) throws SQLException { - return new TajoPreparedStatement(tajoClient, sql); + return new TajoPreparedStatement(this, tajoClient, sql); } @Override public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { - return new TajoPreparedStatement(tajoClient, sql); + return new TajoPreparedStatement(this, tajoClient, sql); } @Override @@ -319,7 +319,7 @@ public PreparedStatement prepareStatement(String sql, String[] columnNames) @Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { - return new TajoPreparedStatement(tajoClient, sql); + return new TajoPreparedStatement(this, tajoClient, sql); } @Override diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java index bc7d0b036f..d4c89c0f2d 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java @@ -31,6 +31,7 @@ * */ public class TajoPreparedStatement implements PreparedStatement { + private TajoConnection conn; private final String sql; private TajoClient tajoClient; /** @@ -65,8 +66,10 @@ public class TajoPreparedStatement implements PreparedStatement { /** * */ - public TajoPreparedStatement(TajoClient tajoClient, + public TajoPreparedStatement(TajoConnection conn, + TajoClient tajoClient, String sql) { + this.conn = conn; this.tajoClient = tajoClient; this.sql = sql; } @@ -107,11 +110,16 @@ protected ResultSet executeImmediate(String sql) throws SQLException { if (sql.contains("?")) { sql = updateSql(sql, parameters); } - resultSet = tajoClient.executeQueryAndGetResult(sql); + if (TajoStatement.isSetVariableQuery(sql)) { + return TajoStatement.setSessionVariable(tajoClient, sql); + } else if (TajoStatement.isUnSetVariableQuery(sql)) { + return TajoStatement.unSetSessionVariable(tajoClient, sql); + } else { + return tajoClient.executeQueryAndGetResult(sql); + } } catch (Exception e) { throw new SQLException(e.getMessage(), e); } - return resultSet; } /** @@ -517,7 +525,7 @@ public int executeUpdate(String sql, String[] columnNames) throws SQLException { @Override public Connection getConnection() throws SQLException { - throw new SQLFeatureNotSupportedException("getConnection not supported"); + return conn; } @Override diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java index 776c892828..69aa68e041 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java @@ -17,11 +17,17 @@ */ package org.apache.tajo.jdbc; +import com.google.common.collect.Lists; +import com.google.protobuf.ServiceException; import org.apache.tajo.client.TajoClient; +import java.io.IOException; import java.sql.*; +import java.util.HashMap; +import java.util.Map; public class TajoStatement implements Statement { + private TajoConnection conn; private TajoClient tajoClient; private int fetchSize = 200; @@ -44,7 +50,8 @@ public class TajoStatement implements Statement { */ private boolean isClosed = false; - public TajoStatement(TajoClient tajoClient) { + public TajoStatement(TajoConnection conn, TajoClient tajoClient) { + this.conn = conn; this.tajoClient = tajoClient; } @@ -116,13 +123,74 @@ public ResultSet executeQuery(String sql) throws SQLException { } try { - resultSet = tajoClient.executeQueryAndGetResult(sql); - return resultSet; + if (isSetVariableQuery(sql)) { + return setSessionVariable(tajoClient, sql); + } else if (isUnSetVariableQuery(sql)) { + return unSetSessionVariable(tajoClient, sql); + } else { + return tajoClient.executeQueryAndGetResult(sql); + } } catch (Exception e) { throw new SQLFeatureNotSupportedException(e.getMessage(), e); } } + public static boolean isSetVariableQuery(String sql) { + if (sql == null || sql.trim().isEmpty()) { + return false; + } + + return sql.trim().toLowerCase().startsWith("set"); + } + + public static boolean isUnSetVariableQuery(String sql) { + if (sql == null || sql.trim().isEmpty()) { + return false; + } + + return sql.trim().toLowerCase().startsWith("unset"); + } + + public static ResultSet setSessionVariable(TajoClient client, String sql) throws SQLException { + int index = sql.toLowerCase().indexOf("set"); + if (index < 0) { + throw new SQLException("SET statement should be started 'SET' keyword: " + sql); + } + + String[] tokens = sql.substring(index + 3).trim().split(" "); + if (tokens.length != 2) { + throw new SQLException("SET statement should be : " + sql); + } + Map variable = new HashMap(); + variable.put(tokens[0].trim(), tokens[1].trim()); + try { + client.updateSessionVariables(variable); + } catch (ServiceException e) { + throw new SQLException(e.getMessage(), e); + } + + return new TajoResultSet(client, null); + } + + public static ResultSet unSetSessionVariable(TajoClient client, String sql) throws SQLException { + int index = sql.toLowerCase().indexOf("unset"); + if (index < 0) { + throw new SQLException("UNSET statement should be started 'UNSET' keyword: " + sql); + } + + String key = sql.substring(index + 5).trim(); + if (key.isEmpty()) { + throw new SQLException("UNSET statement should be : " + sql); + } + try { + client.unsetSessionVariables(Lists.newArrayList(key)); + } catch (ServiceException e) { + throw new SQLException(e.getMessage(), e); + } + + return new TajoResultSet(client, null); + } + @Override public int executeUpdate(String sql) throws SQLException { try { @@ -151,7 +219,7 @@ public int executeUpdate(String sql, String[] columnNames) throws SQLException { @Override public Connection getConnection() throws SQLException { - throw new SQLFeatureNotSupportedException("getConnection not supported"); + return conn; } @Override