Skip to content

Commit

Permalink
Issue GoogleCloudDataproc#310: Allowing to configure the expiration t…
Browse files Browse the repository at this point in the history
…ime of materialized data
  • Loading branch information
davidrabinowitz committed Feb 24, 2021
1 parent 8d99265 commit afe3292
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public TableInfo getReadTable(ReadTableOptions options) {
// in this case, let's materialize it and use it as the table
validateViewsEnabled(options);
String sql = query.get();
return materializeQueryToTable(sql, options.viewExpirationTimeInHours());
return materializeQueryToTable(sql, options.expirationTimeInMinutes());
}

TableInfo table = getTable(options.tableId());
Expand Down Expand Up @@ -254,13 +254,14 @@ public long calculateTableSize(TableInfo tableInfo, Optional<String> filter) {

/**
* Runs the provided query on BigQuery and saves the result in a temporary table.
*
* @param querySql the query to be run
* @param tableExpirationTimeInHours the time in hours until the table is expired and auto-deleted
* @param expirationTimeInMinutes the time in minutes until the table is expired and auto-deleted
* @return a reference to the table
*/
public TableInfo materializeQueryToTable(String querySql, int tableExpirationTimeInHours) {
public TableInfo materializeQueryToTable(String querySql, int expirationTimeInMinutes) {
TableId tableId = createDestinationTable(Optional.empty(), Optional.empty());
return materializeTable(querySql, tableId, tableExpirationTimeInHours);
return materializeTable(querySql, tableId, expirationTimeInMinutes);
}

/**
Expand All @@ -271,28 +272,29 @@ public TableInfo materializeQueryToTable(String querySql, int tableExpirationTim
*
* @param querySql the query to be run
* @param viewId the view the query came from
* @param viewExpirationTimeInHours the time in hours until the table is expired and auto-deleted
* @param expirationTimeInMinutes the time in hours until the table is expired and auto-deleted
* @return a reference to the table
*/
public TableInfo materializeViewToTable(
String querySql, TableId viewId, int viewExpirationTimeInHours) {
String querySql, TableId viewId, int expirationTimeInMinutes) {
TableId tableId =
createDestinationTable(
Optional.ofNullable(viewId.getProject()), Optional.ofNullable(viewId.getDataset()));
return materializeTable(querySql, tableId, viewExpirationTimeInHours);
return materializeTable(querySql, tableId, expirationTimeInMinutes);
}

private TableInfo materializeTable(
String querySql, TableId destinationTableId, int viewExpirationTimeInHours) {
String querySql, TableId destinationTableId, int expirationTimeInMinutes) {
try {
return destinationTableCache.get(
querySql,
new DestinationTableBuilder(
this, querySql, destinationTableId, viewExpirationTimeInHours));
new DestinationTableBuilder(this, querySql, destinationTableId, expirationTimeInMinutes));
} catch (Exception e) {
throw new BigQueryConnectorException(
BIGQUERY_VIEW_DESTINATION_TABLE_CREATION_FAILED, String.format(
"Error creating destination table using the following query: [%s]", querySql), e);
BIGQUERY_VIEW_DESTINATION_TABLE_CREATION_FAILED,
String.format(
"Error creating destination table using the following query: [%s]", querySql),
e);
}
}

Expand All @@ -305,24 +307,24 @@ public interface ReadTableOptions {

String viewEnabledParamName();

int viewExpirationTimeInHours();
int expirationTimeInMinutes();
}

static class DestinationTableBuilder implements Callable<TableInfo> {
final BigQueryClient bigQueryClient;
final String querySql;
final TableId destinationTable;
final int viewExpirationTimeInHours;
final int expirationTimeInMinutes;

DestinationTableBuilder(
BigQueryClient bigQueryClient,
String querySql,
TableId destinationTable,
int viewExpirationTimeInHours) {
int expirationTimeInMinutes) {
this.bigQueryClient = bigQueryClient;
this.querySql = querySql;
this.destinationTable = destinationTable;
this.viewExpirationTimeInHours = viewExpirationTimeInHours;
this.expirationTimeInMinutes = expirationTimeInMinutes;
}

@Override
Expand All @@ -346,7 +348,7 @@ TableInfo createTableFromQuery() {
// add expiration time to the table
TableInfo createdTable = bigQueryClient.getTable(destinationTable);
long expirationTime =
createdTable.getCreationTime() + TimeUnit.HOURS.toMillis(viewExpirationTimeInHours);
createdTable.getCreationTime() + TimeUnit.MINUTES.toMillis(expirationTimeInMinutes);
Table updatedTable =
bigQueryClient.update(createdTable.toBuilder().setExpirationTime(expirationTime).build());
return updatedTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ TableInfo getActualTable(
String querySql = bigQueryClient.createSql(table.getTableId(), requiredColumns, filters);
log.debug("querySql is %s", querySql);
return bigQueryClient.materializeViewToTable(
querySql, table.getTableId(), config.getViewExpirationTimeInHours());
querySql, table.getTableId(), config.getMaterializationExpirationTimeInMinutes());
} else {
// not regular table or a view
throw new BigQueryConnectorException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class ReadSessionCreatorConfig {
final Optional<String> materializationProject;
final Optional<String> materializationDataset;
final String viewEnabledParamName;
final int viewExpirationTimeInHours;
final int materializationExpirationTimeInMinutes;
final DataFormat readDataFormat;
final int maxReadRowsRetries;
final OptionalInt maxParallelism;
Expand All @@ -35,7 +35,7 @@ public ReadSessionCreatorConfig(
boolean viewsEnabled,
Optional<String> materializationProject,
Optional<String> materializationDataset,
int viewExpirationTimeInHours,
int materializationExpirationTimeInMinutes,
DataFormat readDataFormat,
int maxReadRowsRetries,
String viewEnabledParamName,
Expand All @@ -45,7 +45,7 @@ public ReadSessionCreatorConfig(
this.materializationProject = materializationProject;
this.materializationDataset = materializationDataset;
this.viewEnabledParamName = viewEnabledParamName;
this.viewExpirationTimeInHours = viewExpirationTimeInHours;
this.materializationExpirationTimeInMinutes = materializationExpirationTimeInMinutes;
this.readDataFormat = readDataFormat;
this.maxReadRowsRetries = maxReadRowsRetries;
this.maxParallelism = maxParallelism;
Expand All @@ -68,8 +68,8 @@ public String getViewEnabledParamName() {
return viewEnabledParamName;
}

public int getViewExpirationTimeInHours() {
return viewExpirationTimeInHours;
public int getMaterializationExpirationTimeInMinutes() {
return materializationExpirationTimeInMinutes;
}

public DataFormat getReadDataFormat() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class SparkBigQueryConfig implements BigQueryConfig, Serializable {
public static final String DATE_PARTITION_PARAM = "datePartition";
public static final String VALIDATE_SPARK_AVRO_PARAM = "validateSparkAvroInternalParam";
public static final String INTERMEDIATE_FORMAT_OPTION = "intermediateFormat";
public static final int DEFAULT_MATERIALIZATION_EXPRIRATION_TIME_IN_MINUTES = 24 * 60;
@VisibleForTesting static final DataFormat DEFAULT_READ_DATA_FORMAT = DataFormat.ARROW;

@VisibleForTesting
Expand Down Expand Up @@ -109,7 +110,7 @@ public class SparkBigQueryConfig implements BigQueryConfig, Serializable {
boolean optimizedEmptyProjection = true;
boolean useAvroLogicalTypes = false;
ImmutableList<JobInfo.SchemaUpdateOption> loadSchemaUpdateOptions = ImmutableList.of();
int viewExpirationTimeInHours = 24;
int materializationExpirationTimeInMinutes = DEFAULT_MATERIALIZATION_EXPRIRATION_TIME_IN_MINUTES;
int maxReadRowsRetries = 3;

@VisibleForTesting
Expand Down Expand Up @@ -143,6 +144,15 @@ public static SparkBigQueryConfig from(
globalOptions,
options,
ImmutableList.of("materializationDataset", "viewMaterializationDataset"));
config.materializationExpirationTimeInMinutes =
getAnyOption(globalOptions, options, "materializationExpirationTimeInMinutes")
.transform(Integer::parseInt)
.or(DEFAULT_MATERIALIZATION_EXPRIRATION_TIME_IN_MINUTES);
if (config.materializationExpirationTimeInMinutes < 1) {
throw new IllegalArgumentException(
"materializationExpirationTimeInMinutes must have a positive value, the configured value is "
+ config.materializationExpirationTimeInMinutes);
}
// get the table details
String tableParam =
getOptionFromMultipleParams(options, ImmutableList.of("table", "path"), DEFAULT_FALLBACK)
Expand Down Expand Up @@ -485,8 +495,8 @@ public ImmutableList<JobInfo.SchemaUpdateOption> getLoadSchemaUpdateOptions() {
return loadSchemaUpdateOptions;
}

public int getViewExpirationTimeInHours() {
return viewExpirationTimeInHours;
public int getMaterializationExpirationTimeInMinutes() {
return materializationExpirationTimeInMinutes;
}

public int getMaxReadRowsRetries() {
Expand Down Expand Up @@ -524,7 +534,7 @@ public ReadSessionCreatorConfig toReadSessionCreatorConfig() {
viewsEnabled,
materializationProject.toJavaUtil(),
materializationDataset.toJavaUtil(),
viewExpirationTimeInHours,
materializationExpirationTimeInMinutes,
readDataFormat,
maxReadRowsRetries,
VIEWS_ENABLED_OPTION,
Expand Down Expand Up @@ -555,8 +565,8 @@ public String viewEnabledParamName() {
}

@Override
public int viewExpirationTimeInHours() {
return SparkBigQueryConfig.this.getViewExpirationTimeInHours();
public int expirationTimeInMinutes() {
return SparkBigQueryConfig.this.getMaterializationExpirationTimeInMinutes();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,7 @@ public static void main(String[] args) {
wordCountDF.printSchema();

// Saving the data to BigQuery
wordCountDF
.write()
.format("bigquery")
.option("table", outputBigqueryTable)
.save();
wordCountDF.write().format("bigquery").option("table", outputBigqueryTable).save();
}

private static void usage() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ private[bigquery] class DirectBigQueryRelation(
// add expiration time to the table
val createdTable = bigQuery.getTable(destinationTable)
val expirationTime = createdTable.getCreationTime +
TimeUnit.HOURS.toMillis(options.getViewExpirationTimeInHours)
TimeUnit.MINUTES.toMillis(options.getMaterializationExpirationTimeInMinutes)
val updatedTable = bigQuery.update(createdTable.toBuilder
.setExpirationTime(expirationTime)
.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testDefaults() {
assertThat(config.getClusteredFields()).isEqualTo(Optional.empty());
assertThat(config.getCreateDisposition()).isEqualTo(Optional.empty());
assertThat(config.getLoadSchemaUpdateOptions()).isEqualTo(ImmutableList.of());
assertThat(config.getViewExpirationTimeInHours()).isEqualTo(24);
assertThat(config.getMaterializationExpirationTimeInMinutes()).isEqualTo(24 * 60);
assertThat(config.getMaxReadRowsRetries()).isEqualTo(3);
assertThat(config.isUseAvroLogicalTypes()).isFalse();
}
Expand All @@ -89,6 +89,7 @@ public void testConfigFromOptions() {
.put("viewsEnabled", "true")
.put("viewMaterializationProject", "vmp")
.put("viewMaterializationDataset", "vmd")
.put("materializationExpirationTimeInMinutes", "100")
.put("readDataFormat", "ARROW")
.put("optimizedEmptyProjection", "false")
.put("createDisposition", "CREATE_NEVER")
Expand Down Expand Up @@ -134,7 +135,7 @@ public void testConfigFromOptions() {
ImmutableList.of(
JobInfo.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
JobInfo.SchemaUpdateOption.ALLOW_FIELD_RELAXATION));
assertThat(config.getViewExpirationTimeInHours()).isEqualTo(24);
assertThat(config.getMaterializationExpirationTimeInMinutes()).isEqualTo(100);
assertThat(config.getMaxReadRowsRetries()).isEqualTo(3);
assertThat(config.isUseAvroLogicalTypes()).isTrue();
}
Expand Down

0 comments on commit afe3292

Please sign in to comment.