From 207189c4c571515f81b520e554e80ef4283b4645 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9F=B3=E6=BA=90?= Date: Sat, 11 May 2019 09:40:01 +0800 Subject: [PATCH] es 6.7.1 support --- .travis.yml | 2 +- pom.xml | 18 +- .../pool/ElasticSearchDruidDataSource.java | 1440 +++++++++++++---- .../ElasticSearchDruidPooledConnection.java | 6 +- ...ticSearchDruidPooledPreparedStatement.java | 6 +- .../org/nlpcn/es4sql/parse/ElasticLexer.java | 110 +- .../parse/ElasticSqlDeleteStatement.java | 21 - .../es4sql/parse/ElasticSqlExprParser.java | 1006 +++++++----- .../es4sql/parse/ElasticSqlSelectParser.java | 391 +++-- .../parse/ElasticSqlStatementParser.java | 93 -- .../es4sql/parse/SQLParensIdentifierExpr.java | 28 +- .../org/nlpcn/es4sql/parse/SqlParser.java | 59 +- .../nlpcn/es4sql/query/ESActionFactory.java | 10 +- 13 files changed, 2165 insertions(+), 1025 deletions(-) delete mode 100644 src/main/java/org/nlpcn/es4sql/parse/ElasticSqlDeleteStatement.java delete mode 100644 src/main/java/org/nlpcn/es4sql/parse/ElasticSqlStatementParser.java diff --git a/.travis.yml b/.travis.yml index d791acb0..158702ce 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,7 @@ before_install: # update to java 8 - sudo update-java-alternatives -s java-8-oracle - export JAVA_HOME=/usr/lib/jvm/java-8-oracle - - wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.7.0.deb && sudo dpkg -i --force-confnew elasticsearch-6.7.0.deb + - wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.7.1.deb && sudo dpkg -i --force-confnew elasticsearch-6.7.1.deb - sudo cp ./src/test/resources/elasticsearch.yml /etc/elasticsearch/elasticsearch.yml - sudo cat /etc/elasticsearch/elasticsearch.yml - sudo java -version diff --git a/pom.xml b/pom.xml index 153a671e..bdba93c4 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 org.nlpcn elasticsearch-sql - 6.7.0.1 + 6.7.1.0 jar Query elasticsearch using SQL elasticsearch-sql @@ -44,8 +44,10 @@ UTF-8 **/MainTestSuite.class sql - 6.7.0 + 6.7.1 org.elasticsearch.plugin.nlpcn.SqlPlug + 1.1.16 + 15.0 @@ -79,7 +81,7 @@ com.alibaba druid - 1.0.15 + ${druid.version} @@ -123,16 +125,10 @@ provided - - javax.servlet - servlet-api - 2.5 - provided - com.google.guava guava - 15.0 + ${guava.version} @@ -223,6 +219,7 @@ com.alibaba druid + ${druid.version} false ${project.build.directory} druid.jar @@ -230,6 +227,7 @@ com.google.guava guava + ${guava.version} false ${project.build.directory} guava.jar diff --git a/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidDataSource.java b/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidDataSource.java index 425f2401..771adedc 100644 --- a/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidDataSource.java +++ b/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidDataSource.java @@ -5,7 +5,10 @@ import com.alibaba.druid.VERSION; import com.alibaba.druid.filter.AutoLoad; import com.alibaba.druid.filter.Filter; -import com.alibaba.druid.pool.vendor.*; +import com.alibaba.druid.mock.MockDriver; +import com.alibaba.druid.pool.vendor.MySqlExceptionSorter; +import com.alibaba.druid.pool.vendor.MySqlValidConnectionChecker; +import com.alibaba.druid.pool.vendor.NullExceptionSorter; import com.alibaba.druid.proxy.DruidDriver; import com.alibaba.druid.proxy.jdbc.DataSourceProxyConfig; import com.alibaba.druid.proxy.jdbc.TransactionInfo; @@ -21,7 +24,11 @@ import com.alibaba.druid.stat.JdbcSqlStatValue; import com.alibaba.druid.support.logging.Log; import com.alibaba.druid.support.logging.LogFactory; -import com.alibaba.druid.util.*; +import com.alibaba.druid.util.JMXUtils; +import com.alibaba.druid.util.JdbcConstants; +import com.alibaba.druid.util.JdbcUtils; +import com.alibaba.druid.util.StringUtils; +import com.alibaba.druid.util.Utils; import com.alibaba.druid.wall.WallFilter; import com.alibaba.druid.wall.WallProviderStatValue; import org.elasticsearch.client.Client; @@ -29,29 +36,41 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.xpack.client.PreBuiltXPackTransportClient; - import javax.management.JMException; import javax.management.MBeanServer; import javax.management.ObjectName; import javax.naming.NamingException; import javax.naming.Reference; import javax.naming.StringRefAddr; - import javax.sql.ConnectionEvent; import javax.sql.ConnectionEventListener; import javax.sql.PooledConnection; - import java.net.InetAddress; import java.net.UnknownHostException; import java.security.AccessController; import java.security.PrivilegedAction; import java.sql.Connection; import java.sql.SQLException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.ServiceLoader; +import java.util.StringTokenizer; +import java.util.TimeZone; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -62,37 +81,39 @@ */ public class ElasticSearchDruidDataSource extends DruidDataSource { - private final static Log LOG = LogFactory.getLog(DruidDataSource.class); - + private final static Log LOG = LogFactory.getLog(ElasticSearchDruidDataSource.class); private static final long serialVersionUID = 1L; - // stats - private final AtomicLong recycleErrorCount = new AtomicLong(); + private volatile long recycleErrorCount = 0L; private long connectCount = 0L; private long closeCount = 0L; - private final AtomicLong connectErrorCount = new AtomicLong(); + private volatile long connectErrorCount = 0L; private long recycleCount = 0L; private long removeAbandonedCount = 0L; private long notEmptyWaitCount = 0L; private long notEmptySignalCount = 0L; private long notEmptyWaitNanos = 0L; - + private int keepAliveCheckCount = 0; private int activePeak = 0; private long activePeakTime = 0; private int poolingPeak = 0; private long poolingPeakTime = 0; - // store private volatile DruidConnectionHolder[] connections; private int poolingCount = 0; private int activeCount = 0; - private long discardCount = 0; + private volatile long discardCount = 0; private int notEmptyWaitThreadCount = 0; private int notEmptyWaitThreadPeak = 0; + // + private DruidConnectionHolder[] evictConnections; + private DruidConnectionHolder[] keepAliveConnections; // threads - private ScheduledFuture destroySchedulerFuture; - private DestroyTask destoryTask; + private volatile ScheduledFuture destroySchedulerFuture; + private DestroyTask destroyTask; + + private volatile Future createSchedulerFuture; private CreateConnectionThread createConnectionThread; private DestroyConnectionThread destroyConnectionThread; @@ -104,22 +125,33 @@ public class ElasticSearchDruidDataSource extends DruidDataSource { private volatile boolean enable = true; private boolean resetStatEnable = true; - private final AtomicLong resetCount = new AtomicLong(); + private volatile long resetCount = 0L; private String initStackTrace; + private volatile boolean closing = false; private volatile boolean closed = false; private long closeTimeMillis = -1L; protected JdbcDataSourceStat dataSourceStat; private boolean useGlobalDataSourceStat = false; - private boolean mbeanRegistered = false; - public static ThreadLocal waitNanosLocal = new ThreadLocal(); - private boolean logDifferentThread = true; + private volatile boolean keepAlive = false; + private boolean asyncInit = false; + protected boolean killWhenSocketReadTimeout = false; + + private static List autoFilters = null; + private boolean loadSpifilterSkip = false; + + protected static final AtomicLongFieldUpdater recycleErrorCountUpdater + = AtomicLongFieldUpdater.newUpdater(ElasticSearchDruidDataSource.class, "recycleErrorCount"); + protected static final AtomicLongFieldUpdater connectErrorCountUpdater + = AtomicLongFieldUpdater.newUpdater(ElasticSearchDruidDataSource.class, "connectErrorCount"); + protected static final AtomicLongFieldUpdater resetCountUpdater + = AtomicLongFieldUpdater.newUpdater(ElasticSearchDruidDataSource.class, "resetCount"); // elasticsearch client private volatile Client client; @@ -134,17 +166,49 @@ public ElasticSearchDruidDataSource(boolean fairLock) { configFromPropety(System.getProperties()); } + public boolean isAsyncInit() { + return asyncInit; + } + + public void setAsyncInit(boolean asyncInit) { + this.asyncInit = asyncInit; + } + public void configFromPropety(Properties properties) { + { + String property = properties.getProperty("druid.name"); + if (property != null) { + this.setName(property); + } + } + { + String property = properties.getProperty("druid.url"); + if (property != null) { + this.setUrl(property); + } + } + { + String property = properties.getProperty("druid.username"); + if (property != null) { + this.setUsername(property); + } + } + { + String property = properties.getProperty("druid.password"); + if (property != null) { + this.setPassword(property); + } + } { Boolean value = getBoolean(properties, "druid.testWhileIdle"); if (value != null) { - this.setTestWhileIdle(value); + this.testWhileIdle = value; } } { Boolean value = getBoolean(properties, "druid.testOnBorrow"); if (value != null) { - this.setTestOnBorrow(value); + this.testOnBorrow = value; } } { @@ -165,6 +229,12 @@ public void configFromPropety(Properties properties) { this.setUseGlobalDataSourceStat(value); } } + { + Boolean value = getBoolean(properties, "druid.asyncInit"); // compatible for early versions + if (value != null) { + this.setAsyncInit(value); + } + } { String property = properties.getProperty("druid.filters"); @@ -223,6 +293,217 @@ public void configFromPropety(Properties properties) { } } } + { + String property = properties.getProperty("druid.timeBetweenEvictionRunsMillis"); + if (property != null && property.length() > 0) { + try { + long value = Long.parseLong(property); + this.setTimeBetweenEvictionRunsMillis(value); + } catch (NumberFormatException e) { + LOG.error("illegal property 'druid.timeBetweenEvictionRunsMillis'", e); + } + } + } + { + String property = properties.getProperty("druid.maxWaitThreadCount"); + if (property != null && property.length() > 0) { + try { + int value = Integer.parseInt(property); + this.setMaxWaitThreadCount(value); + } catch (NumberFormatException e) { + LOG.error("illegal property 'druid.maxWaitThreadCount'", e); + } + } + } + { + String property = properties.getProperty("druid.maxWait"); + if (property != null && property.length() > 0) { + try { + int value = Integer.parseInt(property); + this.setMaxWait(value); + } catch (NumberFormatException e) { + LOG.error("illegal property 'druid.maxWait'", e); + } + } + } + { + Boolean value = getBoolean(properties, "druid.failFast"); + if (value != null) { + this.setFailFast(value); + } + } + { + String property = properties.getProperty("druid.phyTimeoutMillis"); + if (property != null && property.length() > 0) { + try { + long value = Long.parseLong(property); + this.setPhyTimeoutMillis(value); + } catch (NumberFormatException e) { + LOG.error("illegal property 'druid.phyTimeoutMillis'", e); + } + } + } + { + String property = properties.getProperty("druid.phyMaxUseCount"); + if (property != null && property.length() > 0) { + try { + long value = Long.parseLong(property); + this.setPhyMaxUseCount(value); + } catch (NumberFormatException e) { + LOG.error("illegal property 'druid.phyMaxUseCount'", e); + } + } + } + { + String property = properties.getProperty("druid.minEvictableIdleTimeMillis"); + if (property != null && property.length() > 0) { + try { + long value = Long.parseLong(property); + this.setMinEvictableIdleTimeMillis(value); + } catch (NumberFormatException e) { + LOG.error("illegal property 'druid.minEvictableIdleTimeMillis'", e); + } + } + } + { + String property = properties.getProperty("druid.maxEvictableIdleTimeMillis"); + if (property != null && property.length() > 0) { + try { + long value = Long.parseLong(property); + this.setMaxEvictableIdleTimeMillis(value); + } catch (NumberFormatException e) { + LOG.error("illegal property 'druid.maxEvictableIdleTimeMillis'", e); + } + } + } + { + Boolean value = getBoolean(properties, "druid.keepAlive"); + if (value != null) { + this.setKeepAlive(value); + } + } + { + String property = properties.getProperty("druid.keepAliveBetweenTimeMillis"); + if (property != null && property.length() > 0) { + try { + long value = Long.parseLong(property); + this.setKeepAliveBetweenTimeMillis(value); + } catch (NumberFormatException e) { + LOG.error("illegal property 'druid.keepAliveBetweenTimeMillis'", e); + } + } + } + { + Boolean value = getBoolean(properties, "druid.poolPreparedStatements"); + if (value != null) { + this.setPoolPreparedStatements0(value); + } + } + { + Boolean value = getBoolean(properties, "druid.initVariants"); + if (value != null) { + this.setInitVariants(value); + } + } + { + Boolean value = getBoolean(properties, "druid.initGlobalVariants"); + if (value != null) { + this.setInitGlobalVariants(value); + } + } + { + Boolean value = getBoolean(properties, "druid.useUnfairLock"); + if (value != null) { + this.setUseUnfairLock(value); + } + } + { + String property = properties.getProperty("druid.driverClassName"); + if (property != null) { + this.setDriverClassName(property); + } + } + { + String property = properties.getProperty("druid.initialSize"); + if (property != null && property.length() > 0) { + try { + int value = Integer.parseInt(property); + this.setInitialSize(value); + } catch (NumberFormatException e) { + LOG.error("illegal property 'druid.initialSize'", e); + } + } + } + { + String property = properties.getProperty("druid.minIdle"); + if (property != null && property.length() > 0) { + try { + int value = Integer.parseInt(property); + this.setMinIdle(value); + } catch (NumberFormatException e) { + LOG.error("illegal property 'druid.minIdle'", e); + } + } + } + { + String property = properties.getProperty("druid.maxActive"); + if (property != null && property.length() > 0) { + try { + int value = Integer.parseInt(property); + this.setMaxActive(value); + } catch (NumberFormatException e) { + LOG.error("illegal property 'druid.maxActive'", e); + } + } + } + { + Boolean value = getBoolean(properties, "druid.killWhenSocketReadTimeout"); + if (value != null) { + setKillWhenSocketReadTimeout(value); + } + } + { + String property = properties.getProperty("druid.connectProperties"); + if (property != null) { + this.setConnectionProperties(property); + } + } + { + String property = properties.getProperty("druid.maxPoolPreparedStatementPerConnectionSize"); + if (property != null && property.length() > 0) { + try { + int value = Integer.parseInt(property); + this.setMaxPoolPreparedStatementPerConnectionSize(value); + } catch (NumberFormatException e) { + LOG.error("illegal property 'druid.maxPoolPreparedStatementPerConnectionSize'", e); + } + } + } + { + String property = properties.getProperty("druid.initConnectionSqls"); + if (property != null && property.length() > 0) { + try { + StringTokenizer tokenizer = new StringTokenizer(property, ";"); + setConnectionInitSqls(Collections.list(tokenizer)); + } catch (NumberFormatException e) { + LOG.error("illegal property 'druid.initConnectionSqls'", e); + } + } + } + { + String property = System.getProperty("druid.load.spifilter.skip"); + if (property != null && !"false".equals(property)) { + loadSpifilterSkip = true; + } + } + } + + public boolean isKillWhenSocketReadTimeout() { + return killWhenSocketReadTimeout; + } + + public void setKillWhenSocketReadTimeout(boolean killWhenSocketTimeOut) { + this.killWhenSocketReadTimeout = killWhenSocketTimeOut; } public boolean isUseGlobalDataSourceStat() { @@ -233,6 +514,14 @@ public void setUseGlobalDataSourceStat(boolean useGlobalDataSourceStat) { this.useGlobalDataSourceStat = useGlobalDataSourceStat; } + public boolean isKeepAlive() { + return keepAlive; + } + + public void setKeepAlive(boolean keepAlive) { + this.keepAlive = keepAlive; + } + public String getInitStackTrace() { return initStackTrace; } @@ -283,14 +572,15 @@ public void resetStat() { closeCount = 0; discardCount = 0; recycleCount = 0; - createCount.set(0); - destroyCount.set(0); + createCount = 0L; + directCreateCount = 0; + destroyCount = 0L; removeAbandonedCount = 0; notEmptyWaitCount = 0; notEmptySignalCount = 0L; notEmptyWaitNanos = 0; - activePeak = 0; + activePeak = activeCount; activePeakTime = 0; poolingPeak = 0; createTimespan = 0; @@ -302,23 +592,23 @@ public void resetStat() { lock.unlock(); } - connectErrorCount.set(0); - errorCount.set(0); - commitCount.set(0); - rollbackCount.set(0); - startTransactionCount.set(0); - cachedPreparedStatementHitCount.set(0); - closedPreparedStatementCount.set(0); - preparedStatementCount.set(0); + connectErrorCountUpdater.set(this, 0); + errorCountUpdater.set(this, 0); + commitCountUpdater.set(this, 0); + rollbackCountUpdater.set(this, 0); + startTransactionCountUpdater.set(this, 0); + cachedPreparedStatementHitCountUpdater.set(this, 0); + closedPreparedStatementCountUpdater.set(this, 0); + preparedStatementCountUpdater.set(this, 0); transactionHistogram.reset(); - cachedPreparedStatementDeleteCount.set(0); - recycleErrorCount.set(0); + cachedPreparedStatementDeleteCountUpdater.set(this, 0); + recycleErrorCountUpdater.set(this, 0); - resetCount.incrementAndGet(); + resetCountUpdater.incrementAndGet(this); } public long getResetCount() { - return this.resetCount.get(); + return this.resetCount; } public boolean isEnable() { @@ -339,6 +629,10 @@ public void setEnable(boolean enable) { } public void setPoolPreparedStatements(boolean value) { + setPoolPreparedStatements0(value); + } + + private void setPoolPreparedStatements0(boolean value) { if (this.poolPreparedStatements == value) { return; } @@ -400,8 +694,12 @@ public void setMaxActive(int maxActive) { if (maxActive > allCount) { this.connections = Arrays.copyOf(this.connections, maxActive); + evictConnections = new DruidConnectionHolder[maxActive]; + keepAliveConnections = new DruidConnectionHolder[maxActive]; } else { this.connections = Arrays.copyOf(this.connections, allCount); + evictConnections = new DruidConnectionHolder[allCount]; + keepAliveConnections = new DruidConnectionHolder[allCount]; } this.maxActive = maxActive; @@ -473,6 +771,9 @@ public void init() throws SQLException { return; } + // bug fixed for dead lock, for issue #2980 + DruidDriver.getInstance(); + final ReentrantLock lock = this.lock; try { lock.lockInterruptibly(); @@ -486,17 +787,15 @@ public void init() throws SQLException { return; } - init = true; - initStackTrace = Utils.toString(Thread.currentThread().getStackTrace()); this.id = DruidDriver.createDataSourceId(); if (this.id > 1) { long delta = (this.id - 1) * 100000; - this.connectionIdSeed.addAndGet(delta); - this.statementIdSeed.addAndGet(delta); - this.resultSetIdSeed.addAndGet(delta); - this.transactionIdSeed.addAndGet(delta); + this.connectionIdSeedUpdater.addAndGet(this, delta); + this.statementIdSeedUpdater.addAndGet(this, delta); + this.resultSetIdSeedUpdater.addAndGet(this, delta); + this.transactionIdSeedUpdater.addAndGet(this, delta); } if (this.jdbcUrl != null) { @@ -504,16 +803,17 @@ public void init() throws SQLException { initFromWrapDriverUrl(); } - if (this.dbType == null || this.dbType.length() == 0) { - this.dbType = JdbcUtils.getDbType(jdbcUrl, null); - } - for (Filter filter : filters) { filter.init(this); } - if (JdbcConstants.MYSQL.equals(this.dbType) || // - JdbcConstants.MARIADB.equals(this.dbType)) { + if (this.dbType == null || this.dbType.length() == 0) { + this.dbType = JdbcUtils.getDbType(jdbcUrl, null); + } + + if (JdbcConstants.MYSQL.equals(this.dbType) + || JdbcConstants.MARIADB.equals(this.dbType) + || JdbcConstants.ALIYUN_ADS.equals(this.dbType)) { boolean cacheServerConfigurationSet = false; if (this.connectProperties.containsKey("cacheServerConfiguration")) { cacheServerConfigurationSet = true; @@ -534,14 +834,17 @@ public void init() throws SQLException { } if (getInitialSize() > maxActive) { - throw new IllegalArgumentException("illegal initialSize " + this.initialSize + ", maxActieve " - + maxActive); + throw new IllegalArgumentException("illegal initialSize " + this.initialSize + ", maxActive " + maxActive); } if (timeBetweenLogStatsMillis > 0 && useGlobalDataSourceStat) { throw new IllegalArgumentException("timeBetweenLogStatsMillis not support useGlobalDataSourceStat=true"); } + if (maxEvictableIdleTimeMillis < minEvictableIdleTimeMillis) { + throw new SQLException("maxEvictableIdleTimeMillis must be grater than minEvictableIdleTimeMillis"); + } + if (this.driverClass != null) { this.driverClass = driverClass.trim(); } @@ -562,7 +865,7 @@ public void init() throws SQLException { JdbcDataSourceStat.setGlobal(dataSourceStat); } if (dataSourceStat.getDbType() == null) { - dataSourceStat.setDbType(this.getDbType()); + dataSourceStat.setDbType(this.dbType); } } else { dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbType, this.connectProperties); @@ -570,25 +873,39 @@ public void init() throws SQLException { dataSourceStat.setResetStatEnable(this.resetStatEnable); connections = new DruidConnectionHolder[maxActive]; + evictConnections = new DruidConnectionHolder[maxActive]; + keepAliveConnections = new DruidConnectionHolder[maxActive]; SQLException connectError = null; - try { + if (createScheduler != null && asyncInit) { + for (int i = 0; i < initialSize; ++i) { + createTaskCount++; + CreateConnectionTask task = new CreateConnectionTask(true); + this.createSchedulerFuture = createScheduler.submit(task); + } + } else if (!asyncInit) { // init connections - for (int i = 0, size = getInitialSize(); i < size; ++i) { - Connection conn = createPhysicalConnection(); - DruidConnectionHolder holder = new DruidConnectionHolder(this, conn); - connections[poolingCount] = holder; - incrementPoolingCount(); + while (poolingCount < initialSize) { + try { + PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection(); + DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo); + connections[poolingCount++] = holder; + } catch (SQLException ex) { + LOG.error("init datasource error, url: " + this.getUrl(), ex); + if (initExceptionThrow) { + connectError = ex; + break; + } else { + Thread.sleep(3000); + } + } } if (poolingCount > 0) { poolingPeak = poolingCount; poolingPeakTime = System.currentTimeMillis(); } - } catch (SQLException ex) { - LOG.error("init datasource error, url: " + this.getUrl(), ex); - connectError = ex; } createAndLogThread(); @@ -596,6 +913,7 @@ public void init() throws SQLException { createAndStartDestroyThread(); initedLatch.await(); + init = true; initedTime = new Date(); registerMbean(); @@ -603,63 +921,106 @@ public void init() throws SQLException { if (connectError != null && poolingCount == 0) { throw connectError; } + + if (keepAlive) { + // async fill to minIdle + if (createScheduler != null) { + for (int i = 0; i < minIdle; ++i) { + createTaskCount++; + CreateConnectionTask task = new CreateConnectionTask(true); + this.createSchedulerFuture = createScheduler.submit(task); + } + } else { + this.emptySignal(); + } + } + } catch (SQLException e) { - LOG.error("dataSource init error", e); + LOG.error("{dataSource-" + this.getID() + "} init error", e); throw e; } catch (InterruptedException e) { throw new SQLException(e.getMessage(), e); + } catch (RuntimeException e){ + LOG.error("{dataSource-" + this.getID() + "} init error", e); + throw e; + } catch (Error e){ + LOG.error("{dataSource-" + this.getID() + "} init error", e); + throw e; + } finally { inited = true; lock.unlock(); if (init && LOG.isInfoEnabled()) { - LOG.info("{dataSource-" + this.getID() + "} inited"); + String msg = "{dataSource-" + this.getID(); + + if (this.name != null && !this.name.isEmpty()) { + msg += ","; + msg += this.name; + } + + msg += "} inited"; + + LOG.info(msg); } } } @Override - public Connection createPhysicalConnection() throws SQLException { + public PhysicalConnectionInfo createPhysicalConnection() throws SQLException { String url = this.getUrl(); Properties connectProperties = getConnectProperties(); + Connection conn = null; - Connection conn; + long connectStartNanos = System.nanoTime(); + long connectedNanos, initedNanos, validatedNanos; - long startNano = System.nanoTime(); + Map variables = initVariants + ? new HashMap() + : null; + Map globalVariables = initGlobalVariants + ? new HashMap() + : null; + createStartNanosUpdater.set(this, connectStartNanos); + creatingCountUpdater.incrementAndGet(this); try { conn = createPhysicalConnection(url, connectProperties); + connectedNanos = System.nanoTime(); if (conn == null) { throw new SQLException("connect error, url " + url + ", driverClass " + this.driverClass); } - initPhysicalConnection(conn); + initPhysicalConnection(conn, variables, globalVariables); + initedNanos = System.nanoTime(); validateConnection(conn); - createError = null; + validatedNanos = System.nanoTime(); + + setFailContinuous(false); + setCreateError(null); } catch (SQLException ex) { - createErrorCount.incrementAndGet(); - createError = ex; - lastCreateError = ex; - lastCreateErrorTimeMillis = System.currentTimeMillis(); + setCreateError(ex); + JdbcUtils.close(conn); throw ex; } catch (RuntimeException ex) { - createErrorCount.incrementAndGet(); - createError = ex; - lastCreateError = ex; - lastCreateErrorTimeMillis = System.currentTimeMillis(); + setCreateError(ex); + JdbcUtils.close(conn); throw ex; } catch (Error ex) { - createErrorCount.incrementAndGet(); + createErrorCountUpdater.incrementAndGet(this); + setCreateError(ex); + JdbcUtils.close(conn); throw ex; } finally { - long nano = System.nanoTime() - startNano; + long nano = System.nanoTime() - connectStartNanos; createTimespan += nano; + creatingCountUpdater.decrementAndGet(this); } - return conn; + return new PhysicalConnectionInfo(conn, connectStartNanos, connectedNanos, initedNanos, validatedNanos, variables, globalVariables); } @Override @@ -689,7 +1050,7 @@ public Connection createPhysicalConnection(String url, Properties info) throws S } Connection conn = new ElasticSearchConnection(client); - createCount.incrementAndGet(); + createCountUpdater.incrementAndGet(this); return conn; } @@ -707,14 +1068,14 @@ private void createAndLogThread() { } protected void createAndStartDestroyThread() { - destoryTask = new DestroyTask(); + destroyTask = new DestroyTask(); if (destroyScheduler != null) { long period = timeBetweenEvictionRunsMillis; if (period <= 0) { period = 1000; } - destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destoryTask, period, period, + destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period, TimeUnit.MILLISECONDS); initedLatch.countDown(); return; @@ -742,22 +1103,28 @@ protected void createAndStartCreatorThread() { * @see ServiceLoader */ private void initFromSPIServiceLoader() { - - String property = System.getProperty("druid.load.spifilter.skip"); - if (property != null) { + if (loadSpifilterSkip) { return; } - ServiceLoader druidAutoFilterLoader = ServiceLoader.load(Filter.class); + if (autoFilters == null) { + List filters = new ArrayList(); + ServiceLoader autoFilterLoader = ServiceLoader.load(Filter.class); - for (Filter autoFilter : druidAutoFilterLoader) { - AutoLoad autoLoad = autoFilter.getClass().getAnnotation(AutoLoad.class); - if (autoLoad != null && autoLoad.value()) { - if (LOG.isInfoEnabled()) { - LOG.info("load filter from spi :" + autoFilter.getClass().getName()); + for (Filter filter : autoFilterLoader) { + AutoLoad autoLoad = filter.getClass().getAnnotation(AutoLoad.class); + if (autoLoad != null && autoLoad.value()) { + filters.add(filter); } - addFilter(autoFilter); } + autoFilters = filters; + } + + for (Filter filter : autoFilters) { + if (LOG.isInfoEnabled()) { + LOG.info("load filter from spi :" + filter.getClass().getName()); + } + addFilter(filter); } } @@ -803,7 +1170,7 @@ private void addFilter(Filter filter) { } private void validationQueryCheck() { - if (!(isTestOnBorrow() || isTestOnReturn() || isTestWhileIdle())) { + if (!(testOnBorrow || testOnReturn || testWhileIdle)) { return; } @@ -811,21 +1178,21 @@ private void validationQueryCheck() { return; } - if (this.getValidationQuery() != null && this.getValidationQuery().length() > 0) { + if (this.validationQuery != null && this.validationQuery.length() > 0) { return; } String errorMessage = ""; - if (isTestOnBorrow()) { + if (testOnBorrow) { errorMessage += "testOnBorrow is true, "; } - if (isTestOnReturn()) { + if (testOnReturn) { errorMessage += "testOnReturn is true, "; } - if (isTestWhileIdle()) { + if (testWhileIdle) { errorMessage += "testWhileIdle is true, "; } @@ -848,6 +1215,13 @@ protected void initCheck() throws SQLException { oracleValidationQueryCheck(); } else if (JdbcUtils.DB2.equals(dbType)) { db2ValidationQueryCheck(); + } else if (JdbcUtils.MYSQL.equals(this.dbType) + || JdbcUtils.MYSQL_DRIVER_6.equals(this.dbType)) { + isMySql = true; + } + + if (removeAbandoned) { + LOG.warn("removeAbandoned is true, not use in productiion."); } } @@ -910,15 +1284,24 @@ private void db2ValidationQueryCheck() { } private void initValidConnectionChecker() { + if (this.validConnectionChecker != null) { + return; + } + this.validConnectionChecker = new MySqlValidConnectionChecker(); } private void initExceptionSorter() { - if (this.exceptionSorter != null) { + if (exceptionSorter instanceof NullExceptionSorter) { + if (driver instanceof MockDriver) { + return; + } + } else if (this.exceptionSorter != null) { return; } this.exceptionSorter = new MySqlExceptionSorter(); + this.isMySql = true; } @Override @@ -952,22 +1335,24 @@ public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLE if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) { notFullTimeoutRetryCnt++; if (LOG.isWarnEnabled()) { - LOG.warn("not full timeout retry : " + notFullTimeoutRetryCnt); + LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt); } continue; } throw ex; } - - if (isRemoveAbandoned()) { + if (removeAbandoned) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); - poolableConnection.setConnectStackTrace(stackTrace); + poolableConnection.connectStackTrace = stackTrace; poolableConnection.setConnectedTimeNano(); - poolableConnection.setTraceEnable(true); + poolableConnection.traceEnable = true; - synchronized (activeConnections) { + activeConnectionLock.lock(); + try { activeConnections.put(poolableConnection, PRESENT); + } finally { + activeConnectionLock.unlock(); } } @@ -980,7 +1365,6 @@ public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLE * 抛弃连接,不进行回收,而是抛弃 * * @param realConnection - * @throws SQLException */ public void discardConnection(Connection realConnection) { JdbcUtils.close(realConnection); @@ -990,7 +1374,7 @@ public void discardConnection(Connection realConnection) { activeCount--; discardCount++; - if (activeCount <= 0) { + if (activeCount <= minIdle) { emptySignal(); } } finally { @@ -1000,69 +1384,160 @@ public void discardConnection(Connection realConnection) { private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException { if (closed) { - connectErrorCount.incrementAndGet(); + connectErrorCountUpdater.incrementAndGet(this); throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis)); } if (!enable) { - connectErrorCount.incrementAndGet(); + connectErrorCountUpdater.incrementAndGet(this); throw new DataSourceDisableException(); } final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait); - final int maxWaitThreadCount = getMaxWaitThreadCount(); + final int maxWaitThreadCount = this.maxWaitThreadCount; DruidConnectionHolder holder; - try { - lock.lockInterruptibly(); - } catch (InterruptedException e) { - connectErrorCount.incrementAndGet(); - throw new SQLException("interrupt", e); - } - try { - if (maxWaitThreadCount > 0) { - if (notEmptyWaitThreadCount >= maxWaitThreadCount) { - connectErrorCount.incrementAndGet(); + for (boolean createDirect = false;;) { + if (createDirect) { + createStartNanosUpdater.set(this, System.nanoTime()); + if (creatingCountUpdater.compareAndSet(this, 0, 1)) { + PhysicalConnectionInfo pyConnInfo = ElasticSearchDruidDataSource.this.createPhysicalConnection(); + holder = new DruidConnectionHolder(this, pyConnInfo); + holder.lastActiveTimeMillis = System.currentTimeMillis(); + + creatingCountUpdater.decrementAndGet(this); + directCreateCountUpdater.incrementAndGet(this); + + if (LOG.isDebugEnabled()) { + LOG.debug("conn-direct_create "); + } + + boolean discard = false; + lock.lock(); + try { + if (activeCount < maxActive) { + activeCount++; + if (activeCount > activePeak) { + activePeak = activeCount; + activePeakTime = System.currentTimeMillis(); + } + break; + } else { + discard = true; + } + } finally { + lock.unlock(); + } + + if (discard) { + JdbcUtils.close(pyConnInfo.getPhysicalConnection()); + } + } + } + + try { + lock.lockInterruptibly(); + } catch (InterruptedException e) { + connectErrorCountUpdater.incrementAndGet(this); + throw new SQLException("interrupt", e); + } + + try { + if (maxWaitThreadCount > 0 + && notEmptyWaitThreadCount >= maxWaitThreadCount) { + connectErrorCountUpdater.incrementAndGet(this); throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count " + lock.getQueueLength()); } - } - connectCount++; + if (onFatalError + && onFatalErrorMaxActive > 0 + && activeCount >= onFatalErrorMaxActive) { + connectErrorCountUpdater.incrementAndGet(this); + + StringBuilder errorMsg = new StringBuilder(); + errorMsg.append("onFatalError, activeCount ") + .append(activeCount) + .append(", onFatalErrorMaxActive ") + .append(onFatalErrorMaxActive); + + if (lastFatalErrorTimeMillis > 0) { + errorMsg.append(", time '") + .append(StringUtils.formatDateTime19( + lastFatalErrorTimeMillis, TimeZone.getDefault())) + .append("'"); + } - if (maxWait > 0) { - holder = pollLast(nanos); - } else { - holder = takeLast(); - } + if (lastFatalErrorSql != null) { + errorMsg.append(", sql \n") + .append(lastFatalErrorSql); + } + + throw new SQLException( + errorMsg.toString(), lastFatalError); + } + + connectCount++; + + if (createScheduler != null + && poolingCount == 0 + && activeCount < maxActive + && creatingCountUpdater.get(this) == 0 + && createScheduler instanceof ScheduledThreadPoolExecutor) { + ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler; + if (executor.getQueue().size() > 0) { + createDirect = true; + continue; + } + } - if (holder != null) { - activeCount++; - if (activeCount > activePeak) { - activePeak = activeCount; - activePeakTime = System.currentTimeMillis(); + if (maxWait > 0) { + holder = pollLast(nanos); + } else { + holder = takeLast(); + } + + if (holder != null) { + activeCount++; + if (activeCount > activePeak) { + activePeak = activeCount; + activePeakTime = System.currentTimeMillis(); + } } + } catch (InterruptedException e) { + connectErrorCountUpdater.incrementAndGet(this); + throw new SQLException(e.getMessage(), e); + } catch (SQLException e) { + connectErrorCountUpdater.incrementAndGet(this); + throw e; + } finally { + lock.unlock(); } - } catch (InterruptedException e) { - connectErrorCount.incrementAndGet(); - throw new SQLException(e.getMessage(), e); - } catch (SQLException e) { - connectErrorCount.incrementAndGet(); - throw e; - } finally { - lock.unlock(); + + break; } if (holder == null) { long waitNanos = waitNanosLocal.get(); - StringBuilder buf = new StringBuilder(); + StringBuilder buf = new StringBuilder(128); buf.append("wait millis ")// .append(waitNanos / (1000 * 1000))// - .append(", active " + activeCount)// - .append(", maxActive " + maxActive)// + .append(", active ").append(activeCount)// + .append(", maxActive ").append(maxActive)// + .append(", creating ").append(creatingCount)// ; + if (creatingCount > 0 && createStartNanos > 0) { + long createElapseMillis = (System.nanoTime() - createStartNanos) / (1000 * 1000); + if (createElapseMillis > 0) { + buf.append(", createElapseMillis ").append(createElapseMillis); + } + } + + if (createErrorCount > 0) { + buf.append(", createErrorCount ").append(createErrorCount); + } List sqlList = this.getDataSourceStat().getRuningSqlList(); for (int i = 0; i < sqlList.size(); ++i) { @@ -1072,8 +1547,7 @@ private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLExce buf.append(", "); } JdbcSqlStatValue sql = sqlList.get(i); - buf.append("runningSqlCount "); - buf.append(sql.getRunningCount()); + buf.append("runningSqlCount ").append(sql.getRunningCount()); buf.append(" : "); buf.append(sql.getSql()); } @@ -1093,10 +1567,10 @@ private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLExce return poolalbeConnection; } - public void handleConnectionException(DruidPooledConnection pooledConnection, Throwable t) throws SQLException { + public void handleConnectionException(DruidPooledConnection pooledConnection, Throwable t, String sql) throws SQLException { final DruidConnectionHolder holder = pooledConnection.getConnectionHolder(); - errorCount.incrementAndGet(); + errorCountUpdater.incrementAndGet(this); lastError = t; lastErrorTimeMillis = System.currentTimeMillis(); @@ -1111,30 +1585,7 @@ public void handleConnectionException(DruidPooledConnection pooledConnection, Th // exceptionSorter.isExceptionFatal if (exceptionSorter != null && exceptionSorter.isExceptionFatal(sqlEx)) { - if (pooledConnection.isTraceEnable()) { - synchronized (activeConnections) { - if (pooledConnection.isTraceEnable()) { - activeConnections.remove(pooledConnection); - pooledConnection.setTraceEnable(false); - } - } - } - - boolean requireDiscard = false; - synchronized (pooledConnection) { - if ((!pooledConnection.isClosed()) || !pooledConnection.isDisable()) { - holder.setDiscard(true); - pooledConnection.disable(t); - requireDiscard = true; - } - } - - if (requireDiscard) { - this.discardConnection(holder.getConnection()); - holder.setDiscard(true); - } - - LOG.error("discard connection", sqlEx); + handleFatalError(pooledConnection, sqlEx, sql); } throw sqlEx; @@ -1147,7 +1598,7 @@ public void handleConnectionException(DruidPooledConnection pooledConnection, Th * 回收连接 */ protected void recycle(DruidPooledConnection pooledConnection) throws SQLException { - final DruidConnectionHolder holder = pooledConnection.getConnectionHolder(); + final DruidConnectionHolder holder = pooledConnection.holder; if (holder == null) { LOG.warn("connectionHolder is null"); @@ -1156,30 +1607,34 @@ protected void recycle(DruidPooledConnection pooledConnection) throws SQLExcepti if (logDifferentThread // && (!isAsyncCloseConnectionEnable()) // - && pooledConnection.getOwnerThread() != Thread.currentThread()// + && pooledConnection.ownerThread != Thread.currentThread()// ) { LOG.warn("get/close not same thread"); } - final Connection physicalConnection = holder.getConnection(); + final Connection physicalConnection = holder.conn; - if (pooledConnection.isTraceEnable()) { - synchronized (activeConnections) { - if (pooledConnection.isTraceEnable()) { - Object oldInfo = activeConnections.remove(pooledConnection); - if (oldInfo == null) { - if (LOG.isWarnEnabled()) { - LOG.warn("remove abandonded failed. activeConnections.size " + activeConnections.size()); - } - } - pooledConnection.setTraceEnable(false); + if (pooledConnection.traceEnable) { + Object oldInfo = null; + activeConnectionLock.lock(); + try { + if (pooledConnection.traceEnable) { + oldInfo = activeConnections.remove(pooledConnection); + pooledConnection.traceEnable = false; + } + } finally { + activeConnectionLock.unlock(); + } + if (oldInfo == null) { + if (LOG.isWarnEnabled()) { + LOG.warn("remove abandonded failed. activeConnections.size " + activeConnections.size()); } } } - final boolean isAutoCommit = holder.isUnderlyingAutoCommit(); - final boolean isReadOnly = holder.isUnderlyingReadOnly(); - final boolean testOnReturn = this.isTestOnReturn(); + final boolean isAutoCommit = holder.underlyingAutoCommit; + final boolean isReadOnly = holder.underlyingReadOnly; + final boolean testOnReturn = this.testOnReturn; try { // check need to rollback? @@ -1188,25 +1643,45 @@ protected void recycle(DruidPooledConnection pooledConnection) throws SQLExcepti } // reset holder, restore default settings, clear warnings - boolean isSameThread = pooledConnection.getOwnerThread() == Thread.currentThread(); + boolean isSameThread = pooledConnection.ownerThread == Thread.currentThread(); if (!isSameThread) { - synchronized (pooledConnection) { + final ReentrantLock lock = pooledConnection.lock; + lock.lock(); + try { holder.reset(); + } finally { + lock.unlock(); } } else { holder.reset(); } - if (holder.isDiscard()) { + if (holder.discard) { + return; + } + + if (phyMaxUseCount > 0 && holder.useCount >= phyMaxUseCount) { + discardConnection(holder.conn); + return; + } + + if (physicalConnection.isClosed()) { + lock.lock(); + try { + activeCount--; + closeCount++; + } finally { + lock.unlock(); + } return; } if (testOnReturn) { - boolean validate = testConnectionInternal(physicalConnection); + boolean validate = testConnectionInternal(holder, physicalConnection); if (!validate) { JdbcUtils.close(physicalConnection); - destroyCount.incrementAndGet(); + destroyCountUpdater.incrementAndGet(this); lock.lock(); try { @@ -1220,36 +1695,51 @@ protected void recycle(DruidPooledConnection pooledConnection) throws SQLExcepti } if (!enable) { - discardConnection(holder.getConnection()); + discardConnection(holder.conn); return; } - final long lastActiveTimeMillis = System.currentTimeMillis(); - lock.lockInterruptibly(); + boolean result; + final long currentTimeMillis = System.currentTimeMillis(); + + if (phyTimeoutMillis > 0) { + long phyConnectTimeMillis = currentTimeMillis - holder.connectTimeMillis; + if (phyConnectTimeMillis > phyTimeoutMillis) { + discardConnection(holder.conn); + return; + } + } + + lock.lock(); try { activeCount--; closeCount++; - putLast(holder, lastActiveTimeMillis); + result = putLast(holder, currentTimeMillis); recycleCount++; } finally { lock.unlock(); } + + if (!result) { + JdbcUtils.close(holder.conn); + LOG.info("connection recyle failed."); + } } catch (Throwable e) { holder.clearStatementCache(); - if (!holder.isDiscard()) { + if (!holder.discard) { this.discardConnection(physicalConnection); - holder.setDiscard(true); + holder.discard = true; } LOG.error("recyle error", e); - recycleErrorCount.incrementAndGet(); + recycleErrorCountUpdater.incrementAndGet(this); } } public long getRecycleErrorCount() { - return recycleErrorCount.get(); + return recycleErrorCount; } public void clearStatementCache() throws SQLException { @@ -1257,7 +1747,10 @@ public void clearStatementCache() throws SQLException { try { for (int i = 0; i < poolingCount; ++i) { DruidConnectionHolder conn = connections[i]; - conn.getStatementPool().clear(); + + if (conn.statementPool != null) { + conn.statementPool.clear(); + } } } finally { lock.unlock(); @@ -1268,6 +1761,10 @@ public void clearStatementCache() throws SQLException { * close datasource */ public void close() { + if (LOG.isInfoEnabled()) { + LOG.info("{dataSource-" + this.getID() + "} closing ..."); + } + lock.lock(); try { if (this.closed) { @@ -1278,6 +1775,8 @@ public void close() { return; } + this.closing = true; + if (logStatsThread != null) { logStatsThread.interrupt(); } @@ -1290,26 +1789,30 @@ public void close() { destroyConnectionThread.interrupt(); } + if (createSchedulerFuture != null) { + createSchedulerFuture.cancel(true); + } + if (destroySchedulerFuture != null) { destroySchedulerFuture.cancel(true); } for (int i = 0; i < poolingCount; ++i) { - try { - DruidConnectionHolder connHolder = connections[i]; + DruidConnectionHolder connHolder = connections[i]; - for (PreparedStatementHolder stmtHolder : connHolder.getStatementPool().getMap().values()) { - connHolder.getStatementPool().closeRemovedStatement(stmtHolder); - } - connHolder.getStatementPool().getMap().clear(); + for (PreparedStatementHolder stmtHolder : connHolder.getStatementPool().getMap().values()) { + connHolder.getStatementPool().closeRemovedStatement(stmtHolder); + } + connHolder.getStatementPool().getMap().clear(); - Connection physicalConnection = connHolder.getConnection(); + Connection physicalConnection = connHolder.getConnection(); + try { physicalConnection.close(); - connections[i] = null; - destroyCount.incrementAndGet(); } catch (Exception ex) { LOG.warn("close connection error", ex); } + connections[i] = null; + destroyCountUpdater.incrementAndGet(this); } poolingCount = 0; @@ -1375,8 +1878,12 @@ public boolean isMbeanRegistered() { return mbeanRegistered; } - void putLast(DruidConnectionHolder e, long lastActiveTimeMillis) { - e.setLastActiveTimeMillis(lastActiveTimeMillis); + boolean putLast(DruidConnectionHolder e, long lastActiveTimeMillis) { + if (poolingCount >= maxActive) { + return false; + } + + e.lastActiveTimeMillis = lastActiveTimeMillis; connections[poolingCount] = e; incrementPoolingCount(); @@ -1387,12 +1894,19 @@ void putLast(DruidConnectionHolder e, long lastActiveTimeMillis) { notEmpty.signal(); notEmptySignalCount++; + + return true; } DruidConnectionHolder takeLast() throws InterruptedException, SQLException { try { while (poolingCount == 0) { emptySignal(); // send signal to CreateThread create connection + + if (failFast && isFailContinuous()) { + throw new DataSourceNotAvailableException(createError); + } + notEmptyWaitThreadCount++; if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) { notEmptyWaitThreadPeak = notEmptyWaitThreadCount; @@ -1405,7 +1919,7 @@ DruidConnectionHolder takeLast() throws InterruptedException, SQLException { notEmptyWaitCount++; if (!enable) { - connectErrorCount.incrementAndGet(); + connectErrorCountUpdater.incrementAndGet(this); throw new DataSourceDisableException(); } } @@ -1429,6 +1943,10 @@ private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, if (poolingCount == 0) { emptySignal(); // send signal to CreateThread create connection + if (failFast && isFailContinuous()) { + throw new DataSourceNotAvailableException(createError); + } + if (estimate <= 0) { waitNanosLocal.set(nanos - estimate); return null; @@ -1448,7 +1966,7 @@ private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, notEmptyWaitNanos += (startEstimate - estimate); if (!enable) { - connectErrorCount.incrementAndGet(); + connectErrorCountUpdater.incrementAndGet(this); throw new DataSourceDisableException(); } } catch (InterruptedException ie) { @@ -1473,6 +1991,9 @@ private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, DruidConnectionHolder last = connections[poolingCount]; connections[poolingCount] = null; + long waitNanos = nanos - estimate; + last.setLastNotEmptyWaitNanos(waitNanos); + return last; } } @@ -1487,6 +2008,16 @@ private final void incrementPoolingCount() { @Override public Connection getConnection(String username, String password) throws SQLException { + if (this.username == null + && this.password == null + && username != null + && password != null) { + this.username = username; + this.password = password; + + return getConnection(); + } + if (!StringUtils.equals(username, this.username)) { throw new UnsupportedOperationException("Not supported by DruidDataSource"); } @@ -1499,11 +2030,11 @@ public Connection getConnection(String username, String password) throws SQLExce } public long getCreateCount() { - return createCount.get(); + return createCount; } public long getDestroyCount() { - return destroyCount.get(); + return destroyCount; } public long getConnectCount() { @@ -1520,7 +2051,7 @@ public long getCloseCount() { } public long getConnectErrorCount() { - return connectErrorCount.get(); + return connectErrorCountUpdater.get(this); } @Override @@ -1592,6 +2123,7 @@ public DruidDataSourceStatValue getStatValueAndReset() { value.setWaitThreadCount(lock.getWaitQueueLength(notEmpty)); value.setNotEmptyWaitCount(this.notEmptyWaitCount); value.setNotEmptyWaitNanos(this.notEmptyWaitNanos); + value.setKeepAliveCheckCount(this.keepAliveCheckCount); // reset this.poolingPeak = 0; @@ -1600,6 +2132,7 @@ public DruidDataSourceStatValue getStatValueAndReset() { this.activePeakTime = 0; this.connectCount = 0; this.closeCount = 0; + this.keepAliveCheckCount = 0; this.notEmptyWaitCount = 0; this.notEmptyWaitNanos = 0; @@ -1608,7 +2141,7 @@ public DruidDataSourceStatValue getStatValueAndReset() { } value.setName(this.getName()); - value.setDbType(this.getDbType()); + value.setDbType(this.dbType); value.setDriverClassName(this.getDriverClassName()); value.setUrl(this.getUrl()); @@ -1625,9 +2158,9 @@ public DruidDataSourceStatValue getStatValueAndReset() { value.setValidConnectionCheckerClassName(this.getValidConnectionCheckerClassName()); value.setExceptionSorterClassName(this.getExceptionSorterClassName()); - value.setTestOnBorrow(this.isTestOnBorrow()); - value.setTestOnReturn(this.isTestOnReturn()); - value.setTestWhileIdle(this.isTestWhileIdle()); + value.setTestOnBorrow(this.testOnBorrow); + value.setTestOnReturn(this.testOnReturn); + value.setTestWhileIdle(this.testWhileIdle); value.setDefaultAutoCommit(this.isDefaultAutoCommit()); @@ -1636,25 +2169,25 @@ public DruidDataSourceStatValue getStatValueAndReset() { } value.setDefaultTransactionIsolation(this.getDefaultTransactionIsolation()); - value.setLogicConnectErrorCount(connectErrorCount.getAndSet(0)); + value.setLogicConnectErrorCount(connectErrorCountUpdater.getAndSet(this, 0)); - value.setPhysicalConnectCount(createCount.getAndSet(0)); - value.setPhysicalCloseCount(destroyCount.getAndSet(0)); - value.setPhysicalConnectErrorCount(createErrorCount.getAndSet(0)); + value.setPhysicalConnectCount(createCountUpdater.getAndSet(this, 0)); + value.setPhysicalCloseCount(destroyCountUpdater.getAndSet(this, 0)); + value.setPhysicalConnectErrorCount(createErrorCountUpdater.getAndSet(this, 0)); - value.setExecuteCount(this.executeCount.getAndSet(0)); - value.setErrorCount(errorCount.getAndSet(0)); - value.setCommitCount(commitCount.getAndSet(0)); - value.setRollbackCount(rollbackCount.getAndSet(0)); + value.setExecuteCount(this.getAndResetExecuteCount()); + value.setErrorCount(errorCountUpdater.getAndSet(this, 0)); + value.setCommitCount(commitCountUpdater.getAndSet(this, 0)); + value.setRollbackCount(rollbackCountUpdater.getAndSet(this, 0)); - value.setPstmtCacheHitCount(cachedPreparedStatementHitCount.getAndSet(0)); - value.setPstmtCacheMissCount(cachedPreparedStatementMissCount.getAndSet(0)); + value.setPstmtCacheHitCount(cachedPreparedStatementHitCountUpdater.getAndSet(this,0)); + value.setPstmtCacheMissCount(cachedPreparedStatementMissCountUpdater.getAndSet(this, 0)); - value.setStartTransactionCount(startTransactionCount.getAndSet(0)); + value.setStartTransactionCount(startTransactionCountUpdater.getAndSet(this, 0)); value.setTransactionHistogram(this.getTransactionHistogram().toArrayAndReset()); value.setConnectionHoldTimeHistogram(this.getDataSourceStat().getConnectionHoldHistogram().toArrayAndReset()); - value.removeAbandoned = this.isRemoveAbandoned(); + value.setRemoveAbandoned(this.isRemoveAbandoned()); value.setClobOpenCount(this.getDataSourceStat().getClobOpenCountAndReset()); value.setBlobOpenCount(this.getDataSourceStat().getBlobOpenCountAndReset()); @@ -1668,10 +2201,10 @@ public long getRemoveAbandonedCount() { return removeAbandonedCount; } - protected void put(Connection connection) { + protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) { DruidConnectionHolder holder = null; try { - holder = new DruidConnectionHolder(ElasticSearchDruidDataSource.this, connection); + holder = new DruidConnectionHolder(ElasticSearchDruidDataSource.this, physicalConnectionInfo); } catch (SQLException ex) { lock.lock(); try { @@ -1682,11 +2215,18 @@ protected void put(Connection connection) { lock.unlock(); } LOG.error("create connection holder error", ex); - return; + return false; } + return put(holder); + } + + private boolean put(DruidConnectionHolder holder) { lock.lock(); try { + if (poolingCount >= maxActive) { + return false; + } connections[poolingCount] = holder; incrementPoolingCount(); @@ -1709,55 +2249,120 @@ protected void put(Connection connection) { } finally { lock.unlock(); } + return true; } public class CreateConnectionTask implements Runnable { - private int errorCount = 0; + private int errorCount = 0; + private boolean initTask = false; + + public CreateConnectionTask() { + + } + + public CreateConnectionTask(boolean initTask) { + this.initTask = initTask; + } @Override public void run() { - for (; ; ) { + runInternal(); + } + + private void runInternal() { + for (;;) { + // addLast + lock.lock(); try { - lock.lockInterruptibly(); - } catch (InterruptedException e2) { - LOG.error("interrupt: ", e2); - lock.lock(); - try { + if (closed || closing) { createTaskCount--; - } finally { - lock.unlock(); + return; } - break; - } - try { - // 必须存在线程等待,才创建连接 - if (poolingCount >= notEmptyWaitThreadCount) { - createTaskCount--; - return; + boolean emptyWait = true; + + if (createError != null && poolingCount == 0) { + emptyWait = false; } - // 防止创建超过maxActive数量的连接 - if (activeCount + poolingCount >= maxActive) { - createTaskCount--; - return; + if (emptyWait) { + // 必须存在线程等待,才创建连接 + if (poolingCount >= notEmptyWaitThreadCount // + && (!(keepAlive && activeCount + poolingCount < minIdle)) + && (!initTask) + && !isFailContinuous() + ) { + createTaskCount--; + return; + } + + // 防止创建超过maxActive数量的连接 + if (activeCount + poolingCount >= maxActive) { + createTaskCount--; + return; + } } } finally { lock.unlock(); } - Connection connection = null; + PhysicalConnectionInfo physicalConnection = null; try { - connection = createPhysicalConnection(); + physicalConnection = createPhysicalConnection(); + } catch (OutOfMemoryError e) { + LOG.error("create connection OutOfMemoryError, out memory. ", e); + + errorCount++; + if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) { + // fail over retry attempts + setFailContinuous(true); + if (failFast) { + lock.lock(); + try { + notEmpty.signalAll(); + } finally { + lock.unlock(); + } + } + + if (breakAfterAcquireFailure) { + lock.lock(); + try { + createTaskCount--; + } finally { + lock.unlock(); + } + return; + } + + this.errorCount = 0; // reset errorCount + if (closing || closed) { + createTaskCount--; + return; + } + createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS); + return; + } } catch (SQLException e) { - LOG.error("create connection error, url: " + jdbcUrl, e); + LOG.error("create connection SQLException, url: " + jdbcUrl, e); errorCount++; if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) { + // fail over retry attempts + setFailContinuous(true); + if (failFast) { + lock.lock(); + try { + notEmpty.signalAll(); + } finally { + lock.unlock(); + } + } + if (breakAfterAcquireFailure) { lock.lock(); try { @@ -1769,11 +2374,17 @@ public void run() { } this.errorCount = 0; // reset errorCount - createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS); + if (closing || closed) { + createTaskCount--; + return; + } + createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS); return; } } catch (RuntimeException e) { - LOG.error("create connection error", e); + LOG.error("create connection RuntimeException", e); + // unknow fatal exception + setFailContinuous(true); continue; } catch (Error e) { lock.lock(); @@ -1782,15 +2393,24 @@ public void run() { } finally { lock.unlock(); } - LOG.error("create connection error", e); + LOG.error("create connection Error", e); + // unknow fatal exception + setFailContinuous(true); + break; + } catch (Throwable e) { + LOG.error("create connection unexecpted error.", e); break; } - if (connection == null) { + if (physicalConnection == null) { continue; } - put(connection); + boolean result = put(physicalConnection); + if (!result) { + JdbcUtils.close(physicalConnection.getPhysicalConnection()); + LOG.info("put physical connection to pool failed."); + } break; } } @@ -1806,6 +2426,7 @@ public CreateConnectionThread(String name) { public void run() { initedLatch.countDown(); + long lastDiscardCount = 0; int errorCount = 0; for (; ; ) { // addLast @@ -1815,36 +2436,74 @@ public void run() { break; } + long discardCount = ElasticSearchDruidDataSource.this.discardCount; + boolean discardChanged = discardCount - lastDiscardCount > 0; + lastDiscardCount = discardCount; + try { - // 必须存在线程等待,才创建连接 - if (poolingCount >= notEmptyWaitThreadCount) { - empty.await(); + boolean emptyWait = true; + + if (createError != null + && poolingCount == 0 + && !discardChanged) { + emptyWait = false; } - // 防止创建超过maxActive数量的连接 - if (activeCount + poolingCount >= maxActive) { - empty.await(); - continue; + if (emptyWait + && asyncInit && createCount < initialSize) { + emptyWait = false; + } + + if (emptyWait) { + // 必须存在线程等待,才创建连接 + if (poolingCount >= notEmptyWaitThreadCount // + && (!(keepAlive && activeCount + poolingCount < minIdle)) + && !isFailContinuous() + ) { + empty.await(); + } + + // 防止创建超过maxActive数量的连接 + if (activeCount + poolingCount >= maxActive) { + empty.await(); + continue; + } } } catch (InterruptedException e) { lastCreateError = e; lastErrorTimeMillis = System.currentTimeMillis(); + + if (!closing) { + LOG.error("create connection Thread Interrupted, url: " + jdbcUrl, e); + } break; } finally { lock.unlock(); } - Connection connection = null; + PhysicalConnectionInfo connection = null; try { connection = createPhysicalConnection(); } catch (SQLException e) { - LOG.error("create connection error, url: " + jdbcUrl, e); + LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode() + + ", state " + e.getSQLState(), e); errorCount++; if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) { + // fail over retry attempts + setFailContinuous(true); + if (failFast) { + lock.lock(); + try { + notEmpty.signalAll(); + } finally { + lock.unlock(); + } + } + if (breakAfterAcquireFailure) { break; } @@ -1856,10 +2515,12 @@ public void run() { } } } catch (RuntimeException e) { - LOG.error("create connection error", e); + LOG.error("create connection RuntimeException", e); + setFailContinuous(true); continue; } catch (Error e) { - LOG.error("create connection error", e); + LOG.error("create connection Error", e); + setFailContinuous(true); break; } @@ -1867,7 +2528,11 @@ public void run() { continue; } - put(connection); + boolean result = put(connection); + if (!result) { + JdbcUtils.close(connection.getPhysicalConnection()); + LOG.info("put physical connection to pool failed."); + } errorCount = 0; // reset errorCount } @@ -1876,7 +2541,7 @@ public void run() { public class DestroyConnectionThread extends Thread { - public DestroyConnectionThread(String name) { + public DestroyConnectionThread(String name){ super(name); this.setDaemon(true); } @@ -1901,7 +2566,7 @@ public void run() { break; } - destoryTask.run(); + destroyTask.run(); } catch (InterruptedException e) { break; } @@ -1911,10 +2576,13 @@ public void run() { } public class DestroyTask implements Runnable { + public DestroyTask() { + + } @Override public void run() { - shrink(true); + shrink(true, keepAlive); if (isRemoveAbandoned()) { removeAbandoned(); @@ -1954,7 +2622,8 @@ public int removeAbandoned() { List abandonedList = new ArrayList(); - synchronized (activeConnections) { + activeConnectionLock.lock(); + try { Iterator iter = activeConnections.keySet().iterator(); for (; iter.hasNext(); ) { @@ -1972,14 +2641,20 @@ public int removeAbandoned() { abandonedList.add(pooledConnection); } } + } finally { + activeConnectionLock.unlock(); } if (abandonedList.size() > 0) { for (DruidPooledConnection pooledConnection : abandonedList) { - synchronized (pooledConnection) { + final ReentrantLock lock = pooledConnection.lock; + lock.lock(); + try { if (pooledConnection.isDisable()) { continue; } + } finally { + lock.unlock(); } JdbcUtils.close(pooledConnection); @@ -1991,8 +2666,8 @@ public int removeAbandoned() { StringBuilder buf = new StringBuilder(); buf.append("abandon connection, owner thread: "); buf.append(pooledConnection.getOwnerThread().getName()); - buf.append(", connected time nano: "); - buf.append(pooledConnection.getConnectedTimeNano()); + buf.append(", connected at : "); + buf.append(pooledConnection.getConnectedTimeMillis()); buf.append(", open stackTrace\n"); StackTraceElement[] trace = pooledConnection.getConnectStackTrace(); @@ -2002,6 +2677,15 @@ public int removeAbandoned() { buf.append("\n"); } + buf.append("ownerThread current state is " + pooledConnection.getOwnerThread().getState() + + ", current stackTrace\n"); + trace = pooledConnection.getOwnerThread().getStackTrace(); + for (int i = 0; i < trace.length; i++) { + buf.append("\tat "); + buf.append(trace[i].toString()); + buf.append("\n"); + } + LOG.error(buf.toString()); } } @@ -2063,49 +2747,140 @@ public String getProperties() { @Override public void shrink() { - shrink(false); + shrink(false, false); } public void shrink(boolean checkTime) { - final List evictList = new ArrayList(); + shrink(checkTime, keepAlive); + } + + public void shrink(boolean checkTime, boolean keepAlive) { try { lock.lockInterruptibly(); } catch (InterruptedException e) { return; } + int evictCount = 0; + int keepAliveCount = 0; try { + if (!inited) { + return; + } + final int checkCount = poolingCount - minIdle; final long currentTimeMillis = System.currentTimeMillis(); - for (int i = 0; i < checkCount; ++i) { + for (int i = 0; i < poolingCount; ++i) { DruidConnectionHolder connection = connections[i]; if (checkTime) { - long idleMillis = currentTimeMillis - connection.getLastActiveTimeMillis(); + if (phyTimeoutMillis > 0) { + long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis; + if (phyConnectTimeMillis > phyTimeoutMillis) { + evictConnections[evictCount++] = connection; + continue; + } + } + + long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis; + + if (idleMillis < minEvictableIdleTimeMillis + && idleMillis < keepAliveBetweenTimeMillis + ) { + break; + } + if (idleMillis >= minEvictableIdleTimeMillis) { - evictList.add(connection); + if (checkTime && i < checkCount) { + evictConnections[evictCount++] = connection; + continue; + } else if (idleMillis > maxEvictableIdleTimeMillis) { + evictConnections[evictCount++] = connection; + continue; + } + } + + if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) { + keepAliveConnections[keepAliveCount++] = connection; + } + } else { + if (i < checkCount) { + evictConnections[evictCount++] = connection; } else { break; } - } else { - evictList.add(connection); } } - int removeCount = evictList.size(); + int removeCount = evictCount + keepAliveCount; if (removeCount > 0) { System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount); Arrays.fill(connections, poolingCount - removeCount, poolingCount, null); poolingCount -= removeCount; } + keepAliveCheckCount += keepAliveCount; } finally { lock.unlock(); } - for (DruidConnectionHolder item : evictList) { - Connection connection = item.getConnection(); - JdbcUtils.close(connection); - destroyCount.incrementAndGet(); + if (evictCount > 0) { + for (int i = 0; i < evictCount; ++i) { + DruidConnectionHolder item = evictConnections[i]; + Connection connection = item.getConnection(); + JdbcUtils.close(connection); + destroyCountUpdater.incrementAndGet(this); + } + Arrays.fill(evictConnections, null); + } + + if (keepAliveCount > 0) { + // keep order + for (int i = keepAliveCount - 1; i >= 0; --i) { + DruidConnectionHolder holer = keepAliveConnections[i]; + Connection connection = holer.getConnection(); + holer.incrementKeepAliveCheckCount(); + + boolean validate = false; + try { + this.validateConnection(connection); + validate = true; + } catch (Throwable error) { + if (LOG.isDebugEnabled()) { + LOG.debug("keepAliveErr", error); + } + // skip + } + + boolean discard = !validate; + if (validate) { + holer.lastKeepTimeMillis = System.currentTimeMillis(); + boolean putOk = put(holer); + if (!putOk) { + discard = true; + } + } + + if (discard) { + try { + connection.close(); + } catch (Exception e) { + // skip + } + + lock.lock(); + try { + discardCount++; + + if (activeCount <= minIdle) { + emptySignal(); + } + } finally { + lock.unlock(); + } + } + } + this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount); + Arrays.fill(keepAliveConnections, null); } } @@ -2178,7 +2953,7 @@ public String dump() { } public long getErrorCount() { - return this.errorCount.get(); + return this.errorCount; } public String toString() { @@ -2277,14 +3052,18 @@ public List> getPoolingConnectionInfo() { Map map = new LinkedHashMap(); map.put("id", System.identityHashCode(conn)); + map.put("connectionId", connHolder.getConnectionId()); map.put("useCount", connHolder.getUseCount()); - if (connHolder.getLastActiveTimeMillis() > 0) { - map.put("lastActiveTime", new Date(connHolder.getLastActiveTimeMillis())); + if (connHolder.lastActiveTimeMillis > 0) { + map.put("lastActiveTime", new Date(connHolder.lastActiveTimeMillis)); + } + if (connHolder.lastKeepTimeMillis > 0) { + map.put("lastKeepTimeMillis", new Date(connHolder.lastKeepTimeMillis)); } map.put("connectTime", new Date(connHolder.getTimeMillis())); map.put("holdability", connHolder.getUnderlyingHoldability()); map.put("transactionIsolation", connHolder.getUnderlyingTransactionIsolation()); - map.put("autoCommit", connHolder.isUnderlyingAutoCommit()); + map.put("autoCommit", connHolder.underlyingAutoCommit); map.put("readoOnly", connHolder.isUnderlyingReadOnly()); if (connHolder.isPoolPreparedStatements()) { @@ -2293,8 +3072,8 @@ public List> getPoolingConnectionInfo() { for (PreparedStatementHolder stmtHolder : stmtPool.getMap().values()) { Map stmtInfo = new LinkedHashMap(); - stmtInfo.put("sql", stmtHolder.getKey().getSql()); - stmtInfo.put("defaultRowPretch", stmtHolder.getDefaultRowPrefetch()); + stmtInfo.put("sql", stmtHolder.key.getSql()); + stmtInfo.put("defaultRowPrefetch", stmtHolder.getDefaultRowPrefetch()); stmtInfo.put("rowPrefetch", stmtHolder.getRowPrefetch()); stmtInfo.put("hitCount", stmtHolder.getHitCount()); @@ -2303,6 +3082,7 @@ public List> getPoolingConnectionInfo() { map.put("pscache", stmtCache); } + map.put("keepAliveCheckCount", connHolder.getKeepAliveCheckCount()); list.add(map); } @@ -2362,7 +3142,7 @@ public Map getStatDataForMBean() { // 5 - 9 map.put("CloseCount", this.getCloseCount()); - map.put("ActiveCount", this.getActivePeak()); + map.put("ActiveCount", this.getActiveCount()); map.put("PoolingCount", this.getPoolingCount()); map.put("LockQueueLength", this.getLockQueueLength()); map.put("WaitThreadCount", this.getNotEmptyWaitThreadPeak()); @@ -2376,10 +3156,10 @@ public Map getStatDataForMBean() { // 15 - 19 map.put("TestOnReturn", this.isTestOnReturn()); - map.put("MinEvictableIdleTimeMillis", this.getMinEvictableIdleTimeMillis()); + map.put("MinEvictableIdleTimeMillis", this.minEvictableIdleTimeMillis); map.put("ConnectErrorCount", this.getConnectErrorCount()); map.put("CreateTimespanMillis", this.getCreateTimespanMillis()); - map.put("DbType", this.getDbType()); + map.put("DbType", this.dbType); // 20 - 24 map.put("ValidationQuery", this.getValidationQuery()); @@ -2414,6 +3194,9 @@ public Map getStatDataForMBean() { map.put("LastCreateErrorTime", this.getLastCreateErrorTime()); map.put("CreateErrorCount", this.getCreateErrorCount()); map.put("DiscardCount", this.getDiscardCount()); + map.put("ExecuteQueryCount", this.getExecuteQueryCount()); + + map.put("ExecuteUpdateCount", this.getExecuteUpdateCount()); return map; } catch (JMException ex) { @@ -2452,7 +3235,7 @@ public Map getStatData() { dataMap.put("Identity", System.identityHashCode(this)); dataMap.put("Name", this.getName()); - dataMap.put("DbType", this.getDbType()); + dataMap.put("DbType", this.dbType); dataMap.put("DriverClassName", this.getDriverClassName()); dataMap.put("URL", this.getUrl()); @@ -2498,6 +3281,9 @@ public Map getStatData() { dataMap.put("PhysicalConnectErrorCount", this.getCreateErrorCount()); dataMap.put("ExecuteCount", this.getExecuteCount()); + dataMap.put("ExecuteUpdateCount", this.getExecuteUpdateCount()); + dataMap.put("ExecuteQueryCount", this.getExecuteQueryCount()); + dataMap.put("ExecuteBatchCount", this.getExecuteBatchCount()); dataMap.put("ErrorCount", this.getErrorCount()); dataMap.put("CommitCount", this.getCommitCount()); dataMap.put("RollbackCount", this.getRollbackCount()); @@ -2513,7 +3299,25 @@ public Map getStatData() { dataMap.put("RemoveAbandoned", this.isRemoveAbandoned()); dataMap.put("ClobOpenCount", this.getDataSourceStat().getClobOpenCount()); dataMap.put("BlobOpenCount", this.getDataSourceStat().getBlobOpenCount()); - + dataMap.put("KeepAliveCheckCount", this.getDataSourceStat().getKeepAliveCheckCount()); + + dataMap.put("KeepAlive", this.isKeepAlive()); + dataMap.put("FailFast", this.isFailFast()); + dataMap.put("MaxWait", this.getMaxWait()); + dataMap.put("MaxWaitThreadCount", this.getMaxWaitThreadCount()); + dataMap.put("PoolPreparedStatements", this.isPoolPreparedStatements()); + dataMap.put("MaxPoolPreparedStatementPerConnectionSize", this.getMaxPoolPreparedStatementPerConnectionSize()); + dataMap.put("MinEvictableIdleTimeMillis", this.minEvictableIdleTimeMillis); + dataMap.put("MaxEvictableIdleTimeMillis", this.maxEvictableIdleTimeMillis); + + dataMap.put("LogDifferentThread", isLogDifferentThread()); + dataMap.put("RecycleErrorCount", getRecycleErrorCount()); + dataMap.put("PreparedStatementOpenCount", getPreparedStatementCount()); + dataMap.put("PreparedStatementClosedCount", getClosedPreparedStatementCount()); + + dataMap.put("UseUnfairLock", isUseUnfairLock()); + dataMap.put("InitGlobalVariants", isInitGlobalVariants()); + dataMap.put("InitVariants", isInitVariants()); return dataMap; } @@ -2627,7 +3431,7 @@ public int fill(int toCount) throws SQLException { try { lock.lockInterruptibly(); } catch (InterruptedException e) { - connectErrorCount.incrementAndGet(); + connectErrorCountUpdater.incrementAndGet(this); throw new SQLException("interrupt", e); } @@ -2641,24 +3445,25 @@ public int fill(int toCount) throws SQLException { DruidConnectionHolder holder; try { - Connection conn = createPhysicalConnection(); - holder = new DruidConnectionHolder(this, conn); + PhysicalConnectionInfo pyConnInfo = createPhysicalConnection(); + holder = new DruidConnectionHolder(this, pyConnInfo); } catch (SQLException e) { LOG.error("fill connection error, url: " + this.jdbcUrl, e); - connectErrorCount.incrementAndGet(); + connectErrorCountUpdater.incrementAndGet(this); throw e; } try { lock.lockInterruptibly(); } catch (InterruptedException e) { - connectErrorCount.incrementAndGet(); + connectErrorCountUpdater.incrementAndGet(this); throw new SQLException("interrupt", e); } try { if (!this.isFillable(toCount)) { JdbcUtils.close(holder.getConnection()); + LOG.info("fill connections skip."); break; } this.putLast(holder, System.currentTimeMillis()); @@ -2709,13 +3514,20 @@ private void emptySignal() { createTaskCount++; CreateConnectionTask task = new CreateConnectionTask(); - createScheduler.submit(task); + this.createSchedulerFuture = createScheduler.submit(task); } @Override public ObjectName preRegister(MBeanServer server, ObjectName name) throws Exception { - //do nothing - //return original name to avoid NullPointerException + if (server != null) { + try { + if (server.isRegistered(name)) { + server.unregisterMBean(name); + } + } catch (Exception ex) { + LOG.warn("DruidDataSource preRegister error", ex); + } + } return name; } diff --git a/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidPooledConnection.java b/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidPooledConnection.java index 41ca2796..435d1f08 100644 --- a/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidPooledConnection.java +++ b/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidPooledConnection.java @@ -29,7 +29,7 @@ public PreparedStatement prepareStatement(String sql) throws SQLException { stmtHolder = new PreparedStatementHolder(key, conn.prepareStatement(sql)); holder.getDataSource().incrementPreparedStatementCount(); } catch (SQLException ex) { - handleException(ex); + handleException(ex, sql); } } @@ -64,7 +64,7 @@ public PreparedStatement prepareStatement(String sql, int resultSetType, int res resultSetConcurrency)); holder.getDataSource().incrementPreparedStatementCount(); } catch (SQLException ex) { - handleException(ex); + handleException(ex, sql); } } @@ -79,7 +79,7 @@ public PreparedStatement prepareStatement(String sql, int resultSetType, int res private void initStatement(PreparedStatementHolder stmtHolder) throws SQLException { stmtHolder.incrementInUseCount(); - holder.getDataSource().initStatement(this, stmtHolder.getStatement()); + holder.getDataSource().initStatement(this, stmtHolder.statement); } } diff --git a/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidPooledPreparedStatement.java b/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidPooledPreparedStatement.java index adb5a98b..a4a25641 100644 --- a/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidPooledPreparedStatement.java +++ b/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidPooledPreparedStatement.java @@ -30,7 +30,7 @@ public ElasticSearchDruidPooledPreparedStatement(DruidPooledConnection conn, Pre public ResultSet executeQuery() throws SQLException { checkOpen(); - incrementExecuteCount(); + incrementExecuteQueryCount(); transactionRecord(getSql()); oracleSetRowPrefetch(); @@ -48,6 +48,8 @@ public ResultSet executeQuery() throws SQLException { return poolableResultSet; } catch (Throwable t) { + errorCheck(t); + throw checkException(t); } finally { conn.afterExecute(); @@ -74,6 +76,8 @@ public boolean execute() throws SQLException { return true; } catch (Throwable t) { + errorCheck(t); + throw checkException(t); } finally { conn.afterExecute(); diff --git a/src/main/java/org/nlpcn/es4sql/parse/ElasticLexer.java b/src/main/java/org/nlpcn/es4sql/parse/ElasticLexer.java index c1ce9613..50513c28 100644 --- a/src/main/java/org/nlpcn/es4sql/parse/ElasticLexer.java +++ b/src/main/java/org/nlpcn/es4sql/parse/ElasticLexer.java @@ -1,11 +1,12 @@ package org.nlpcn.es4sql.parse; import com.alibaba.druid.sql.dialect.mysql.parser.MySqlLexer; +import com.alibaba.druid.sql.parser.CharTypes; import com.alibaba.druid.sql.parser.ParserException; +import com.alibaba.druid.sql.parser.SymbolTable; import com.alibaba.druid.sql.parser.Token; import static com.alibaba.druid.sql.parser.CharTypes.isFirstIdentifierChar; -import static com.alibaba.druid.sql.parser.CharTypes.isIdentifierChar; import static com.alibaba.druid.sql.parser.LayoutCharacters.EOI; /** @@ -22,68 +23,123 @@ public ElasticLexer(char[] input, int inputLength, boolean skipComment) { } public void scanIdentifier() { + hash_lower = 0; + hash = 0; + final char first = ch; - if (ch == '`') { + if ((ch == 'b' || ch == 'B' ) + && charAt(pos + 1) == '\'') { + int i = 2; + int mark = pos + 2; + for (;;++i) { + char ch = charAt(pos + i); + if (ch == '0' || ch == '1') { + continue; + } else if (ch == '\'') { + bufPos += i; + pos += (i + 1); + stringVal = subString(mark, i - 2); + this.ch = charAt(pos); + token = Token.BITS; + return; + } else if (ch == EOI) { + throw new ParserException("illegal identifier. " + info()); + } else { + break; + } + } + } + if (ch == '`') { mark = pos; bufPos = 1; char ch; - for (;;) { - ch = charAt(++pos); - if (ch == '`') { - bufPos++; - ch = charAt(++pos); - break; - } else if (ch == EOI) { - throw new ParserException("illegal identifier"); - } - - bufPos++; - continue; + int startPos = pos + 1; + int quoteIndex = text.indexOf('`', startPos); + if (quoteIndex == -1) { + throw new ParserException("illegal identifier. " + info()); } - this.ch = charAt(pos); + hash_lower = 0xcbf29ce484222325L; + hash = 0xcbf29ce484222325L; - stringVal = subString(mark, bufPos); - Token tok = keywods.getKeyword(stringVal); - if (tok != null) { - token = tok; - } else { - token = Token.IDENTIFIER; + for (int i = startPos; i < quoteIndex; ++i) { + ch = text.charAt(i); + + hash_lower ^= ((ch >= 'A' && ch <= 'Z') ? (ch + 32) : ch); + hash_lower *= 0x100000001b3L; + + hash ^= ch; + hash *= 0x100000001b3L; } - } else { + stringVal = quoteTable.addSymbol(text, pos, quoteIndex + 1 - pos, hash); + //stringVal = text.substring(mark, pos); + pos = quoteIndex + 1; + this.ch = charAt(pos); + token = Token.IDENTIFIER; + } else { final boolean firstFlag = isFirstIdentifierChar(first); if (!firstFlag) { - throw new ParserException("illegal identifier"); + throw new ParserException("illegal identifier. " + info()); } + hash_lower = 0xcbf29ce484222325L; + hash = 0xcbf29ce484222325L; + + hash_lower ^= ((ch >= 'A' && ch <= 'Z') ? (ch + 32) : ch); + hash_lower *= 0x100000001b3L; + + hash ^= ch; + hash *= 0x100000001b3L; + mark = pos; bufPos = 1; - char ch; + char ch = '\0'; for (;;) { ch = charAt(++pos); - //zhongshu-comment 就这行和父类MySqlLexer的scanIdentifier()方法不一样,用了自己的实现isElasticIdentifierChar() if (!isElasticIdentifierChar(ch)) { break; } bufPos++; + + hash_lower ^= ((ch >= 'A' && ch <= 'Z') ? (ch + 32) : ch); + hash_lower *= 0x100000001b3L; + + hash ^= ch; + hash *= 0x100000001b3L; + continue; } this.ch = charAt(pos); - stringVal = addSymbol(); - Token tok = keywods.getKeyword(stringVal); + if (bufPos == 1) { + token = Token.IDENTIFIER; + stringVal = CharTypes.valueOf(first); + if (stringVal == null) { + stringVal = Character.toString(first); + } + return; + } + + Token tok = keywods.getKeyword(hash_lower); if (tok != null) { token = tok; + if (token == Token.IDENTIFIER) { + stringVal = SymbolTable.global.addSymbol(text, mark, bufPos, hash); + } else { + stringVal = null; + } } else { token = Token.IDENTIFIER; + stringVal = SymbolTable.global.addSymbol(text, mark, bufPos, hash); } + } } diff --git a/src/main/java/org/nlpcn/es4sql/parse/ElasticSqlDeleteStatement.java b/src/main/java/org/nlpcn/es4sql/parse/ElasticSqlDeleteStatement.java deleted file mode 100644 index 9bfeca78..00000000 --- a/src/main/java/org/nlpcn/es4sql/parse/ElasticSqlDeleteStatement.java +++ /dev/null @@ -1,21 +0,0 @@ -package org.nlpcn.es4sql.parse; - -import com.alibaba.druid.sql.ast.SQLCommentHint; -import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlDeleteStatement; - -import java.util.ArrayList; -import java.util.List; - -public class ElasticSqlDeleteStatement extends MySqlDeleteStatement { - - private List hints; - - public List getHints() { - if (hints == null) { - hints = new ArrayList(2); - } - - return hints; - } - -} diff --git a/src/main/java/org/nlpcn/es4sql/parse/ElasticSqlExprParser.java b/src/main/java/org/nlpcn/es4sql/parse/ElasticSqlExprParser.java index 67759521..0a1eecf4 100644 --- a/src/main/java/org/nlpcn/es4sql/parse/ElasticSqlExprParser.java +++ b/src/main/java/org/nlpcn/es4sql/parse/ElasticSqlExprParser.java @@ -1,81 +1,98 @@ package org.nlpcn.es4sql.parse; -import com.alibaba.druid.sql.ast.*; -import com.alibaba.druid.sql.ast.expr.*; +import com.alibaba.druid.sql.ast.SQLCommentHint; +import com.alibaba.druid.sql.ast.SQLDataType; +import com.alibaba.druid.sql.ast.SQLDataTypeImpl; +import com.alibaba.druid.sql.ast.SQLExpr; +import com.alibaba.druid.sql.ast.SQLName; +import com.alibaba.druid.sql.ast.SQLOrderBy; +import com.alibaba.druid.sql.ast.SQLOrderingSpecification; +import com.alibaba.druid.sql.ast.SQLPartition; +import com.alibaba.druid.sql.ast.SQLPartitionValue; +import com.alibaba.druid.sql.ast.SQLSubPartition; +import com.alibaba.druid.sql.ast.expr.SQLAggregateExpr; +import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr; +import com.alibaba.druid.sql.ast.expr.SQLBinaryOperator; +import com.alibaba.druid.sql.ast.expr.SQLCharExpr; +import com.alibaba.druid.sql.ast.expr.SQLExistsExpr; +import com.alibaba.druid.sql.ast.expr.SQLHexExpr; +import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; +import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr; +import com.alibaba.druid.sql.ast.expr.SQLIntervalExpr; +import com.alibaba.druid.sql.ast.expr.SQLIntervalUnit; +import com.alibaba.druid.sql.ast.expr.SQLMethodInvokeExpr; +import com.alibaba.druid.sql.ast.expr.SQLNotExpr; +import com.alibaba.druid.sql.ast.expr.SQLUnaryExpr; +import com.alibaba.druid.sql.ast.expr.SQLUnaryOperator; +import com.alibaba.druid.sql.ast.expr.SQLVariantRefExpr; import com.alibaba.druid.sql.ast.statement.SQLAssignItem; import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition; +import com.alibaba.druid.sql.ast.statement.SQLForeignKeyImpl; import com.alibaba.druid.sql.dialect.mysql.ast.MySqlPrimaryKey; import com.alibaba.druid.sql.dialect.mysql.ast.MySqlUnique; import com.alibaba.druid.sql.dialect.mysql.ast.MysqlForeignKey; -import com.alibaba.druid.sql.dialect.mysql.ast.expr.*; -import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSQLColumnDefinition; -import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock; -import com.alibaba.druid.sql.parser.*; +import com.alibaba.druid.sql.dialect.mysql.ast.expr.MySqlCharExpr; +import com.alibaba.druid.sql.dialect.mysql.ast.expr.MySqlExtractExpr; +import com.alibaba.druid.sql.dialect.mysql.ast.expr.MySqlMatchAgainstExpr; +import com.alibaba.druid.sql.dialect.mysql.ast.expr.MySqlOrderingExpr; +import com.alibaba.druid.sql.dialect.mysql.ast.expr.MySqlOutFileExpr; +import com.alibaba.druid.sql.dialect.mysql.ast.expr.MySqlUserName; +import com.alibaba.druid.sql.parser.Lexer; +import com.alibaba.druid.sql.parser.ParserException; +import com.alibaba.druid.sql.parser.SQLExprParser; +import com.alibaba.druid.sql.parser.SQLSelectParser; +import com.alibaba.druid.sql.parser.Token; +import com.alibaba.druid.util.FnvHash; import com.alibaba.druid.util.JdbcConstants; +import java.util.Arrays; import java.util.List; /** * Created by Eliran on 18/8/2015. */ public class ElasticSqlExprParser extends SQLExprParser { - public ElasticSqlExprParser(Lexer lexer) { - super(lexer); + + public final static String[] AGGREGATE_FUNCTIONS; + + public final static long[] AGGREGATE_FUNCTIONS_CODES; + + static { + String[] strings = { "AVG", "COUNT", "GROUP_CONCAT", "MAX", "MIN", "STDDEV", "SUM" }; + AGGREGATE_FUNCTIONS_CODES = FnvHash.fnv1a_64_lower(strings, true); + AGGREGATE_FUNCTIONS = new String[AGGREGATE_FUNCTIONS_CODES.length]; + for (String str : strings) { + long hash = FnvHash.fnv1a_64_lower(str); + int index = Arrays.binarySearch(AGGREGATE_FUNCTIONS_CODES, hash); + AGGREGATE_FUNCTIONS[index] = str; + } + } + + public ElasticSqlExprParser(Lexer lexer){ + super(lexer, JdbcConstants.MYSQL); this.aggregateFunctions = AGGREGATE_FUNCTIONS; + this.aggregateFunctionHashCodes = AGGREGATE_FUNCTIONS_CODES; } - public ElasticSqlExprParser(String sql) { + public ElasticSqlExprParser(String sql){ this(new ElasticLexer(sql)); this.lexer.nextToken(); } @SuppressWarnings({"unchecked", "rawtypes"}) - @Override public void parseHints(List hints) { while (lexer.token() == Token.HINT) { - hints.add(new SQLCommentHint(lexer.stringVal())); - lexer.nextToken(); - } - } - - @Override - protected SQLExpr methodRest(SQLExpr expr, boolean acceptLPAREN) { - if (acceptLPAREN) { - accept(Token.LPAREN); - } - - if (expr instanceof SQLName || expr instanceof SQLDefaultExpr) { - String methodName; + SQLCommentHint hint = new SQLCommentHint(lexer.stringVal()); - SQLMethodInvokeExpr methodInvokeExpr; - if (expr instanceof SQLPropertyExpr) { - methodName = ((SQLPropertyExpr) expr).getName(); - methodInvokeExpr = new SQLMethodInvokeExpr(methodName); - methodInvokeExpr.setOwner(((SQLPropertyExpr) expr).getOwner()); - } else { - methodName = expr.toString(); - methodInvokeExpr = new SQLMethodInvokeExpr(methodName); + if (lexer.getCommentCount() > 0) { + hint.addBeforeComment(lexer.readAndResetComments()); } - if (isAggreateFunction(methodName)) { - SQLAggregateExpr aggregateExpr = parseAggregateExpr(methodName); - - return aggregateExpr; - } - - if (lexer.token() != Token.RPAREN) { - exprList(methodInvokeExpr.getParameters(), methodInvokeExpr); - } - - accept(Token.RPAREN); - - return primaryRest(methodInvokeExpr); + hints.add(hint); + lexer.nextToken(); } - - throw new ParserException("not support token:" + lexer.token()); } - public SQLExpr primary() { if (lexer.token() == Token.LBRACE) { @@ -84,7 +101,7 @@ public SQLExpr primary() { if (lexer.stringVal().equals("ts")) { String current = lexer.stringVal(); do { - if (current.equals(lexer.token().RBRACE.name())) { + if (current.equals(Token.RBRACE.name())) { foundRBrace = true; break; } @@ -155,53 +172,11 @@ public SQLExpr primary() { return expr; } - //zhongshu-comment 比父类的AGGREGATE_FUNCTIONS 多了 GROUP_CONCAT 这个function - public static String[] AGGREGATE_FUNCTIONS = {"AVG", "COUNT", "GROUP_CONCAT", "MAX", "MIN", "STDDEV", "SUM"}; - - - public SQLExpr relationalRest(SQLExpr expr) { - if (identifierEquals("REGEXP")) { - lexer.nextToken(); - SQLExpr rightExp = equality(); - - rightExp = relationalRest(rightExp); - - return new SQLBinaryOpExpr(expr, SQLBinaryOperator.RegExp, rightExp, JdbcConstants.MYSQL); - } - - return super.relationalRest(expr); - } - - public SQLExpr multiplicativeRest(SQLExpr expr) { - if (lexer.token() == Token.IDENTIFIER && "MOD".equalsIgnoreCase(lexer.stringVal())) { - lexer.nextToken(); - SQLExpr rightExp = primary(); - - rightExp = relationalRest(rightExp); - - return new SQLBinaryOpExpr(expr, SQLBinaryOperator.Modulus, rightExp, JdbcConstants.MYSQL); - } - - return super.multiplicativeRest(expr); - } - - public SQLExpr notRationalRest(SQLExpr expr) { - if (identifierEquals("REGEXP")) { - lexer.nextToken(); - SQLExpr rightExp = primary(); - - rightExp = relationalRest(rightExp); - - return new SQLBinaryOpExpr(expr, SQLBinaryOperator.NotRegExp, rightExp, JdbcConstants.MYSQL); - } - - return super.notRationalRest(expr); - } public SQLExpr primary2() { final Token tok = lexer.token(); - if (identifierEquals("outfile")) { + if (lexer.identifierEquals(FnvHash.Constants.OUTFILE)) { lexer.nextToken(); SQLExpr file = primary(); SQLExpr expr = new MySqlOutFileExpr(file); @@ -211,10 +186,6 @@ public SQLExpr primary2() { } switch (tok) { - case LITERAL_ALIAS: - String aliasValue = lexer.stringVal(); - lexer.nextToken(); - return primaryRest(new SQLCharExpr(aliasValue)); case VARIANT: SQLVariantRefExpr varRefExpr = new SQLVariantRefExpr(lexer.stringVal()); lexer.nextToken(); @@ -233,7 +204,7 @@ public SQLExpr primary2() { case VALUES: lexer.nextToken(); if (lexer.token() != Token.LPAREN) { - throw new ParserException("syntax error, illegal values clause"); + throw new ParserException("syntax error, illegal values clause. " + lexer.info()); } return this.methodRest(new SQLIdentifierExpr("VALUES"), true); case BINARY: @@ -244,17 +215,12 @@ public SQLExpr primary2() { SQLUnaryExpr binaryExpr = new SQLUnaryExpr(SQLUnaryOperator.BINARY, expr()); return primaryRest(binaryExpr); } - case CACHE: - case GROUP: - lexer.nextToken(); - return primaryRest(new SQLIdentifierExpr(lexer.stringVal())); default: return super.primary(); } } - public final SQLExpr primaryRest(SQLExpr expr) { if (expr == null) { throw new IllegalArgumentException("expr"); @@ -271,19 +237,19 @@ public final SQLExpr primaryRest(SQLExpr expr) { expr = new SQLHexExpr(charValue); return primaryRest(expr); - } else if (ident.equalsIgnoreCase("b")) { - String charValue = lexer.stringVal(); - lexer.nextToken(); - expr = new SQLBinaryExpr(charValue); - - return primaryRest(expr); +// } else if (ident.equalsIgnoreCase("b")) { +// String charValue = lexer.stringVal(); +// lexer.nextToken(); +// expr = new SQLBinaryExpr(charValue); +// +// return primaryRest(expr); } else if (ident.startsWith("_")) { String charValue = lexer.stringVal(); lexer.nextToken(); MySqlCharExpr mysqlCharExpr = new MySqlCharExpr(charValue); mysqlCharExpr.setCharset(identExpr.getName()); - if (identifierEquals("COLLATE")) { + if (lexer.identifierEquals(FnvHash.Constants.COLLATE)) { lexer.nextToken(); String collate = lexer.stringVal(); @@ -296,21 +262,27 @@ public final SQLExpr primaryRest(SQLExpr expr) { return primaryRest(expr); } } else if (expr instanceof SQLCharExpr) { - SQLMethodInvokeExpr concat = new SQLMethodInvokeExpr("CONCAT"); - concat.addParameter(expr); + String text2 = ((SQLCharExpr) expr).getText(); do { String chars = lexer.stringVal(); - concat.addParameter(new SQLCharExpr(chars)); + text2 += chars; lexer.nextToken(); } while (lexer.token() == Token.LITERAL_CHARS || lexer.token() == Token.LITERAL_ALIAS); + expr = new SQLCharExpr(text2); + } else if (expr instanceof SQLVariantRefExpr) { + SQLMethodInvokeExpr concat = new SQLMethodInvokeExpr("CONCAT"); + concat.addArgument(expr); + concat.addArgument(this.primary()); expr = concat; + + return primaryRest(expr); } } else if (lexer.token() == Token.IDENTIFIER) { if (expr instanceof SQLHexExpr) { if ("USING".equalsIgnoreCase(lexer.stringVal())) { lexer.nextToken(); if (lexer.token() != Token.IDENTIFIER) { - throw new ParserException("syntax error, illegal hex"); + throw new ParserException("syntax error, illegal hex. " + lexer.info()); } String charSet = lexer.stringVal(); lexer.nextToken(); @@ -318,15 +290,16 @@ public final SQLExpr primaryRest(SQLExpr expr) { return primaryRest(expr); } - } else if ("COLLATE".equalsIgnoreCase(lexer.stringVal())) { + } else if (lexer.identifierEquals(FnvHash.Constants.COLLATE)) { lexer.nextToken(); if (lexer.token() == Token.EQ) { lexer.nextToken(); } - if (lexer.token() != Token.IDENTIFIER) { - throw new ParserException("syntax error"); + if (lexer.token() != Token.IDENTIFIER + && lexer.token() != Token.LITERAL_CHARS) { + throw new ParserException("syntax error. " + lexer.info()); } String collate = lexer.stringVal(); @@ -339,11 +312,12 @@ public final SQLExpr primaryRest(SQLExpr expr) { return primaryRest(expr); } else if (expr instanceof SQLVariantRefExpr) { - if ("COLLATE".equalsIgnoreCase(lexer.stringVal())) { + if (lexer.identifierEquals(FnvHash.Constants.COLLATE)) { lexer.nextToken(); - if (lexer.token() != Token.IDENTIFIER) { - throw new ParserException("syntax error"); + if (lexer.token() != Token.IDENTIFIER + && lexer.token() != Token.LITERAL_CHARS) { + throw new ParserException("syntax error. " + lexer.info()); } String collate = lexer.stringVal(); @@ -351,246 +325,199 @@ public final SQLExpr primaryRest(SQLExpr expr) { expr.putAttribute("COLLATE", collate); - return primaryRest(expr); - } - } else if (expr instanceof SQLIntegerExpr) { - SQLIntegerExpr intExpr = (SQLIntegerExpr) expr; - String binaryString = lexer.stringVal(); - if (intExpr.getNumber().intValue() == 0 && binaryString.startsWith("b")) { - lexer.nextToken(); - expr = new SQLBinaryExpr(binaryString.substring(1)); - return primaryRest(expr); } } } - if (lexer.token() == Token.LPAREN && expr instanceof SQLIdentifierExpr) { - SQLIdentifierExpr identExpr = (SQLIdentifierExpr) expr; - String ident = identExpr.getName(); +// if (lexer.token() == Token.LPAREN && expr instanceof SQLIdentifierExpr) { +// SQLIdentifierExpr identExpr = (SQLIdentifierExpr) expr; +// String ident = identExpr.getName(); +// +// if ("POSITION".equalsIgnoreCase(ident)) { +// return parsePosition(); +// } +// } - if ("EXTRACT".equalsIgnoreCase(ident)) { - lexer.nextToken(); + if (lexer.token() == Token.VARIANT && "@".equals(lexer.stringVal())) { + return userNameRest(expr); + } - if (lexer.token() != Token.IDENTIFIER) { - throw new ParserException("syntax error"); - } + // + if (expr instanceof SQLMethodInvokeExpr && lexer.token() == Token.LBRACKET) { + lexer.nextToken(); + expr = bracketRest(expr); + return primaryRest(expr); + } - String unitVal = lexer.stringVal(); - MySqlIntervalUnit unit = MySqlIntervalUnit.valueOf(unitVal.toUpperCase()); - lexer.nextToken(); + if (lexer.token() == Token.ERROR) { + throw new ParserException("syntax error. " + lexer.info()); + } - accept(Token.FROM); + return super.primaryRest(expr); + } - SQLExpr value = expr(); + protected SQLExpr bracketRest(SQLExpr expr) { + Number index; - MySqlExtractExpr extract = new MySqlExtractExpr(); - extract.setValue(value); - extract.setUnit(unit); - accept(Token.RPAREN); + if (lexer.token() == Token.LITERAL_INT) { + index = lexer.integerValue(); + lexer.nextToken(); + } else { + throw new ParserException("error : " + lexer.stringVal()); + } - expr = extract; + if (expr instanceof SQLMethodInvokeExpr) { + SQLMethodInvokeExpr methodInvokeExpr = (SQLMethodInvokeExpr) expr; + methodInvokeExpr.getParameters().add(new SQLIntegerExpr(index)); + } + lexer.nextToken(); + expr = primaryRest(expr); + return expr; + } - return primaryRest(expr); - } else if ("SUBSTRING".equalsIgnoreCase(ident)) { - lexer.nextToken(); - SQLMethodInvokeExpr methodInvokeExpr = new SQLMethodInvokeExpr(ident); - for (; ; ) { - SQLExpr param = expr(); - methodInvokeExpr.addParameter(param); + public SQLName userName() { + SQLName name = this.name(); + if (lexer.token() == Token.LPAREN && name.hashCode64() == FnvHash.Constants.CURRENT_USER) { + lexer.nextToken(); + accept(Token.RPAREN); + return name; + } - if (lexer.token() == Token.COMMA) { - lexer.nextToken(); - continue; - } else if (lexer.token() == Token.FROM) { - lexer.nextToken(); - SQLExpr from = expr(); - methodInvokeExpr.addParameter(from); - - if (lexer.token() == Token.FOR) { - lexer.nextToken(); - SQLExpr forExpr = expr(); - methodInvokeExpr.addParameter(forExpr); - } - break; - } else if (lexer.token() == Token.RPAREN) { - break; - } else { - throw new ParserException("syntax error"); - } - } + return (SQLName) userNameRest(name); + } - accept(Token.RPAREN); - expr = methodInvokeExpr; + private SQLExpr userNameRest(SQLExpr expr) { + if (lexer.token() != Token.VARIANT || !lexer.stringVal().startsWith("@")) { + return expr; + } - return primaryRest(expr); - } else if ("TRIM".equalsIgnoreCase(ident)) { - lexer.nextToken(); - SQLMethodInvokeExpr methodInvokeExpr = new SQLMethodInvokeExpr(ident); + MySqlUserName userName = new MySqlUserName(); + if (expr instanceof SQLCharExpr) { + userName.setUserName(((SQLCharExpr) expr).toString()); + } else { + userName.setUserName(((SQLIdentifierExpr) expr).getName()); + } - if (lexer.token() == Token.IDENTIFIER) { - String flagVal = lexer.stringVal(); - if ("LEADING".equalsIgnoreCase(flagVal)) { - lexer.nextToken(); - methodInvokeExpr.getAttributes().put("TRIM_TYPE", "LEADING"); - } else if ("BOTH".equalsIgnoreCase(flagVal)) { - lexer.nextToken(); - methodInvokeExpr.getAttributes().put("TRIM_TYPE", "BOTH"); - } else if ("TRAILING".equalsIgnoreCase(flagVal)) { - lexer.nextToken(); - methodInvokeExpr.putAttribute("TRIM_TYPE", "TRAILING"); - } - } - SQLExpr param = expr(); - methodInvokeExpr.addParameter(param); + String strVal = lexer.stringVal(); + lexer.nextToken(); - if (lexer.token() == Token.FROM) { - lexer.nextToken(); - SQLExpr from = expr(); - methodInvokeExpr.putAttribute("FROM", from); - } + if (strVal.length() > 1) { + userName.setHost(strVal.substring(1)); + return userName; + } - accept(Token.RPAREN); - expr = methodInvokeExpr; + if (lexer.token() == Token.LITERAL_CHARS) { + userName.setHost("'" + lexer.stringVal() + "'"); + } else { + userName.setHost(lexer.stringVal()); + } + lexer.nextToken(); - return primaryRest(expr); - } else if ("MATCH".equalsIgnoreCase(ident)) { - lexer.nextToken(); - MySqlMatchAgainstExpr matchAgainstExpr = new MySqlMatchAgainstExpr(); + if (lexer.token() == Token.IDENTIFIED) { + Lexer.SavePoint mark = lexer.mark(); - if (lexer.token() == Token.RPAREN) { - lexer.nextToken(); + lexer.nextToken(); + if (lexer.token() == Token.BY) { + lexer.nextToken(); + if (lexer.identifierEquals(FnvHash.Constants.PASSWORD)) { + lexer.reset(mark); } else { - exprList(matchAgainstExpr.getColumns(), matchAgainstExpr); - accept(Token.RPAREN); + userName.setIdentifiedBy(lexer.stringVal()); + lexer.nextToken(); } + } else { + lexer.reset(mark); + } + } - acceptIdentifier("AGAINST"); - - accept(Token.LPAREN); - SQLExpr against = primary(); - matchAgainstExpr.setAgainst(against); + return userName; + } - if (lexer.token() == Token.IN) { - lexer.nextToken(); - if (identifierEquals("NATURAL")) { - lexer.nextToken(); - acceptIdentifier("LANGUAGE"); - acceptIdentifier("MODE"); - if (lexer.token() == Token.WITH) { - lexer.nextToken(); - acceptIdentifier("QUERY"); - acceptIdentifier("EXPANSION"); - matchAgainstExpr.setSearchModifier(MySqlMatchAgainstExpr.SearchModifier.IN_NATURAL_LANGUAGE_MODE_WITH_QUERY_EXPANSION); - } else { - matchAgainstExpr.setSearchModifier(MySqlMatchAgainstExpr.SearchModifier.IN_NATURAL_LANGUAGE_MODE); - } - } else if (identifierEquals("BOOLEAN")) { - lexer.nextToken(); - acceptIdentifier("MODE"); - matchAgainstExpr.setSearchModifier(MySqlMatchAgainstExpr.SearchModifier.IN_BOOLEAN_MODE); - } else { - throw new ParserException("TODO"); - } - } else if (lexer.token() == Token.WITH) { - throw new ParserException("TODO"); - } + protected SQLExpr parsePosition() { - accept(Token.RPAREN); + SQLExpr subStr = this.primary(); + accept(Token.IN); + SQLExpr str = this.expr(); + accept(Token.RPAREN); - expr = matchAgainstExpr; + SQLMethodInvokeExpr locate = new SQLMethodInvokeExpr("LOCATE"); + locate.addParameter(subStr); + locate.addParameter(str); - return primaryRest(expr); - } else if ("CONVERT".equalsIgnoreCase(ident)) { - lexer.nextToken(); - SQLMethodInvokeExpr methodInvokeExpr = new SQLMethodInvokeExpr(ident); + return primaryRest(locate); + } - if (lexer.token() != Token.RPAREN) { - exprList(methodInvokeExpr.getParameters(), methodInvokeExpr); - } + protected SQLExpr parseExtract() { + SQLExpr expr; + if (lexer.token() != Token.IDENTIFIER) { + throw new ParserException("syntax error. " + lexer.info()); + } - if (identifierEquals("USING")) { - lexer.nextToken(); - if (lexer.token() != Token.IDENTIFIER) { - throw new ParserException("syntax error"); - } - String charset = lexer.stringVal(); - lexer.nextToken(); - methodInvokeExpr.putAttribute("USING", charset); - } + String unitVal = lexer.stringVal(); + SQLIntervalUnit unit = SQLIntervalUnit.valueOf(unitVal.toUpperCase()); + lexer.nextToken(); - accept(Token.RPAREN); + accept(Token.FROM); - expr = methodInvokeExpr; + SQLExpr value = expr(); - return primaryRest(expr); - } else if ("POSITION".equalsIgnoreCase(ident)) { - accept(Token.LPAREN); - SQLExpr subStr = this.primary(); - accept(Token.IN); - SQLExpr str = this.expr(); - accept(Token.RPAREN); + MySqlExtractExpr extract = new MySqlExtractExpr(); + extract.setValue(value); + extract.setUnit(unit); + accept(Token.RPAREN); - SQLMethodInvokeExpr locate = new SQLMethodInvokeExpr("LOCATE"); - locate.addParameter(subStr); - locate.addParameter(str); + expr = extract; - expr = locate; - return primaryRest(expr); - } - } + return primaryRest(expr); + } - if (lexer.token() == Token.VARIANT && "@".equals(lexer.stringVal())) { - lexer.nextToken(); - MySqlUserName userName = new MySqlUserName(); - if (expr instanceof SQLCharExpr) { - userName.setUserName(((SQLCharExpr) expr).toString()); - } else { - userName.setUserName(((SQLIdentifierExpr) expr).getName()); - } + protected SQLExpr parseMatch() { - if (lexer.token() == Token.LITERAL_CHARS) { - userName.setHost("'" + lexer.stringVal() + "'"); - } else { - userName.setHost(lexer.stringVal()); - } - lexer.nextToken(); - return userName; - } + MySqlMatchAgainstExpr matchAgainstExpr = new MySqlMatchAgainstExpr(); - // - if (expr instanceof SQLMethodInvokeExpr && lexer.token() == Token.LBRACKET) { + if (lexer.token() == Token.RPAREN) { lexer.nextToken(); - expr = bracketRest(expr); - return primaryRest(expr); + } else { + exprList(matchAgainstExpr.getColumns(), matchAgainstExpr); + accept(Token.RPAREN); } - if (lexer.token() == Token.ERROR) { - throw new ParserException("syntax error, token: " + lexer.token() + " " + lexer.stringVal() + ", pos : " - + lexer.pos()); - } + acceptIdentifier("AGAINST"); - return super.primaryRest(expr); - } - - protected SQLExpr bracketRest(SQLExpr expr) { - Number index; + accept(Token.LPAREN); + SQLExpr against = primary(); + matchAgainstExpr.setAgainst(against); - if (lexer.token() == Token.LITERAL_INT) { - index = lexer.integerValue(); + if (lexer.token() == Token.IN) { lexer.nextToken(); - } else { - throw new ParserException("error : " + lexer.stringVal()); + if (lexer.identifierEquals(FnvHash.Constants.NATURAL)) { + lexer.nextToken(); + acceptIdentifier("LANGUAGE"); + acceptIdentifier("MODE"); + if (lexer.token() == Token.WITH) { + lexer.nextToken(); + acceptIdentifier("QUERY"); + acceptIdentifier("EXPANSION"); + matchAgainstExpr.setSearchModifier(MySqlMatchAgainstExpr.SearchModifier.IN_NATURAL_LANGUAGE_MODE_WITH_QUERY_EXPANSION); + } else { + matchAgainstExpr.setSearchModifier(MySqlMatchAgainstExpr.SearchModifier.IN_NATURAL_LANGUAGE_MODE); + } + } else if (lexer.identifierEquals(FnvHash.Constants.BOOLEAN)) { + lexer.nextToken(); + acceptIdentifier("MODE"); + matchAgainstExpr.setSearchModifier(MySqlMatchAgainstExpr.SearchModifier.IN_BOOLEAN_MODE); + } else { + throw new ParserException("syntax error. " + lexer.info()); + } + } else if (lexer.token() == Token.WITH) { + throw new ParserException("TODO. " + lexer.info()); } - if (expr instanceof SQLMethodInvokeExpr) { - SQLMethodInvokeExpr methodInvokeExpr = (SQLMethodInvokeExpr) expr; - methodInvokeExpr.getParameters().add(new SQLIntegerExpr(index)); - } - lexer.nextToken(); - expr = primaryRest(expr); - return expr; + accept(Token.RPAREN); + + return primaryRest(matchAgainstExpr); } public SQLSelectParser createSelectParser() { @@ -610,30 +537,55 @@ protected SQLExpr parseInterval() { accept(Token.RPAREN); - return primaryRest(methodInvokeExpr); + // + + if (methodInvokeExpr.getParameters().size() == 1 // + && lexer.token() == Token.IDENTIFIER) { + SQLExpr value = methodInvokeExpr.getParameters().get(0); + String unit = lexer.stringVal(); + lexer.nextToken(); + + SQLIntervalExpr intervalExpr = new SQLIntervalExpr(); + intervalExpr.setValue(value); + intervalExpr.setUnit(SQLIntervalUnit.valueOf(unit.toUpperCase())); + return intervalExpr; + } else { + return primaryRest(methodInvokeExpr); + } } else { SQLExpr value = expr(); if (lexer.token() != Token.IDENTIFIER) { - throw new ParserException("Syntax error"); + throw new ParserException("Syntax error. " + lexer.info()); } String unit = lexer.stringVal(); lexer.nextToken(); - MySqlIntervalExpr intervalExpr = new MySqlIntervalExpr(); + SQLIntervalExpr intervalExpr = new SQLIntervalExpr(); intervalExpr.setValue(value); - intervalExpr.setUnit(MySqlIntervalUnit.valueOf(unit.toUpperCase())); + intervalExpr.setUnit(SQLIntervalUnit.valueOf(unit.toUpperCase())); return intervalExpr; } } public SQLColumnDefinition parseColumn() { - MySqlSQLColumnDefinition column = new MySqlSQLColumnDefinition(); + SQLColumnDefinition column = new SQLColumnDefinition(); + column.setDbType(dbType); column.setName(name()); column.setDataType(parseDataType()); + if (lexer.identifierEquals(FnvHash.Constants.GENERATED)) { + lexer.nextToken(); + acceptIdentifier("ALWAYS"); + accept(Token.AS); + accept(Token.LPAREN); + SQLExpr expr = this.expr(); + accept(Token.RPAREN); + column.setGeneratedAlawsAs(expr); + } + return parseColumnRest(column); } @@ -642,91 +594,97 @@ public SQLColumnDefinition parseColumnRest(SQLColumnDefinition column) { lexer.nextToken(); accept(Token.UPDATE); SQLExpr expr = this.expr(); - ((MySqlSQLColumnDefinition) column).setOnUpdate(expr); + column.setOnUpdate(expr); } - if (identifierEquals("AUTO_INCREMENT")) { + if (lexer.identifierEquals(FnvHash.Constants.CHARACTER)) { + lexer.nextToken(); + accept(Token.SET); + MySqlCharExpr charSetCollateExpr=new MySqlCharExpr(); + charSetCollateExpr.setCharset(lexer.stringVal()); lexer.nextToken(); - if (column instanceof MySqlSQLColumnDefinition) { - ((MySqlSQLColumnDefinition) column).setAutoIncrement(true); + if (lexer.identifierEquals(FnvHash.Constants.COLLATE)) { + lexer.nextToken(); + charSetCollateExpr.setCollate(lexer.stringVal()); + lexer.nextToken(); } + column.setCharsetExpr(charSetCollateExpr); return parseColumnRest(column); } - if (identifierEquals("precision") && column.getDataType().getName().equalsIgnoreCase("double")) { + if (lexer.identifierEquals(FnvHash.Constants.CHARSET)) { lexer.nextToken(); + MySqlCharExpr charSetCollateExpr=new MySqlCharExpr(); + charSetCollateExpr.setCharset(lexer.stringVal()); + lexer.nextToken(); + if (lexer.identifierEquals(FnvHash.Constants.COLLATE)) { + lexer.nextToken(); + charSetCollateExpr.setCollate(lexer.stringVal()); + lexer.nextToken(); + } + column.setCharsetExpr(charSetCollateExpr); + return parseColumnRest(column); } - - if (identifierEquals("PARTITION")) { - throw new ParserException("syntax error " + lexer.token() + " " + lexer.stringVal()); + if (lexer.identifierEquals(FnvHash.Constants.AUTO_INCREMENT)) { + lexer.nextToken(); + column.setAutoIncrement(true); + return parseColumnRest(column); } - if (identifierEquals("STORAGE")) { + if (lexer.identifierEquals(FnvHash.Constants.PRECISION) + && column.getDataType().nameHashCode64() ==FnvHash.Constants.DOUBLE) { lexer.nextToken(); - SQLExpr expr = expr(); - if (column instanceof MySqlSQLColumnDefinition) { - ((MySqlSQLColumnDefinition) column).setStorage(expr); - } } - super.parseColumnRest(column); + if (lexer.token() == Token.PARTITION) { + throw new ParserException("syntax error " + lexer.info()); + } - return column; - } + if (lexer.identifierEquals(FnvHash.Constants.STORAGE)) { + lexer.nextToken(); + SQLExpr expr = expr(); + column.setStorage(expr); + } - protected SQLDataType parseDataTypeRest(SQLDataType dataType) { - super.parseDataTypeRest(dataType); + if (lexer.token() == Token.AS) { + lexer.nextToken(); + accept(Token.LPAREN); + SQLExpr expr = expr(); + column.setAsExpr(expr); + accept(Token.RPAREN); + } - if (identifierEquals("UNSIGNED")) { + if (lexer.identifierEquals(FnvHash.Constants.STORED)) { lexer.nextToken(); - dataType.getAttributes().put("UNSIGNED", true); + column.setStored(true); } - if (identifierEquals("ZEROFILL")) { + if (lexer.identifierEquals(FnvHash.Constants.VIRTUAL)) { lexer.nextToken(); - dataType.getAttributes().put("ZEROFILL", true); + column.setVirtual(true); } - return dataType; + super.parseColumnRest(column); + + return column; } - public SQLExpr orRest(SQLExpr expr) { + protected SQLDataType parseDataTypeRest(SQLDataType dataType) { + super.parseDataTypeRest(dataType); - for (; ; ) { - if (lexer.token() == Token.OR || lexer.token() == Token.BARBAR) { + for (;;) { + if (lexer.identifierEquals(FnvHash.Constants.UNSIGNED)) { lexer.nextToken(); - SQLExpr rightExp = and(); - - expr = new SQLBinaryOpExpr(expr, SQLBinaryOperator.BooleanOr, rightExp, JdbcConstants.MYSQL); - } else if (lexer.token() == Token.XOR) { + ((SQLDataTypeImpl) dataType).setUnsigned(true); + } else if (lexer.identifierEquals(FnvHash.Constants.ZEROFILL)) { lexer.nextToken(); - SQLExpr rightExp = and(); - - expr = new SQLBinaryOpExpr(expr, SQLBinaryOperator.BooleanXor, rightExp, JdbcConstants.MYSQL); + ((SQLDataTypeImpl) dataType).setZerofill(true); } else { break; } } - return expr; - } - - public SQLExpr additiveRest(SQLExpr expr) { - if (lexer.token() == Token.PLUS) { - lexer.nextToken(); - SQLExpr rightExp = multiplicative(); - - expr = new SQLBinaryOpExpr(expr, SQLBinaryOperator.Add, rightExp, JdbcConstants.MYSQL); - expr = additiveRest(expr); - } else if (lexer.token() == Token.SUB) { - lexer.nextToken(); - SQLExpr rightExp = multiplicative(); - - expr = new SQLBinaryOpExpr(expr, SQLBinaryOperator.Subtract, rightExp, JdbcConstants.MYSQL); - expr = additiveRest(expr); - } - - return expr; + return dataType; } public SQLAssignItem parseAssignItem() { @@ -735,25 +693,67 @@ public SQLAssignItem parseAssignItem() { SQLExpr var = primary(); String ident = null; + long identHash = 0; if (var instanceof SQLIdentifierExpr) { - ident = ((SQLIdentifierExpr) var).getName(); + SQLIdentifierExpr identExpr = (SQLIdentifierExpr) var; + ident = identExpr.getName(); + identHash = identExpr.hashCode64(); - if ("GLOBAL".equalsIgnoreCase(ident)) { + if (identHash == FnvHash.Constants.GLOBAL) { ident = lexer.stringVal(); lexer.nextToken(); var = new SQLVariantRefExpr(ident, true); - } else if ("SESSION".equalsIgnoreCase(ident)) { + } else if (identHash == FnvHash.Constants.SESSION) { ident = lexer.stringVal(); lexer.nextToken(); - var = new SQLVariantRefExpr(ident, false); + var = new SQLVariantRefExpr(ident, false, true); } else { var = new SQLVariantRefExpr(ident); } } - if ("NAMES".equalsIgnoreCase(ident)) { - // skip - } else if ("CHARACTER".equalsIgnoreCase(ident)) { + if (identHash == FnvHash.Constants.NAMES) { + String charset = lexer.stringVal(); + + SQLExpr varExpr = null; + boolean chars = false; + final Token token = lexer.token(); + if (token == Token.IDENTIFIER) { + lexer.nextToken(); + } else if (token == Token.DEFAULT) { + charset = "DEFAULT"; + lexer.nextToken(); + } else if (token == Token.QUES) { + varExpr = new SQLVariantRefExpr("?"); + lexer.nextToken(); + } else { + chars = true; + accept(Token.LITERAL_CHARS); + } + + if (lexer.identifierEquals(FnvHash.Constants.COLLATE)) { + MySqlCharExpr charsetExpr = new MySqlCharExpr(charset); + lexer.nextToken(); + + String collate = lexer.stringVal(); + lexer.nextToken(); + charsetExpr.setCollate(collate); + + item.setValue(charsetExpr); + } else { + if (varExpr != null) { + item.setValue(varExpr); + } else { + item.setValue(chars + ? new SQLCharExpr(charset) + : new SQLIdentifierExpr(charset) + ); + } + } + + item.setTarget(var); + return item; + } else if (identHash == FnvHash.Constants.CHARACTER) { var = new SQLIdentifierExpr("CHARACTER SET"); accept(Token.SET); if (lexer.token() == Token.EQ) { @@ -767,7 +767,12 @@ public SQLAssignItem parseAssignItem() { } } - item.setValue(this.expr()); + if (lexer.token() == Token.ON) { + lexer.nextToken(); + item.setValue(new SQLIdentifierExpr("ON")); + } else { + item.setValue(this.expr()); + } item.setTarget(var); return item; @@ -785,33 +790,17 @@ public SQLName nameRest(SQLName name) { userName.setHost(lexer.stringVal()); } lexer.nextToken(); - return userName; - } - return super.nameRest(name); - } - - public MySqlSelectQueryBlock.Limit parseLimit() { - if (lexer.token() == Token.LIMIT) { - lexer.nextToken(); - - MySqlSelectQueryBlock.Limit limit = new MySqlSelectQueryBlock.Limit(); - SQLExpr temp = this.expr(); - if (lexer.token() == (Token.COMMA)) { - limit.setOffset(temp); + if (lexer.token() == Token.IDENTIFIED) { lexer.nextToken(); - limit.setRowCount(this.expr()); - } else if (identifierEquals("OFFSET")) { - limit.setRowCount(temp); + accept(Token.BY); + userName.setIdentifiedBy(lexer.stringVal()); lexer.nextToken(); - limit.setOffset(this.expr()); - } else { - limit.setRowCount(temp); } - return limit; - } - return null; + return userName; + } + return super.nameRest(name); } @Override @@ -821,15 +810,26 @@ public MySqlPrimaryKey parsePrimaryKey() { MySqlPrimaryKey primaryKey = new MySqlPrimaryKey(); - if (identifierEquals("USING")) { + if (lexer.identifierEquals(FnvHash.Constants.USING)) { lexer.nextToken(); primaryKey.setIndexType(lexer.stringVal()); lexer.nextToken(); } + if (lexer.token() != Token.LPAREN) { + SQLName name = this.name(); + primaryKey.setName(name); + } + accept(Token.LPAREN); - for (; ; ) { - primaryKey.getColumns().add(this.expr()); + for (;;) { + SQLExpr expr; + if (lexer.token() == Token.LITERAL_ALIAS) { + expr = this.name(); + } else { + expr = this.expr(); + } + primaryKey.addColumn(expr); if (!(lexer.token() == (Token.COMMA))) { break; } else { @@ -838,6 +838,12 @@ public MySqlPrimaryKey parsePrimaryKey() { } accept(Token.RPAREN); + if (lexer.identifierEquals(FnvHash.Constants.USING)) { + lexer.nextToken(); + primaryKey.setIndexType(lexer.stringVal()); + lexer.nextToken(); + } + return primaryKey; } @@ -856,12 +862,27 @@ public MySqlUnique parseUnique() { if (lexer.token() != Token.LPAREN) { SQLName indexName = name(); - unique.setIndexName(indexName); + unique.setName(indexName); + } + + //5.5语法 USING BTREE 放在index 名字后 + if (lexer.identifierEquals(FnvHash.Constants.USING)) { + lexer.nextToken(); + unique.setIndexType(lexer.stringVal()); + lexer.nextToken(); } accept(Token.LPAREN); - for (; ; ) { - unique.getColumns().add(this.expr()); + for (;;) { + SQLExpr column = this.expr(); + if (lexer.token() == Token.ASC) { + column = new MySqlOrderingExpr(column, SQLOrderingSpecification.ASC); + lexer.nextToken(); + } else if (lexer.token() == Token.DESC) { + column = new MySqlOrderingExpr(column, SQLOrderingSpecification.DESC); + lexer.nextToken(); + } + unique.addColumn(column); if (!(lexer.token() == (Token.COMMA))) { break; } else { @@ -870,12 +891,21 @@ public MySqlUnique parseUnique() { } accept(Token.RPAREN); - if (identifierEquals("USING")) { + if (lexer.identifierEquals(FnvHash.Constants.USING)) { lexer.nextToken(); unique.setIndexType(lexer.stringVal()); lexer.nextToken(); } + if (lexer.identifierEquals(FnvHash.Constants.KEY_BLOCK_SIZE)) { + lexer.nextToken(); + if (lexer.token() == Token.EQ) { + lexer.nextToken(); + } + SQLExpr value = this.primary(); + unique.setKeyBlockSize(value); + } + return unique; } @@ -891,7 +921,7 @@ public MysqlForeignKey parseForeignKey() { } accept(Token.LPAREN); - this.names(fk.getReferencingColumns()); + this.names(fk.getReferencingColumns(), fk); accept(Token.RPAREN); accept(Token.REFERENCES); @@ -902,45 +932,39 @@ public MysqlForeignKey parseForeignKey() { this.names(fk.getReferencedColumns()); accept(Token.RPAREN); - if (identifierEquals("MATCH")) { - if (identifierEquals("FULL")) { - fk.setReferenceMatch(MysqlForeignKey.Match.FULL); - } else if (identifierEquals("PARTIAL")) { - fk.setReferenceMatch(MysqlForeignKey.Match.PARTIAL); - } else if (identifierEquals("SIMPLE")) { - fk.setReferenceMatch(MysqlForeignKey.Match.SIMPLE); + if (lexer.identifierEquals(FnvHash.Constants.MATCH)) { + lexer.nextToken(); + if (lexer.identifierEquals("FULL") || lexer.token() == Token.FULL) { + fk.setReferenceMatch(SQLForeignKeyImpl.Match.FULL); + lexer.nextToken(); + } else if (lexer.identifierEquals(FnvHash.Constants.PARTIAL)) { + fk.setReferenceMatch(SQLForeignKeyImpl.Match.PARTIAL); + lexer.nextToken(); + } else if (lexer.identifierEquals(FnvHash.Constants.SIMPLE)) { + fk.setReferenceMatch(SQLForeignKeyImpl.Match.SIMPLE); + lexer.nextToken(); + } else { + throw new ParserException("TODO : " + lexer.info()); } } - if (lexer.token() == Token.ON) { + while (lexer.token() == Token.ON) { lexer.nextToken(); + if (lexer.token() == Token.DELETE) { - fk.setReferenceOn(MysqlForeignKey.On.DELETE); + lexer.nextToken(); + + SQLForeignKeyImpl.Option option = parseReferenceOption(); + fk.setOnDelete(option); } else if (lexer.token() == Token.UPDATE) { - fk.setReferenceOn(MysqlForeignKey.On.UPDATE); + lexer.nextToken(); + + SQLForeignKeyImpl.Option option = parseReferenceOption(); + fk.setOnUpdate(option); } else { throw new ParserException("syntax error, expect DELETE or UPDATE, actual " + lexer.token() + " " - + lexer.stringVal()); + + lexer.info()); } - lexer.nextToken(); - - if (lexer.token() == Token.RESTRICT) { - fk.setReferenceOption(MysqlForeignKey.Option.RESTRICT); - } else if (identifierEquals("CASCADE")) { - fk.setReferenceOption(MysqlForeignKey.Option.CASCADE); - } else if (lexer.token() == Token.SET) { - accept(Token.NULL); - fk.setReferenceOption(MysqlForeignKey.Option.SET_NULL); - } else if (identifierEquals("ON")) { - lexer.nextToken(); - if (identifierEquals("ACTION")) { - fk.setReferenceOption(MysqlForeignKey.Option.NO_ACTION); - } else { - throw new ParserException("syntax error, expect ACTION, actual " + lexer.token() + " " - + lexer.stringVal()); - } - } - lexer.nextToken(); } return fk; } @@ -950,18 +974,19 @@ protected SQLAggregateExpr parseAggregateExprRest(SQLAggregateExpr aggregateExpr SQLOrderBy orderBy = this.parseOrderBy(); aggregateExpr.putAttribute("ORDER BY", orderBy); } - if (identifierEquals("SEPARATOR")) { + if (lexer.identifierEquals(FnvHash.Constants.SEPARATOR)) { lexer.nextToken(); SQLExpr seperator = this.primary(); + seperator.setParent(aggregateExpr); aggregateExpr.putAttribute("SEPARATOR", seperator); } return aggregateExpr; } - public MySqlSelectGroupByExpr parseSelectGroupByItem() { - MySqlSelectGroupByExpr item = new MySqlSelectGroupByExpr(); + public MySqlOrderingExpr parseSelectGroupByItem() { + MySqlOrderingExpr item = new MySqlOrderingExpr(); item.setExpr(expr()); @@ -976,4 +1001,107 @@ public MySqlSelectGroupByExpr parseSelectGroupByItem() { return item; } + public SQLPartition parsePartition() { + accept(Token.PARTITION); + + SQLPartition partitionDef = new SQLPartition(); + + partitionDef.setName(this.name()); + + SQLPartitionValue values = this.parsePartitionValues(); + if (values != null) { + partitionDef.setValues(values); + } + + for (;;) { + boolean storage = false; + if (lexer.identifierEquals(FnvHash.Constants.DATA)) { + lexer.nextToken(); + acceptIdentifier("DIRECTORY"); + if (lexer.token() == Token.EQ) { + lexer.nextToken(); + } + partitionDef.setDataDirectory(this.expr()); + } else if (lexer.token() == Token.TABLESPACE) { + lexer.nextToken(); + if (lexer.token() == Token.EQ) { + lexer.nextToken(); + } + SQLName tableSpace = this.name(); + partitionDef.setTablespace(tableSpace); + } else if (lexer.token() == Token.INDEX) { + lexer.nextToken(); + acceptIdentifier("DIRECTORY"); + if (lexer.token() == Token.EQ) { + lexer.nextToken(); + } + partitionDef.setIndexDirectory(this.expr()); + } else if (lexer.identifierEquals(FnvHash.Constants.MAX_ROWS)) { + lexer.nextToken(); + if (lexer.token() == Token.EQ) { + lexer.nextToken(); + } + SQLExpr maxRows = this.primary(); + partitionDef.setMaxRows(maxRows); + } else if (lexer.identifierEquals(FnvHash.Constants.MIN_ROWS)) { + lexer.nextToken(); + if (lexer.token() == Token.EQ) { + lexer.nextToken(); + } + SQLExpr minRows = this.primary(); + partitionDef.setMaxRows(minRows); + } else if (lexer.identifierEquals(FnvHash.Constants.ENGINE) || // + (storage = (lexer.token() == Token.STORAGE || lexer.identifierEquals(FnvHash.Constants.STORAGE)))) { + if (storage) { + lexer.nextToken(); + } + acceptIdentifier("ENGINE"); + + if (lexer.token() == Token.EQ) { + lexer.nextToken(); + } + + SQLName engine = this.name(); + partitionDef.setEngine(engine); + } else if (lexer.token() == Token.COMMENT) { + lexer.nextToken(); + if (lexer.token() == Token.EQ) { + lexer.nextToken(); + } + SQLExpr comment = this.primary(); + partitionDef.setComment(comment); + } else { + break; + } + } + + if (lexer.token() == Token.LPAREN) { + lexer.nextToken(); + + for (;;) { + acceptIdentifier("SUBPARTITION"); + + SQLName subPartitionName = this.name(); + SQLSubPartition subPartition = new SQLSubPartition(); + subPartition.setName(subPartitionName); + + partitionDef.addSubPartition(subPartition); + + if (lexer.token() == Token.COMMA) { + lexer.nextToken(); + continue; + } + break; + } + + accept(Token.RPAREN); + } + return partitionDef; + } + + protected SQLExpr parseAliasExpr(String alias) { + String chars = alias.substring(1, alias.length() - 1); + return new SQLCharExpr(chars); + } + } diff --git a/src/main/java/org/nlpcn/es4sql/parse/ElasticSqlSelectParser.java b/src/main/java/org/nlpcn/es4sql/parse/ElasticSqlSelectParser.java index cc0d190d..5e97476e 100644 --- a/src/main/java/org/nlpcn/es4sql/parse/ElasticSqlSelectParser.java +++ b/src/main/java/org/nlpcn/es4sql/parse/ElasticSqlSelectParser.java @@ -1,109 +1,180 @@ package org.nlpcn.es4sql.parse; import com.alibaba.druid.sql.ast.SQLExpr; +import com.alibaba.druid.sql.ast.SQLName; +import com.alibaba.druid.sql.ast.SQLObject; import com.alibaba.druid.sql.ast.SQLSetQuantifier; import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; +import com.alibaba.druid.sql.ast.expr.SQLListExpr; import com.alibaba.druid.sql.ast.expr.SQLLiteralExpr; -import com.alibaba.druid.sql.ast.statement.*; -import com.alibaba.druid.sql.dialect.mysql.ast.*; +import com.alibaba.druid.sql.ast.statement.SQLExprTableSource; +import com.alibaba.druid.sql.ast.statement.SQLSelect; +import com.alibaba.druid.sql.ast.statement.SQLSelectItem; +import com.alibaba.druid.sql.ast.statement.SQLSelectQuery; +import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock; +import com.alibaba.druid.sql.ast.statement.SQLSubqueryTableSource; +import com.alibaba.druid.sql.ast.statement.SQLTableSource; +import com.alibaba.druid.sql.ast.statement.SQLUnionQuery; +import com.alibaba.druid.sql.ast.statement.SQLUnionQueryTableSource; +import com.alibaba.druid.sql.ast.statement.SQLUpdateSetItem; +import com.alibaba.druid.sql.dialect.mysql.ast.MySqlForceIndexHint; +import com.alibaba.druid.sql.dialect.mysql.ast.MySqlIgnoreIndexHint; +import com.alibaba.druid.sql.dialect.mysql.ast.MySqlIndexHint; +import com.alibaba.druid.sql.dialect.mysql.ast.MySqlIndexHintImpl; +import com.alibaba.druid.sql.dialect.mysql.ast.MySqlUseIndexHint; import com.alibaba.druid.sql.dialect.mysql.ast.expr.MySqlOutFileExpr; -import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectGroupBy; import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock; -import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlUnionQuery; +import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlUpdateStatement; +import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlUpdateTableSource; import com.alibaba.druid.sql.parser.ParserException; import com.alibaba.druid.sql.parser.SQLExprParser; +import com.alibaba.druid.sql.parser.SQLSelectListCache; import com.alibaba.druid.sql.parser.SQLSelectParser; import com.alibaba.druid.sql.parser.Token; +import com.alibaba.druid.util.FnvHash; + +import java.util.List; /** * Created by allwefantasy on 8/19/16. */ public class ElasticSqlSelectParser extends SQLSelectParser { + + protected boolean returningFlag = false; + protected MySqlUpdateStatement updateStmt; + public ElasticSqlSelectParser(SQLExprParser exprParser) { super(exprParser); } + public ElasticSqlSelectParser(SQLExprParser exprParser, SQLSelectListCache selectListCache){ + super(exprParser, selectListCache); + } + public ElasticSqlSelectParser(String sql) { this(new ElasticSqlExprParser(sql)); } @Override - public SQLSelectQuery query() { - if (lexer.token() == (Token.LPAREN)) { + public void parseFrom(SQLSelectQueryBlock queryBlock) { + if (lexer.token() != Token.FROM) { + return; + } + + lexer.nextTokenIdent(); + + if (lexer.token() == Token.UPDATE) { // taobao returning to urgly syntax + updateStmt = this.parseUpdateStatment(); + List returnning = updateStmt.getReturning(); + for (SQLSelectItem item : queryBlock.getSelectList()) { + SQLExpr itemExpr = item.getExpr(); + itemExpr.setParent(updateStmt); + returnning.add(itemExpr); + } + returningFlag = true; + return; + } + + queryBlock.setFrom(parseTableSource()); + } + + @Override + public SQLSelectQuery query(SQLObject parent, boolean acceptUnion) { + if (lexer.token() == Token.LPAREN) { lexer.nextToken(); SQLSelectQuery select = query(); + select.setBracket(true); accept(Token.RPAREN); - return queryRest(select); + return queryRest(select, acceptUnion); } MySqlSelectQueryBlock queryBlock = new MySqlSelectQueryBlock(); + queryBlock.setParent(parent); - if (lexer.token() == Token.SELECT) { - lexer.nextToken(); + if (lexer.hasComment() && lexer.isKeepComments()) { + queryBlock.addBeforeComment(lexer.readAndResetComments()); + } - if (lexer.token() == Token.HINT) { - this.exprParser.parseHints(queryBlock.getHints()); + if (lexer.token() == Token.SELECT) { + if (selectListCache != null) { + selectListCache.match(lexer, queryBlock); } + } - if (lexer.token() == Token.COMMENT) { - lexer.nextToken(); + if (lexer.token() == Token.SELECT) { + lexer.nextTokenValue(); + + for(;;) { + if (lexer.token() == Token.HINT) { + this.exprParser.parseHints(queryBlock.getHints()); + } else { + break; + } } - if (lexer.token() == (Token.DISTINCT)) { + Token token = lexer.token(); + if (token == (Token.DISTINCT)) { queryBlock.setDistionOption(SQLSetQuantifier.DISTINCT); lexer.nextToken(); - } else if (identifierEquals("DISTINCTROW")) { + } else if (lexer.identifierEquals(FnvHash.Constants.DISTINCTROW)) { queryBlock.setDistionOption(SQLSetQuantifier.DISTINCTROW); lexer.nextToken(); - } else if (lexer.token() == (Token.ALL)) { + } else if (token == (Token.ALL)) { queryBlock.setDistionOption(SQLSetQuantifier.ALL); lexer.nextToken(); } - if (identifierEquals("HIGH_PRIORITY")) { + if (lexer.identifierEquals(FnvHash.Constants.HIGH_PRIORITY)) { queryBlock.setHignPriority(true); lexer.nextToken(); } - if (identifierEquals("STRAIGHT_JOIN")) { + if (lexer.identifierEquals(FnvHash.Constants.STRAIGHT_JOIN)) { queryBlock.setStraightJoin(true); lexer.nextToken(); } - if (identifierEquals("SQL_SMALL_RESULT")) { + if (lexer.identifierEquals(FnvHash.Constants.SQL_SMALL_RESULT)) { queryBlock.setSmallResult(true); lexer.nextToken(); } - if (identifierEquals("SQL_BIG_RESULT")) { + if (lexer.identifierEquals(FnvHash.Constants.SQL_BIG_RESULT)) { queryBlock.setBigResult(true); lexer.nextToken(); } - if (identifierEquals("SQL_BUFFER_RESULT")) { + if (lexer.identifierEquals(FnvHash.Constants.SQL_BUFFER_RESULT)) { queryBlock.setBufferResult(true); lexer.nextToken(); } - if (identifierEquals("SQL_CACHE")) { + if (lexer.identifierEquals(FnvHash.Constants.SQL_CACHE)) { queryBlock.setCache(true); lexer.nextToken(); } - if (identifierEquals("SQL_NO_CACHE")) { + if (lexer.identifierEquals(FnvHash.Constants.SQL_NO_CACHE)) { queryBlock.setCache(false); lexer.nextToken(); } - if (identifierEquals("SQL_CALC_FOUND_ROWS")) { + if (lexer.identifierEquals(FnvHash.Constants.SQL_CALC_FOUND_ROWS)) { queryBlock.setCalcFoundRows(true); lexer.nextToken(); } parseSelectList(queryBlock); + if (lexer.identifierEquals(FnvHash.Constants.FORCE)) { + lexer.nextToken(); + accept(Token.PARTITION); + SQLName partition = this.exprParser.name(); + queryBlock.setForcePartition(partition); + } + parseInto(queryBlock); } @@ -111,17 +182,19 @@ public SQLSelectQuery query() { parseWhere(queryBlock); + parseHierachical(queryBlock); + parseGroupBy(queryBlock); queryBlock.setOrderBy(this.exprParser.parseOrderBy()); if (lexer.token() == Token.LIMIT) { - queryBlock.setLimit(parseLimit()); + queryBlock.setLimit(this.exprParser.parseLimit()); } if (lexer.token() == Token.PROCEDURE) { lexer.nextToken(); - throw new ParserException("TODO"); + throw new ParserException("TODO. " + lexer.info()); } parseInto(queryBlock); @@ -131,6 +204,15 @@ public SQLSelectQuery query() { accept(Token.UPDATE); queryBlock.setForUpdate(true); + + if (lexer.identifierEquals(FnvHash.Constants.NO_WAIT) || lexer.identifierEquals(FnvHash.Constants.NOWAIT)) { + lexer.nextToken(); + queryBlock.setNoWait(true); + } else if (lexer.identifierEquals(FnvHash.Constants.WAIT)) { + lexer.nextToken(); + SQLExpr waitTime = this.exprParser.primary(); + queryBlock.setWaitTime(waitTime); + } } if (lexer.token() == Token.LOCK) { @@ -141,14 +223,151 @@ public SQLSelectQuery query() { queryBlock.setLockInShareMode(true); } - return queryRest(queryBlock); + return queryRest(queryBlock, acceptUnion); + } + + @Override + public SQLTableSource parseTableSource() { + if (lexer.token() == Token.LPAREN) { + lexer.nextToken(); + SQLTableSource tableSource; + if (lexer.token() == Token.SELECT || lexer.token() == Token.WITH) { + SQLSelect select = select(); + + accept(Token.RPAREN); + + SQLSelectQuery query = queryRest(select.getQuery()); + if (query instanceof SQLUnionQuery && select.getWithSubQuery() == null) { + select.getQuery().setBracket(true); + tableSource = new SQLUnionQueryTableSource((SQLUnionQuery) query); + } else { + tableSource = new SQLSubqueryTableSource(select); + } + } else if (lexer.token() == Token.LPAREN) { + tableSource = parseTableSource(); + accept(Token.RPAREN); + } else { + tableSource = parseTableSource(); + accept(Token.RPAREN); + } + + return parseTableSourceRest(tableSource); + } + + if(lexer.token() == Token.UPDATE) { + SQLTableSource tableSource = new MySqlUpdateTableSource(parseUpdateStatment()); + return parseTableSourceRest(tableSource); + } + + if (lexer.token() == Token.SELECT) { + throw new ParserException("TODO. " + lexer.info()); + } + + SQLExprTableSource tableReference = new SQLExprTableSource(); + + parseTableSourceQueryTableExpr(tableReference); + + SQLTableSource tableSrc = parseTableSourceRest(tableReference); + + if (lexer.hasComment() && lexer.isKeepComments()) { + tableSrc.addAfterComment(lexer.readAndResetComments()); + } + + return tableSrc; + } + + protected MySqlUpdateStatement parseUpdateStatment() { + MySqlUpdateStatement update = new MySqlUpdateStatement(); + + lexer.nextToken(); + + if (lexer.identifierEquals(FnvHash.Constants.LOW_PRIORITY)) { + lexer.nextToken(); + update.setLowPriority(true); + } + + if (lexer.identifierEquals(FnvHash.Constants.IGNORE)) { + lexer.nextToken(); + update.setIgnore(true); + } + + if (lexer.identifierEquals(FnvHash.Constants.COMMIT_ON_SUCCESS)) { + lexer.nextToken(); + update.setCommitOnSuccess(true); + } + + if (lexer.identifierEquals(FnvHash.Constants.ROLLBACK_ON_FAIL)) { + lexer.nextToken(); + update.setRollBackOnFail(true); + } + + if (lexer.identifierEquals(FnvHash.Constants.QUEUE_ON_PK)) { + lexer.nextToken(); + update.setQueryOnPk(true); + } + + if (lexer.identifierEquals(FnvHash.Constants.TARGET_AFFECT_ROW)) { + lexer.nextToken(); + SQLExpr targetAffectRow = this.exprParser.expr(); + update.setTargetAffectRow(targetAffectRow); + } + + if (lexer.identifierEquals(FnvHash.Constants.FORCE)) { + lexer.nextToken(); + + if (lexer.token() == Token.ALL) { + lexer.nextToken(); + acceptIdentifier("PARTITIONS"); + update.setForceAllPartitions(true); + } else if (lexer.identifierEquals(FnvHash.Constants.PARTITIONS)){ + lexer.nextToken(); + update.setForceAllPartitions(true); + } else if (lexer.token() == Token.PARTITION) { + lexer.nextToken(); + SQLName partition = this.exprParser.name(); + update.setForcePartition(partition); + } else { + throw new ParserException("TODO. " + lexer.info()); + } + } + + while (lexer.token() == Token.HINT) { + this.exprParser.parseHints(update.getHints()); + } + + SQLSelectParser selectParser = this.exprParser.createSelectParser(); + SQLTableSource updateTableSource = selectParser.parseTableSource(); + update.setTableSource(updateTableSource); + + accept(Token.SET); + + for (;;) { + SQLUpdateSetItem item = this.exprParser.parseUpdateSetItem(); + update.addItem(item); + + if (lexer.token() != Token.COMMA) { + break; + } + + lexer.nextToken(); + } + + if (lexer.token() == (Token.WHERE)) { + lexer.nextToken(); + update.setWhere(this.exprParser.expr()); + } + + update.setOrderBy(this.exprParser.parseOrderBy()); + update.setLimit(this.exprParser.parseLimit()); + + return update; } protected void parseInto(SQLSelectQueryBlock queryBlock) { if (lexer.token() == (Token.INTO)) { lexer.nextToken(); - if (identifierEquals("OUTFILE")) { + if (lexer.identifierEquals("OUTFILE")) { lexer.nextToken(); MySqlOutFileExpr outFile = new MySqlOutFileExpr(); @@ -156,123 +375,122 @@ protected void parseInto(SQLSelectQueryBlock queryBlock) { queryBlock.setInto(outFile); - if (identifierEquals("FIELDS") || identifierEquals("COLUMNS")) { + if (lexer.identifierEquals("FIELDS") || lexer.identifierEquals("COLUMNS")) { lexer.nextToken(); - if (identifierEquals("TERMINATED")) { + if (lexer.identifierEquals("TERMINATED")) { lexer.nextToken(); accept(Token.BY); } - outFile.setColumnsTerminatedBy((SQLLiteralExpr) expr()); + outFile.setColumnsTerminatedBy(expr()); - if (identifierEquals("OPTIONALLY")) { + if (lexer.identifierEquals("OPTIONALLY")) { lexer.nextToken(); outFile.setColumnsEnclosedOptionally(true); } - if (identifierEquals("ENCLOSED")) { + if (lexer.identifierEquals("ENCLOSED")) { lexer.nextToken(); accept(Token.BY); outFile.setColumnsEnclosedBy((SQLLiteralExpr) expr()); } - if (identifierEquals("ESCAPED")) { + if (lexer.identifierEquals("ESCAPED")) { lexer.nextToken(); accept(Token.BY); outFile.setColumnsEscaped((SQLLiteralExpr) expr()); } } - if (identifierEquals("LINES")) { + if (lexer.identifierEquals("LINES")) { lexer.nextToken(); - if (identifierEquals("STARTING")) { + if (lexer.identifierEquals("STARTING")) { lexer.nextToken(); accept(Token.BY); outFile.setLinesStartingBy((SQLLiteralExpr) expr()); } else { - identifierEquals("TERMINATED"); + lexer.identifierEquals("TERMINATED"); lexer.nextToken(); accept(Token.BY); outFile.setLinesTerminatedBy((SQLLiteralExpr) expr()); } } } else { - queryBlock.setInto(this.exprParser.name()); - } - } - } - - protected void parseGroupBy(SQLSelectQueryBlock queryBlock) { - SQLSelectGroupByClause groupBy = null; + SQLExpr intoExpr = this.exprParser.name(); + if (lexer.token() == Token.COMMA) { + SQLListExpr list = new SQLListExpr(); + list.addItem(intoExpr); - if (lexer.token() == Token.GROUP) { - groupBy = new SQLSelectGroupByClause(); - - lexer.nextToken(); - accept(Token.BY); - - while (true) { - groupBy.addItem(this.getExprParser().parseSelectGroupByItem()); - if (!(lexer.token() == (Token.COMMA))) { - break; - } - lexer.nextToken(); - } - - if (lexer.token() == Token.WITH) { - lexer.nextToken(); - acceptIdentifier("ROLLUP"); + while (lexer.token() == Token.COMMA) { + lexer.nextToken(); + SQLName name = this.exprParser.name(); + list.addItem(name); + } - MySqlSelectGroupBy mySqlGroupBy = new MySqlSelectGroupBy(); - for (SQLExpr sqlExpr : groupBy.getItems()) { - mySqlGroupBy.addItem(sqlExpr); + intoExpr = list; } - mySqlGroupBy.setRollUp(true); - - groupBy = mySqlGroupBy; + queryBlock.setInto(intoExpr); } } + } - if (lexer.token() == Token.HAVING) { - lexer.nextToken(); + @Override + protected SQLTableSource primaryTableSourceRest(SQLTableSource tableSource) { + parseIndexHintList(tableSource); - if (groupBy == null) { - groupBy = new SQLSelectGroupByClause(); - } - groupBy.setHaving(this.exprParser.expr()); + if (lexer.token() == Token.PARTITION) { + lexer.nextToken(); + accept(Token.LPAREN); + this.exprParser.names(((SQLExprTableSource) tableSource).getPartitions(), tableSource); + accept(Token.RPAREN); } - queryBlock.setGroupBy(groupBy); + return tableSource; } + @Override protected SQLTableSource parseTableSourceRest(SQLTableSource tableSource) { - if (identifierEquals("USING")) { + if (lexer.identifierEquals(FnvHash.Constants.USING)) { return tableSource; } + parseIndexHintList(tableSource); + + if (lexer.token() == Token.PARTITION) { + lexer.nextToken(); + accept(Token.LPAREN); + this.exprParser.names(((SQLExprTableSource) tableSource).getPartitions(), tableSource); + accept(Token.RPAREN); + } + + return super.parseTableSourceRest(tableSource); + } + + private void parseIndexHintList(SQLTableSource tableSource) { if (lexer.token() == Token.USE) { lexer.nextToken(); MySqlUseIndexHint hint = new MySqlUseIndexHint(); parseIndexHint(hint); tableSource.getHints().add(hint); + parseIndexHintList(tableSource); } - if (identifierEquals("IGNORE")) { + if (lexer.identifierEquals(FnvHash.Constants.IGNORE)) { lexer.nextToken(); MySqlIgnoreIndexHint hint = new MySqlIgnoreIndexHint(); parseIndexHint(hint); tableSource.getHints().add(hint); + parseIndexHintList(tableSource); } - if (identifierEquals("FORCE")) { + if (lexer.identifierEquals(FnvHash.Constants.FORCE)) { lexer.nextToken(); MySqlForceIndexHint hint = new MySqlForceIndexHint(); parseIndexHint(hint); tableSource.getHints().add(hint); + parseIndexHintList(tableSource); } - - return super.parseTableSourceRest(tableSource); } private void parseIndexHint(MySqlIndexHintImpl hint) { @@ -309,23 +527,16 @@ private void parseIndexHint(MySqlIndexHintImpl hint) { accept(Token.RPAREN); } - protected MySqlUnionQuery createSQLUnionQuery() { - return new MySqlUnionQuery(); - } - + @Override public SQLUnionQuery unionRest(SQLUnionQuery union) { if (lexer.token() == Token.LIMIT) { - MySqlUnionQuery mysqlUnionQuery = (MySqlUnionQuery) union; - mysqlUnionQuery.setLimit(parseLimit()); + union.setLimit(this.exprParser.parseLimit()); } return super.unionRest(union); } - public MySqlSelectQueryBlock.Limit parseLimit() { - return ((ElasticSqlExprParser) this.exprParser).parseLimit(); + public ElasticSqlSelectParser getExprParser() { + return ElasticSqlSelectParser.class.cast(exprParser); } - public ElasticSqlExprParser getExprParser() { - return (ElasticSqlExprParser) exprParser; - } } diff --git a/src/main/java/org/nlpcn/es4sql/parse/ElasticSqlStatementParser.java b/src/main/java/org/nlpcn/es4sql/parse/ElasticSqlStatementParser.java deleted file mode 100644 index 58ff40ea..00000000 --- a/src/main/java/org/nlpcn/es4sql/parse/ElasticSqlStatementParser.java +++ /dev/null @@ -1,93 +0,0 @@ -package org.nlpcn.es4sql.parse; - -import com.alibaba.druid.sql.ast.SQLExpr; -import com.alibaba.druid.sql.ast.SQLOrderBy; -import com.alibaba.druid.sql.ast.statement.SQLTableSource; -import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlDeleteStatement; -import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser; -import com.alibaba.druid.sql.parser.Lexer; -import com.alibaba.druid.sql.parser.ParserException; -import com.alibaba.druid.sql.parser.Token; - -public class ElasticSqlStatementParser extends MySqlStatementParser { - - private static final String LOW_PRIORITY = "LOW_PRIORITY"; - private static final String QUICK = "QUICK"; - private static final String IGNORE = "IGNORE"; - private static final String USING = "USING"; - - public ElasticSqlStatementParser(String sql) { - super(sql); - } - - public ElasticSqlStatementParser(Lexer lexer) { - super(lexer); - } - - @Override - public MySqlDeleteStatement parseDeleteStatement() { - ElasticSqlDeleteStatement deleteStatement = new ElasticSqlDeleteStatement(); - - if (lexer.token() == Token.DELETE) { - lexer.nextToken(); - - if (lexer.token() == Token.COMMENT) { - lexer.nextToken(); - } - - getExprParser().parseHints(deleteStatement.getHints()); - - if (identifierEquals(LOW_PRIORITY)) { - deleteStatement.setLowPriority(true); - lexer.nextToken(); - } - - if (identifierEquals(QUICK)) { - deleteStatement.setQuick(true); - lexer.nextToken(); - } - - if (identifierEquals(IGNORE)) { - deleteStatement.setIgnore(true); - lexer.nextToken(); - } - - if (lexer.token() == Token.IDENTIFIER) { - deleteStatement.setTableSource(createSQLSelectParser().parseTableSource()); - - if (lexer.token() == Token.FROM) { - lexer.nextToken(); - SQLTableSource tableSource = createSQLSelectParser().parseTableSource(); - deleteStatement.setFrom(tableSource); - } - } else if (lexer.token() == Token.FROM) { - lexer.nextToken(); - deleteStatement.setTableSource(createSQLSelectParser().parseTableSource()); - } else { - throw new ParserException("syntax error"); - } - - if (identifierEquals(USING)) { - lexer.nextToken(); - - SQLTableSource tableSource = createSQLSelectParser().parseTableSource(); - deleteStatement.setUsing(tableSource); - } - } - - if (lexer.token() == (Token.WHERE)) { - lexer.nextToken(); - SQLExpr where = this.exprParser.expr(); - deleteStatement.setWhere(where); - } - - if (lexer.token() == (Token.ORDER)) { - SQLOrderBy orderBy = exprParser.parseOrderBy(); - deleteStatement.setOrderBy(orderBy); - } - - deleteStatement.setLimit(parseLimit()); - - return deleteStatement; - } -} diff --git a/src/main/java/org/nlpcn/es4sql/parse/SQLParensIdentifierExpr.java b/src/main/java/org/nlpcn/es4sql/parse/SQLParensIdentifierExpr.java index 56d33a02..a02aafa3 100644 --- a/src/main/java/org/nlpcn/es4sql/parse/SQLParensIdentifierExpr.java +++ b/src/main/java/org/nlpcn/es4sql/parse/SQLParensIdentifierExpr.java @@ -1,6 +1,8 @@ package org.nlpcn.es4sql.parse; +import com.alibaba.druid.sql.ast.SQLExprImpl; import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; +import com.alibaba.druid.sql.visitor.SQLASTVisitor; /* @@ -24,16 +26,30 @@ * This is for tracking in group bys the difference between "group by state, age" and "group by (state), (age)". * For non group by identifiers, it acts as a normal SQLIdentifierExpr. */ -public class SQLParensIdentifierExpr extends SQLIdentifierExpr { +public class SQLParensIdentifierExpr extends SQLExprImpl { - public SQLParensIdentifierExpr() { + private SQLIdentifierExpr expr; + + public SQLParensIdentifierExpr(SQLIdentifierExpr expr) { + this.expr = new SQLIdentifierExpr(expr.getName()); } - public SQLParensIdentifierExpr(String name) { - super(name); + public SQLIdentifierExpr getExpr() { + return expr; } - public SQLParensIdentifierExpr(SQLIdentifierExpr expr) { - super(expr.getName()); + @Override + public boolean equals(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public int hashCode() { + throw new UnsupportedOperationException(); + } + + @Override + protected void accept0(SQLASTVisitor visitor) { + throw new UnsupportedOperationException(); } } diff --git a/src/main/java/org/nlpcn/es4sql/parse/SqlParser.java b/src/main/java/org/nlpcn/es4sql/parse/SqlParser.java index facfd251..780e7b87 100644 --- a/src/main/java/org/nlpcn/es4sql/parse/SqlParser.java +++ b/src/main/java/org/nlpcn/es4sql/parse/SqlParser.java @@ -1,21 +1,48 @@ package org.nlpcn.es4sql.parse; -import java.util.*; - -import com.alibaba.druid.sql.ast.expr.*; -import com.alibaba.druid.sql.ast.statement.*; -import com.alibaba.druid.sql.ast.*; -import com.alibaba.druid.sql.dialect.mysql.ast.expr.MySqlSelectGroupByExpr; +import com.alibaba.druid.sql.ast.SQLCommentHint; +import com.alibaba.druid.sql.ast.SQLExpr; +import com.alibaba.druid.sql.ast.SQLLimit; +import com.alibaba.druid.sql.ast.SQLOrderBy; +import com.alibaba.druid.sql.ast.SQLOrderingSpecification; +import com.alibaba.druid.sql.ast.expr.SQLCaseExpr; +import com.alibaba.druid.sql.ast.expr.SQLCharExpr; +import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; +import com.alibaba.druid.sql.ast.expr.SQLListExpr; +import com.alibaba.druid.sql.ast.expr.SQLMethodInvokeExpr; +import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr; +import com.alibaba.druid.sql.ast.expr.SQLQueryExpr; +import com.alibaba.druid.sql.ast.statement.SQLDeleteStatement; +import com.alibaba.druid.sql.ast.statement.SQLExprTableSource; +import com.alibaba.druid.sql.ast.statement.SQLJoinTableSource; +import com.alibaba.druid.sql.ast.statement.SQLSelectGroupByClause; +import com.alibaba.druid.sql.ast.statement.SQLSelectItem; +import com.alibaba.druid.sql.ast.statement.SQLSelectOrderByItem; +import com.alibaba.druid.sql.ast.statement.SQLTableSource; +import com.alibaba.druid.sql.ast.statement.SQLUnionQuery; +import com.alibaba.druid.sql.dialect.mysql.ast.expr.MySqlOrderingExpr; +import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlDeleteStatement; import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock; - - import org.elasticsearch.search.sort.ScriptSortBuilder; -import org.nlpcn.es4sql.domain.*; +import org.nlpcn.es4sql.domain.Condition; +import org.nlpcn.es4sql.domain.Delete; +import org.nlpcn.es4sql.domain.Field; +import org.nlpcn.es4sql.domain.From; +import org.nlpcn.es4sql.domain.JoinSelect; +import org.nlpcn.es4sql.domain.Query; +import org.nlpcn.es4sql.domain.Select; +import org.nlpcn.es4sql.domain.TableOnJoinSelect; +import org.nlpcn.es4sql.domain.Where; import org.nlpcn.es4sql.domain.hints.Hint; import org.nlpcn.es4sql.domain.hints.HintFactory; import org.nlpcn.es4sql.exception.SqlParseException; import org.nlpcn.es4sql.query.multi.MultiQuerySelect; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + /** * es sql support @@ -93,9 +120,9 @@ public Delete parseDelete(SQLDeleteStatement deleteStatement) throws SqlParseExc delete.setWhere(whereParser.findWhere()); - delete.getHints().addAll(parseHints(((ElasticSqlDeleteStatement) deleteStatement).getHints())); + delete.getHints().addAll(parseHints(((MySqlDeleteStatement) deleteStatement).getHints())); - findLimit(((ElasticSqlDeleteStatement) deleteStatement).getLimit(), delete); + findLimit(((MySqlDeleteStatement) deleteStatement).getLimit(), delete); return delete; } @@ -125,8 +152,8 @@ private void findGroupBy(MySqlSelectQueryBlock query, Select select) throws SqlP List standardGroupBys = new ArrayList<>(); for (SQLExpr sqlExpr : items) { //todo: mysql expr patch - if (sqlExpr instanceof MySqlSelectGroupByExpr) { - MySqlSelectGroupByExpr sqlSelectGroupByExpr = (MySqlSelectGroupByExpr) sqlExpr; + if (sqlExpr instanceof MySqlOrderingExpr) { + MySqlOrderingExpr sqlSelectGroupByExpr = (MySqlOrderingExpr) sqlExpr; sqlExpr = sqlSelectGroupByExpr.getExpr(); } if ((sqlExpr instanceof SQLParensIdentifierExpr || !(sqlExpr instanceof SQLIdentifierExpr || sqlExpr instanceof SQLMethodInvokeExpr)) && !standardGroupBys.isEmpty()) { @@ -138,7 +165,7 @@ private void findGroupBy(MySqlSelectQueryBlock query, Select select) throws SqlP if (sqlExpr instanceof SQLParensIdentifierExpr) { // single item with parens (should get its own aggregation) - select.addGroupBy(FieldMaker.makeField(sqlExpr, null, sqlTableSource.getAlias())); + select.addGroupBy(FieldMaker.makeField(((SQLParensIdentifierExpr) sqlExpr).getExpr(), null, sqlTableSource.getAlias())); } else if (sqlExpr instanceof SQLListExpr) { // multiple items in their own list SQLListExpr listExpr = (SQLListExpr) sqlExpr; @@ -235,7 +262,7 @@ private ScriptSortBuilder.ScriptSortType judgeIsStringSort(SQLExpr expr) { return ScriptSortBuilder.ScriptSortType.NUMBER; } - private void findLimit(MySqlSelectQueryBlock.Limit limit, Query query) { + private void findLimit(SQLLimit limit, Query query) { if (limit == null) { return; @@ -320,7 +347,7 @@ private Map> splitAndFindOrder(SQLOrderBy ord return aliasToOrderBys; } - private void updateJoinLimit(MySqlSelectQueryBlock.Limit limit, JoinSelect joinSelect) { + private void updateJoinLimit(SQLLimit limit, JoinSelect joinSelect) { if (limit != null && limit.getRowCount() != null) { int sizeLimit = Integer.parseInt(limit.getRowCount().toString()); joinSelect.setTotalLimit(sizeLimit); diff --git a/src/main/java/org/nlpcn/es4sql/query/ESActionFactory.java b/src/main/java/org/nlpcn/es4sql/query/ESActionFactory.java index 2d64ecb0..1265bb89 100644 --- a/src/main/java/org/nlpcn/es4sql/query/ESActionFactory.java +++ b/src/main/java/org/nlpcn/es4sql/query/ESActionFactory.java @@ -1,13 +1,16 @@ package org.nlpcn.es4sql.query; - import com.alibaba.druid.sql.ast.SQLExpr; import com.alibaba.druid.sql.ast.expr.SQLQueryExpr; import com.alibaba.druid.sql.ast.statement.SQLDeleteStatement; import com.alibaba.druid.sql.ast.statement.SQLJoinTableSource; import com.alibaba.druid.sql.ast.statement.SQLUnionQuery; import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock; -import com.alibaba.druid.sql.parser.*; +import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser; +import com.alibaba.druid.sql.parser.ParserException; +import com.alibaba.druid.sql.parser.SQLExprParser; +import com.alibaba.druid.sql.parser.SQLStatementParser; +import com.alibaba.druid.sql.parser.Token; import org.elasticsearch.client.Client; import org.elasticsearch.plugin.nlpcn.ElasticResultHandler; import org.elasticsearch.plugin.nlpcn.QueryActionElasticExecutor; @@ -19,7 +22,6 @@ import org.nlpcn.es4sql.exception.SqlParseException; import org.nlpcn.es4sql.parse.ElasticLexer; import org.nlpcn.es4sql.parse.ElasticSqlExprParser; -import org.nlpcn.es4sql.parse.ElasticSqlStatementParser; import org.nlpcn.es4sql.parse.SqlParser; import org.nlpcn.es4sql.parse.SubQueryExpression; import org.nlpcn.es4sql.query.join.ESJoinQueryActionFactory; @@ -123,7 +125,7 @@ private static QueryAction handleSelect(Client client, Select select) { private static SQLStatementParser createSqlStatementParser(String sql) { ElasticLexer lexer = new ElasticLexer(sql); lexer.nextToken(); - return new ElasticSqlStatementParser(lexer); + return new MySqlStatementParser(lexer); } private static boolean isJoin(SQLQueryExpr sqlExpr,String sql) {