Skip to content

Commit

Permalink
Copier context (#196)
Browse files Browse the repository at this point in the history
  • Loading branch information
massdosage committed Aug 25, 2020
1 parent a2c254c commit 6c45be9
Show file tree
Hide file tree
Showing 41 changed files with 441 additions and 217 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## [16.3.0] - TBD
### Added
* Added method `newInstance(CopierContext)` to `com.hotels.bdp.circustrain.api.copier.CopierFactory`. This provides Copiers with more configuration information in a future proof manner. See [#195](https://github.com/HotelsDotCom/circus-train/issues/195).
### Deprecated
* Deprecated other `newInstance()` methods on `com.hotels.bdp.circustrain.api.copier.CopierFactory`.

## [16.2.0] - 2020-07-01
### Changed
* Changed version of `hive.version` to `2.3.7` (was `2.3.2`). This allows Circus Train to be used on JDK>=9.
Expand Down
2 changes: 1 addition & 1 deletion circus-train-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.hotels</groupId>
<artifactId>circus-train-parent</artifactId>
<version>16.2.1-SNAPSHOT</version>
<version>16.3.0-SNAPSHOT</version>
</parent>

<artifactId>circus-train-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,20 @@ public boolean supportsSchemes(String sourceScheme, String replicaScheme) {
}

@Override
public Copier newInstance(
String eventId,
Path sourceBaseLocation,
List<Path> sourceSubLocations,
Path replicaLocation,
Map<String, Object> copierOptions) {
public Copier newInstance(CopierContext copierContext) {
List<Copier> copiers = new ArrayList<>(delegates.size());
int i = 0;
for (CopierFactory delegate : delegates) {
CopierPathGeneratorParams copierPathGeneratorParams = CopierPathGeneratorParams.newParams(i++, eventId,
sourceBaseLocation, sourceSubLocations, replicaLocation, copierOptions);
CopierPathGeneratorParams copierPathGeneratorParams = CopierPathGeneratorParams
.newParams(i++, copierContext.getEventId(), copierContext.getSourceBaseLocation(),
copierContext.getSourceSubLocations(), copierContext.getReplicaLocation(),
copierContext.getCopierOptions());
Path newSourceBaseLocation = pathGenerator.generateSourceBaseLocation(copierPathGeneratorParams);
Path newReplicaLocation = pathGenerator.generateReplicaLocation(copierPathGeneratorParams);
Copier copier = delegate.newInstance(eventId, newSourceBaseLocation, sourceSubLocations, newReplicaLocation,
copierOptions);

CopierContext delegateContext = new CopierContext(copierContext.getEventId(), newSourceBaseLocation,
copierContext.getSourceSubLocations(), newReplicaLocation, copierContext.getCopierOptions());
Copier copier = delegate.newInstance(delegateContext);
copiers.add(copier);
}
return new CompositeCopier(copiers, metricsMerger);
Expand All @@ -108,17 +107,20 @@ public Copier newInstance(
Path sourceBaseLocation,
Path replicaLocation,
Map<String, Object> copierOptions) {
List<Copier> copiers = new ArrayList<>(delegates.size());
int i = 0;
for (CopierFactory delegatee : delegates) {
CopierPathGeneratorParams copierPathGeneratorParams = CopierPathGeneratorParams.newParams(i++, eventId,
sourceBaseLocation, null, replicaLocation, copierOptions);
Path newReplicaLocation = pathGenerator.generateReplicaLocation(copierPathGeneratorParams);
Path newSourceBaseLocation = pathGenerator.generateSourceBaseLocation(copierPathGeneratorParams);
Copier copier = delegatee.newInstance(eventId, newSourceBaseLocation, newReplicaLocation, copierOptions);
copiers.add(copier);
}
return new CompositeCopier(copiers, metricsMerger);
CopierContext copierContext = new CopierContext(eventId, sourceBaseLocation, replicaLocation, copierOptions);
return newInstance(copierContext);
}

@Override
public Copier newInstance(
String eventId,
Path sourceBaseLocation,
List<Path> sourceSubLocations,
Path replicaLocation,
Map<String, Object> copierOptions) {
CopierContext copierContext = new CopierContext(eventId, sourceBaseLocation, sourceSubLocations, replicaLocation,
copierOptions);
return newInstance(copierContext);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hotels.bdp.circustrain.api.copier;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.fs.Path;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import com.hotels.bdp.circustrain.api.conf.TableReplication;

public final class CopierContext {

private String eventId;
private Path sourceBaseLocation;
private List<Path> sourceSubLocations = ImmutableList.copyOf(Collections.<Path>emptyList());
private Path replicaLocation;
private Map<String, Object> copierOptions;
private TableReplication tableReplication;

public CopierContext(
TableReplication tableReplication,
String eventId,
Path sourceBaseLocation,
List<Path> sourceSubLocations,
Path replicaLocation,
Map<String, Object> copierOptions) {
this.tableReplication = tableReplication;
this.eventId = eventId;
this.sourceBaseLocation = sourceBaseLocation;
if (sourceSubLocations != null) {
this.sourceSubLocations = ImmutableList.copyOf(sourceSubLocations);
}
this.replicaLocation = replicaLocation;
this.copierOptions = ImmutableMap.copyOf(copierOptions);
}

public CopierContext(
TableReplication tableReplication,
String eventId,
Path sourceLocation,
Path replicaLocation,
Map<String, Object> copierOptions) {
this(tableReplication, eventId, sourceLocation, null, replicaLocation, copierOptions);
}

public CopierContext(
String eventId,
Path sourceBaseLocation,
List<Path> sourceSubLocations,
Path replicaLocation,
Map<String, Object> copierOptions) {
this(null, eventId, sourceBaseLocation, sourceSubLocations, replicaLocation, copierOptions);
}

public CopierContext(
String eventId,
Path sourceBaseLocation,
Path replicaLocation,
Map<String, Object> copierOptions) {
this(null, eventId, sourceBaseLocation, null, replicaLocation, copierOptions);
}

public String getEventId() {
return eventId;
}

public Path getSourceBaseLocation() {
return sourceBaseLocation;
}

public List<Path> getSourceSubLocations() {
return sourceSubLocations;
}

public Path getReplicaLocation() {
return replicaLocation;
}

public Map<String, Object> getCopierOptions() {
return copierOptions;
}

public TableReplication getTableReplication() {
return tableReplication;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2019 Expedia, Inc.
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,13 +25,28 @@ public interface CopierFactory {
boolean supportsSchemes(String sourceScheme, String replicaScheme);

/**
* Creates a new Copier.
*
* @param copierContext Context object containing configuration values for the Copier.
* @return
*/
default Copier newInstance(CopierContext copierContext) {
//TODO: this is only here for backwards compatibility with CopierFactorys using older versions of Circus Train, when the below
//deprecated methods are removed so should this default implementation
return newInstance(copierContext.getEventId(), copierContext.getSourceBaseLocation(), copierContext.getSourceSubLocations(), copierContext.getReplicaLocation(),
copierContext.getCopierOptions());
}

/**
* @deprecated As of release 16.3.0, replaced by {@link #newInstance(CopierContext)}.
* @param eventId
* @param sourceBaseLocation
* @param sourceSubLocations
* @param replicaLocation
* @param copierOptions, contains both global and per table override configured options
* @return
*/
@Deprecated
Copier newInstance(
String eventId,
Path sourceBaseLocation,
Expand All @@ -40,12 +55,15 @@ Copier newInstance(
Map<String, Object> copierOptions);

/**
* @deprecated As of release 16.3.0, replaced by {@link #newInstance(CopierContext)}.
*
* @param eventId
* @param sourceBaseLocation
* @param replicaLocation
* @param copierOptions, contains both global and per table override configured options
* @return
*/
@Deprecated
Copier newInstance(String eventId, Path sourceBaseLocation, Path replicaLocation, Map<String, Object> copierOptions);

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2019 Expedia, Inc.
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Loading

0 comments on commit 6c45be9

Please sign in to comment.