diff --git a/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-facade/src/main/java/org/apache/shardingsphere/cluster/facade/ClusterFacade.java b/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-facade/src/main/java/org/apache/shardingsphere/cluster/facade/ClusterFacade.java index 81b7cef5dd83e..2343ff513ed14 100644 --- a/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-facade/src/main/java/org/apache/shardingsphere/cluster/facade/ClusterFacade.java +++ b/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-facade/src/main/java/org/apache/shardingsphere/cluster/facade/ClusterFacade.java @@ -83,12 +83,8 @@ private void buildDataSourceState(final String schemaName, final Collection> { + + private Boolean retryEnable; + + private Integer retryMaximum; + + private Integer retryInterval; + + public AbstractHeartbeatDetect(final Boolean retryEnable, final Integer retryMaximum, final Integer retryInterval) { + this.retryEnable = retryEnable; + this.retryMaximum = retryMaximum; + this.retryInterval = retryInterval; + } + + /** + * Detect heart beat. + * + * @return heart beat result. + */ + protected abstract Boolean detect(); + + /** + * Build heart beat result. + * + * @param result heart beat result + * @return heart beat result + */ + protected abstract Map buildResult(Boolean result); + + @Override + public Map call() { + if (retryEnable && retryMaximum > 0) { + Boolean result = Boolean.FALSE; + for (int i = 0; i < retryMaximum; i++) { + result = detect(); + if (result) { + break; + } + try { + Thread.sleep(retryInterval * 1000); + } catch (InterruptedException ex) { + log.warn("Retry heart beat detect sleep error", ex); + } + } + return buildResult(result); + } else { + return buildResult(detect()); + } + } +} diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/cluster/HeartbeatDetect.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/cluster/HeartbeatDetect.java new file mode 100644 index 0000000000000..a9b90dabf04d3 --- /dev/null +++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/cluster/HeartbeatDetect.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.proxy.backend.cluster; + +import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.cluster.configuration.config.HeartbeatConfiguration; +import org.apache.shardingsphere.cluster.heartbeat.response.HeartbeatResult; + +import javax.sql.DataSource; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Heart beat detect. + */ +@Slf4j +public final class HeartbeatDetect extends AbstractHeartbeatDetect { + + private String sql; + + private String schemaName; + + private String dataSourceName; + + private DataSource dataSource; + + public HeartbeatDetect(final String schemaName, final String dataSourceName, final DataSource dataSource, final HeartbeatConfiguration configuration) { + super(configuration.getRetryEnable(), configuration.getRetryMaximum(), configuration.getRetryInterval()); + this.sql = configuration.getSql(); + this.schemaName = schemaName; + this.dataSourceName = dataSourceName; + this.dataSource = dataSource; + } + + @Override + protected Boolean detect() { + try { + PreparedStatement preparedStatement = dataSource.getConnection().prepareStatement(sql); + ResultSet result = preparedStatement.executeQuery(); + return Objects.nonNull(result) && result.next(); + } catch (SQLException ex) { + log.error("Heart beat detect error", ex); + } + return Boolean.FALSE; + } + + @Override + protected Map buildResult(final Boolean result) { + Map heartBeatResultMap = new HashMap<>(1, 1); + heartBeatResultMap.put(schemaName, new HeartbeatResult(dataSourceName, result, System.currentTimeMillis())); + return heartBeatResultMap; + } +} diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/cluster/HeartbeatHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/cluster/HeartbeatHandler.java new file mode 100644 index 0000000000000..5a546de740f86 --- /dev/null +++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/cluster/HeartbeatHandler.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.proxy.backend.cluster; + +import com.google.common.base.Preconditions; +import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.cluster.configuration.config.HeartbeatConfiguration; + +import org.apache.shardingsphere.cluster.facade.ClusterFacade; +import org.apache.shardingsphere.cluster.heartbeat.response.HeartbeatResponse; +import org.apache.shardingsphere.cluster.heartbeat.response.HeartbeatResult; +import org.apache.shardingsphere.proxy.backend.schema.ShardingSphereSchema; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Arrays; +import java.util.Objects; +import java.util.Collection; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * Heart beat handler. + */ +@Slf4j +public final class HeartbeatHandler { + + private HeartbeatConfiguration configuration; + + /** + * Init heart beat handler. + * + * @param configuration heart beat configuration + */ + public void init(final HeartbeatConfiguration configuration) { + Preconditions.checkNotNull(configuration, "heart beat configuration can not be null."); + this.configuration = configuration; + } + + /** + * Get heart beat handler instance. + * + * @return heart beat handler instance + */ + public static HeartbeatHandler getInstance() { + return HeartbeatHandlerHolder.INSTANCE; + } + + /** + * Handle heart beat detect event. + * + * @param schemas ShardingSphere schemas + */ + public void handle(final Map schemas) { + ExecutorService executorService = Executors.newFixedThreadPool(countDataSource(schemas)); + List>> futureTasks = new ArrayList<>(); + schemas.values().forEach(value -> value.getBackendDataSource().getDataSources().entrySet().forEach(entry -> { + FutureTask> futureTask = new FutureTask<>(new HeartbeatDetect(value.getName(), entry.getKey(), + entry.getValue(), configuration)); + futureTasks.add(futureTask); + executorService.submit(futureTask); + })); + reportHeartbeat(futureTasks); + closeExecutor(executorService); + } + + private Integer countDataSource(final Map schemas) { + return Long.valueOf(schemas.values().stream() + .collect(Collectors.summarizingInt(entry -> entry.getBackendDataSource() + .getDataSources().keySet().size())).getSum()).intValue(); + } + + private void reportHeartbeat(final List>> futureTasks) { + Map> heartbeatResultMap = new HashMap<>(); + futureTasks.stream().forEach(each -> { + try { + each.get().entrySet().forEach(entry -> { + if (Objects.isNull(heartbeatResultMap.get(entry.getKey()))) { + heartbeatResultMap.put(entry.getKey(), new ArrayList<>(Arrays.asList(entry.getValue()))); + } else { + heartbeatResultMap.get(entry.getKey()).add(entry.getValue()); + } + }); + } catch (InterruptedException | ExecutionException ex) { + log.error("Heart beat report error", ex); + } + }); + ClusterFacade.getInstance().reportHeartbeat(new HeartbeatResponse(heartbeatResultMap)); + } + + private void closeExecutor(final ExecutorService executorService) { + if (null != executorService && !executorService.isShutdown()) { + executorService.shutdown(); + } + } + + private static final class HeartbeatHandlerHolder { + + public static final HeartbeatHandler INSTANCE = new HeartbeatHandler(); + } +} diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/schema/ShardingSphereSchemas.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/schema/ShardingSphereSchemas.java index 0eed39e644495..77669dccf9fe8 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/schema/ShardingSphereSchemas.java +++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/schema/ShardingSphereSchemas.java @@ -20,6 +20,8 @@ import com.google.common.base.Strings; import com.google.common.eventbus.Subscribe; import lombok.Getter; +import org.apache.shardingsphere.cluster.heartbeat.event.HeartbeatDetectNoticeEvent; +import org.apache.shardingsphere.cluster.heartbeat.eventbus.HeartbeatEventBus; import org.apache.shardingsphere.infra.config.RuleConfiguration; import org.apache.shardingsphere.infra.database.type.DatabaseType; import org.apache.shardingsphere.infra.database.type.DatabaseTypes; @@ -27,6 +29,7 @@ import org.apache.shardingsphere.orchestration.core.common.event.SchemaAddedEvent; import org.apache.shardingsphere.orchestration.core.common.event.SchemaDeletedEvent; import org.apache.shardingsphere.orchestration.core.common.eventbus.ShardingOrchestrationEventBus; +import org.apache.shardingsphere.proxy.backend.cluster.HeartbeatHandler; import org.apache.shardingsphere.proxy.backend.communication.jdbc.recognizer.JDBCDriverURLRecognizerEngine; import org.apache.shardingsphere.proxy.backend.util.DataSourceConverter; @@ -53,6 +56,7 @@ public final class ShardingSphereSchemas { private ShardingSphereSchemas() { ShardingOrchestrationEventBus.getInstance().register(this); + HeartbeatEventBus.getInstance().register(this); } /** @@ -142,4 +146,14 @@ public synchronized void renew(final SchemaAddedEvent schemaAddedEvent) throws S public synchronized void renew(final SchemaDeletedEvent schemaDeletedEvent) { schemas.remove(schemaDeletedEvent.getShardingSchemaName()); } + + /** + * Heart beat detect. + * + * @param event heart beat detect notice event + */ + @Subscribe + public synchronized void heartbeat(final HeartbeatDetectNoticeEvent event) { + HeartbeatHandler.getInstance().handle(schemas); + } } diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java index 58fa8e01da17d..00b3115419167 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java +++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java @@ -21,6 +21,9 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; import org.apache.commons.collections4.CollectionUtils; +import org.apache.shardingsphere.cluster.configuration.swapper.ClusterConfigurationYamlSwapper; +import org.apache.shardingsphere.cluster.configuration.yaml.YamlClusterConfiguration; +import org.apache.shardingsphere.cluster.facade.ClusterFacade; import org.apache.shardingsphere.infra.auth.Authentication; import org.apache.shardingsphere.infra.auth.yaml.config.YamlAuthenticationConfiguration; import org.apache.shardingsphere.infra.auth.yaml.swapper.AuthenticationYamlSwapper; @@ -39,6 +42,7 @@ import org.apache.shardingsphere.orchestration.center.yaml.config.YamlOrchestrationConfiguration; import org.apache.shardingsphere.orchestration.center.yaml.swapper.OrchestrationConfigurationYamlSwapper; import org.apache.shardingsphere.orchestration.core.facade.ShardingOrchestrationFacade; +import org.apache.shardingsphere.proxy.backend.cluster.HeartbeatHandler; import org.apache.shardingsphere.proxy.backend.schema.ProxySchemaContexts; import org.apache.shardingsphere.proxy.backend.schema.ShardingSphereSchemas; import org.apache.shardingsphere.proxy.backend.util.DataSourceConverter; @@ -132,6 +136,7 @@ private static void startWithRegistryCenter(final YamlProxyServerConfiguration s ProxySchemaContexts.getInstance().init(schemaDataSources, schemaRules, authentication, properties); log(authentication, properties); initMetrics(serverConfig.getMetrics()); + initCluster(serverConfig.getCluster()); startProxy(shardingSchemaNames, port, schemaDataSources, schemaRules); } } @@ -186,6 +191,13 @@ private static void initMetrics(final YamlMetricsConfiguration metricsConfigurat } } + private static void initCluster(final YamlClusterConfiguration clusterConfiguration) { + if (ProxySchemaContexts.getInstance().getSchemaContexts().getProperties().getValue(ConfigurationPropertyKey.PROXY_CLUSTER_ENABLED)) { + ClusterFacade.getInstance().init(new ClusterConfigurationYamlSwapper().swap(clusterConfiguration)); + HeartbeatHandler.getInstance().init(new ClusterConfigurationYamlSwapper().swap(clusterConfiguration).getHeartbeat()); + } + } + private static Map> getDataSourceConfigurationMap(final Map ruleConfigs) { Map> result = new LinkedHashMap<>(); for (Entry entry : ruleConfigs.entrySet()) {