diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e0ac85a..8151a4ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## [16.3.0] - TBD +### Added +* Added method `newInstance(CopierContext)` to `com.hotels.bdp.circustrain.api.copier.CopierFactory`. This provides Copiers with more configuration information in a future proof manner. See [#195](https://github.com/HotelsDotCom/circus-train/issues/195). +### Deprecated +* Deprecated other `newInstance()` methods on `com.hotels.bdp.circustrain.api.copier.CopierFactory`. + ## [16.2.0] - 2020-07-01 ### Changed * Changed version of `hive.version` to `2.3.7` (was `2.3.2`). This allows Circus Train to be used on JDK>=9. diff --git a/circus-train-api/pom.xml b/circus-train-api/pom.xml index 06582359..643f976f 100644 --- a/circus-train-api/pom.xml +++ b/circus-train-api/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train-api diff --git a/circus-train-api/src/main/java/com/hotels/bdp/circustrain/api/copier/CompositeCopierFactory.java b/circus-train-api/src/main/java/com/hotels/bdp/circustrain/api/copier/CompositeCopierFactory.java index e8770fbf..d8488dcf 100644 --- a/circus-train-api/src/main/java/com/hotels/bdp/circustrain/api/copier/CompositeCopierFactory.java +++ b/circus-train-api/src/main/java/com/hotels/bdp/circustrain/api/copier/CompositeCopierFactory.java @@ -82,21 +82,20 @@ public boolean supportsSchemes(String sourceScheme, String replicaScheme) { } @Override - public Copier newInstance( - String eventId, - Path sourceBaseLocation, - List sourceSubLocations, - Path replicaLocation, - Map copierOptions) { + public Copier newInstance(CopierContext copierContext) { List copiers = new ArrayList<>(delegates.size()); int i = 0; for (CopierFactory delegate : delegates) { - CopierPathGeneratorParams copierPathGeneratorParams = CopierPathGeneratorParams.newParams(i++, eventId, - sourceBaseLocation, sourceSubLocations, replicaLocation, copierOptions); + CopierPathGeneratorParams copierPathGeneratorParams = CopierPathGeneratorParams + .newParams(i++, copierContext.getEventId(), copierContext.getSourceBaseLocation(), + copierContext.getSourceSubLocations(), copierContext.getReplicaLocation(), + copierContext.getCopierOptions()); Path newSourceBaseLocation = pathGenerator.generateSourceBaseLocation(copierPathGeneratorParams); Path newReplicaLocation = pathGenerator.generateReplicaLocation(copierPathGeneratorParams); - Copier copier = delegate.newInstance(eventId, newSourceBaseLocation, sourceSubLocations, newReplicaLocation, - copierOptions); + + CopierContext delegateContext = new CopierContext(copierContext.getEventId(), newSourceBaseLocation, + copierContext.getSourceSubLocations(), newReplicaLocation, copierContext.getCopierOptions()); + Copier copier = delegate.newInstance(delegateContext); copiers.add(copier); } return new CompositeCopier(copiers, metricsMerger); @@ -108,17 +107,20 @@ public Copier newInstance( Path sourceBaseLocation, Path replicaLocation, Map copierOptions) { - List copiers = new ArrayList<>(delegates.size()); - int i = 0; - for (CopierFactory delegatee : delegates) { - CopierPathGeneratorParams copierPathGeneratorParams = CopierPathGeneratorParams.newParams(i++, eventId, - sourceBaseLocation, null, replicaLocation, copierOptions); - Path newReplicaLocation = pathGenerator.generateReplicaLocation(copierPathGeneratorParams); - Path newSourceBaseLocation = pathGenerator.generateSourceBaseLocation(copierPathGeneratorParams); - Copier copier = delegatee.newInstance(eventId, newSourceBaseLocation, newReplicaLocation, copierOptions); - copiers.add(copier); - } - return new CompositeCopier(copiers, metricsMerger); + CopierContext copierContext = new CopierContext(eventId, sourceBaseLocation, replicaLocation, copierOptions); + return newInstance(copierContext); + } + + @Override + public Copier newInstance( + String eventId, + Path sourceBaseLocation, + List sourceSubLocations, + Path replicaLocation, + Map copierOptions) { + CopierContext copierContext = new CopierContext(eventId, sourceBaseLocation, sourceSubLocations, replicaLocation, + copierOptions); + return newInstance(copierContext); } } diff --git a/circus-train-api/src/main/java/com/hotels/bdp/circustrain/api/copier/CopierContext.java b/circus-train-api/src/main/java/com/hotels/bdp/circustrain/api/copier/CopierContext.java new file mode 100644 index 00000000..d961af96 --- /dev/null +++ b/circus-train-api/src/main/java/com/hotels/bdp/circustrain/api/copier/CopierContext.java @@ -0,0 +1,105 @@ +/** + * Copyright (C) 2016-2020 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.bdp.circustrain.api.copier; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.fs.Path; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import com.hotels.bdp.circustrain.api.conf.TableReplication; + +public final class CopierContext { + + private String eventId; + private Path sourceBaseLocation; + private List sourceSubLocations = ImmutableList.copyOf(Collections.emptyList()); + private Path replicaLocation; + private Map copierOptions; + private TableReplication tableReplication; + + public CopierContext( + TableReplication tableReplication, + String eventId, + Path sourceBaseLocation, + List sourceSubLocations, + Path replicaLocation, + Map copierOptions) { + this.tableReplication = tableReplication; + this.eventId = eventId; + this.sourceBaseLocation = sourceBaseLocation; + if (sourceSubLocations != null) { + this.sourceSubLocations = ImmutableList.copyOf(sourceSubLocations); + } + this.replicaLocation = replicaLocation; + this.copierOptions = ImmutableMap.copyOf(copierOptions); + } + + public CopierContext( + TableReplication tableReplication, + String eventId, + Path sourceLocation, + Path replicaLocation, + Map copierOptions) { + this(tableReplication, eventId, sourceLocation, null, replicaLocation, copierOptions); + } + + public CopierContext( + String eventId, + Path sourceBaseLocation, + List sourceSubLocations, + Path replicaLocation, + Map copierOptions) { + this(null, eventId, sourceBaseLocation, sourceSubLocations, replicaLocation, copierOptions); + } + + public CopierContext( + String eventId, + Path sourceBaseLocation, + Path replicaLocation, + Map copierOptions) { + this(null, eventId, sourceBaseLocation, null, replicaLocation, copierOptions); + } + + public String getEventId() { + return eventId; + } + + public Path getSourceBaseLocation() { + return sourceBaseLocation; + } + + public List getSourceSubLocations() { + return sourceSubLocations; + } + + public Path getReplicaLocation() { + return replicaLocation; + } + + public Map getCopierOptions() { + return copierOptions; + } + + public TableReplication getTableReplication() { + return tableReplication; + } + +} diff --git a/circus-train-api/src/main/java/com/hotels/bdp/circustrain/api/copier/CopierFactory.java b/circus-train-api/src/main/java/com/hotels/bdp/circustrain/api/copier/CopierFactory.java index 1da54c80..b612cc3b 100644 --- a/circus-train-api/src/main/java/com/hotels/bdp/circustrain/api/copier/CopierFactory.java +++ b/circus-train-api/src/main/java/com/hotels/bdp/circustrain/api/copier/CopierFactory.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2019 Expedia, Inc. + * Copyright (C) 2016-2020 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,20 @@ public interface CopierFactory { boolean supportsSchemes(String sourceScheme, String replicaScheme); /** + * Creates a new Copier. + * + * @param copierContext Context object containing configuration values for the Copier. + * @return + */ + default Copier newInstance(CopierContext copierContext) { + //TODO: this is only here for backwards compatibility with CopierFactorys using older versions of Circus Train, when the below + //deprecated methods are removed so should this default implementation + return newInstance(copierContext.getEventId(), copierContext.getSourceBaseLocation(), copierContext.getSourceSubLocations(), copierContext.getReplicaLocation(), + copierContext.getCopierOptions()); + } + + /** + * @deprecated As of release 16.3.0, replaced by {@link #newInstance(CopierContext)}. * @param eventId * @param sourceBaseLocation * @param sourceSubLocations @@ -32,6 +46,7 @@ public interface CopierFactory { * @param copierOptions, contains both global and per table override configured options * @return */ + @Deprecated Copier newInstance( String eventId, Path sourceBaseLocation, @@ -40,12 +55,15 @@ Copier newInstance( Map copierOptions); /** + * @deprecated As of release 16.3.0, replaced by {@link #newInstance(CopierContext)}. + * * @param eventId * @param sourceBaseLocation * @param replicaLocation * @param copierOptions, contains both global and per table override configured options * @return */ + @Deprecated Copier newInstance(String eventId, Path sourceBaseLocation, Path replicaLocation, Map copierOptions); } diff --git a/circus-train-api/src/main/java/com/hotels/bdp/circustrain/api/copier/CopierPathGeneratorParams.java b/circus-train-api/src/main/java/com/hotels/bdp/circustrain/api/copier/CopierPathGeneratorParams.java index e6dd85e0..a1597039 100644 --- a/circus-train-api/src/main/java/com/hotels/bdp/circustrain/api/copier/CopierPathGeneratorParams.java +++ b/circus-train-api/src/main/java/com/hotels/bdp/circustrain/api/copier/CopierPathGeneratorParams.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2019 Expedia, Inc. + * Copyright (C) 2016-2020 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/circus-train-api/src/test/java/com/hotels/bdp/circustrain/api/copier/CompositeCopierFactoryTest.java b/circus-train-api/src/test/java/com/hotels/bdp/circustrain/api/copier/CompositeCopierFactoryTest.java index 2d5fd709..9a0f8c05 100644 --- a/circus-train-api/src/test/java/com/hotels/bdp/circustrain/api/copier/CompositeCopierFactoryTest.java +++ b/circus-train-api/src/test/java/com/hotels/bdp/circustrain/api/copier/CompositeCopierFactoryTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2019 Expedia, Inc. + * Copyright (C) 2016-2020 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,9 @@ import static java.util.Arrays.asList; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.verify; @@ -31,7 +32,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.Matchers; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; @@ -66,14 +67,8 @@ public Path generateReplicaLocation(CopierPathGeneratorParams copierParams) { @Before public void init() { - doReturn(firstCopier).when(firstCopierFactory).newInstance(anyString(), any(Path.class), - Matchers.> any(), any(Path.class), Matchers.> any()); - doReturn(firstCopier).when(firstCopierFactory).newInstance(anyString(), any(Path.class), any(Path.class), - Matchers.> any()); - doReturn(secondCopier).when(secondCopierFactory).newInstance(anyString(), any(Path.class), - Matchers.> any(), any(Path.class), Matchers.> any()); - doReturn(secondCopier).when(secondCopierFactory).newInstance(anyString(), any(Path.class), any(Path.class), - Matchers.> any()); + doReturn(firstCopier).when(firstCopierFactory).newInstance(any(CopierContext.class)); + doReturn(secondCopier).when(secondCopierFactory).newInstance(any(CopierContext.class)); } @Test @@ -88,12 +83,22 @@ public void copyFromPreviousCopierReplicaLocation() { asList(firstReplicaLocation, secondReplicaLocation)), MetricsMerger.DEFAULT); - copierFactory.newInstance("eventId", sourceBaseLocation, sourceSubLocations, new Path("replicaLocation"), - overridingCopierOptions); - verify(firstCopierFactory).newInstance("eventId", sourceBaseLocation, sourceSubLocations, firstReplicaLocation, - overridingCopierOptions); - verify(secondCopierFactory).newInstance("eventId", firstReplicaLocation, sourceSubLocations, secondReplicaLocation, - overridingCopierOptions); + CopierContext copierContext = new CopierContext("eventId", sourceBaseLocation, sourceSubLocations, + new Path("replicaLocation"), overridingCopierOptions); + copierFactory.newInstance(copierContext); + + ArgumentCaptor argument = ArgumentCaptor.forClass(CopierContext.class); + verify(firstCopierFactory).newInstance(argument.capture()); + CopierContext captured = argument.getValue(); + assertThat(captured.getSourceBaseLocation(), is(sourceBaseLocation)); + assertThat(captured.getSourceSubLocations(), is(sourceSubLocations)); + assertThat(captured.getReplicaLocation(), is(firstReplicaLocation)); + + verify(secondCopierFactory).newInstance(argument.capture()); + captured = argument.getValue(); + assertThat(captured.getSourceBaseLocation(), is(firstReplicaLocation)); + assertThat(captured.getSourceSubLocations(), is(sourceSubLocations)); + assertThat(captured.getReplicaLocation(), is(secondReplicaLocation)); } @Test @@ -107,11 +112,19 @@ public void copyFromPreviousCopierReplicaLocationNoSubLocations() { asList(firstReplicaLocation, secondReplicaLocation)), MetricsMerger.DEFAULT); - copierFactory.newInstance("eventId", sourceBaseLocation, new Path("replicaLocation"), overridingCopierOptions); - verify(firstCopierFactory).newInstance("eventId", sourceBaseLocation, firstReplicaLocation, - overridingCopierOptions); - verify(secondCopierFactory).newInstance("eventId", firstReplicaLocation, secondReplicaLocation, - overridingCopierOptions); + CopierContext copierContext = new CopierContext("eventId", sourceBaseLocation, new Path("replicaLocation"), overridingCopierOptions); + copierFactory.newInstance(copierContext); + + ArgumentCaptor argument = ArgumentCaptor.forClass(CopierContext.class); + verify(firstCopierFactory).newInstance(argument.capture()); + CopierContext captured = argument.getValue(); + assertThat(captured.getSourceBaseLocation(), is(sourceBaseLocation)); + assertThat(captured.getReplicaLocation(), is(firstReplicaLocation)); + + verify(secondCopierFactory).newInstance(argument.capture()); + captured = argument.getValue(); + assertThat(captured.getSourceBaseLocation(), is(firstReplicaLocation)); + assertThat(captured.getReplicaLocation(), is(secondReplicaLocation)); } @Test @@ -125,13 +138,23 @@ public void copyFromOriginalSourceToMultipleReplicaLocations() { new DummyCopierPathGenerator(asList(sourceBaseLocation, sourceBaseLocation), asList(firstReplicaLocation, secondReplicaLocation)), MetricsMerger.DEFAULT); - - copierFactory.newInstance("eventId", sourceBaseLocation, sourceSubLocations, new Path("replicaLocation"), - overridingCopierOptions); - verify(firstCopierFactory).newInstance("eventId", sourceBaseLocation, sourceSubLocations, firstReplicaLocation, - overridingCopierOptions); - verify(secondCopierFactory).newInstance("eventId", sourceBaseLocation, sourceSubLocations, secondReplicaLocation, - overridingCopierOptions); + + CopierContext copierContext = new CopierContext("eventId", sourceBaseLocation, sourceSubLocations, + new Path("replicaLocation"), overridingCopierOptions); + copierFactory.newInstance(copierContext); + + ArgumentCaptor argument = ArgumentCaptor.forClass(CopierContext.class); + verify(firstCopierFactory).newInstance(argument.capture()); + CopierContext captured = argument.getValue(); + assertThat(captured.getSourceBaseLocation(), is(sourceBaseLocation)); + assertThat(captured.getSourceSubLocations(), is(sourceSubLocations)); + assertThat(captured.getReplicaLocation(), is(firstReplicaLocation)); + + verify(secondCopierFactory).newInstance(argument.capture()); + captured = argument.getValue(); + assertThat(captured.getSourceBaseLocation(), is(sourceBaseLocation)); + assertThat(captured.getSourceSubLocations(), is(sourceSubLocations)); + assertThat(captured.getReplicaLocation(), is(secondReplicaLocation)); } @Test @@ -144,12 +167,20 @@ public void copyFromOriginalSourceToMultipleReplicaLocationsNoSubLocations() { new DummyCopierPathGenerator(asList(sourceBaseLocation, sourceBaseLocation), asList(firstReplicaLocation, secondReplicaLocation)), MetricsMerger.DEFAULT); - - copierFactory.newInstance("eventId", sourceBaseLocation, new Path("replicaLocation"), overridingCopierOptions); - verify(firstCopierFactory).newInstance("eventId", sourceBaseLocation, firstReplicaLocation, - overridingCopierOptions); - verify(secondCopierFactory).newInstance("eventId", sourceBaseLocation, secondReplicaLocation, + CopierContext copierContext = new CopierContext("eventId", sourceBaseLocation, new Path("replicaLocation"), overridingCopierOptions); + copierFactory.newInstance(copierContext); + + ArgumentCaptor argument = ArgumentCaptor.forClass(CopierContext.class); + verify(firstCopierFactory).newInstance(argument.capture()); + CopierContext captured = argument.getValue(); + assertThat(captured.getSourceBaseLocation(), is(sourceBaseLocation)); + assertThat(captured.getReplicaLocation(), is(firstReplicaLocation)); + + verify(secondCopierFactory).newInstance(argument.capture()); + captured = argument.getValue(); + assertThat(captured.getSourceBaseLocation(), is(sourceBaseLocation)); + assertThat(captured.getReplicaLocation(), is(secondReplicaLocation)); } } diff --git a/circus-train-avro/pom.xml b/circus-train-avro/pom.xml index 5d597fe0..f7a78bf6 100644 --- a/circus-train-avro/pom.xml +++ b/circus-train-avro/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train-avro diff --git a/circus-train-avro/src/main/java/com/hotels/bdp/circustrain/avro/util/SchemaCopier.java b/circus-train-avro/src/main/java/com/hotels/bdp/circustrain/avro/util/SchemaCopier.java index 8718b269..8612870f 100644 --- a/circus-train-avro/src/main/java/com/hotels/bdp/circustrain/avro/util/SchemaCopier.java +++ b/circus-train-avro/src/main/java/com/hotels/bdp/circustrain/avro/util/SchemaCopier.java @@ -31,6 +31,7 @@ import com.hotels.bdp.circustrain.api.Modules; import com.hotels.bdp.circustrain.api.conf.TableReplication; import com.hotels.bdp.circustrain.api.copier.Copier; +import com.hotels.bdp.circustrain.api.copier.CopierContext; import com.hotels.bdp.circustrain.api.copier.CopierFactory; import com.hotels.bdp.circustrain.api.copier.CopierFactoryManager; import com.hotels.bdp.circustrain.api.copier.CopierOptions; @@ -74,7 +75,8 @@ public Path copy(String source, String destination, EventTableReplication eventT CopierFactory copierFactory = copierFactoryManager .getCopierFactory(sourceLocation, destinationSchemaFile, mergedCopierOptions); LOG.info("Replicating Avro schema from '{}' to '{}'", sourceLocation, destinationSchemaFile); - Copier copier = copierFactory.newInstance(eventId, sourceLocation, destinationSchemaFile, mergedCopierOptions); + CopierContext copierContext = new CopierContext(eventId, sourceLocation, destinationSchemaFile, mergedCopierOptions); + Copier copier = copierFactory.newInstance(copierContext); Metrics metrics = copier.copy(); LOG diff --git a/circus-train-avro/src/test/java/com/hotels/bdp/circustrain/avro/util/SchemaCopierTest.java b/circus-train-avro/src/test/java/com/hotels/bdp/circustrain/avro/util/SchemaCopierTest.java index c8bb693c..c675c012 100644 --- a/circus-train-avro/src/test/java/com/hotels/bdp/circustrain/avro/util/SchemaCopierTest.java +++ b/circus-train-avro/src/test/java/com/hotels/bdp/circustrain/avro/util/SchemaCopierTest.java @@ -17,6 +17,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.when; @@ -36,6 +37,7 @@ import org.mockito.runners.MockitoJUnitRunner; import com.hotels.bdp.circustrain.api.copier.Copier; +import com.hotels.bdp.circustrain.api.copier.CopierContext; import com.hotels.bdp.circustrain.api.copier.CopierFactory; import com.hotels.bdp.circustrain.api.copier.CopierFactoryManager; import com.hotels.bdp.circustrain.api.copier.CopierOptions; @@ -70,7 +72,7 @@ public void copiedToCorrectDestination() throws IOException { copierOptionsMap.put(CopierOptions.COPY_DESTINATION_IS_FILE, "true"); when(copierFactoryManager.getCopierFactory(eq(source), eq(targetFile), eq(copierOptionsMap))) .thenReturn(copierFactory); - when(copierFactory.newInstance(eq(eventId), eq(source), eq(targetFile), eq(copierOptionsMap))).thenReturn(copier); + when(copierFactory.newInstance(any(CopierContext.class))).thenReturn(copier); when(copier.copy()).thenReturn(metrics); when(metrics.getBytesReplicated()).thenReturn(123L); Path result = schemaCopier.copy(source.toString(), destination.toString(), eventTableReplication, eventId); diff --git a/circus-train-aws-sns/pom.xml b/circus-train-aws-sns/pom.xml index bf2e2650..9628622c 100644 --- a/circus-train-aws-sns/pom.xml +++ b/circus-train-aws-sns/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT diff --git a/circus-train-aws/pom.xml b/circus-train-aws/pom.xml index ee8dfd42..1391d4c3 100644 --- a/circus-train-aws/pom.xml +++ b/circus-train-aws/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train-aws diff --git a/circus-train-common-test/pom.xml b/circus-train-common-test/pom.xml index 85441b3f..3d5b2965 100644 --- a/circus-train-common-test/pom.xml +++ b/circus-train-common-test/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train-common-test diff --git a/circus-train-comparator/pom.xml b/circus-train-comparator/pom.xml index 5bcb9415..0b6771e7 100644 --- a/circus-train-comparator/pom.xml +++ b/circus-train-comparator/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train-comparator diff --git a/circus-train-core/pom.xml b/circus-train-core/pom.xml index 1194897c..b55f75cd 100644 --- a/circus-train-core/pom.xml +++ b/circus-train-core/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train-core diff --git a/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/PartitionedTableReplication.java b/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/PartitionedTableReplication.java index 78d7d670..d2a3b1d9 100644 --- a/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/PartitionedTableReplication.java +++ b/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/PartitionedTableReplication.java @@ -28,7 +28,9 @@ import com.hotels.bdp.circustrain.api.ReplicaLocationManager; import com.hotels.bdp.circustrain.api.Replication; import com.hotels.bdp.circustrain.api.SourceLocationManager; +import com.hotels.bdp.circustrain.api.conf.TableReplication; import com.hotels.bdp.circustrain.api.copier.Copier; +import com.hotels.bdp.circustrain.api.copier.CopierContext; import com.hotels.bdp.circustrain.api.copier.CopierFactory; import com.hotels.bdp.circustrain.api.copier.CopierFactoryManager; import com.hotels.bdp.circustrain.api.data.DataManipulator; @@ -52,37 +54,30 @@ class PartitionedTableReplication implements Replication { private final String eventId; private final CopierFactoryManager copierFactoryManager; private final PartitionPredicate partitionPredicate; - private final String targetTableLocation; - private final String replicaDatabaseName; - private final String replicaTableName; private Metrics metrics = Metrics.NULL_VALUE; private final Map copierOptions; private final CopierListener copierListener; private final DataManipulatorFactoryManager dataManipulatorFactoryManager; + private TableReplication tableReplication; + PartitionedTableReplication( - String database, - String table, + TableReplication tableReplication, PartitionPredicate partitionPredicate, Source source, Replica replica, CopierFactoryManager copierFactoryManager, EventIdFactory eventIdFactory, - String targetTableLocation, - String replicaDatabaseName, - String replicaTableName, Map copierOptions, CopierListener copierListener, DataManipulatorFactoryManager dataManipulatorFactoryManager) { - this.database = database; - this.table = table; + this.tableReplication = tableReplication; + this.database = tableReplication.getSourceTable().getDatabaseName(); + this.table = tableReplication.getSourceTable().getTableName(); this.partitionPredicate = partitionPredicate; this.source = source; this.replica = replica; this.copierFactoryManager = copierFactoryManager; - this.targetTableLocation = targetTableLocation; - this.replicaDatabaseName = replicaDatabaseName; - this.replicaTableName = replicaTableName; this.copierOptions = copierOptions; this.copierListener = copierListener; this.dataManipulatorFactoryManager = dataManipulatorFactoryManager; @@ -92,6 +87,9 @@ class PartitionedTableReplication implements Replication { @Override public void replicate() throws CircusTrainException { try { + String replicaDatabaseName = tableReplication.getReplicaDatabaseName(); + String replicaTableName = tableReplication.getReplicaTableName(); + TableAndStatistics sourceTableAndStatistics = source.getTableAndStatistics(database, table); Table sourceTable = sourceTableAndStatistics.getTable(); @@ -109,7 +107,8 @@ public void replicate() throws CircusTrainException { List sourceSubLocations = sourceLocationManager.getPartitionLocations(); ReplicaLocationManager replicaLocationManager = replica - .getLocationManager(TableType.PARTITIONED, targetTableLocation, eventId, sourceLocationManager); + .getLocationManager(TableType.PARTITIONED, tableReplication.getReplicaTable().getTableLocation(), eventId, + sourceLocationManager); Path replicaPartitionBaseLocation = replicaLocationManager.getPartitionBaseLocation(); DataManipulatorFactory dataManipulatorFactory = dataManipulatorFactoryManager @@ -128,8 +127,9 @@ public void replicate() throws CircusTrainException { } else { CopierFactory copierFactory = copierFactoryManager .getCopierFactory(sourceBaseLocation, replicaPartitionBaseLocation, copierOptions); - Copier copier = copierFactory - .newInstance(eventId, sourceBaseLocation, sourceSubLocations, replicaPartitionBaseLocation, copierOptions); + CopierContext copierContext = new CopierContext(tableReplication, eventId, sourceBaseLocation, sourceSubLocations, + replicaPartitionBaseLocation, copierOptions); + Copier copier = copierFactory.newInstance(copierContext); copierListener.copierStart(copier.getClass().getName()); try { metrics = copier.copy(); diff --git a/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/ReplicationFactoryImpl.java b/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/ReplicationFactoryImpl.java index 82bddd21..aaaf2acd 100644 --- a/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/ReplicationFactoryImpl.java +++ b/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/ReplicationFactoryImpl.java @@ -75,10 +75,6 @@ public Replication newInstance(TableReplication tableReplication) { String sourceDatabaseName = sourceTable.getDatabaseName(); String sourceTableName = sourceTable.getTableName(); - String replicaDatabaseName = tableReplication.getReplicaDatabaseName(); - String replicaTableName = tableReplication.getReplicaTableName(); - String replicaTableLocation = tableReplication.getReplicaTable().getTableLocation(); - Source source = sourceFactory.newInstance(tableReplication); validate(tableReplication, source, replica); TableAndStatistics tableAndStatistics = source.getTableAndStatistics(sourceDatabaseName, sourceTableName); @@ -86,11 +82,9 @@ public Replication newInstance(TableReplication tableReplication) { Replication replication = null; if (partitionKeys == null || partitionKeys.isEmpty()) { - replication = createUnpartitionedTableReplication(tableReplication, source, replica, sourceDatabaseName, - sourceTableName, replicaDatabaseName, replicaTableName, replicaTableLocation); + replication = createUnpartitionedTableReplication(tableReplication, source, replica); } else { - replication = createPartitionedTableReplication(tableReplication, source, replica, sourceDatabaseName, - sourceTableName, replicaDatabaseName, replicaTableName, replicaTableLocation); + replication = createPartitionedTableReplication(tableReplication, source, replica); } return replication; } @@ -98,31 +92,27 @@ public Replication newInstance(TableReplication tableReplication) { private Replication createPartitionedTableReplication( TableReplication tableReplication, Source source, - Replica replica, - String sourceDatabaseName, - String sourceTableName, - String replicaDatabaseName, - String replicaTableName, - String replicaTableLocation) { + Replica replica) { Replication replication = null; PartitionPredicate partitionPredicate = partitionPredicateFactory.newInstance(tableReplication); switch (tableReplication.getReplicationMode()) { case METADATA_MIRROR: - replication = new PartitionedTableMetadataMirrorReplication(sourceDatabaseName, sourceTableName, - partitionPredicate, source, replica, eventIdFactory, replicaDatabaseName, replicaTableName); + replication = new PartitionedTableMetadataMirrorReplication(tableReplication.getSourceTable().getDatabaseName(), + tableReplication.getSourceTable().getTableName(), partitionPredicate, source, replica, eventIdFactory, + tableReplication.getReplicaDatabaseName(), tableReplication.getReplicaTableName()); break; case FULL_OVERWRITE: case FULL: Map mergedCopierOptions = tableReplication .getMergedCopierOptions(copierOptions.getCopierOptions()); - replication = new PartitionedTableReplication(sourceDatabaseName, sourceTableName, partitionPredicate, source, - replica, copierFactoryManager, eventIdFactory, replicaTableLocation, replicaDatabaseName, replicaTableName, - mergedCopierOptions, copierListener, dataManipulatorFactoryManager); + replication = new PartitionedTableReplication(tableReplication, partitionPredicate, source, replica, + copierFactoryManager, eventIdFactory, mergedCopierOptions, copierListener, dataManipulatorFactoryManager); break; case METADATA_UPDATE: - replication = new PartitionedTableMetadataUpdateReplication(sourceDatabaseName, sourceTableName, - partitionPredicate, source, replica, eventIdFactory, replicaTableLocation, replicaDatabaseName, - replicaTableName); + replication = new PartitionedTableMetadataUpdateReplication(tableReplication.getSourceTable().getDatabaseName(), + tableReplication.getSourceTable().getTableName(), partitionPredicate, source, replica, eventIdFactory, + tableReplication.getReplicaTable().getTableLocation(), tableReplication.getReplicaDatabaseName(), + tableReplication.getReplicaTableName()); break; default: throw new CircusTrainException( @@ -134,29 +124,25 @@ private Replication createPartitionedTableReplication( private Replication createUnpartitionedTableReplication( TableReplication tableReplication, Source source, - Replica replica, - String sourceDatabaseName, - String sourceTableName, - String replicaDatabaseName, - String replicaTableName, - String replicaTableLocation) { + Replica replica) { Replication replication = null; switch (tableReplication.getReplicationMode()) { case METADATA_MIRROR: - replication = new UnpartitionedTableMetadataMirrorReplication(sourceDatabaseName, sourceTableName, source, - replica, eventIdFactory, replicaDatabaseName, replicaTableName); + replication = new UnpartitionedTableMetadataMirrorReplication(tableReplication.getSourceTable().getDatabaseName(), + tableReplication.getSourceTable().getTableName(), source, replica, eventIdFactory, + tableReplication.getReplicaDatabaseName(), tableReplication.getReplicaTableName()); break; case FULL_OVERWRITE: case FULL: Map mergedCopierOptions = tableReplication .getMergedCopierOptions(copierOptions.getCopierOptions()); - replication = new UnpartitionedTableReplication(sourceDatabaseName, sourceTableName, source, replica, - copierFactoryManager, eventIdFactory, replicaTableLocation, replicaDatabaseName, replicaTableName, - mergedCopierOptions, copierListener, dataManipulatorFactoryManager); + replication = new UnpartitionedTableReplication(tableReplication, source, replica, copierFactoryManager, + eventIdFactory, mergedCopierOptions, copierListener, dataManipulatorFactoryManager); break; case METADATA_UPDATE: - replication = new UnpartitionedTableMetadataUpdateReplication(sourceDatabaseName, sourceTableName, source, - replica, eventIdFactory, replicaDatabaseName, replicaTableName); + replication = new UnpartitionedTableMetadataUpdateReplication(tableReplication.getSourceTable().getDatabaseName(), + tableReplication.getSourceTable().getTableName(), source, replica, eventIdFactory, + tableReplication.getReplicaDatabaseName(), tableReplication.getReplicaTableName()); break; default: throw new CircusTrainException( diff --git a/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/UnpartitionedTableReplication.java b/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/UnpartitionedTableReplication.java index 19cd4e75..644e8787 100644 --- a/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/UnpartitionedTableReplication.java +++ b/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/UnpartitionedTableReplication.java @@ -26,7 +26,9 @@ import com.hotels.bdp.circustrain.api.ReplicaLocationManager; import com.hotels.bdp.circustrain.api.Replication; import com.hotels.bdp.circustrain.api.SourceLocationManager; +import com.hotels.bdp.circustrain.api.conf.TableReplication; import com.hotels.bdp.circustrain.api.copier.Copier; +import com.hotels.bdp.circustrain.api.copier.CopierContext; import com.hotels.bdp.circustrain.api.copier.CopierFactory; import com.hotels.bdp.circustrain.api.copier.CopierFactoryManager; import com.hotels.bdp.circustrain.api.data.DataManipulator; @@ -49,35 +51,28 @@ class UnpartitionedTableReplication implements Replication { private final Replica replica; private final String eventId; private final CopierFactoryManager copierFactoryManager; - private final String targetTableLocation; - private final String replicaDatabaseName; - private final String replicaTableName; private Metrics metrics = Metrics.NULL_VALUE; private final Map copierOptions; private final CopierListener copierListener; private final DataManipulatorFactoryManager dataManipulatorFactoryManager; + private TableReplication tableReplication; + UnpartitionedTableReplication( - String database, - String table, + TableReplication tableReplication, Source source, Replica replica, CopierFactoryManager copierFactoryManager, EventIdFactory eventIdFactory, - String targetTableLocation, - String replicaDatabaseName, - String replicaTableName, Map copierOptions, CopierListener copierListener, DataManipulatorFactoryManager dataManipulatorFactoryManager) { - this.database = database; - this.table = table; + this.tableReplication = tableReplication; + this.database = tableReplication.getSourceTable().getDatabaseName(); + this.table = tableReplication.getSourceTable().getTableName(); this.source = source; this.replica = replica; this.copierFactoryManager = copierFactoryManager; - this.targetTableLocation = targetTableLocation; - this.replicaDatabaseName = replicaDatabaseName; - this.replicaTableName = replicaTableName; this.copierOptions = copierOptions; this.copierListener = copierListener; this.dataManipulatorFactoryManager = dataManipulatorFactoryManager; @@ -87,6 +82,8 @@ class UnpartitionedTableReplication implements Replication { @Override public void replicate() throws CircusTrainException { try { + String replicaDatabaseName = tableReplication.getReplicaDatabaseName(); + String replicaTableName = tableReplication.getReplicaTableName(); replica.validateReplicaTable(replicaDatabaseName, replicaTableName); TableAndStatistics sourceTableAndStatistics = source.getTableAndStatistics(database, table); Table sourceTable = sourceTableAndStatistics.getTable(); @@ -94,12 +91,14 @@ public void replicate() throws CircusTrainException { Path sourceLocation = sourceLocationManager.getTableLocation(); ReplicaLocationManager replicaLocationManager = replica - .getLocationManager(TableType.UNPARTITIONED, targetTableLocation, eventId, sourceLocationManager); + .getLocationManager(TableType.UNPARTITIONED, tableReplication.getReplicaTable().getTableLocation(), eventId, sourceLocationManager); Path replicaLocation = replicaLocationManager.getTableLocation(); CopierFactory copierFactory = copierFactoryManager .getCopierFactory(sourceLocation, replicaLocation, copierOptions); - Copier copier = copierFactory.newInstance(eventId, sourceLocation, replicaLocation, copierOptions); + + CopierContext copierContext = new CopierContext(tableReplication, eventId, sourceLocation, replicaLocation, copierOptions); + Copier copier = copierFactory.newInstance(copierContext); copierListener.copierStart(copier.getClass().getName()); try { metrics = copier.copy(); diff --git a/circus-train-core/src/test/java/com/hotels/bdp/circustrain/core/PartitionedTableReplicationTest.java b/circus-train-core/src/test/java/com/hotels/bdp/circustrain/core/PartitionedTableReplicationTest.java index 9976d9ce..405dd247 100644 --- a/circus-train-core/src/test/java/com/hotels/bdp/circustrain/core/PartitionedTableReplicationTest.java +++ b/circus-train-core/src/test/java/com/hotels/bdp/circustrain/core/PartitionedTableReplicationTest.java @@ -42,7 +42,9 @@ import com.hotels.bdp.circustrain.api.CircusTrainException; import com.hotels.bdp.circustrain.api.ReplicaLocationManager; import com.hotels.bdp.circustrain.api.SourceLocationManager; +import com.hotels.bdp.circustrain.api.conf.TableReplication; import com.hotels.bdp.circustrain.api.copier.Copier; +import com.hotels.bdp.circustrain.api.copier.CopierContext; import com.hotels.bdp.circustrain.api.copier.CopierFactory; import com.hotels.bdp.circustrain.api.copier.CopierFactoryManager; import com.hotels.bdp.circustrain.api.data.DataManipulator; @@ -103,9 +105,7 @@ public void injectMocks() throws Exception { when(replicaLocationManager.getPartitionBaseLocation()).thenReturn(replicaTableLocation); when(copierFactoryManager.getCopierFactory(sourceTableLocation, replicaTableLocation, copierOptions)) .thenReturn(copierFactory); - when(copierFactory - .newInstance(EVENT_ID, sourceTableLocation, sourcePartitionLocations, replicaTableLocation, copierOptions)) - .thenReturn(copier); + when(copierFactory.newInstance(any(CopierContext.class))).thenReturn(copier); when(partitionsAndStatistics.getPartitions()).thenReturn(sourcePartitions); when(copierOptions.get("task-count")).thenReturn(Integer.valueOf(2)); when(partitionPredicate.getPartitionPredicate()).thenReturn(PARTITION_PREDICATE); @@ -125,9 +125,9 @@ public void noMatchingPartitions() throws Exception { when(source.getLocationManager(sourceTable, Collections.emptyList(), EVENT_ID, copierOptions)) .thenReturn(sourceLocationManager); - PartitionedTableReplication replication = new PartitionedTableReplication(DATABASE, TABLE, partitionPredicate, - source, replica, copierFactoryManager, eventIdFactory, targetTableLocation, DATABASE, TABLE, copierOptions, - listener, dataManipulatorFactoryManager); + TableReplication tableReplication = createTypicalTableReplication(); + PartitionedTableReplication replication = new PartitionedTableReplication(tableReplication, partitionPredicate, + source, replica, copierFactoryManager, eventIdFactory, copierOptions, listener, dataManipulatorFactoryManager); replication.replicate(); verifyZeroInteractions(copier); @@ -144,9 +144,9 @@ public void typical() throws Exception { .thenReturn(replicaLocationManager); when(source.getPartitions(sourceTable, PARTITION_PREDICATE, MAX_PARTITIONS)).thenReturn(partitionsAndStatistics); - PartitionedTableReplication replication = new PartitionedTableReplication(DATABASE, TABLE, partitionPredicate, - source, replica, copierFactoryManager, eventIdFactory, targetTableLocation, DATABASE, TABLE, copierOptions, - listener, dataManipulatorFactoryManager); + TableReplication tableReplication = createTypicalTableReplication(); + PartitionedTableReplication replication = new PartitionedTableReplication(tableReplication, partitionPredicate, + source, replica, copierFactoryManager, eventIdFactory, copierOptions, listener, dataManipulatorFactoryManager); replication.replicate(); InOrder replicationOrder = inOrder(copierFactoryManager, copierFactory, copier, sourceLocationManager, replica, @@ -155,9 +155,7 @@ public void typical() throws Exception { replicationOrder .verify(copierFactoryManager) .getCopierFactory(sourceTableLocation, replicaTableLocation, copierOptions); - replicationOrder - .verify(copierFactory) - .newInstance(EVENT_ID, sourceTableLocation, sourcePartitionLocations, replicaTableLocation, copierOptions); + replicationOrder.verify(copierFactory).newInstance(any(CopierContext.class)); replicationOrder.verify(listener).copierStart(anyString()); replicationOrder.verify(copier).copy(); replicationOrder.verify(listener).copierEnd(any(Metrics.class)); @@ -169,15 +167,20 @@ public void typical() throws Exception { replicationOrder.verify(replicaLocationManager).cleanUpLocations(); } + private TableReplication createTypicalTableReplication() { + return TableReplicationUtils.createTableReplication(DATABASE, TABLE, DATABASE, TABLE, targetTableLocation); + } + @Test public void mappedNames() throws Exception { when(replica.getLocationManager(TableType.PARTITIONED, targetTableLocation, EVENT_ID, sourceLocationManager)) .thenReturn(replicaLocationManager); when(source.getPartitions(sourceTable, PARTITION_PREDICATE, MAX_PARTITIONS)).thenReturn(partitionsAndStatistics); - PartitionedTableReplication replication = new PartitionedTableReplication(DATABASE, TABLE, partitionPredicate, - source, replica, copierFactoryManager, eventIdFactory, targetTableLocation, MAPPED_DATABASE, MAPPED_TABLE, - copierOptions, listener, dataManipulatorFactoryManager); + TableReplication tableReplication = TableReplicationUtils + .createTableReplication(DATABASE, TABLE, MAPPED_DATABASE, MAPPED_TABLE, targetTableLocation); + PartitionedTableReplication replication = new PartitionedTableReplication(tableReplication, partitionPredicate, + source, replica, copierFactoryManager, eventIdFactory, copierOptions, listener, dataManipulatorFactoryManager); replication.replicate(); InOrder replicationOrder = inOrder(copierFactoryManager, copierFactory, copier, sourceLocationManager, replica, @@ -186,9 +189,7 @@ public void mappedNames() throws Exception { replicationOrder .verify(copierFactoryManager) .getCopierFactory(sourceTableLocation, replicaTableLocation, copierOptions); - replicationOrder - .verify(copierFactory) - .newInstance(EVENT_ID, sourceTableLocation, sourcePartitionLocations, replicaTableLocation, copierOptions); + replicationOrder.verify(copierFactory).newInstance(any(CopierContext.class)); replicationOrder.verify(copier).copy(); replicationOrder.verify(sourceLocationManager).cleanUpLocations(); replicationOrder @@ -205,9 +206,9 @@ public void copierListenerCalledWhenException() throws Exception { when(copier.copy()).thenThrow(new CircusTrainException("copy failed")); when(source.getPartitions(sourceTable, PARTITION_PREDICATE, MAX_PARTITIONS)).thenReturn(partitionsAndStatistics); - PartitionedTableReplication replication = new PartitionedTableReplication(DATABASE, TABLE, partitionPredicate, - source, replica, copierFactoryManager, eventIdFactory, targetTableLocation, DATABASE, TABLE, copierOptions, - listener, dataManipulatorFactoryManager); + TableReplication tableReplication = createTypicalTableReplication(); + PartitionedTableReplication replication = new PartitionedTableReplication(tableReplication, partitionPredicate, + source, replica, copierFactoryManager, eventIdFactory, copierOptions, listener, dataManipulatorFactoryManager); try { replication.replicate(); fail("Copy exception should be caught and rethrown"); @@ -227,9 +228,9 @@ public void replicationFailsOnDeleteTableException() throws Exception { when(source.getPartitions(sourceTable, PARTITION_PREDICATE, MAX_PARTITIONS)).thenReturn(partitionsAndStatistics); doThrow(new Exception()).when(replica).cleanupReplicaTableIfRequired(DATABASE, TABLE, dataManipulator); - PartitionedTableReplication replication = new PartitionedTableReplication(DATABASE, TABLE, partitionPredicate, - source, replica, copierFactoryManager, eventIdFactory, targetTableLocation, DATABASE, TABLE, copierOptions, - listener, dataManipulatorFactoryManager); + TableReplication tableReplication = createTypicalTableReplication(); + PartitionedTableReplication replication = new PartitionedTableReplication(tableReplication, partitionPredicate, + source, replica, copierFactoryManager, eventIdFactory, copierOptions, listener, dataManipulatorFactoryManager); try { replication.replicate(); fail("Copy exception should be caught and rethrown"); diff --git a/circus-train-core/src/test/java/com/hotels/bdp/circustrain/core/TableReplicationUtils.java b/circus-train-core/src/test/java/com/hotels/bdp/circustrain/core/TableReplicationUtils.java new file mode 100644 index 00000000..d41168fc --- /dev/null +++ b/circus-train-core/src/test/java/com/hotels/bdp/circustrain/core/TableReplicationUtils.java @@ -0,0 +1,45 @@ +/** + * Copyright (C) 2016-2020 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.bdp.circustrain.core; + +import com.hotels.bdp.circustrain.api.conf.ReplicaTable; +import com.hotels.bdp.circustrain.api.conf.SourceTable; +import com.hotels.bdp.circustrain.api.conf.TableReplication; + +public final class TableReplicationUtils { + + private TableReplicationUtils() {}; + + protected static TableReplication createTableReplication( + String sourceDatabaseName, + String sourceTableName, + String replicaDatabaseName, + String replicateTableName, + String targetTableLocation) { + TableReplication tableReplication = new TableReplication(); + ReplicaTable replicaTable = new ReplicaTable(); + replicaTable.setDatabaseName(replicaDatabaseName); + replicaTable.setTableName(replicateTableName); + replicaTable.setTableLocation(targetTableLocation); + tableReplication.setReplicaTable(replicaTable); + SourceTable sourceTable = new SourceTable(); + sourceTable.setDatabaseName(sourceDatabaseName); + sourceTable.setTableName(sourceTableName); + tableReplication.setSourceTable(sourceTable); + return tableReplication; + } + +} diff --git a/circus-train-core/src/test/java/com/hotels/bdp/circustrain/core/UnpartitionedTableReplicationTest.java b/circus-train-core/src/test/java/com/hotels/bdp/circustrain/core/UnpartitionedTableReplicationTest.java index 1feb4b00..1587af0d 100644 --- a/circus-train-core/src/test/java/com/hotels/bdp/circustrain/core/UnpartitionedTableReplicationTest.java +++ b/circus-train-core/src/test/java/com/hotels/bdp/circustrain/core/UnpartitionedTableReplicationTest.java @@ -36,7 +36,9 @@ import com.hotels.bdp.circustrain.api.CircusTrainException; import com.hotels.bdp.circustrain.api.ReplicaLocationManager; import com.hotels.bdp.circustrain.api.SourceLocationManager; +import com.hotels.bdp.circustrain.api.conf.TableReplication; import com.hotels.bdp.circustrain.api.copier.Copier; +import com.hotels.bdp.circustrain.api.copier.CopierContext; import com.hotels.bdp.circustrain.api.copier.CopierFactory; import com.hotels.bdp.circustrain.api.copier.CopierFactoryManager; import com.hotels.bdp.circustrain.api.data.DataManipulator; @@ -90,7 +92,7 @@ public class UnpartitionedTableReplicationTest { private final Path sourceTableLocation = new Path("sourceTableLocation"); private final Path replicaTableLocation = new Path("replicaTableLocation"); - private final String targetTableLoation = "targetTableLocation"; + private final String targetTableLocation = "targetTableLocation"; @Before public void injectMocks() throws Exception { @@ -101,21 +103,25 @@ public void injectMocks() throws Exception { when(sourceLocationManager.getTableLocation()).thenReturn(sourceTableLocation); when(copierFactoryManager.getCopierFactory(sourceTableLocation, replicaTableLocation, copierOptions)) .thenReturn(copierFactory); - when(copierFactory.newInstance(EVENT_ID, sourceTableLocation, replicaTableLocation, copierOptions)) - .thenReturn(copier); + when(copierFactory.newInstance(any(CopierContext.class))).thenReturn(copier); when(replicaLocationManager.getTableLocation()).thenReturn(replicaTableLocation); when(dataManipulatorFactoryManager.getFactory(sourceTableLocation, replicaTableLocation, copierOptions)) .thenReturn(dataManipulatorFactory); when(dataManipulatorFactory.newInstance(replicaTableLocation, copierOptions)).thenReturn(dataManipulator); } + private TableReplication createTypicalTableReplication() { + return TableReplicationUtils.createTableReplication(DATABASE, TABLE, DATABASE, TABLE, targetTableLocation); + } + @Test public void typical() throws Exception { - when(replica.getLocationManager(TableType.UNPARTITIONED, targetTableLoation, EVENT_ID, sourceLocationManager)) + when(replica.getLocationManager(TableType.UNPARTITIONED, targetTableLocation, EVENT_ID, sourceLocationManager)) .thenReturn(replicaLocationManager); - UnpartitionedTableReplication replication = new UnpartitionedTableReplication(DATABASE, TABLE, source, replica, - copierFactoryManager, eventIdFactory, targetTableLoation, DATABASE, TABLE, copierOptions, listener, - dataManipulatorFactoryManager); + + TableReplication tableReplication = createTypicalTableReplication(); + UnpartitionedTableReplication replication = new UnpartitionedTableReplication(tableReplication, source, replica, + copierFactoryManager, eventIdFactory, copierOptions, listener, dataManipulatorFactoryManager); replication.replicate(); InOrder replicationOrder = inOrder(copierFactoryManager, copierFactory, copier, sourceLocationManager, replica, @@ -124,9 +130,7 @@ public void typical() throws Exception { replicationOrder .verify(copierFactoryManager) .getCopierFactory(sourceTableLocation, replicaTableLocation, copierOptions); - replicationOrder - .verify(copierFactory) - .newInstance(EVENT_ID, sourceTableLocation, replicaTableLocation, copierOptions); + replicationOrder.verify(copierFactory).newInstance(any(CopierContext.class)); replicationOrder.verify(listener).copierStart(anyString()); replicationOrder.verify(copier).copy(); replicationOrder.verify(listener).copierEnd(any(Metrics.class)); @@ -139,12 +143,13 @@ public void typical() throws Exception { @Test public void mappedNames() throws Exception { - when(replica.getLocationManager(TableType.UNPARTITIONED, targetTableLoation, EVENT_ID, sourceLocationManager)) + when(replica.getLocationManager(TableType.UNPARTITIONED, targetTableLocation, EVENT_ID, sourceLocationManager)) .thenReturn(replicaLocationManager); - UnpartitionedTableReplication replication = new UnpartitionedTableReplication(DATABASE, TABLE, source, replica, - copierFactoryManager, eventIdFactory, targetTableLoation, MAPPED_DATABASE, MAPPED_TABLE, copierOptions, - listener, dataManipulatorFactoryManager); + TableReplication tableReplication = TableReplicationUtils + .createTableReplication(DATABASE, TABLE, MAPPED_DATABASE, MAPPED_TABLE, targetTableLocation); + UnpartitionedTableReplication replication = new UnpartitionedTableReplication(tableReplication, source, replica, + copierFactoryManager, eventIdFactory, copierOptions, listener, dataManipulatorFactoryManager); replication.replicate(); InOrder replicationOrder = inOrder(copierFactoryManager, copierFactory, copier, sourceLocationManager, replica, @@ -153,9 +158,7 @@ public void mappedNames() throws Exception { replicationOrder .verify(copierFactoryManager) .getCopierFactory(sourceTableLocation, replicaTableLocation, copierOptions); - replicationOrder - .verify(copierFactory) - .newInstance(EVENT_ID, sourceTableLocation, replicaTableLocation, copierOptions); + replicationOrder.verify(copierFactory).newInstance(any(CopierContext.class)); replicationOrder.verify(copier).copy(); replicationOrder.verify(sourceLocationManager).cleanUpLocations(); replicationOrder @@ -166,14 +169,14 @@ public void mappedNames() throws Exception { @Test public void copierListenerCalledWhenException() throws Exception { - when(replica.getLocationManager(TableType.UNPARTITIONED, targetTableLoation, EVENT_ID, sourceLocationManager)) + when(replica.getLocationManager(TableType.UNPARTITIONED, targetTableLocation, EVENT_ID, sourceLocationManager)) .thenReturn(replicaLocationManager); when(copier.copy()).thenThrow(new CircusTrainException("copy failed")); - UnpartitionedTableReplication replication = new UnpartitionedTableReplication(DATABASE, TABLE, source, replica, - copierFactoryManager, eventIdFactory, targetTableLoation, DATABASE, TABLE, copierOptions, listener, - dataManipulatorFactoryManager); + TableReplication tableReplication = createTypicalTableReplication(); + UnpartitionedTableReplication replication = new UnpartitionedTableReplication(tableReplication, source, replica, + copierFactoryManager, eventIdFactory, copierOptions, listener, dataManipulatorFactoryManager); try { replication.replicate(); fail("Copy exception should be caught and rethrown"); @@ -188,13 +191,13 @@ public void copierListenerCalledWhenException() throws Exception { @Test public void replicationFailsOnDeleteTableException() throws Exception { - when(replica.getLocationManager(TableType.UNPARTITIONED, targetTableLoation, EVENT_ID, sourceLocationManager)) + when(replica.getLocationManager(TableType.UNPARTITIONED, targetTableLocation, EVENT_ID, sourceLocationManager)) .thenReturn(replicaLocationManager); doThrow(new Exception()).when(replica).cleanupReplicaTableIfRequired(DATABASE, TABLE, dataManipulator); - UnpartitionedTableReplication replication = new UnpartitionedTableReplication(DATABASE, TABLE, source, replica, - copierFactoryManager, eventIdFactory, targetTableLoation, DATABASE, TABLE, copierOptions, listener, - dataManipulatorFactoryManager); + TableReplication tableReplication = createTypicalTableReplication(); + UnpartitionedTableReplication replication = new UnpartitionedTableReplication(tableReplication, source, replica, + copierFactoryManager, eventIdFactory, copierOptions, listener, dataManipulatorFactoryManager); try { replication.replicate(); fail("Copy exception should be caught and rethrown"); diff --git a/circus-train-distcp-copier/pom.xml b/circus-train-distcp-copier/pom.xml index c3a261f1..c1997a7f 100644 --- a/circus-train-distcp-copier/pom.xml +++ b/circus-train-distcp-copier/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train-distcp-copier diff --git a/circus-train-distcp-copier/src/main/java/com/hotels/bdp/circustrain/distcpcopier/DistCpCopierFactory.java b/circus-train-distcp-copier/src/main/java/com/hotels/bdp/circustrain/distcpcopier/DistCpCopierFactory.java index 21bb052d..7d11125c 100644 --- a/circus-train-distcp-copier/src/main/java/com/hotels/bdp/circustrain/distcpcopier/DistCpCopierFactory.java +++ b/circus-train-distcp-copier/src/main/java/com/hotels/bdp/circustrain/distcpcopier/DistCpCopierFactory.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2019 Expedia, Inc. + * Copyright (C) 2016-2020 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,7 +15,6 @@ */ package com.hotels.bdp.circustrain.distcpcopier; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -32,6 +31,7 @@ import com.hotels.bdp.circustrain.api.Modules; import com.hotels.bdp.circustrain.api.copier.Copier; +import com.hotels.bdp.circustrain.api.copier.CopierContext; import com.hotels.bdp.circustrain.api.copier.CopierFactory; @Profile({ Modules.REPLICATION }) @@ -53,24 +53,32 @@ public boolean supportsSchemes(String sourceScheme, String replicaScheme) { return true; } + @Override + public Copier newInstance(CopierContext copierContext) { + return new DistCpCopier(conf, copierContext.getSourceBaseLocation(), copierContext.getSourceSubLocations(), + copierContext.getReplicaLocation(), copierContext.getCopierOptions(), runningMetricsRegistry); + } + @Override public Copier newInstance( String eventId, Path sourceBaseLocation, - List sourceSubLocations, Path replicaLocation, Map copierOptions) { - return new DistCpCopier(conf, sourceBaseLocation, sourceSubLocations, replicaLocation, copierOptions, - runningMetricsRegistry); + CopierContext copierContext = new CopierContext(eventId, sourceBaseLocation, replicaLocation, copierOptions); + return newInstance(copierContext); } @Override public Copier newInstance( String eventId, Path sourceBaseLocation, + List sourceSubLocations, Path replicaLocation, Map copierOptions) { - return newInstance(eventId, sourceBaseLocation, Collections.emptyList(), replicaLocation, copierOptions); + CopierContext copierContext = new CopierContext(eventId, sourceBaseLocation, sourceSubLocations, replicaLocation, + copierOptions); + return newInstance(copierContext); } } diff --git a/circus-train-gcp/pom.xml b/circus-train-gcp/pom.xml index 1e9e8de5..36839d3f 100644 --- a/circus-train-gcp/pom.xml +++ b/circus-train-gcp/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train-gcp diff --git a/circus-train-hive-view/pom.xml b/circus-train-hive-view/pom.xml index 9b5eb7ac..ff6bd43f 100644 --- a/circus-train-hive-view/pom.xml +++ b/circus-train-hive-view/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train-hive-view diff --git a/circus-train-hive/pom.xml b/circus-train-hive/pom.xml index 5a1346ec..c2f9a377 100644 --- a/circus-train-hive/pom.xml +++ b/circus-train-hive/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train-hive diff --git a/circus-train-housekeeping/pom.xml b/circus-train-housekeeping/pom.xml index c9123fb6..5a75f18a 100644 --- a/circus-train-housekeeping/pom.xml +++ b/circus-train-housekeeping/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train-housekeeping diff --git a/circus-train-integration-tests/pom.xml b/circus-train-integration-tests/pom.xml index d8977bf2..ab94f375 100644 --- a/circus-train-integration-tests/pom.xml +++ b/circus-train-integration-tests/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train-integration-tests diff --git a/circus-train-metrics/pom.xml b/circus-train-metrics/pom.xml index 2fdffb24..fd7d519c 100644 --- a/circus-train-metrics/pom.xml +++ b/circus-train-metrics/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train-metrics diff --git a/circus-train-package/pom.xml b/circus-train-package/pom.xml index 337c2a97..205357f7 100644 --- a/circus-train-package/pom.xml +++ b/circus-train-package/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train diff --git a/circus-train-s3-mapreduce-cp-copier/pom.xml b/circus-train-s3-mapreduce-cp-copier/pom.xml index 2725eed7..081ad4fe 100644 --- a/circus-train-s3-mapreduce-cp-copier/pom.xml +++ b/circus-train-s3-mapreduce-cp-copier/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train-s3-mapreduce-cp-copier diff --git a/circus-train-s3-mapreduce-cp-copier/src/main/java/com/hotels/bdp/circustrain/s3mapreducecpcopier/S3MapReduceCpCopierFactory.java b/circus-train-s3-mapreduce-cp-copier/src/main/java/com/hotels/bdp/circustrain/s3mapreducecpcopier/S3MapReduceCpCopierFactory.java index d530b6e7..91738511 100644 --- a/circus-train-s3-mapreduce-cp-copier/src/main/java/com/hotels/bdp/circustrain/s3mapreducecpcopier/S3MapReduceCpCopierFactory.java +++ b/circus-train-s3-mapreduce-cp-copier/src/main/java/com/hotels/bdp/circustrain/s3mapreducecpcopier/S3MapReduceCpCopierFactory.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2019 Expedia, Inc. + * Copyright (C) 2016-2020 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,7 +15,6 @@ */ package com.hotels.bdp.circustrain.s3mapreducecpcopier; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -32,6 +31,7 @@ import com.hotels.bdp.circustrain.api.Modules; import com.hotels.bdp.circustrain.api.copier.Copier; +import com.hotels.bdp.circustrain.api.copier.CopierContext; import com.hotels.bdp.circustrain.api.copier.CopierFactory; import com.hotels.bdp.circustrain.aws.S3Schemes; @@ -55,24 +55,32 @@ public boolean supportsSchemes(String sourceScheme, String replicaScheme) { return !S3Schemes.isS3Scheme(sourceScheme) && S3Schemes.isS3Scheme(replicaScheme); } + @Override + public Copier newInstance(CopierContext copierContext) { + return new S3MapReduceCpCopier(conf, copierContext.getSourceBaseLocation(), copierContext.getSourceSubLocations(), + copierContext.getReplicaLocation(), copierContext.getCopierOptions(), runningMetricsRegistry); + } + @Override public Copier newInstance( String eventId, Path sourceBaseLocation, - List sourceSubLocations, Path replicaLocation, Map copierOptions) { - return new S3MapReduceCpCopier(conf, sourceBaseLocation, sourceSubLocations, replicaLocation, copierOptions, - runningMetricsRegistry); + CopierContext copierContext = new CopierContext(eventId, sourceBaseLocation, replicaLocation, copierOptions); + return newInstance(copierContext); } @Override public Copier newInstance( String eventId, Path sourceBaseLocation, + List sourceSubLocations, Path replicaLocation, Map copierOptions) { - return newInstance(eventId, sourceBaseLocation, Collections.emptyList(), replicaLocation, copierOptions); + CopierContext copierContext = new CopierContext(eventId, sourceBaseLocation, sourceSubLocations, replicaLocation, + copierOptions); + return newInstance(copierContext); } } diff --git a/circus-train-s3-mapreduce-cp/pom.xml b/circus-train-s3-mapreduce-cp/pom.xml index d8fc5f68..f36722a9 100644 --- a/circus-train-s3-mapreduce-cp/pom.xml +++ b/circus-train-s3-mapreduce-cp/pom.xml @@ -18,7 +18,7 @@ com.hotels circus-train-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train-s3-mapreduce-cp diff --git a/circus-train-s3-s3-copier/pom.xml b/circus-train-s3-s3-copier/pom.xml index eec75ffd..ebc8a759 100644 --- a/circus-train-s3-s3-copier/pom.xml +++ b/circus-train-s3-s3-copier/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train-s3-s3-copier diff --git a/circus-train-s3-s3-copier/src/main/java/com/hotels/bdp/circustrain/s3s3copier/S3S3CopierFactory.java b/circus-train-s3-s3-copier/src/main/java/com/hotels/bdp/circustrain/s3s3copier/S3S3CopierFactory.java index 6a233bfb..c16f55bd 100644 --- a/circus-train-s3-s3-copier/src/main/java/com/hotels/bdp/circustrain/s3s3copier/S3S3CopierFactory.java +++ b/circus-train-s3-s3-copier/src/main/java/com/hotels/bdp/circustrain/s3s3copier/S3S3CopierFactory.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2019 Expedia, Inc. + * Copyright (C) 2016-2020 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,7 +15,6 @@ */ package com.hotels.bdp.circustrain.s3s3copier; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -30,6 +29,7 @@ import com.hotels.bdp.circustrain.api.Modules; import com.hotels.bdp.circustrain.api.copier.Copier; +import com.hotels.bdp.circustrain.api.copier.CopierContext; import com.hotels.bdp.circustrain.api.copier.CopierFactory; import com.hotels.bdp.circustrain.aws.S3Schemes; import com.hotels.bdp.circustrain.s3s3copier.aws.AmazonS3ClientFactory; @@ -63,25 +63,33 @@ public boolean supportsSchemes(String sourceScheme, String replicaScheme) { return S3Schemes.isS3Scheme(sourceScheme) && S3Schemes.isS3Scheme(replicaScheme); } + @Override + public Copier newInstance(CopierContext copierContext) { + return new S3S3Copier(copierContext.getSourceBaseLocation(), copierContext.getSourceSubLocations(), + copierContext.getReplicaLocation(), clientFactory, transferManagerFactory, listObjectsRequestFactory, + runningMetricsRegistry, new S3S3CopierOptions(copierContext.getCopierOptions())); + } + @Override public Copier newInstance( String eventId, Path sourceBaseLocation, - List sourceSubLocations, Path replicaLocation, Map copierOptions) { - return new S3S3Copier(sourceBaseLocation, sourceSubLocations, replicaLocation, clientFactory, - transferManagerFactory, listObjectsRequestFactory, runningMetricsRegistry, - new S3S3CopierOptions(copierOptions)); + CopierContext copierContext = new CopierContext(eventId, sourceBaseLocation, replicaLocation, copierOptions); + return newInstance(copierContext); } @Override public Copier newInstance( String eventId, Path sourceBaseLocation, + List sourceSubLocations, Path replicaLocation, Map copierOptions) { - return newInstance(eventId, sourceBaseLocation, Collections.emptyList(), replicaLocation, copierOptions); + CopierContext copierContext = new CopierContext(eventId, sourceBaseLocation, sourceSubLocations, replicaLocation, + copierOptions); + return newInstance(copierContext); } } diff --git a/circus-train-tool-parent/circus-train-comparison-tool/pom.xml b/circus-train-tool-parent/circus-train-comparison-tool/pom.xml index 74646d82..d3c40280 100644 --- a/circus-train-tool-parent/circus-train-comparison-tool/pom.xml +++ b/circus-train-tool-parent/circus-train-comparison-tool/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-tool-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train-comparison-tool diff --git a/circus-train-tool-parent/circus-train-filter-tool/pom.xml b/circus-train-tool-parent/circus-train-filter-tool/pom.xml index d4ee3346..fc4b5a90 100644 --- a/circus-train-tool-parent/circus-train-filter-tool/pom.xml +++ b/circus-train-tool-parent/circus-train-filter-tool/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-tool-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train-filter-tool diff --git a/circus-train-tool-parent/circus-train-tool-core/pom.xml b/circus-train-tool-parent/circus-train-tool-core/pom.xml index 57bf75d9..9cebf017 100644 --- a/circus-train-tool-parent/circus-train-tool-core/pom.xml +++ b/circus-train-tool-parent/circus-train-tool-core/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-tool-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train-tool-core diff --git a/circus-train-tool-parent/circus-train-tool/pom.xml b/circus-train-tool-parent/circus-train-tool/pom.xml index f8258041..34a88c14 100644 --- a/circus-train-tool-parent/circus-train-tool/pom.xml +++ b/circus-train-tool-parent/circus-train-tool/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-tool-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train-tool diff --git a/circus-train-tool-parent/pom.xml b/circus-train-tool-parent/pom.xml index 06306d97..a49bd78c 100644 --- a/circus-train-tool-parent/pom.xml +++ b/circus-train-tool-parent/pom.xml @@ -4,7 +4,7 @@ com.hotels circus-train-parent - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT circus-train-tool-parent diff --git a/pom.xml b/pom.xml index ce1919c0..778f4449 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ circus-train-parent circus-train replicates data and hive metadata between various clusters - 16.2.1-SNAPSHOT + 16.3.0-SNAPSHOT pom Circus Train Parent 2016