Skip to content

Commit

Permalink
Move MetaDataPersistService from MetaDataContexts to PersistServiceFa…
Browse files Browse the repository at this point in the history
…cade (#31376)

* Move MetaDataPersistService from MetaDataContexts to PersistServiceFacade

* Move MetaDataPersistService from MetaDataContexts to PersistServiceFacade

* Move MetaDataPersistService from MetaDataContexts to PersistServiceFacade
  • Loading branch information
menghaoranss committed May 24, 2024
1 parent 8c116fd commit 1daa1fa
Show file tree
Hide file tree
Showing 43 changed files with 190 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.shardingsphere.mode.manager.standalone.workerid.generator.StandaloneWorkerIdGenerator;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
import org.apache.shardingsphere.test.mock.StaticMockSettings;
Expand Down Expand Up @@ -84,6 +85,6 @@ private ContextManager mockContextManager() {
ComputeNodeInstanceContext computeNodeInstanceContext = new ComputeNodeInstanceContext(
new ComputeNodeInstance(mock(InstanceMetaData.class)), new StandaloneWorkerIdGenerator(), new ModeConfiguration("Standalone", null),
mock(LockContext.class), new EventBusContext());
return new ContextManager(metaDataContexts, computeNodeInstanceContext);
return new ContextManager(metaDataContexts, computeNodeInstanceContext, mock(PersistRepository.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.shardingsphere.mode.manager.standalone.workerid.generator.StandaloneWorkerIdGenerator;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
import org.apache.shardingsphere.test.mock.StaticMockSettings;
Expand Down Expand Up @@ -73,6 +74,6 @@ private ContextManager mockContextManager() {
ComputeNodeInstanceContext computeNodeInstanceContext = new ComputeNodeInstanceContext(
new ComputeNodeInstance(mock(InstanceMetaData.class)), new StandaloneWorkerIdGenerator(), new ModeConfiguration("Standalone", null),
mock(LockContext.class), new EventBusContext());
return new ContextManager(metaDataContexts, computeNodeInstanceContext);
return new ContextManager(metaDataContexts, computeNodeInstanceContext, mock(PersistRepository.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private void checkBeforeUpdate(final AlterReadwriteSplittingStorageUnitStatusSta

private void updateStatus(final ContextManager contextManager, final AlterReadwriteSplittingStorageUnitStatusStatement sqlStatement) {
DataSourceState status = sqlStatement.isEnable() ? DataSourceState.ENABLED : DataSourceState.DISABLED;
new QualifiedDataSourceStatusService(contextManager.getMetaDataContexts().getPersistService().getRepository())
new QualifiedDataSourceStatusService(contextManager.getRepository())
.changeStatus(database.getName(), sqlStatement.getRuleName(), sqlStatement.getStorageUnitName(), status);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void executeUpdate() {
checkBeforeUpdate();
RuleConfiguration currentRuleConfig = rule.map(ShardingSphereRule::getConfiguration).orElse(null);
if (getRefreshStatus(rule.isPresent())) {
contextManager.getMetaDataContexts().getPersistService().getMetaDataVersionPersistService()
contextManager.getPersistServiceFacade().getMetaDataPersistService().getMetaDataVersionPersistService()
.switchActiveVersion(DatabaseRuleOperatorFactory.newInstance(contextManager, executor).operate(sqlStatement, database, currentRuleConfig));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private Map<String, DataSource> getTrafficDataSourceMap(final String databaseNam
if (rule.getStrategyRules().isEmpty()) {
return Collections.emptyMap();
}
MetaDataPersistService persistService = contextManager.getMetaDataContexts().getPersistService();
MetaDataPersistService persistService = contextManager.getPersistServiceFacade().getMetaDataPersistService();
String actualDatabaseName = contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName).getName();
Map<String, DataSourcePoolProperties> propsMap = persistService.getDataSourceUnitService().load(actualDatabaseName);
Preconditions.checkState(!propsMap.isEmpty(), "Can not get data source properties from meta data.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private ContextManager mockContextManager() throws SQLException {
Map<String, StorageUnit> storageUnits = mockStorageUnits();
when(result.getStorageUnits(DefaultDatabase.LOGIC_NAME)).thenReturn(storageUnits);
MetaDataPersistService persistService = mockMetaDataPersistService();
when(result.getMetaDataContexts().getPersistService()).thenReturn(persistService);
when(result.getPersistServiceFacade().getMetaDataPersistService()).thenReturn(persistService);
when(result.getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(
new RuleMetaData(Arrays.asList(mock(AuthorityRule.class, RETURNS_DEEP_STUBS), mock(TransactionRule.class, RETURNS_DEEP_STUBS),
mock(TrafficRule.class, RETURNS_DEEP_STUBS))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static PipelineGovernanceFacade getPipelineGovernanceFacade(final Pipelin
@Override
protected PipelineGovernanceFacade initialize() {
ContextManager contextManager = PipelineContextManager.getContext(contextKey).getContextManager();
return new PipelineGovernanceFacade((ClusterPersistRepository) contextManager.getMetaDataContexts().getPersistService().getRepository());
return new PipelineGovernanceFacade((ClusterPersistRepository) contextManager.getPersistServiceFacade().getMetaDataPersistService().getRepository());
}
}).get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private final class PersistRepositoryLazyInitializer extends LazyInitializer<Clu

@Override
protected ClusterPersistRepository initialize() {
return (ClusterPersistRepository) PipelineContextManager.getContext(contextKey).getContextManager().getMetaDataContexts().getPersistService().getRepository();
return (ClusterPersistRepository) PipelineContextManager.getContext(contextKey).getContextManager().getPersistServiceFacade().getMetaDataPersistService().getRepository();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

/**
Expand All @@ -55,8 +55,10 @@ public void persist(final String databaseName, final String schemaName, final Ma
String tableName = entry.getKey().toLowerCase();
List<String> versions = repository.getChildrenKeys(TableMetaDataNode.getTableVersionsNode(databaseName, schemaName, tableName));
String nextActiveVersion = versions.isEmpty() ? DEFAULT_VERSION : String.valueOf(Integer.parseInt(versions.get(0)) + 1);
repository.persist(TableMetaDataNode.getTableVersionNode(databaseName, schemaName, tableName, nextActiveVersion),
YamlEngine.marshal(new YamlTableSwapper().swapToYamlConfiguration(entry.getValue())));
if (entry.getValue() != null) {
repository.persist(TableMetaDataNode.getTableVersionNode(databaseName, schemaName, tableName, nextActiveVersion),
YamlEngine.marshal(new YamlTableSwapper().swapToYamlConfiguration(entry.getValue())));
}
if (Strings.isNullOrEmpty(getActiveVersion(databaseName, schemaName, tableName))) {
repository.persist(TableMetaDataNode.getTableActiveVersionNode(databaseName, schemaName, tableName), DEFAULT_VERSION);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import org.apache.shardingsphere.mode.service.PersistServiceFacade;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.mode.state.StateContext;

import java.sql.SQLException;
Expand Down Expand Up @@ -77,15 +78,18 @@ public final class ContextManager implements AutoCloseable {

private final PersistServiceFacade persistServiceFacade;

public ContextManager(final MetaDataContexts metaDataContexts, final ComputeNodeInstanceContext computeNodeInstanceContext) {
private final PersistRepository repository;

public ContextManager(final MetaDataContexts metaDataContexts, final ComputeNodeInstanceContext computeNodeInstanceContext, final PersistRepository repository) {
this.metaDataContexts = new AtomicReference<>(metaDataContexts);
this.computeNodeInstanceContext = computeNodeInstanceContext;
shardingSphereDatabaseContextManager = new ShardingSphereDatabaseContextManager(this.metaDataContexts);
configurationContextManager = new ConfigurationContextManager(this.metaDataContexts, computeNodeInstanceContext);
resourceMetaDataContextManager = new ResourceMetaDataContextManager(this.metaDataContexts);
this.repository = repository;
persistServiceFacade = new PersistServiceFacade(repository, computeNodeInstanceContext.getModeConfiguration(), this);
shardingSphereDatabaseContextManager = new ShardingSphereDatabaseContextManager(this.metaDataContexts, persistServiceFacade);
configurationContextManager = new ConfigurationContextManager(this.metaDataContexts, computeNodeInstanceContext, persistServiceFacade);
resourceMetaDataContextManager = new ResourceMetaDataContextManager(this.metaDataContexts, persistServiceFacade);
executorEngine = ExecutorEngine.createExecutorEngineWithSize(metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE));
stateContext = new StateContext();
persistServiceFacade = new PersistServiceFacade(metaDataContexts.getPersistService().getRepository(), computeNodeInstanceContext.getModeConfiguration(), this);
}

/**
Expand Down Expand Up @@ -138,7 +142,7 @@ public Map<String, StorageUnit> getStorageUnits(final String databaseName) {
public void refreshDatabaseMetaData(final ShardingSphereDatabase database, final boolean force) {
try {
MetaDataContexts reloadedMetaDataContexts = createMetaDataContexts(database);
MetaDataPersistService persistService = metaDataContexts.get().getPersistService();
MetaDataPersistService persistService = persistServiceFacade.getMetaDataPersistService();
if (force) {
metaDataContexts.set(reloadedMetaDataContexts);
metaDataContexts.get().getMetaData().getDatabase(database.getName()).getSchemas()
Expand All @@ -165,14 +169,14 @@ public void refreshTableMetaData(final ShardingSphereDatabase database) {
deletedSchemaNames(database.getName(), reloadedMetaDataContexts.getMetaData().getDatabase(database.getName()), database);
metaDataContexts.set(reloadedMetaDataContexts);
metaDataContexts.get().getMetaData().getDatabase(database.getName()).getSchemas()
.forEach((schemaName, schema) -> metaDataContexts.get().getPersistService().getDatabaseMetaDataService().compareAndPersist(database.getName(), schemaName, schema));
.forEach((schemaName, schema) -> persistServiceFacade.getMetaDataPersistService().getDatabaseMetaDataService().compareAndPersist(database.getName(), schemaName, schema));
} catch (final SQLException ex) {
log.error("Refresh table meta data: {} failed", database.getName(), ex);
}
}

private MetaDataContexts createMetaDataContexts(final ShardingSphereDatabase database) throws SQLException {
MetaDataPersistService metaDataPersistService = metaDataContexts.get().getPersistService();
MetaDataPersistService metaDataPersistService = persistServiceFacade.getMetaDataPersistService();
Map<String, DataSourcePoolProperties> dataSourcePoolPropsFromRegCenter = metaDataPersistService.getDataSourceUnitService().load(database.getName());
SwitchingResource switchingResource = new ResourceSwitchManager().alterStorageUnit(database.getResourceMetaData(), dataSourcePoolPropsFromRegCenter);
metaDataContexts.get().getMetaData().getDatabases().putAll(configurationContextManager.renewDatabase(database, switchingResource));
Expand All @@ -196,7 +200,7 @@ private MetaDataContexts createMetaDataContexts(final ShardingSphereDatabase dat
*/
public void deletedSchemaNames(final String databaseName, final ShardingSphereDatabase reloadDatabase, final ShardingSphereDatabase currentDatabase) {
GenericSchemaManager.getToBeDeletedSchemaNames(reloadDatabase.getSchemas(), currentDatabase.getSchemas()).keySet()
.forEach(each -> metaDataContexts.get().getPersistService().getDatabaseMetaDataService().dropSchema(databaseName, each));
.forEach(each -> persistServiceFacade.getMetaDataPersistService().getDatabaseMetaDataService().dropSchema(databaseName, each));
}

/**
Expand All @@ -211,11 +215,11 @@ public void reloadSchema(final ShardingSphereDatabase database, final String sch
ShardingSphereSchema reloadedSchema = loadSchema(database, schemaName, dataSourceName);
if (reloadedSchema.getTables().isEmpty()) {
database.dropSchema(schemaName);
metaDataContexts.get().getPersistService().getDatabaseMetaDataService().dropSchema(database.getName(),
persistServiceFacade.getMetaDataPersistService().getDatabaseMetaDataService().dropSchema(database.getName(),
schemaName);
} else {
database.addSchema(schemaName, reloadedSchema);
metaDataContexts.get().getPersistService().getDatabaseMetaDataService()
persistServiceFacade.getMetaDataPersistService().getDatabaseMetaDataService()
.compareAndPersist(database.getName(), schemaName, reloadedSchema);
}
} catch (final SQLException ex) {
Expand All @@ -230,7 +234,7 @@ private ShardingSphereSchema loadSchema(final ShardingSphereDatabase database, f
Collections.singletonMap(dataSourceName, database.getResourceMetaData().getStorageUnits().get(dataSourceName).getDataSource()),
database.getRuleMetaData().getRules(), metaDataContexts.get().getMetaData().getProps(), schemaName);
ShardingSphereSchema result = GenericSchemaBuilder.build(material).get(schemaName);
result.getViews().putAll(metaDataContexts.get().getPersistService().getDatabaseMetaDataService().getViewMetaDataPersistService().load(database.getName(), schemaName));
result.getViews().putAll(persistServiceFacade.getMetaDataPersistService().getDatabaseMetaDataService().getViewMetaDataPersistService().load(database.getName(), schemaName));
return result;
}

Expand Down Expand Up @@ -273,13 +277,14 @@ public void reloadTable(final ShardingSphereDatabase database, final String sche

private void persistTable(final ShardingSphereDatabase database, final String schemaName, final String tableName, final GenericSchemaBuilderMaterial material) throws SQLException {
ShardingSphereSchema schema = GenericSchemaBuilder.build(Collections.singleton(tableName), material).getOrDefault(schemaName, new ShardingSphereSchema());
metaDataContexts.get().getPersistService().getDatabaseMetaDataService().getTableMetaDataPersistService()
persistServiceFacade.getMetaDataPersistService().getDatabaseMetaDataService().getTableMetaDataPersistService()
.persist(database.getName(), schemaName, Collections.singletonMap(tableName, schema.getTable(tableName)));
}

@Override
public void close() {
executorEngine.close();
metaDataContexts.get().close();
repository.close();
}
}
Loading

0 comments on commit 1daa1fa

Please sign in to comment.