diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java index eba47d35d61f7..f36e16e6f2d83 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java @@ -21,7 +21,7 @@ import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper; /** @@ -32,6 +32,6 @@ public abstract class AbstractIncrementalDumper

extends AbstractLifecycleExecutor implements IncrementalDumper { public AbstractIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition

position, - final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) { + final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { } } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java index 063941a68454a..b537ee1b62e7d 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java @@ -19,9 +19,11 @@ import lombok.AccessLevel; import lombok.Getter; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.concurrent.ConcurrentException; +import org.apache.commons.lang3.concurrent.LazyInitializer; import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; -import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; @@ -34,7 +36,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType; import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException; import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; @@ -42,6 +43,7 @@ import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper; import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm; +import javax.sql.DataSource; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -62,29 +64,28 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor private final JobRateLimitAlgorithm rateLimitAlgorithm; - private final PipelineDataSourceManager dataSourceManager; - - private final PipelineTableMetaData tableMetaData; + private final LazyInitializer tableMetaDataLazyInitializer; private final PipelineChannel channel; - protected AbstractInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) { + private final DataSource dataSource; + + protected AbstractInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineChannel channel, + final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) { if (!StandardPipelineDataSourceConfiguration.class.equals(inventoryDumperConfig.getDataSourceConfig().getClass())) { throw new UnsupportedOperationException("AbstractInventoryDumper only support StandardPipelineDataSourceConfiguration"); } this.inventoryDumperConfig = inventoryDumperConfig; this.batchSize = inventoryDumperConfig.getBatchSize(); this.rateLimitAlgorithm = inventoryDumperConfig.getRateLimitAlgorithm(); - this.dataSourceManager = dataSourceManager; + tableMetaDataLazyInitializer = new LazyInitializer() { + @Override + protected PipelineTableMetaData initialize() { + return metaDataLoader.getTableMetaData(inventoryDumperConfig.getTableName()); + } + }; this.channel = channel; - tableMetaData = createTableMetaData(); - } - - private PipelineTableMetaData createTableMetaData() { - PipelineDataSourceConfiguration dataSourceConfig = inventoryDumperConfig.getDataSourceConfig(); - // TODO share PipelineTableMetaDataLoader - PipelineTableMetaDataLoader metaDataManager = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dataSourceConfig)); - return metaDataManager.getTableMetaData(inventoryDumperConfig.getTableName()); + this.dataSource = dataSource; } @Override @@ -96,7 +97,7 @@ private void dump() { String sql = getDumpSQL(); IngestPosition position = inventoryDumperConfig.getPosition(); log.info("inventory dump, sql={}, position={}", sql, position); - try (Connection conn = dataSourceManager.getDataSource(inventoryDumperConfig.getDataSourceConfig()).getConnection()) { + try (Connection conn = dataSource.getConnection()) { int round = 1; Number startUniqueKeyValue = getPositionBeginValue(position) - 1; Optional maxUniqueKeyValue; @@ -123,10 +124,16 @@ private String getDumpSQL() { return "SELECT * FROM " + tableName + " WHERE " + primaryKey + " > ? AND " + primaryKey + " <= ? ORDER BY " + primaryKey + " ASC LIMIT ?"; } + @SneakyThrows(ConcurrentException.class) + private PipelineTableMetaData getTableMetaData() { + return tableMetaDataLazyInitializer.get(); + } + private Optional dump0(final Connection conn, final String sql, final Number startUniqueKeyValue, final int round) throws SQLException { if (null != rateLimitAlgorithm) { rateLimitAlgorithm.intercept(JobOperationType.SELECT, 1); } + PipelineTableMetaData tableMetaData = getTableMetaData(); try (PreparedStatement preparedStatement = createPreparedStatement(conn, sql)) { preparedStatement.setObject(1, startUniqueKeyValue); preparedStatement.setObject(2, getPositionEndValue(inventoryDumperConfig.getPosition())); diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java index f0a8c9b38f366..a40065efeb53e 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java @@ -32,6 +32,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException; import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback; import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.spi.importer.Importer; import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory; import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper; @@ -47,7 +48,7 @@ * Incremental task. */ @Slf4j -@ToString(exclude = {"incrementalDumperExecuteEngine", "dataSourceManager", "dumper", "progress"}) +@ToString(exclude = {"incrementalDumperExecuteEngine", "channel", "dumper", "importers", "progress"}) public final class IncrementalTask extends AbstractLifecycleExecutor implements PipelineTask, AutoCloseable { @Getter @@ -55,8 +56,6 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements private final ExecuteEngine incrementalDumperExecuteEngine; - private final PipelineDataSourceManager dataSourceManager; - private final PipelineChannel channel; private final Dumper dumper; @@ -68,15 +67,14 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements public IncrementalTask(final int concurrency, final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig, final PipelineChannelFactory pipelineChannelFactory, final PipelineDataSourceManager dataSourceManager, - final ExecuteEngine incrementalDumperExecuteEngine) { + final PipelineTableMetaDataLoader sourceMetaDataLoader, final ExecuteEngine incrementalDumperExecuteEngine) { this.incrementalDumperExecuteEngine = incrementalDumperExecuteEngine; - this.dataSourceManager = dataSourceManager; taskId = dumperConfig.getDataSourceName(); progress = new IncrementalTaskProgress(); IngestPosition position = dumperConfig.getPosition(); progress.setPosition(position); channel = createChannel(concurrency, pipelineChannelFactory, progress); - dumper = DumperFactory.createIncrementalDumper(dumperConfig, position, dataSourceManager, channel); + dumper = DumperFactory.createIncrementalDumper(dumperConfig, position, channel, sourceMetaDataLoader); importers = createImporters(concurrency, importerConfig, dataSourceManager, channel); } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java index ffd0f97bfd441..b4d814b8334f8 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java @@ -32,12 +32,14 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException; import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback; import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.spi.importer.Importer; import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory; import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper; import org.apache.shardingsphere.scaling.core.job.dumper.DumperFactory; import org.apache.shardingsphere.scaling.core.job.importer.ImporterFactory; +import javax.sql.DataSource; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -46,7 +48,7 @@ * Inventory task. */ @Slf4j -@ToString(exclude = {"importerExecuteEngine", "dataSourceManager", "dumper"}) +@ToString(exclude = {"importerExecuteEngine", "channel", "dumper", "importer"}) public final class InventoryTask extends AbstractLifecycleExecutor implements PipelineTask, AutoCloseable { @Getter @@ -54,8 +56,6 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi private final ExecuteEngine importerExecuteEngine; - private final PipelineDataSourceManager dataSourceManager; - private final PipelineChannel channel; private final Dumper dumper; @@ -66,12 +66,12 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi public InventoryTask(final InventoryDumperConfiguration inventoryDumperConfig, final ImporterConfiguration importerConfig, final PipelineChannelFactory pipelineChannelFactory, final PipelineDataSourceManager dataSourceManager, + final DataSource sourceDataSource, final PipelineTableMetaDataLoader sourceMetaDataLoader, final ExecuteEngine importerExecuteEngine) { this.importerExecuteEngine = importerExecuteEngine; - this.dataSourceManager = dataSourceManager; taskId = generateTaskId(inventoryDumperConfig); channel = createChannel(pipelineChannelFactory); - dumper = DumperFactory.createInventoryDumper(inventoryDumperConfig, dataSourceManager, channel); + dumper = DumperFactory.createInventoryDumper(inventoryDumperConfig, channel, sourceDataSource, sourceMetaDataLoader); importer = ImporterFactory.createImporter(importerConfig, dataSourceManager, channel); position = inventoryDumperConfig.getPosition(); } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java index 4612f5cde534f..32562af86ab79 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java @@ -19,12 +19,17 @@ import lombok.Getter; import lombok.Setter; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.concurrent.ConcurrentException; +import org.apache.commons.lang3.concurrent.LazyInitializer; import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration; import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration; +import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.api.job.JobStatus; import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask; import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask; @@ -60,6 +65,20 @@ public final class RuleAlteredJobContext { private final PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager(); + private final LazyInitializer sourceDataSourceLazyInitializer = new LazyInitializer() { + @Override + protected PipelineDataSourceWrapper initialize() { + return dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig()); + } + }; + + private final LazyInitializer sourceMetaDataLoaderLazyInitializer = new LazyInitializer() { + @Override + protected PipelineTableMetaDataLoader initialize() throws ConcurrentException { + return new PipelineTableMetaDataLoader(sourceDataSourceLazyInitializer.get()); + } + }; + private RuleAlteredJobPreparer jobPreparer; public RuleAlteredJobContext(final JobConfiguration jobConfig) { @@ -71,6 +90,26 @@ public RuleAlteredJobContext(final JobConfiguration jobConfig) { taskConfig = RuleAlteredJobWorker.buildTaskConfig(jobConfig.getPipelineConfig(), jobConfig.getHandleConfig(), ruleAlteredContext.getOnRuleAlteredActionConfig()); } + /** + * Get source data source. + * + * @return source data source + */ + @SneakyThrows(ConcurrentException.class) + public PipelineDataSourceWrapper getSourceDataSource() { + return sourceDataSourceLazyInitializer.get(); + } + + /** + * Get source metadata loader. + * + * @return source metadata loader + */ + @SneakyThrows(ConcurrentException.class) + public PipelineTableMetaDataLoader getSourceMetaDataLoader() { + return sourceMetaDataLoaderLazyInitializer.get(); + } + /** * Release resources. */ diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java index fa95de70ee191..e756f4be5856c 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java @@ -29,6 +29,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException; import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine; import org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer; import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter; import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask; @@ -64,8 +65,8 @@ public void prepare(final RuleAlteredJobContext jobContext) { prepareTarget(jobContext.getJobConfig(), dataSourceManager); initAndCheckDataSource(jobContext); try { - initIncrementalTasks(jobContext, dataSourceManager); - initInventoryTasks(jobContext, dataSourceManager); + initIncrementalTasks(jobContext); + initInventoryTasks(jobContext); log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={}, incrementalTasks={}", jobContext.getJobId(), jobContext.getShardingItem(), jobContext.getInventoryTasks(), jobContext.getIncrementalTasks()); } catch (final SQLException ex) { @@ -112,18 +113,20 @@ private void checkTargetDataSource(final RuleAlteredJobContext jobContext, final dataSourceChecker.checkTargetTable(targetDataSources, jobContext.getTaskConfig().getImporterConfig().getShardingColumnsMap().keySet()); } - private void initInventoryTasks(final RuleAlteredJobContext jobContext, final PipelineDataSourceManager dataSourceManager) { - List allInventoryTasks = inventoryTaskSplitter.splitInventoryData(jobContext, dataSourceManager); + private void initInventoryTasks(final RuleAlteredJobContext jobContext) { + List allInventoryTasks = inventoryTaskSplitter.splitInventoryData(jobContext); jobContext.getInventoryTasks().addAll(allInventoryTasks); } - private void initIncrementalTasks(final RuleAlteredJobContext jobContext, final PipelineDataSourceManager dataSourceManager) throws SQLException { + private void initIncrementalTasks(final RuleAlteredJobContext jobContext) throws SQLException { PipelineChannelFactory pipelineChannelFactory = jobContext.getRuleAlteredContext().getPipelineChannelFactory(); ExecuteEngine incrementalDumperExecuteEngine = jobContext.getRuleAlteredContext().getIncrementalDumperExecuteEngine(); TaskConfiguration taskConfig = jobContext.getTaskConfig(); + PipelineDataSourceManager dataSourceManager = jobContext.getDataSourceManager(); taskConfig.getDumperConfig().setPosition(getIncrementalPosition(jobContext, taskConfig, dataSourceManager)); + PipelineTableMetaDataLoader sourceMetaDataLoader = jobContext.getSourceMetaDataLoader(); IncrementalTask incrementalTask = new IncrementalTask(taskConfig.getHandleConfig().getConcurrency(), taskConfig.getDumperConfig(), taskConfig.getImporterConfig(), - pipelineChannelFactory, dataSourceManager, incrementalDumperExecuteEngine); + pipelineChannelFactory, dataSourceManager, sourceMetaDataLoader, incrementalDumperExecuteEngine); jobContext.getIncrementalTasks().add(incrementalTask); } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java index 757b7547d8897..2a9ff69e87c74 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java @@ -22,7 +22,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration; import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration; -import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition; import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPosition; @@ -65,25 +64,26 @@ public final class InventoryTaskSplitter { * Split inventory data to multi-tasks. * * @param jobContext job context - * @param dataSourceManager data source manager * @return split inventory data task */ - public List splitInventoryData(final RuleAlteredJobContext jobContext, final PipelineDataSourceManager dataSourceManager) { + public List splitInventoryData(final RuleAlteredJobContext jobContext) { List result = new LinkedList<>(); TaskConfiguration taskConfig = jobContext.getTaskConfig(); PipelineChannelFactory pipelineChannelFactory = jobContext.getRuleAlteredContext().getPipelineChannelFactory(); + PipelineDataSourceManager dataSourceManager = jobContext.getDataSourceManager(); + DataSource dataSource = jobContext.getSourceDataSource(); + PipelineTableMetaDataLoader metaDataLoader = jobContext.getSourceMetaDataLoader(); ExecuteEngine importerExecuteEngine = jobContext.getRuleAlteredContext().getImporterExecuteEngine(); - for (InventoryDumperConfiguration each : splitDumperConfig(jobContext, taskConfig.getDumperConfig(), dataSourceManager)) { - result.add(new InventoryTask(each, taskConfig.getImporterConfig(), pipelineChannelFactory, dataSourceManager, importerExecuteEngine)); + for (InventoryDumperConfiguration each : splitDumperConfig(jobContext, taskConfig.getDumperConfig())) { + result.add(new InventoryTask(each, taskConfig.getImporterConfig(), pipelineChannelFactory, dataSourceManager, dataSource, metaDataLoader, importerExecuteEngine)); } return result; } - private Collection splitDumperConfig( - final RuleAlteredJobContext jobContext, final DumperConfiguration dumperConfig, final PipelineDataSourceManager dataSourceManager) { + private Collection splitDumperConfig(final RuleAlteredJobContext jobContext, final DumperConfiguration dumperConfig) { Collection result = new LinkedList<>(); - PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); - PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource); + DataSource dataSource = jobContext.getSourceDataSource(); + PipelineTableMetaDataLoader metaDataLoader = jobContext.getSourceMetaDataLoader(); for (InventoryDumperConfiguration each : splitByTable(dumperConfig)) { result.addAll(splitByPrimaryKey(jobContext, dataSource, metaDataLoader, each)); } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java index eed54f62854da..3ecacf1e72eef 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java @@ -24,12 +24,13 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper; import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper; import org.apache.shardingsphere.scaling.core.spi.ScalingEntry; import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader; +import javax.sql.DataSource; import java.lang.reflect.Constructor; /** @@ -42,16 +43,18 @@ public final class DumperFactory { * Create inventory dumper. * * @param inventoryDumperConfig inventory dumper configuration - * @param dataSourceManager data source factory * @param channel channel + * @param sourceDataSource data source + * @param sourceMetaDataLoader metadata loader * @return inventory dumper */ @SneakyThrows(ReflectiveOperationException.class) - public static InventoryDumper createInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) { + public static InventoryDumper createInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineChannel channel, + final DataSource sourceDataSource, final PipelineTableMetaDataLoader sourceMetaDataLoader) { ScalingEntry scalingEntry = ScalingEntryLoader.getInstance(inventoryDumperConfig.getDataSourceConfig().getDatabaseType().getName()); Constructor constructor = scalingEntry.getInventoryDumperClass() - .getConstructor(InventoryDumperConfiguration.class, PipelineDataSourceManager.class, PipelineChannel.class); - return constructor.newInstance(inventoryDumperConfig, dataSourceManager, channel); + .getConstructor(InventoryDumperConfiguration.class, PipelineChannel.class, DataSource.class, PipelineTableMetaDataLoader.class); + return constructor.newInstance(inventoryDumperConfig, channel, sourceDataSource, sourceMetaDataLoader); } /** @@ -59,17 +62,17 @@ public static InventoryDumper createInventoryDumper(final InventoryDumperConfigu * * @param dumperConfig dumper configuration * @param position position - * @param dataSourceManager data source manager * @param channel channel + * @param sourceMetaDataLoader metadata loader * @return incremental dumper */ @SneakyThrows(ReflectiveOperationException.class) public static IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition position, - final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) { + final PipelineChannel channel, final PipelineTableMetaDataLoader sourceMetaDataLoader) { String databaseType = dumperConfig.getDataSourceConfig().getDatabaseType().getName(); ScalingEntry scalingEntry = ScalingEntryLoader.getInstance(databaseType); Constructor constructor = scalingEntry.getIncrementalDumperClass() - .getConstructor(DumperConfiguration.class, IngestPosition.class, PipelineDataSourceManager.class, PipelineChannel.class); - return constructor.newInstance(dumperConfig, position, dataSourceManager, channel); + .getConstructor(DumperConfiguration.class, IngestPosition.class, PipelineChannel.class, PipelineTableMetaDataLoader.class); + return constructor.newInstance(dumperConfig, position, channel, sourceMetaDataLoader); } } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java index 60099401a40cc..d90e75d38012e 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java @@ -30,7 +30,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper; import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; @@ -79,13 +78,13 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper binlogPosition, - final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) { - super(dumperConfig, binlogPosition, dataSourceManager, channel); + final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { + super(dumperConfig, binlogPosition, channel, metaDataLoader); this.binlogPosition = (BinlogPosition) binlogPosition; this.dumperConfig = dumperConfig; Preconditions.checkArgument(dumperConfig.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration"); this.channel = channel; - metaDataLoader = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig())); + this.metaDataLoader = metaDataLoader; } @Override diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java index 809d5b980763b..5a22aec8962cd 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java @@ -19,9 +19,10 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; +import javax.sql.DataSource; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -36,8 +37,9 @@ public final class MySQLInventoryDumper extends AbstractInventoryDumper { private static final String YEAR_DATA_TYPE = "YEAR"; - public MySQLInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) { - super(inventoryDumperConfig, dataSourceManager, channel); + public MySQLInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineChannel channel, + final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) { + super(inventoryDumperConfig, channel, dataSource, metaDataLoader); Properties queryProps = new Properties(); queryProps.setProperty("yearIsDateType", Boolean.FALSE.toString()); inventoryDumperConfig.getDataSourceConfig().appendJDBCQueryProperties(queryProps); diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java index 1b44d6a4fd4b0..173ee3925e45e 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java @@ -27,6 +27,7 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType; import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil; import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition; import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent; @@ -34,6 +35,7 @@ import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.PlaceholderEvent; import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent; import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -56,12 +58,20 @@ public final class MySQLIncrementalDumperTest { private MultiplexMemoryPipelineChannel channel; + private final PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager(); + @Before public void setUp() { DumperConfiguration dumperConfig = mockDumperConfiguration(); initTableData(dumperConfig); channel = new MultiplexMemoryPipelineChannel(); - incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new BinlogPosition("binlog-000001", 4L), new PipelineDataSourceManager(), channel); + PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig())); + incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new BinlogPosition("binlog-000001", 4L), channel, metaDataLoader); + } + + @After + public void tearDown() { + dataSourceManager.close(); } private DumperConfiguration mockDumperConfiguration() { diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLJdbcDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLJdbcDumperTest.java index d74c70d7efdab..5dbfcf37661ed 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLJdbcDumperTest.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLJdbcDumperTest.java @@ -20,13 +20,14 @@ import lombok.SneakyThrows; import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; import javax.sql.DataSource; @@ -45,22 +46,20 @@ @RunWith(MockitoJUnitRunner.class) public final class MySQLJdbcDumperTest { - private PipelineDataSourceManager dataSourceManager; - private MySQLInventoryDumper mysqlJdbcDumper; - @Mock - private Connection connection; - @Before public void setUp() { - dataSourceManager = new PipelineDataSourceManager(); - mysqlJdbcDumper = new MySQLInventoryDumper(mockInventoryDumperConfiguration(), dataSourceManager, new SimpleMemoryPipelineChannel(100)); + PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager(); + InventoryDumperConfiguration dumperConfig = mockInventoryDumperConfiguration(); + PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); + mysqlJdbcDumper = new MySQLInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100), + dataSource, new PipelineTableMetaDataLoader(dataSource)); + initTableData(dataSource); } private InventoryDumperConfiguration mockInventoryDumperConfiguration() { DumperConfiguration dumperConfig = mockDumperConfiguration(); - initTableData(dumperConfig); InventoryDumperConfiguration result = new InventoryDumperConfiguration(dumperConfig); result.setTableName("t_order"); return result; @@ -73,8 +72,7 @@ private DumperConfiguration mockDumperConfiguration() { } @SneakyThrows(SQLException.class) - private void initTableData(final DumperConfiguration dumperConfig) { - DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); + private void initTableData(final DataSource dataSource) { try (Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement()) { statement.execute("DROP TABLE IF EXISTS t_order"); @@ -98,6 +96,7 @@ public void assertReadValue() throws SQLException { @Test public void assertCreatePreparedStatement() throws SQLException { + Connection connection = mock(Connection.class); when(connection.prepareStatement("", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)).thenReturn(mock(PreparedStatement.class)); PreparedStatement preparedStatement = mysqlJdbcDumper.createPreparedStatement(connection, ""); verify(preparedStatement).setFetchSize(Integer.MIN_VALUE); diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java index a91ba155e972b..693d973ee600e 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java @@ -23,10 +23,10 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper; import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil; import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLogicalReplication; import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.MppdbDecodingPlugin; @@ -64,15 +64,15 @@ public final class OpenGaussWalDumper extends AbstractIncrementalDumper position, - final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) { - super(dumperConfig, position, dataSourceManager, channel); + final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { + super(dumperConfig, position, channel, metaDataLoader); walPosition = (WalPosition) position; if (!StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass())) { throw new UnsupportedOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration"); } this.dumperConfig = dumperConfig; this.channel = channel; - walEventConverter = new WalEventConverter(dumperConfig, dataSourceManager); + walEventConverter = new WalEventConverter(dumperConfig, metaDataLoader); } @Override diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java index 4b9a0050a4669..2f6a38b5919a3 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java @@ -19,10 +19,11 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; import org.postgresql.util.PGobject; +import javax.sql.DataSource; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -38,8 +39,9 @@ public final class PostgreSQLInventoryDumper extends AbstractInventoryDumper { private static final String PG_BIT_TYPE = "bit"; - public PostgreSQLInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) { - super(inventoryDumperConfig, dataSourceManager, channel); + public PostgreSQLInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineChannel channel, + final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) { + super(inventoryDumperConfig, channel, dataSource, metaDataLoader); } @Override diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java index 4ae61bccaed24..effdcf15d4067 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java @@ -23,9 +23,9 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper; import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.LogicalReplication; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalEventConverter; @@ -60,15 +60,15 @@ public final class PostgreSQLWalDumper extends AbstractIncrementalDumper position, - final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) { - super(dumperConfig, position, dataSourceManager, channel); + final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { + super(dumperConfig, position, channel, metaDataLoader); walPosition = (WalPosition) position; if (!StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass())) { throw new UnsupportedOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration"); } this.dumperConfig = dumperConfig; this.channel = channel; - walEventConverter = new WalEventConverter(dumperConfig, dataSourceManager); + walEventConverter = new WalEventConverter(dumperConfig, metaDataLoader); } @Override diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java index c65987fb8576e..57038e5ae10eb 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java @@ -22,7 +22,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType; import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData; @@ -44,9 +43,9 @@ public final class WalEventConverter { private final PipelineTableMetaDataLoader metaDataLoader; - public WalEventConverter(final DumperConfiguration dumperConfig, final PipelineDataSourceManager dataSourceManager) { + public WalEventConverter(final DumperConfiguration dumperConfig, final PipelineTableMetaDataLoader metaDataLoader) { this.dumperConfig = dumperConfig; - metaDataLoader = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig())); + this.metaDataLoader = metaDataLoader; } /** diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java index 7733e00e2e20b..7c0b852b87f75 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java @@ -20,9 +20,11 @@ import lombok.SneakyThrows; import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; import org.junit.Before; import org.junit.Test; @@ -37,27 +39,29 @@ public final class PostgreSQLJdbcDumperTest { - private PipelineDataSourceManager dataSourceManager; + private PipelineDataSourceWrapper dataSource; private PostgreSQLInventoryDumper jdbcDumper; @Before public void setUp() { - dataSourceManager = new PipelineDataSourceManager(); - jdbcDumper = new PostgreSQLInventoryDumper(mockInventoryDumperConfiguration(), dataSourceManager, new SimpleMemoryPipelineChannel(100)); + PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager(); + InventoryDumperConfiguration dumperConfig = mockInventoryDumperConfiguration(); + dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); + jdbcDumper = new PostgreSQLInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100), + dataSource, new PipelineTableMetaDataLoader(dataSource)); + initTableData(dataSource); } private InventoryDumperConfiguration mockInventoryDumperConfiguration() { DumperConfiguration dumperConfig = mockDumperConfiguration(); - initTableData(dumperConfig); InventoryDumperConfiguration result = new InventoryDumperConfiguration(dumperConfig); result.setTableName("t_order"); return result; } @SneakyThrows(SQLException.class) - private void initTableData(final DumperConfiguration dumperConfig) { - DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); + private void initTableData(final DataSource dataSource) { try (Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement()) { statement.execute("DROP TABLE IF EXISTS t_order"); @@ -68,7 +72,6 @@ private void initTableData(final DumperConfiguration dumperConfig) { @Test public void assertCreatePreparedStatement() throws SQLException { - DataSource dataSource = dataSourceManager.getDataSource(mockDumperConfiguration().getDataSourceConfig()); try (Connection connection = dataSource.getConnection(); PreparedStatement preparedStatement = jdbcDumper.createPreparedStatement(connection, "SELECT * FROM t_order")) { assertThat(preparedStatement.getFetchSize(), is(1)); diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java index c54da70736443..dbaf5a609f1c4 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java @@ -18,14 +18,17 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest; import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel; import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.LogicalReplication; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -61,17 +64,26 @@ public final class PostgreSQLWalDumperTest { private WalPosition position; - private PostgreSQLWalDumper walDumper; + private DumperConfiguration dumperConfig; - private StandardPipelineDataSourceConfiguration pipelineDataSourceConfig; + private PostgreSQLWalDumper walDumper; private MultiplexMemoryPipelineChannel channel; + private final PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager(); + @Before public void setUp() { position = new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))); channel = new MultiplexMemoryPipelineChannel(); - walDumper = new PostgreSQLWalDumper(mockDumperConfiguration(), position, new PipelineDataSourceManager(), channel); + dumperConfig = mockDumperConfiguration(); + PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig())); + walDumper = new PostgreSQLWalDumper(dumperConfig, position, channel, metaDataLoader); + } + + @After + public void tearDown() { + dataSourceManager.close(); } private DumperConfiguration mockDumperConfiguration() { @@ -86,9 +98,9 @@ private DumperConfiguration mockDumperConfiguration() { } catch (final SQLException e) { throw new RuntimeException("Init table failed", e); } - pipelineDataSourceConfig = new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password); + PipelineDataSourceConfiguration dataSourceConfig = new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password); DumperConfiguration result = new DumperConfiguration(); - result.setDataSourceConfig(pipelineDataSourceConfig); + result.setDataSourceConfig(dataSourceConfig); Map tableNameMap = new HashMap<>(); tableNameMap.put("t_order_0", "t_order"); result.setTableNameMap(tableNameMap); @@ -97,9 +109,10 @@ private DumperConfiguration mockDumperConfiguration() { @Test public void assertStart() throws SQLException, NoSuchFieldException, IllegalAccessException { + StandardPipelineDataSourceConfiguration dataSourceConfig = (StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig(); try { ReflectionUtil.setFieldValue(walDumper, "logicalReplication", logicalReplication); - when(logicalReplication.createConnection(pipelineDataSourceConfig)).thenReturn(pgConnection); + when(logicalReplication.createConnection(dataSourceConfig)).thenReturn(pgConnection); when(pgConnection.unwrap(PgConnection.class)).thenReturn(pgConnection); when(logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.getUniqueSlotName(pgConnection), position.getLogSequenceNumber())) .thenReturn(pgReplicationStream); diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java index 1a18864aebe27..df3f1ab553ade 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java @@ -25,11 +25,13 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -48,13 +50,20 @@ public final class WalEventConverterTest { private WalEventConverter walEventConverter; + private final PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager(); + @Before public void setUp() { DumperConfiguration dumperConfig = mockDumperConfiguration(); - walEventConverter = new WalEventConverter(dumperConfig, new PipelineDataSourceManager()); + walEventConverter = new WalEventConverter(dumperConfig, new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()))); initTableData(dumperConfig); } + @After + public void tearDown() { + dataSourceManager.close(); + } + private DumperConfiguration mockDumperConfiguration() { DumperConfiguration result = new DumperConfiguration(); result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root")); diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java index 734961750a558..2ed943bdff23e 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java @@ -20,6 +20,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration; +import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition; import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress; import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI; @@ -28,6 +29,7 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer; import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.JobProgressYamlSwapper; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask; import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask; import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil; @@ -52,6 +54,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; public final class GovernanceRepositoryAPIImplTest { @@ -130,14 +133,17 @@ private InventoryTask mockInventoryTask(final TaskConfiguration taskConfig) { dumperConfig.setTableName("t_order"); dumperConfig.setPrimaryKey("order_id"); dumperConfig.setShardingItem(0); + PipelineDataSourceWrapper dataSource = mock(PipelineDataSourceWrapper.class); + PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource); return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelFactory(), - new PipelineDataSourceManager(), PipelineContextUtil.getExecuteEngine()); + new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine()); } private IncrementalTask mockIncrementalTask(final TaskConfiguration taskConfig) { DumperConfiguration dumperConfig = taskConfig.getDumperConfig(); dumperConfig.setPosition(new PlaceholderPosition()); + PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class)); return new IncrementalTask(3, dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelFactory(), - new PipelineDataSourceManager(), PipelineContextUtil.getExecuteEngine()); + new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine()); } } diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java index 32bec41041f9a..b97eebb219a2a 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java @@ -21,14 +21,14 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; public final class FixtureIncrementalDumper extends AbstractIncrementalDumper { public FixtureIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition position, - final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) { - super(dumperConfig, position, dataSourceManager, channel); + final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { + super(dumperConfig, position, channel, metaDataLoader); } @Override diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java index 7bc7a8c5ac0f6..3da47584451de 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java @@ -19,17 +19,19 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; +import javax.sql.DataSource; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; public final class FixtureInventoryDumper extends AbstractInventoryDumper { - public FixtureInventoryDumper(final InventoryDumperConfiguration dumperConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) { - super(dumperConfig, dataSourceManager, channel); + public FixtureInventoryDumper(final InventoryDumperConfiguration dumperConfig, final PipelineChannel channel, + final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) { + super(dumperConfig, channel, dataSource, metaDataLoader); } @Override diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java index bf1c14faa1b8b..52765c2adbd72 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java @@ -18,8 +18,10 @@ package org.apache.shardingsphere.data.pipeline.core.task; import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration; +import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil; import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil; import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext; @@ -31,6 +33,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; public final class IncrementalTaskTest { @@ -45,9 +48,10 @@ public static void beforeClass() { public void setUp() { TaskConfiguration taskConfig = new RuleAlteredJobContext(ResourceUtil.mockJobConfig()).getTaskConfig(); taskConfig.getDumperConfig().setPosition(new PlaceholderPosition()); + PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class)); incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelFactory(), - new PipelineDataSourceManager(), PipelineContextUtil.getExecuteEngine()); + new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine()); } @Test diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java index 57320bc3f8f49..b80d20150f7e1 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java @@ -26,9 +26,11 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPosition; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil; import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil; import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -42,12 +44,19 @@ public final class InventoryTaskTest { private static TaskConfiguration taskConfig; + private static final PipelineDataSourceManager DATA_SOURCE_MANAGER = new PipelineDataSourceManager(); + @BeforeClass public static void beforeClass() { PipelineContextUtil.mockModeConfig(); taskConfig = new RuleAlteredJobContext(ResourceUtil.mockJobConfig()).getTaskConfig(); } + @AfterClass + public static void afterClass() { + DATA_SOURCE_MANAGER.close(); + } + @Test(expected = IngestException.class) public void assertStartWithGetEstimatedRowsFailure() { InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(taskConfig.getDumperConfig()); @@ -57,9 +66,11 @@ public void assertStartWithGetEstimatedRowsFailure() { position = new PrimaryKeyPosition(0, 1000); } inventoryDumperConfig.setPosition(position); + PipelineDataSourceWrapper dataSource = DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig()); + PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource); try (InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelFactory(), - new PipelineDataSourceManager(), PipelineContextUtil.getExecuteEngine())) { + DATA_SOURCE_MANAGER, dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine())) { inventoryTask.start(); } } @@ -74,9 +85,11 @@ public void assertGetProgress() throws SQLException { position = new PrimaryKeyPosition(0, 1000); } inventoryDumperConfig.setPosition(position); + PipelineDataSourceWrapper dataSource = DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig()); + PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource); try (InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelFactory(), - new PipelineDataSourceManager(), PipelineContextUtil.getExecuteEngine())) { + new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine())) { inventoryTask.start(); assertFalse(inventoryTask.getProgress().getPosition() instanceof FinishedPosition); } diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java index b7ae201a35ea4..8a034270175b3 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java @@ -51,12 +51,12 @@ public final class InventoryTaskSplitterTest { @Before public void setUp() { initJobContext(); - dataSourceManager = new PipelineDataSourceManager(); inventoryTaskSplitter = new InventoryTaskSplitter(); } private void initJobContext() { jobContext = new RuleAlteredJobContext(ResourceUtil.mockJobConfig()); + dataSourceManager = jobContext.getDataSourceManager(); taskConfig = jobContext.getTaskConfig(); } @@ -64,7 +64,7 @@ private void initJobContext() { public void assertSplitInventoryDataWithEmptyTable() throws SQLException { taskConfig.getHandleConfig().setShardingSize(10); initEmptyTablePrimaryEnvironment(taskConfig.getDumperConfig()); - List actual = inventoryTaskSplitter.splitInventoryData(jobContext, dataSourceManager); + List actual = inventoryTaskSplitter.splitInventoryData(jobContext); assertNotNull(actual); assertThat(actual.size(), is(1)); assertThat(((PrimaryKeyPosition) actual.get(0).getProgress().getPosition()).getBeginValue(), is(0L)); @@ -75,7 +75,7 @@ public void assertSplitInventoryDataWithEmptyTable() throws SQLException { public void assertSplitInventoryDataWithIntPrimary() throws SQLException { taskConfig.getHandleConfig().setShardingSize(10); initIntPrimaryEnvironment(taskConfig.getDumperConfig()); - List actual = inventoryTaskSplitter.splitInventoryData(jobContext, dataSourceManager); + List actual = inventoryTaskSplitter.splitInventoryData(jobContext); assertNotNull(actual); assertThat(actual.size(), is(10)); assertThat(((PrimaryKeyPosition) actual.get(9).getProgress().getPosition()).getBeginValue(), is(91L)); @@ -85,7 +85,7 @@ public void assertSplitInventoryDataWithIntPrimary() throws SQLException { @Test public void assertSplitInventoryDataWithCharPrimary() throws SQLException { initCharPrimaryEnvironment(taskConfig.getDumperConfig()); - List actual = inventoryTaskSplitter.splitInventoryData(jobContext, dataSourceManager); + List actual = inventoryTaskSplitter.splitInventoryData(jobContext); assertNotNull(actual); assertThat(actual.size(), is(1)); } @@ -93,7 +93,7 @@ public void assertSplitInventoryDataWithCharPrimary() throws SQLException { @Test public void assertSplitInventoryDataWithUnionPrimary() throws SQLException { initUnionPrimaryEnvironment(taskConfig.getDumperConfig()); - List actual = inventoryTaskSplitter.splitInventoryData(jobContext, dataSourceManager); + List actual = inventoryTaskSplitter.splitInventoryData(jobContext); assertNotNull(actual); assertThat(actual.size(), is(1)); } @@ -101,7 +101,7 @@ public void assertSplitInventoryDataWithUnionPrimary() throws SQLException { @Test public void assertSplitInventoryDataWithoutPrimary() throws SQLException { initNoPrimaryEnvironment(taskConfig.getDumperConfig()); - List actual = inventoryTaskSplitter.splitInventoryData(jobContext, dataSourceManager); + List actual = inventoryTaskSplitter.splitInventoryData(jobContext); assertNotNull(actual); assertThat(actual.size(), is(1)); }