Skip to content

Commit

Permalink
Support multi replica route and exeute (#5748)
Browse files Browse the repository at this point in the history
* for code style

* refactor ExecuteGroupDecorator

* refactor ExecuteGroupEngine

* Add ReplicaRouteDecorator

* add ReplicaExecuteGroupDecorator

* refactor ReplicaExecuteGroupDecorator

* Add replica executor

* adjust order

* Merge branch 'master' into dev

# Conflicts:
#	shardingsphere-features/shardingsphere-master-slave/shardingsphere-master-slave-route/src/main/java/org/apache/shardingsphere/masterslave/route/engine/MasterSlaveRouteDecorator.java
  • Loading branch information
terrymanu committed May 22, 2020
1 parent 34476cd commit 0f6fe30
Show file tree
Hide file tree
Showing 35 changed files with 673 additions and 55 deletions.
Expand Up @@ -35,16 +35,19 @@ public final class MasterSlaveShadowDatabasesConfiguration implements ExampleCon

@Override
public DataSource getDataSource() throws SQLException {
ShadowRuleConfiguration shadowRuleConfiguration = new ShadowRuleConfiguration("shadow", new HashMap<String, String>(){{
put("ds_master", "shadow_ds_master");
put("ds_slave", "shadow_ds_slave");
}});
Map<String, DataSource> dataSourceMap = new HashMap<>();
dataSourceMap.put("ds_master", DataSourceUtil.createDataSource("demo_ds_master"));
dataSourceMap.put("ds_slave", DataSourceUtil.createDataSource("demo_ds_slave"));
dataSourceMap.put("shadow_ds_master", DataSourceUtil.createDataSource("demo_shadow_ds_master"));
dataSourceMap.put("shadow_ds_slave", DataSourceUtil.createDataSource("demo_shadow_ds_slave"));
return ShardingSphereDataSourceFactory.createDataSource(dataSourceMap, Arrays.asList(shadowRuleConfiguration, getMasterSlaveRuleConfiguration()), null);
return ShardingSphereDataSourceFactory.createDataSource(dataSourceMap, Arrays.asList(new ShadowRuleConfiguration("shadow", createShadowMappings()), getMasterSlaveRuleConfiguration()), null);
}

private Map<String, String> createShadowMappings() {
Map<String, String> result = new HashMap<>();
result.put("ds_master", "shadow_ds_master");
result.put("ds_slave", "shadow_ds_slave");
return result;
}

private MasterSlaveRuleConfiguration getMasterSlaveRuleConfiguration() {
Expand Down
Expand Up @@ -41,7 +41,7 @@ public final class MySQLBitBinlogProtocolValueTest {
private MySQLBitBinlogProtocolValue actual;

@Before
public void setUp() throws Exception {
public void setUp() {
actual = new MySQLBitBinlogProtocolValue();
}

Expand Down
Expand Up @@ -17,16 +17,17 @@

package org.apache.shardingsphere.masterslave.route.engine;

import org.apache.shardingsphere.masterslave.rule.MasterSlaveDataSourceRule;
import org.apache.shardingsphere.masterslave.rule.MasterSlaveRule;
import org.apache.shardingsphere.masterslave.route.engine.impl.MasterSlaveDataSourceRouter;
import org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
import org.apache.shardingsphere.infra.database.DefaultSchema;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.route.context.RouteContext;
import org.apache.shardingsphere.infra.route.context.RouteMapper;
import org.apache.shardingsphere.infra.route.context.RouteResult;
import org.apache.shardingsphere.infra.route.context.RouteUnit;
import org.apache.shardingsphere.infra.route.decorator.RouteDecorator;
import org.apache.shardingsphere.masterslave.route.engine.impl.MasterSlaveDataSourceRouter;
import org.apache.shardingsphere.masterslave.rule.MasterSlaveDataSourceRule;
import org.apache.shardingsphere.masterslave.rule.MasterSlaveRule;

import java.util.Collection;
import java.util.Collections;
Expand All @@ -43,7 +44,7 @@ public RouteContext decorate(final RouteContext routeContext, final ShardingSphe
if (routeContext.getRouteResult().getRouteUnits().isEmpty()) {
String dataSourceName = new MasterSlaveDataSourceRouter(masterSlaveRule.getSingleDataSourceRule()).route(routeContext.getSqlStatementContext().getSqlStatement());
RouteResult routeResult = new RouteResult();
routeResult.getRouteUnits().add(new RouteUnit(new RouteMapper(dataSourceName, dataSourceName), Collections.emptyList()));
routeResult.getRouteUnits().add(new RouteUnit(new RouteMapper(DefaultSchema.LOGIC_NAME, dataSourceName), Collections.emptyList()));
return new RouteContext(routeContext.getSqlStatementContext(), routeContext.getParameters(), routeResult);
}
Collection<RouteUnit> toBeRemoved = new LinkedList<>();
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;

/**
Expand All @@ -50,15 +51,30 @@ public Collection<String> getSingleReplicaDataSources() {
}

/**
* Find data replica data sources.
* Find replica data sources.
*
* @param dataSourceName data source name
* @return replica data source names
*/
public Optional<Collection<String>> findDataSourceRule(final String dataSourceName) {
public Optional<Collection<String>> findReplicaDataSources(final String dataSourceName) {
return Optional.ofNullable(dataSourceRules.get(dataSourceName));
}

/**
* Find logic data source name.
*
* @param replicaDataSourceName replica data source name
* @return logic data source name
*/
public Optional<String> findLogicDataSource(final String replicaDataSourceName) {
for (Entry<String, Collection<String>> entry : dataSourceRules.entrySet()) {
if (entry.getValue().contains(replicaDataSourceName)) {
return Optional.of(entry.getKey());
}
}
return Optional.empty();
}

@Override
public Map<String, Collection<String>> getDataSourceMapper() {
return dataSourceRules;
Expand Down
Expand Up @@ -34,7 +34,7 @@ public ReplicaRule build(final ReplicaRuleConfiguration ruleConfiguration, final

@Override
public int getOrder() {
return 5;
return 7;
}

@Override
Expand Down
Expand Up @@ -73,6 +73,6 @@ public String getRuleTagName() {

@Override
public int getOrder() {
return 10;
return 12;
}
}
Expand Up @@ -27,4 +27,17 @@
</parent>
<artifactId>shardingsphere-replica-execute</artifactId>
<name>${project.artifactId}</name>

<dependencies>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-replica-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-infra-executor</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.replica.execute.executor;

import org.apache.shardingsphere.infra.executor.sql.execute.jdbc.executor.SQLExecutorCallback;
import org.apache.shardingsphere.infra.spi.order.OrderedSPI;
import org.apache.shardingsphere.replica.rule.ReplicaRule;

/**
* SQL executor callback for replica.
*
* @param <T> class type of return value
*/
public abstract class ReplicaSQLExecutorCallback<T> implements SQLExecutorCallback<T>, OrderedSPI<ReplicaRule> {

@Override
public final int getOrder() {
return 5;
}

@Override
public final Class<ReplicaRule> getTypeClass() {
return ReplicaRule.class;
}
}
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.replica.execute.group;

import org.apache.shardingsphere.infra.executor.kernel.InputGroup;
import org.apache.shardingsphere.infra.executor.sql.StorageResourceExecuteUnit;
import org.apache.shardingsphere.infra.executor.sql.group.ExecuteGroupDecorator;
import org.apache.shardingsphere.replica.rule.ReplicaRule;

import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;

/**
* Execute group decorator for replica.
*
* @param <T> type of input value
*/
public final class ReplicaExecuteGroupDecorator<T extends StorageResourceExecuteUnit> implements ExecuteGroupDecorator<T, ReplicaRule> {

@Override
public Collection<InputGroup<T>> decorate(final ReplicaRule rule, final Collection<InputGroup<T>> inputGroups) {
Map<String, InputGroup<T>> result = new LinkedHashMap<>(inputGroups.size(), 1);
for (InputGroup<T> each : inputGroups) {
T sample = each.getInputs().get(0);
String dataSourceName = sample.getExecutionUnit().getDataSourceName();
Optional<String> logicDataSource = rule.findLogicDataSource(dataSourceName);
if (logicDataSource.isPresent() && result.containsKey(dataSourceName)) {
result.get(dataSourceName).getInputs().addAll(each.getInputs());
} else {
result.put(dataSourceName, each);
}
}
return result.values();
}

@Override
public int getOrder() {
return 5;
}

@Override
public Class<ReplicaRule> getTypeClass() {
return ReplicaRule.class;
}
}
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.shardingsphere.replica.execute.group.ReplicaExecuteGroupDecorator
Expand Up @@ -27,4 +27,17 @@
</parent>
<artifactId>shardingsphere-replica-route</artifactId>
<name>${project.artifactId}</name>

<dependencies>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-infra-route</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-replica-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.replica.route.engine;

import org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
import org.apache.shardingsphere.infra.database.DefaultSchema;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.route.context.RouteContext;
import org.apache.shardingsphere.infra.route.context.RouteMapper;
import org.apache.shardingsphere.infra.route.context.RouteResult;
import org.apache.shardingsphere.infra.route.context.RouteUnit;
import org.apache.shardingsphere.infra.route.decorator.RouteDecorator;
import org.apache.shardingsphere.replica.rule.ReplicaRule;

import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Optional;

/**
* Route decorator for replica.
*/
public final class ReplicaRouteDecorator implements RouteDecorator<ReplicaRule> {

@Override
public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ReplicaRule replicaRule, final ConfigurationProperties properties) {
if (routeContext.getRouteResult().getRouteUnits().isEmpty()) {
RouteResult routeResult = new RouteResult();
for (String each : replicaRule.getSingleReplicaDataSources()) {
routeResult.getRouteUnits().add(new RouteUnit(new RouteMapper(DefaultSchema.LOGIC_NAME, each), Collections.emptyList()));
}
return new RouteContext(routeContext.getSqlStatementContext(), Collections.emptyList(), routeResult);
}
Collection<RouteUnit> toBeRemoved = new LinkedList<>();
Collection<RouteUnit> toBeAdded = new LinkedList<>();
for (RouteUnit each : routeContext.getRouteResult().getRouteUnits()) {
String dataSourceName = each.getDataSourceMapper().getLogicName();
Optional<Collection<String>> replicaDataSources = replicaRule.findReplicaDataSources(dataSourceName);
if (!replicaDataSources.isPresent()) {
continue;
}
toBeRemoved.add(each);
for (String replicaDataSource : replicaDataSources.get()) {
toBeAdded.add(new RouteUnit(new RouteMapper(dataSourceName, replicaDataSource), each.getTableMappers()));
}
}
routeContext.getRouteResult().getRouteUnits().removeAll(toBeRemoved);
routeContext.getRouteResult().getRouteUnits().addAll(toBeAdded);
return routeContext;
}

@Override
public int getOrder() {
return 12;
}

@Override
public Class<ReplicaRule> getTypeClass() {
return ReplicaRule.class;
}
}
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.shardingsphere.replica.route.engine.ReplicaRouteDecorator

0 comments on commit 0f6fe30

Please sign in to comment.