Skip to content

Commit

Permalink
Merge 83641af into 582fbd8
Browse files Browse the repository at this point in the history
  • Loading branch information
Patrick Duin committed Jan 23, 2020
2 parents 582fbd8 + 83641af commit 48c1893
Show file tree
Hide file tree
Showing 36 changed files with 675 additions and 433 deletions.
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## [15.0.0] - 2019-11-12
## TBD - TBD
### Changed
* AVRO Schema Copier now re-uses the normal 'data' copier instead of its own. See [#162](https://github.com/HotelsDotCom/circus-train/issues/162).

## [15.0.0] - 2019-11-12
### Changed
* Default `avro-serde-options` must now be included within `transform-options`. This is a backwards incompatible change to the configuration file. Please see [Avro Schema Replication](https://github.com/HotelsDotCom/circus-train/blob/master/circus-train-avro/README.md) for more information.
* Updated `jackson` version to 2.10.0 (was 2.9.10).
Expand All @@ -12,7 +15,6 @@
* Added `copier-options.assume-role` to assume a role when using the S3S3 copier.

## [14.1.0] - 2019-10-04

### Added
* Table transformation to add custom properties to tables during a replication.
* If a user doesn't specify `avro-serde-options`, Circus Train will still copy the external schema over to the target table. See [#131](https://github.com/HotelsDotCom/circus-train/issues/131).
Expand Down Expand Up @@ -371,4 +373,3 @@ _New configuration file_
# 1.5.1

* `DistCP` temporary path is now set per task.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2019 Expedia, Inc.
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,8 @@
import javax.validation.Valid;
import javax.validation.constraints.NotNull;

import com.google.common.collect.ImmutableMap;

import com.hotels.bdp.circustrain.api.validation.constraints.TableReplicationFullReplicationModeConstraint;

@TableReplicationFullReplicationModeConstraint
Expand Down Expand Up @@ -59,6 +61,23 @@ public Map<String, Object> getCopierOptions() {
return copierOptions;
}

public Map<String, Object> getMergedCopierOptions(Map<String, Object> baseCopierOptions) {
return getMergedCopierOptions(baseCopierOptions, getCopierOptions());
}

public static Map<String, Object> getMergedCopierOptions(
Map<String, Object> baseCopierOptions,
Map<String, Object> overrideCopierOptions) {
Map<String, Object> mergedCopierOptions = new HashMap<>();
if (baseCopierOptions != null) {
mergedCopierOptions.putAll(baseCopierOptions);
}
if (overrideCopierOptions != null) {
mergedCopierOptions.putAll(overrideCopierOptions);
}
return ImmutableMap.copyOf(mergedCopierOptions);
}

public void setCopierOptions(Map<String, Object> copierOptions) {
this.copierOptions = copierOptions;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hotels.bdp.circustrain.api.copier;

import java.util.Map;

import org.apache.hadoop.fs.Path;

public interface CopierFactoryManager {

CopierFactory getCopierFactory(Path sourceLocation, Path replicaLocation, Map<String, Object> copierOptions);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2019 Expedia, Inc.
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,10 @@ public interface CopierOptions {

String IGNORE_MISSING_PARTITION_FOLDER_ERRORS = "ignore-missing-partition-folder-errors";

// internal option used to track if the destination of the replication should be treated as a folder or file. Value
// can be parsed with Boolean.parseValue. If not set a folder is assumed.
String COPY_DESTINATION_IS_FILE = "copy-destination-is-file";

Map<String, Object> getCopierOptions();

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2019 Expedia, Inc.
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,12 @@
package com.hotels.bdp.circustrain.api.conf;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import javax.validation.ConstraintViolation;
Expand Down Expand Up @@ -89,4 +93,26 @@ public void nullTableLocation() throws Exception {
assertThat(violations.size(), is(1));
}

@Test
public void mergeNullOptions() {
Map<String, Object> mergedCopierOptions = tableReplication.getMergedCopierOptions(null);
assertThat(mergedCopierOptions, is(not(nullValue())));
assertThat(mergedCopierOptions.isEmpty(), is(true));
}

@Test
public void mergeOptions() {
Map<String, Object> globalOptions = new HashMap<>();
globalOptions.put("one", Integer.valueOf(1));
globalOptions.put("two", Integer.valueOf(2));
Map<String, Object> overrideOptions = new HashMap<>();
overrideOptions.put("two", "two");
overrideOptions.put("three", "three");
tableReplication.setCopierOptions(overrideOptions);
Map<String, Object> mergedCopierOptions = tableReplication.getMergedCopierOptions(globalOptions);
assertThat((Integer) mergedCopierOptions.get("one"), is(Integer.valueOf(1)));
assertThat((String) mergedCopierOptions.get("two"), is("two"));
assertThat((String) mergedCopierOptions.get("three"), is("three"));
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2019 Expedia, Inc.
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,6 +35,7 @@ public abstract class AbstractAvroSerDeTransformation implements TableReplicatio
private String eventId;
private String tableLocation;
private Map<String, Object> avroSerdeConfigOverride = Collections.emptyMap();
private EventTableReplication tableReplication;
static final String AVRO_SCHEMA_URL_PARAMETER = "avro.schema.url";

protected AbstractAvroSerDeTransformation(TransformOptions transformOptions) {
Expand All @@ -52,6 +53,10 @@ protected String getEventId() {
return eventId;
}

protected EventTableReplication getTableReplication() {
return tableReplication;
}

protected String getTableLocation() {
return tableLocation;
}
Expand All @@ -75,6 +80,7 @@ protected String getAvroSchemaDestinationFolder() {
@Override
public void tableReplicationStart(EventTableReplication tableReplication, String eventId) {
this.eventId = eventId;
this.tableReplication = tableReplication;
tableLocation = tableReplication.getReplicaTable().getTableLocation();
avroSerdeConfigOverride = Collections.emptyMap();
Map<String, Object> transformOptions = tableReplication.getTransformOptions();
Expand All @@ -85,8 +91,15 @@ public void tableReplicationStart(EventTableReplication tableReplication, String
}

@Override
public void tableReplicationSuccess(EventTableReplication tableReplication, String eventId) {}
public void tableReplicationSuccess(EventTableReplication tableReplication, String eventId) {
this.eventId = null;
this.tableReplication = null;
}

@Override
public void tableReplicationFailure(EventTableReplication tableReplication, String eventId, Throwable t) {}
public void tableReplicationFailure(EventTableReplication tableReplication, String eventId, Throwable t) {
this.eventId = null;
this.tableReplication = null;

}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2019 Expedia, Inc.
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,7 +53,9 @@ public Partition transform(Partition partition) {
private Partition apply(Partition partition, String avroSchemaDestination) {
String source = HiveObjectUtils.getParameter(partition, AVRO_SCHEMA_URL_PARAMETER);
if (argsPresent(source, avroSchemaDestination)) {
String destinationPath = copier.copy(source, avroSchemaDestination).toString();
String destinationPath = copier
.copy(source, avroSchemaDestination, getTableReplication(), getEventId())
.toString();
HiveObjectUtils.updateSerDeUrl(partition, AVRO_SCHEMA_URL_PARAMETER, destinationPath);
}
return partition;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2019 Expedia, Inc.
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,7 +52,9 @@ public Table transform(Table table) {
private Table apply(Table table, String avroSchemaDestination) {
String source = HiveObjectUtils.getParameter(table, AVRO_SCHEMA_URL_PARAMETER);
if (argsPresent(source, avroSchemaDestination)) {
String destinationPath = copier.copy(source, avroSchemaDestination).toString();
String destinationPath = copier
.copy(source, avroSchemaDestination, getTableReplication(), getEventId())
.toString();
HiveObjectUtils.updateSerDeUrl(table, AVRO_SCHEMA_URL_PARAMETER, destinationPath);
}
return table;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2019 Expedia, Inc.
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,30 +39,6 @@ public static String avroDestination(String pathToDestinationFolder, String even
return pathToDestinationFolder + eventId;
}

public static String fileName(String pathToFile) {
checkArgument(isNotBlank(pathToFile), "There must be a pathToFile provided");
if (pathToFile.charAt(pathToFile.length() - 1) == '/') {
pathToFile = pathToFile.substring(0, pathToFile.length() - 1);
}

String fileName = pathToFile.substring(pathToFile.lastIndexOf("/") + 1, pathToFile.length());
if (countPeriods(fileName) != 1) {
throw new IllegalArgumentException("Incorrect fileName " + fileName);
}

return fileName;
}

private static int countPeriods(String string) {
int periodCount = 0;
for (int i = 0; i < string.length(); i++) {
if (string.charAt(i) == '.') {
periodCount++;
}
}
return periodCount;
}

public static boolean argsPresent(String... args) {
for (String arg : args) {
if (isBlank(arg)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2019 Expedia, Inc.
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,6 +35,8 @@
public class FileSystemPathResolver {
private static final Logger LOG = LoggerFactory.getLogger(FileSystemPathResolver.class);

private static final String HDFS_SCHEME = "hdfs";

private final Configuration configuration;

public FileSystemPathResolver(Configuration configuration) {
Expand All @@ -58,21 +60,22 @@ public Path resolveScheme(Path path) {
}

public Path resolveNameServices(Path path) {
String nameService = configuration.get(DFSConfigKeys.DFS_NAMESERVICES);
if (isNotBlank(nameService)) {
URI uri = path.toUri();
String scheme = uri.getScheme();
String url = uri.getPath();
final String original = path.toString();
if (isBlank(scheme)) {
url = String.format("/%s%s", nameService, path);
path = new Path(url);
} else {
path = new Path(scheme, nameService, url);
if (HDFS_SCHEME.equalsIgnoreCase(path.toUri().getScheme())) {
String nameService = configuration.get(DFSConfigKeys.DFS_NAMESERVICES);
if (isNotBlank(nameService)) {
URI uri = path.toUri();
String scheme = uri.getScheme();
String url = uri.getPath();
final String original = path.toString();
if (isBlank(scheme)) {
url = String.format("/%s%s", nameService, path);
path = new Path(url);
} else {
path = new Path(scheme, nameService, url);
}
LOG.info("Added nameservice to path. {} became {}", original, path);
}
LOG.info("Added nameservice to path. {} became {}", original, path);
}

return path;
}

Expand Down

0 comments on commit 48c1893

Please sign in to comment.