Skip to content

Commit

Permalink
Proxy handle heart beat event #control-panel-cluster (#5795)
Browse files Browse the repository at this point in the history
* proxy detect heart beat

* revise log

* revise class name

* optimize code

* for checkstyle
  • Loading branch information
menghaoranss committed May 26, 2020
1 parent 61e8068 commit 4b192eb
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 9 deletions.
Expand Up @@ -83,12 +83,8 @@ private void buildDataSourceState(final String schemaName, final Collection<Hear
DataSourceState dataSourceState = null == instanceState.getDataSources()
|| null == instanceState.getDataSources().get(dataSourceName) ? new DataSourceState()
: instanceState.getDataSources().get(dataSourceName);
if (each.getEnable()) {
dataSourceState.setState(NodeState.ONLINE);
dataSourceState.setLastConnect(each.getDetectTimeStamp());
} else {
dataSourceState.setState(NodeState.OFFLINE);
}
dataSourceState.setState(each.getEnable() ? NodeState.ONLINE : NodeState.OFFLINE);
dataSourceState.setLastConnect(each.getDetectTimeStamp());
dataSourceStateMap.put(dataSourceName, dataSourceState);
});
}
Expand Down
Expand Up @@ -35,7 +35,7 @@ public HeartbeatTask(final HeartbeatEvent heartbeatEvent) {

@Override
public void run() {
HeartbeatEventBus.getInstance().post(heartbeatEvent);
log.info("heart beat detect running");
HeartbeatEventBus.getInstance().post(heartbeatEvent);
}
}
Expand Up @@ -41,7 +41,7 @@ public RegistryCenter(final String name, final RegistryCenterRepository registry
* Persist instance online.
*/
public void persistInstanceOnline() {
repository.persistEphemeral(node.getInstancesNodeFullPath(instance.getInstanceId()), "");
repository.persistEphemeral(node.getInstancesNodeFullPath(instance.getInstanceId()), "state: " + RegistryCenterNodeStatus.ONLINE);
}

/**
Expand Down
Expand Up @@ -25,5 +25,10 @@ public enum RegistryCenterNodeStatus {
/**
* Disabled state.
*/
DISABLED
DISABLED,

/**
* Online state.
*/
ONLINE;
}
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.proxy.backend.cluster;

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.cluster.heartbeat.response.HeartbeatResult;

import java.util.Map;
import java.util.concurrent.Callable;

/**
* Abstract heart beat detect.
*/
@Slf4j
public abstract class AbstractHeartbeatDetect implements Callable<Map<String, HeartbeatResult>> {

private Boolean retryEnable;

private Integer retryMaximum;

private Integer retryInterval;

public AbstractHeartbeatDetect(final Boolean retryEnable, final Integer retryMaximum, final Integer retryInterval) {
this.retryEnable = retryEnable;
this.retryMaximum = retryMaximum;
this.retryInterval = retryInterval;
}

/**
* Detect heart beat.
*
* @return heart beat result.
*/
protected abstract Boolean detect();

/**
* Build heart beat result.
*
* @param result heart beat result
* @return heart beat result
*/
protected abstract Map<String, HeartbeatResult> buildResult(Boolean result);

@Override
public Map<String, HeartbeatResult> call() {
if (retryEnable && retryMaximum > 0) {
Boolean result = Boolean.FALSE;
for (int i = 0; i < retryMaximum; i++) {
result = detect();
if (result) {
break;
}
try {
Thread.sleep(retryInterval * 1000);
} catch (InterruptedException ex) {
log.warn("Retry heart beat detect sleep error", ex);
}
}
return buildResult(result);
} else {
return buildResult(detect());
}
}
}
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.proxy.backend.cluster;

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.cluster.configuration.config.HeartbeatConfiguration;
import org.apache.shardingsphere.cluster.heartbeat.response.HeartbeatResult;

import javax.sql.DataSource;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* Heart beat detect.
*/
@Slf4j
public final class HeartbeatDetect extends AbstractHeartbeatDetect {

private String sql;

private String schemaName;

private String dataSourceName;

private DataSource dataSource;

public HeartbeatDetect(final String schemaName, final String dataSourceName, final DataSource dataSource, final HeartbeatConfiguration configuration) {
super(configuration.getRetryEnable(), configuration.getRetryMaximum(), configuration.getRetryInterval());
this.sql = configuration.getSql();
this.schemaName = schemaName;
this.dataSourceName = dataSourceName;
this.dataSource = dataSource;
}

@Override
protected Boolean detect() {
try {
PreparedStatement preparedStatement = dataSource.getConnection().prepareStatement(sql);
ResultSet result = preparedStatement.executeQuery();
return Objects.nonNull(result) && result.next();
} catch (SQLException ex) {
log.error("Heart beat detect error", ex);
}
return Boolean.FALSE;
}

@Override
protected Map<String, HeartbeatResult> buildResult(final Boolean result) {
Map<String, HeartbeatResult> heartBeatResultMap = new HashMap<>(1, 1);
heartBeatResultMap.put(schemaName, new HeartbeatResult(dataSourceName, result, System.currentTimeMillis()));
return heartBeatResultMap;
}
}
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.proxy.backend.cluster;

import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.cluster.configuration.config.HeartbeatConfiguration;

import org.apache.shardingsphere.cluster.facade.ClusterFacade;
import org.apache.shardingsphere.cluster.heartbeat.response.HeartbeatResponse;
import org.apache.shardingsphere.cluster.heartbeat.response.HeartbeatResult;
import org.apache.shardingsphere.proxy.backend.schema.ShardingSphereSchema;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Arrays;
import java.util.Objects;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

/**
* Heart beat handler.
*/
@Slf4j
public final class HeartbeatHandler {

private HeartbeatConfiguration configuration;

/**
* Init heart beat handler.
*
* @param configuration heart beat configuration
*/
public void init(final HeartbeatConfiguration configuration) {
Preconditions.checkNotNull(configuration, "heart beat configuration can not be null.");
this.configuration = configuration;
}

/**
* Get heart beat handler instance.
*
* @return heart beat handler instance
*/
public static HeartbeatHandler getInstance() {
return HeartbeatHandlerHolder.INSTANCE;
}

/**
* Handle heart beat detect event.
*
* @param schemas ShardingSphere schemas
*/
public void handle(final Map<String, ShardingSphereSchema> schemas) {
ExecutorService executorService = Executors.newFixedThreadPool(countDataSource(schemas));
List<FutureTask<Map<String, HeartbeatResult>>> futureTasks = new ArrayList<>();
schemas.values().forEach(value -> value.getBackendDataSource().getDataSources().entrySet().forEach(entry -> {
FutureTask<Map<String, HeartbeatResult>> futureTask = new FutureTask<>(new HeartbeatDetect(value.getName(), entry.getKey(),
entry.getValue(), configuration));
futureTasks.add(futureTask);
executorService.submit(futureTask);
}));
reportHeartbeat(futureTasks);
closeExecutor(executorService);
}

private Integer countDataSource(final Map<String, ShardingSphereSchema> schemas) {
return Long.valueOf(schemas.values().stream()
.collect(Collectors.summarizingInt(entry -> entry.getBackendDataSource()
.getDataSources().keySet().size())).getSum()).intValue();
}

private void reportHeartbeat(final List<FutureTask<Map<String, HeartbeatResult>>> futureTasks) {
Map<String, Collection<HeartbeatResult>> heartbeatResultMap = new HashMap<>();
futureTasks.stream().forEach(each -> {
try {
each.get().entrySet().forEach(entry -> {
if (Objects.isNull(heartbeatResultMap.get(entry.getKey()))) {
heartbeatResultMap.put(entry.getKey(), new ArrayList<>(Arrays.asList(entry.getValue())));
} else {
heartbeatResultMap.get(entry.getKey()).add(entry.getValue());
}
});
} catch (InterruptedException | ExecutionException ex) {
log.error("Heart beat report error", ex);
}
});
ClusterFacade.getInstance().reportHeartbeat(new HeartbeatResponse(heartbeatResultMap));
}

private void closeExecutor(final ExecutorService executorService) {
if (null != executorService && !executorService.isShutdown()) {
executorService.shutdown();
}
}

private static final class HeartbeatHandlerHolder {

public static final HeartbeatHandler INSTANCE = new HeartbeatHandler();
}
}
Expand Up @@ -20,13 +20,16 @@
import com.google.common.base.Strings;
import com.google.common.eventbus.Subscribe;
import lombok.Getter;
import org.apache.shardingsphere.cluster.heartbeat.event.HeartbeatDetectNoticeEvent;
import org.apache.shardingsphere.cluster.heartbeat.eventbus.HeartbeatEventBus;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypes;
import org.apache.shardingsphere.kernal.context.schema.DataSourceParameter;
import org.apache.shardingsphere.orchestration.core.common.event.SchemaAddedEvent;
import org.apache.shardingsphere.orchestration.core.common.event.SchemaDeletedEvent;
import org.apache.shardingsphere.orchestration.core.common.eventbus.ShardingOrchestrationEventBus;
import org.apache.shardingsphere.proxy.backend.cluster.HeartbeatHandler;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.recognizer.JDBCDriverURLRecognizerEngine;
import org.apache.shardingsphere.proxy.backend.util.DataSourceConverter;

Expand All @@ -53,6 +56,7 @@ public final class ShardingSphereSchemas {

private ShardingSphereSchemas() {
ShardingOrchestrationEventBus.getInstance().register(this);
HeartbeatEventBus.getInstance().register(this);
}

/**
Expand Down Expand Up @@ -142,4 +146,14 @@ public synchronized void renew(final SchemaAddedEvent schemaAddedEvent) throws S
public synchronized void renew(final SchemaDeletedEvent schemaDeletedEvent) {
schemas.remove(schemaDeletedEvent.getShardingSchemaName());
}

/**
* Heart beat detect.
*
* @param event heart beat detect notice event
*/
@Subscribe
public synchronized void heartbeat(final HeartbeatDetectNoticeEvent event) {
HeartbeatHandler.getInstance().handle(schemas);
}
}
Expand Up @@ -21,6 +21,9 @@
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.shardingsphere.cluster.configuration.swapper.ClusterConfigurationYamlSwapper;
import org.apache.shardingsphere.cluster.configuration.yaml.YamlClusterConfiguration;
import org.apache.shardingsphere.cluster.facade.ClusterFacade;
import org.apache.shardingsphere.infra.auth.Authentication;
import org.apache.shardingsphere.infra.auth.yaml.config.YamlAuthenticationConfiguration;
import org.apache.shardingsphere.infra.auth.yaml.swapper.AuthenticationYamlSwapper;
Expand All @@ -39,6 +42,7 @@
import org.apache.shardingsphere.orchestration.center.yaml.config.YamlOrchestrationConfiguration;
import org.apache.shardingsphere.orchestration.center.yaml.swapper.OrchestrationConfigurationYamlSwapper;
import org.apache.shardingsphere.orchestration.core.facade.ShardingOrchestrationFacade;
import org.apache.shardingsphere.proxy.backend.cluster.HeartbeatHandler;
import org.apache.shardingsphere.proxy.backend.schema.ProxySchemaContexts;
import org.apache.shardingsphere.proxy.backend.schema.ShardingSphereSchemas;
import org.apache.shardingsphere.proxy.backend.util.DataSourceConverter;
Expand Down Expand Up @@ -132,6 +136,7 @@ private static void startWithRegistryCenter(final YamlProxyServerConfiguration s
ProxySchemaContexts.getInstance().init(schemaDataSources, schemaRules, authentication, properties);
log(authentication, properties);
initMetrics(serverConfig.getMetrics());
initCluster(serverConfig.getCluster());
startProxy(shardingSchemaNames, port, schemaDataSources, schemaRules);
}
}
Expand Down Expand Up @@ -186,6 +191,13 @@ private static void initMetrics(final YamlMetricsConfiguration metricsConfigurat
}
}

private static void initCluster(final YamlClusterConfiguration clusterConfiguration) {
if (ProxySchemaContexts.getInstance().getSchemaContexts().getProperties().<Boolean>getValue(ConfigurationPropertyKey.PROXY_CLUSTER_ENABLED)) {
ClusterFacade.getInstance().init(new ClusterConfigurationYamlSwapper().swap(clusterConfiguration));
HeartbeatHandler.getInstance().init(new ClusterConfigurationYamlSwapper().swap(clusterConfiguration).getHeartbeat());
}
}

private static Map<String, Map<String, DataSourceConfiguration>> getDataSourceConfigurationMap(final Map<String, YamlProxyRuleConfiguration> ruleConfigs) {
Map<String, Map<String, DataSourceConfiguration>> result = new LinkedHashMap<>();
for (Entry<String, YamlProxyRuleConfiguration> entry : ruleConfigs.entrySet()) {
Expand Down

0 comments on commit 4b192eb

Please sign in to comment.