Skip to content

Commit

Permalink
Merge pull request #25 from sharding-sphere/dev
Browse files Browse the repository at this point in the history
update from origin
  • Loading branch information
beckhampu committed Aug 22, 2018
2 parents 2b0af72 + d4511ab commit ff44ebc
Show file tree
Hide file tree
Showing 121 changed files with 1,909 additions and 1,170 deletions.
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ Please answer these questions before submitting your issue. Thanks!

### Reason analyze

### Steps to reproduce the behavior
### Steps to reproduce the behavior, such as: SQL to execute, sharding rule configuration, when exception occur etc

### For bug report, please *MUST* provide the reproduce example codes (such as a github link).
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
cache:
directories:
- "$HOME/.m2"
language: java
jdk:
- oraclejdk8
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# [Sharding-Sphere - Distributed Database Middleware Ecosphere](http://shardingsphere.io/)

Official website: http://shardingsphere.io/

[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
[![Gitter](https://badges.gitter.im/shardingsphere/shardingsphere.svg)](https://gitter.im/shardingsphere/Lobby)
[![GitHub release](https://img.shields.io/github/release/sharding-sphere/sharding-sphere.svg)](https://github.com/sharding-sphere/sharding-sphere/releases)
Expand Down
2 changes: 2 additions & 0 deletions README_ZH.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# [Sharding-Sphere - 分布式数据库中间层生态圈](http://shardingsphere.io/index_zh.html)

官方网站: http://shardingsphere.io/

[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
[![Gitter](https://badges.gitter.im/shardingsphere/shardingsphere.svg)](https://gitter.im/shardingsphere/Lobby)
[![GitHub release](https://img.shields.io/github/release/sharding-sphere/sharding-sphere.svg)](https://github.com/sharding-sphere/sharding-sphere/releases)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package io.shardingsphere.core.executor;

import java.sql.SQLException;

/**
* Sharding execute callback.
*
Expand All @@ -32,7 +34,7 @@ public interface ShardingExecuteCallback<I, O> {
*
* @param input input value
* @return execute result
* @throws Exception throw when execute failure
* @throws SQLException throw when execute failure
*/
O execute(I input) throws Exception;
O execute(I input) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.shardingsphere.core.exception.ShardingException;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -61,16 +63,10 @@ public ShardingExecuteEngine(final int executorSize) {
* @param <I> type of input value
* @param <O> type of return value
* @return execute result
* @throws Exception throw if execute failure
* @throws SQLException throw if execute failure
*/
public <I, O> List<O> execute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> callback) throws Exception {
if (inputs.isEmpty()) {
return Collections.emptyList();
}
Iterator<I> inputIterator = inputs.iterator();
I firstInput = inputIterator.next();
Collection<ListenableFuture<O>> restFutures = asyncExecute(Lists.newArrayList(inputIterator), callback);
return getResults(syncExecute(firstInput, callback), restFutures);
public <I, O> List<O> execute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> callback) throws SQLException {
return execute(inputs, null, callback);
}

/**
Expand All @@ -82,16 +78,16 @@ public <I, O> List<O> execute(final Collection<I> inputs, final ShardingExecuteC
* @param <I> type of input value
* @param <O> type of return value
* @return execute result
* @throws Exception throw if execute failure
* @throws SQLException throw if execute failure
*/
public <I, O> List<O> execute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> firstCallback, final ShardingExecuteCallback<I, O> callback) throws Exception {
public <I, O> List<O> execute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> firstCallback, final ShardingExecuteCallback<I, O> callback) throws SQLException {
if (inputs.isEmpty()) {
return Collections.emptyList();
}
Iterator<I> inputIterator = inputs.iterator();
I firstInput = inputIterator.next();
Collection<ListenableFuture<O>> restFutures = asyncExecute(Lists.newArrayList(inputIterator), callback);
return getResults(syncExecute(firstInput, firstCallback), restFutures);
return getResults(syncExecute(firstInput, null == firstCallback ? callback : firstCallback), restFutures);
}

private <I, O> Collection<ListenableFuture<O>> asyncExecute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> callback) {
Expand All @@ -100,23 +96,27 @@ private <I, O> Collection<ListenableFuture<O>> asyncExecute(final Collection<I>
result.add(executorService.submit(new Callable<O>() {

@Override
public O call() throws Exception {
public O call() throws SQLException {
return callback.execute(each);
}
}));
}
return result;
}

private <I, O> O syncExecute(final I input, final ShardingExecuteCallback<I, O> callback) throws Exception {
private <I, O> O syncExecute(final I input, final ShardingExecuteCallback<I, O> callback) throws SQLException {
return callback.execute(input);
}

private <O> List<O> getResults(final O firstResult, final Collection<ListenableFuture<O>> restFutures) throws ExecutionException, InterruptedException {
private <O> List<O> getResults(final O firstResult, final Collection<ListenableFuture<O>> restFutures) throws SQLException {
List<O> result = new LinkedList<>();
result.add(firstResult);
for (ListenableFuture<O> each : restFutures) {
result.add(each.get());
try {
result.add(each.get());
} catch (final InterruptedException | ExecutionException ex) {
return throwException(ex);
}
}
return result;
}
Expand All @@ -129,16 +129,10 @@ private <O> List<O> getResults(final O firstResult, final Collection<ListenableF
* @param <I> type of input value
* @param <O> type of return value
* @return execute result
* @throws Exception throw if execute failure
* @throws SQLException throw if execute failure
*/
public <I, O> List<O> groupExecute(final Map<String, Collection<I>> inputs, final ShardingGroupExecuteCallback<I, O> callback) throws Exception {
if (inputs.isEmpty()) {
return Collections.emptyList();
}
String firstKey = inputs.keySet().iterator().next();
Collection<I> firstInputs = inputs.remove(firstKey);
Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(inputs, callback);
return getGroupResults(syncGroupExecute(firstKey, firstInputs, callback), restResultFutures);
public <I, O> List<O> groupExecute(final Map<String, Collection<I>> inputs, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
return groupExecute(inputs, null, callback);
}

/**
Expand All @@ -150,17 +144,17 @@ public <I, O> List<O> groupExecute(final Map<String, Collection<I>> inputs, fina
* @param <I> type of input value
* @param <O> type of return value
* @return execute result
* @throws Exception throw if execute failure
* @throws SQLException throw if execute failure
*/
public <I, O> List<O> groupExecute(
final Map<String, Collection<I>> inputs, final ShardingGroupExecuteCallback<I, O> firstCallback, final ShardingGroupExecuteCallback<I, O> callback) throws Exception {
final Map<String, Collection<I>> inputs, final ShardingGroupExecuteCallback<I, O> firstCallback, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
if (inputs.isEmpty()) {
return Collections.emptyList();
}
String firstKey = inputs.keySet().iterator().next();
Collection<I> firstInputs = inputs.remove(firstKey);
Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(inputs, callback);
return getGroupResults(syncGroupExecute(firstKey, firstInputs, firstCallback), restResultFutures);
return getGroupResults(syncGroupExecute(firstKey, firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
}

private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final Map<String, Collection<I>> inputs, final ShardingGroupExecuteCallback<I, O> callback) {
Expand All @@ -169,27 +163,38 @@ private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(fin
result.add(executorService.submit(new Callable<Collection<O>>() {

@Override
public Collection<O> call() throws Exception {
public Collection<O> call() throws SQLException {
return callback.execute(entry.getKey(), entry.getValue());
}
}));
}
return result;
}

private <I, O> Collection<O> syncGroupExecute(final String dataSourceName, final Collection<I> inputs, final ShardingGroupExecuteCallback<I, O> callback) throws Exception {
private <I, O> Collection<O> syncGroupExecute(final String dataSourceName, final Collection<I> inputs, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
return callback.execute(dataSourceName, inputs);
}

private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<ListenableFuture<Collection<O>>> restFutures) throws ExecutionException, InterruptedException {
private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<ListenableFuture<Collection<O>>> restFutures) throws SQLException {
List<O> result = new LinkedList<>();
result.addAll(firstResults);
for (ListenableFuture<Collection<O>> each : restFutures) {
result.addAll(each.get());
try {
result.addAll(each.get());
} catch (final InterruptedException | ExecutionException ex) {
return throwException(ex);
}
}
return result;
}

private <O> List<O> throwException(final Exception ex) throws SQLException {
if (ex.getCause() instanceof SQLException) {
throw (SQLException) ex.getCause();
}
throw new ShardingException(ex);
}

@Override
public void close() {
SHUTDOWN_EXECUTOR.execute(new Runnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package io.shardingsphere.core.executor;

import java.sql.SQLException;
import java.util.Collection;

/**
Expand All @@ -35,7 +36,7 @@ public interface ShardingGroupExecuteCallback<I, O> {
* @param key input key
* @param values input values
* @return execute result
* @throws Exception throw when execute failure
* @throws SQLException throw when execute failure
*/
Collection<O> execute(String key, Collection<I> values) throws Exception;
Collection<O> execute(String key, Collection<I> values) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@
* </p>
*/

package io.shardingsphere.core.executor;
package io.shardingsphere.core.executor.sql;

import com.google.common.eventbus.EventBus;
import io.shardingsphere.core.constant.SQLType;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.executor.event.sql.SQLExecutionEvent;
import io.shardingsphere.core.executor.event.sql.SQLExecutionEventFactory;
import io.shardingsphere.core.executor.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.executor.threadlocal.ExecutorExceptionHandler;
import io.shardingsphere.core.executor.ShardingExecuteCallback;
import io.shardingsphere.core.executor.ShardingGroupExecuteCallback;
import io.shardingsphere.core.executor.sql.event.sql.SQLExecutionEvent;
import io.shardingsphere.core.executor.sql.event.sql.SQLExecutionEventFactory;
import io.shardingsphere.core.executor.sql.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.executor.sql.threadlocal.ExecutorExceptionHandler;
import lombok.RequiredArgsConstructor;

import java.sql.SQLException;
Expand Down Expand Up @@ -52,20 +54,20 @@ public abstract class SQLExecuteCallback<T> implements ShardingExecuteCallback<S
private final EventBus shardingEventBus = ShardingEventBusInstance.getInstance();

@Override
public final T execute(final StatementExecuteUnit executeUnit) throws Exception {
return executeInternal(executeUnit);
public final T execute(final StatementExecuteUnit executeUnit) throws SQLException {
return execute0(executeUnit);
}

@Override
public final Collection<T> execute(final String dataSourceName, final Collection<StatementExecuteUnit> executeUnits) throws Exception {
public final Collection<T> execute(final String dataSourceName, final Collection<StatementExecuteUnit> executeUnits) throws SQLException {
Collection<T> result = new LinkedList<>();
for (StatementExecuteUnit each : executeUnits) {
result.add(executeInternal(each));
result.add(execute0(each));
}
return result;
}

private T executeInternal(final StatementExecuteUnit executeUnit) throws Exception {
private T execute0(final StatementExecuteUnit executeUnit) throws SQLException {
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
List<SQLExecutionEvent> events = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
* </p>
*/

package io.shardingsphere.core.executor;
package io.shardingsphere.core.executor.sql;

import io.shardingsphere.core.constant.ConnectionMode;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.executor.event.overall.OverallExecutionEvent;
import io.shardingsphere.core.executor.threadlocal.ExecutorExceptionHandler;
import io.shardingsphere.core.executor.ShardingExecuteEngine;
import io.shardingsphere.core.executor.sql.event.overall.OverallExecutionEvent;
import io.shardingsphere.core.executor.sql.threadlocal.ExecutorExceptionHandler;
import lombok.RequiredArgsConstructor;

import java.sql.SQLException;
Expand All @@ -42,7 +43,7 @@
@RequiredArgsConstructor
public final class SQLExecuteTemplate {

private final ShardingExecuteEngine shardingExecuteEngine;
private final ShardingExecuteEngine executeEngine;

private final ConnectionMode connectionMode;

Expand All @@ -56,11 +57,26 @@ public final class SQLExecuteTemplate {
* @throws SQLException SQL exception
*/
public <T> List<T> execute(final Collection<? extends StatementExecuteUnit> executeUnits, final SQLExecuteCallback<T> executeCallback) throws SQLException {
return execute(executeUnits, null, executeCallback);
}

/**
* Execute.
*
* @param executeUnits execute units
* @param firstExecuteCallback first execute callback
* @param executeCallback execute callback
* @param <T> class type of return value
* @return execute result
* @throws SQLException SQL exception
*/
public <T> List<T> execute(
final Collection<? extends StatementExecuteUnit> executeUnits, final SQLExecuteCallback<T> firstExecuteCallback, final SQLExecuteCallback<T> executeCallback) throws SQLException {
OverallExecutionEvent event = new OverallExecutionEvent(executeUnits.size() > 1);
ShardingEventBusInstance.getInstance().post(event);
try {
List<T> result = ConnectionMode.MEMORY_STRICTLY == connectionMode ? shardingExecuteEngine.execute(new LinkedList<>(executeUnits), executeCallback)
: shardingExecuteEngine.groupExecute(getExecuteUnitGroups(executeUnits), executeCallback);
List<T> result = ConnectionMode.MEMORY_STRICTLY == connectionMode ? executeEngine.execute(new LinkedList<>(executeUnits), firstExecuteCallback, executeCallback)
: executeEngine.groupExecute(getExecuteUnitGroups(executeUnits), firstExecuteCallback, executeCallback);
event.setExecuteSuccess();
return result;
// CHECKSTYLE:OFF
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* </p>
*/

package io.shardingsphere.core.executor;
package io.shardingsphere.core.executor.sql;

import io.shardingsphere.core.routing.SQLExecutionUnit;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* </p>
*/

package io.shardingsphere.core.executor.event.overall;
package io.shardingsphere.core.executor.sql.event.overall;

import io.shardingsphere.core.event.ShardingEvent;
import lombok.Getter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* </p>
*/

package io.shardingsphere.core.executor.event.sql;
package io.shardingsphere.core.executor.sql.event.sql;

import io.shardingsphere.core.routing.SQLUnit;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* </p>
*/

package io.shardingsphere.core.executor.event.sql;
package io.shardingsphere.core.executor.sql.event.sql;

import io.shardingsphere.core.routing.SQLUnit;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* </p>
*/

package io.shardingsphere.core.executor.event.sql;
package io.shardingsphere.core.executor.sql.event.sql;

import io.shardingsphere.core.event.ShardingEvent;
import io.shardingsphere.core.routing.SQLUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
* </p>
*/

package io.shardingsphere.core.executor.event.sql;
package io.shardingsphere.core.executor.sql.event.sql;

import io.shardingsphere.core.constant.SQLType;
import io.shardingsphere.core.executor.StatementExecuteUnit;
import io.shardingsphere.core.executor.sql.StatementExecuteUnit;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

Expand Down
Loading

0 comments on commit ff44ebc

Please sign in to comment.