Skip to content

Commit

Permalink
Merge 9b3550b into 7bafbb3
Browse files Browse the repository at this point in the history
  • Loading branch information
AnanaMJ committed Nov 19, 2018
2 parents 7bafbb3 + 9b3550b commit 2a53d08
Show file tree
Hide file tree
Showing 35 changed files with 325 additions and 506 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# TBD
### Changed
* Partition columns are set to type `string`, regardless of the type of the column that is used to partition the table.
* If an error occurs while deleting an intermediate table during cleanup, this will no longer fail the replication. Instead, it will log the error and continue.

# [5.0.0] - 2018-11-16
### Changed
* Circus Train version upgraded to 13.0.0 (was 12.0.0). Note that this change is _not_ backwards compatible as this BigQuery extension now needs to be explicitly added to the Circus Train classpath using Circus Train's [standard extension loading mechanism](https://github.com/HotelsDotCom/circus-train#loading-extensions). See [#20](https://github.com/HotelsDotCom/circus-train-bigquery/issues/20).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import com.hotels.bdp.circustrain.bigquery.extraction.service.ExtractionService;
import com.hotels.bdp.circustrain.bigquery.table.service.TableServiceFactory;
import com.hotels.bdp.circustrain.bigquery.util.BigQueryMetastore;
import com.hotels.bdp.circustrain.bigquery.util.SchemaExtractor;
import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient;

class BigQueryMetastoreClient implements CloseableMetaStoreClient {
Expand All @@ -110,16 +111,19 @@ class BigQueryMetastoreClient implements CloseableMetaStoreClient {
private final ExtractionService extractionService;
private final TableServiceFactory tableServiceFactory;
private final HiveTableCache cache;
private final SchemaExtractor schemaExtractor;

BigQueryMetastoreClient(
BigQueryMetastore bigQueryMetastore,
ExtractionService extractionService,
HiveTableCache cache,
TableServiceFactory tableServiceFactory) {
TableServiceFactory tableServiceFactory,
SchemaExtractor schemaExtractor) {
this.bigQueryMetastore = bigQueryMetastore;
this.extractionService = extractionService;
this.tableServiceFactory = tableServiceFactory;
this.cache = cache;
this.schemaExtractor = schemaExtractor;
}

@Override
Expand Down Expand Up @@ -152,8 +156,8 @@ public Table getTable(final String databaseName, final String tableName) throws
com.google.cloud.bigquery.Table bigQueryTable = bigQueryMetastore.getTable(databaseName, tableName);

ExtractionUri extractionUri = new ExtractionUri();
PostExtractionAction postExtractionAction = new UpdateTableSchemaAction(databaseName, tableName,
cache, extractionService.getStorage(), extractionUri);
PostExtractionAction postExtractionAction = new UpdateTableSchemaAction(databaseName, tableName, cache,
extractionService.getStorage(), extractionUri, schemaExtractor);
ExtractionContainer container = new ExtractionContainer(bigQueryTable, extractionUri,
Arrays.asList(postExtractionAction));
extractionService.register(container);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.hotels.bdp.circustrain.bigquery.extraction.service.ExtractionService;
import com.hotels.bdp.circustrain.bigquery.table.service.TableServiceFactory;
import com.hotels.bdp.circustrain.bigquery.util.BigQueryMetastore;
import com.hotels.bdp.circustrain.bigquery.util.SchemaExtractor;
import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient;
import com.hotels.hcommon.hive.metastore.client.api.ConditionalMetaStoreClientFactory;
import com.hotels.hcommon.hive.metastore.exception.MetaStoreClientException;
Expand All @@ -36,9 +37,10 @@ public class BigQueryMetastoreClientFactory implements ConditionalMetaStoreClien
BigQueryMetastoreClientFactory(
BigQueryMetastore bigQueryMetastore,
ExtractionService service,
TableServiceFactory tableServiceFactory) {
metastoreClient = new BigQueryMetastoreClient(bigQueryMetastore, service, new HiveTableCache(),
tableServiceFactory);
TableServiceFactory tableServiceFactory,
SchemaExtractor schemaExtractor) {
metastoreClient = new BigQueryMetastoreClient(bigQueryMetastore, service, new HiveTableCache(), tableServiceFactory,
schemaExtractor);
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,6 @@ public BigQueryToHiveTableConverter withTableName(String tableName) {
return this;
}

public BigQueryToHiveTableConverter withCols(List<FieldSchema> cols) {
table.getSd().setCols(cols);
return this;
}

public BigQueryToHiveTableConverter withLocation(String location) {
table.getSd().setLocation(location);
return this;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Table;

public class DeleteTableAction implements PostExtractionAction {
Expand All @@ -31,8 +32,12 @@ public DeleteTableAction(Table table) {

@Override
public void run() {
table.delete();
log.debug("Deleted table '{}'", table.getTableId());
try {
table.delete();
log.debug("Deleted table '{}'", table.getTableId());
} catch (BigQueryException e) {
log.error("Could not delete BigQuery table '{}'", table.getTableId(), e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,22 @@ public class UpdatePartitionSchemaAction implements PostExtractionAction {
private final Partition partition;
private final Storage storage;
private final ExtractionUri extractionUri;
private final SchemaExtractor schemaExtractor;

public UpdatePartitionSchemaAction(Partition partition, Storage storage, ExtractionUri extractionUri) {
public UpdatePartitionSchemaAction(
Partition partition,
Storage storage,
ExtractionUri extractionUri,
SchemaExtractor schemaExtractor) {
this.partition = partition;
this.storage = storage;
this.extractionUri = extractionUri;
this.schemaExtractor = schemaExtractor;
}

@Override
public void run() {
String schema = SchemaExtractor.getSchemaFromStorage(storage, extractionUri);
String schema = schemaExtractor.getSchemaFromStorage(storage, extractionUri);
partition.getSd().getSerdeInfo().putToParameters(AvroConstants.SCHEMA_PARAMETER, schema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,21 @@ public class UpdateTableSchemaAction implements PostExtractionAction {
private final HiveTableCache cache;
private final Storage storage;
private final ExtractionUri extractionUri;
private final SchemaExtractor schemaExtractor;

public UpdateTableSchemaAction(
String databaseName,
String tableName,
HiveTableCache cache,
Storage storage,
ExtractionUri extractionUri) {
ExtractionUri extractionUri,
SchemaExtractor schemaExtractor) {
this.databaseName = databaseName;
this.tableName = tableName;
this.cache = cache;
this.storage = storage;
this.extractionUri = extractionUri;
this.schemaExtractor = schemaExtractor;
}

@Override
Expand All @@ -51,7 +54,7 @@ public void run() {
if (table == null) {
throw new CircusTrainException("Unable to find pre-cached table: " + databaseName + "." + tableName);
}
String schema = SchemaExtractor.getSchemaFromStorage(storage, extractionUri);
String schema = schemaExtractor.getSchemaFromStorage(storage, extractionUri);
table.getSd().getSerdeInfo().putToParameters(AvroConstants.SCHEMA_PARAMETER, schema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,22 @@
import static com.hotels.bdp.circustrain.bigquery.util.RandomStringGenerationUtils.randomUri;

import java.util.Arrays;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.cloud.bigquery.Table;

import com.hotels.bdp.circustrain.bigquery.extraction.container.DeleteTableAction;
import com.hotels.bdp.circustrain.bigquery.extraction.container.ExtractionContainer;
import com.hotels.bdp.circustrain.bigquery.extraction.container.ExtractionUri;
import com.hotels.bdp.circustrain.bigquery.extraction.container.PostExtractionAction;
import com.hotels.bdp.circustrain.bigquery.extraction.container.UpdatePartitionSchemaAction;
import com.hotels.bdp.circustrain.bigquery.extraction.service.ExtractionService;
import com.hotels.bdp.circustrain.bigquery.util.BigQueryMetastore;
import com.hotels.bdp.circustrain.bigquery.util.SchemaExtractor;

class BigQueryPartitionGenerator {

Expand All @@ -48,6 +49,8 @@ class BigQueryPartitionGenerator {
private final String destinationBucket;
private final String destinationFolder;

private final SchemaExtractor schemaExtractor;

BigQueryPartitionGenerator(
BigQueryMetastore bigQueryMetastore,
ExtractionService extractionService,
Expand All @@ -57,7 +60,7 @@ class BigQueryPartitionGenerator {
String partitionValue,
String destinationBucket,
String destinationFolder,
List<FieldSchema> cols) {
SchemaExtractor schemaExtractor) {
this.bigQueryMetastore = bigQueryMetastore;
this.extractionService = extractionService;
this.partitionValue = partitionValue;
Expand All @@ -66,14 +69,14 @@ class BigQueryPartitionGenerator {
this.partitionKey = partitionKey;
this.destinationBucket = destinationBucket;
this.destinationFolder = destinationFolder;
this.schemaExtractor = schemaExtractor;
}

ExtractionUri generatePartition(Partition partition) {
final String statement = getQueryStatement();
final String destinationTableName = randomTableName();

com.google.cloud.bigquery.Table bigQueryPartition = createPartitionInBigQuery(sourceDBName, destinationTableName,
statement);
Table bigQueryPartition = createPartitionInBigQuery(sourceDBName, destinationTableName, statement);
ExtractionUri extractionUri = scheduleForExtraction(bigQueryPartition, partition);
return extractionUri;

Expand All @@ -86,21 +89,21 @@ private String getQueryStatement() {
return query;
}

private com.google.cloud.bigquery.Table createPartitionInBigQuery(
private Table createPartitionInBigQuery(
String destinationDBName,
String destinationTableName,
String queryStatement) {
log.debug("Generating BigQuery partition using query {}", queryStatement);
bigQueryMetastore.executeIntoDestinationTable(destinationDBName, destinationTableName, queryStatement);
com.google.cloud.bigquery.Table part = bigQueryMetastore.getTable(destinationDBName, destinationTableName);
Table part = bigQueryMetastore.getTable(destinationDBName, destinationTableName);
return part;
}

private ExtractionUri scheduleForExtraction(com.google.cloud.bigquery.Table table, Partition partition) {
private ExtractionUri scheduleForExtraction(Table table, Partition partition) {
ExtractionUri extractionUri = new ExtractionUri(destinationBucket, generateFolderName(), generateFileName());
PostExtractionAction deleteTableAction = new DeleteTableAction(table);
PostExtractionAction updatePartitionSchemaAction = new UpdatePartitionSchemaAction(partition,
extractionService.getStorage(), extractionUri);
extractionService.getStorage(), extractionUri, schemaExtractor);
ExtractionContainer toRegister = new ExtractionContainer(table, extractionUri,
Arrays.asList(deleteTableAction, updatePartitionSchemaAction));
extractionService.register(toRegister);
Expand Down

0 comments on commit 2a53d08

Please sign in to comment.