Skip to content

Commit

Permalink
Remove JobInfo hierarchy, add JobConfiguration hierarchy
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard committed Jan 26, 2016
1 parent e11a5ae commit 84402ac
Show file tree
Hide file tree
Showing 31 changed files with 1,739 additions and 1,519 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,9 @@ public static JobListOption startPageToken(String pageToken) {
* is not provided all job's fields are returned. {@code JobOption.fields()} can be used to
* specify only the fields of interest. {@link JobInfo#jobId()}, {@link JobStatus#state()},
* {@link JobStatus#error()} as well as type-specific configuration (e.g.
* {@link QueryJobInfo#query()} for Query Jobs) are always returned, even if not specified.
* {@link JobField#SELF_LINK} and {@link JobField#ETAG} can not be selected when listing jobs.
* {@link QueryJobConfiguration#query()} for Query Jobs) are always returned, even if not
* specified. {@link JobField#SELF_LINK} and {@link JobField#ETAG} can not be selected when
* listing jobs.
*/
public static JobListOption fields(JobField... fields) {
String selector = JobField.selector(fields);
Expand All @@ -397,8 +398,8 @@ private JobOption(BigQueryRpc.Option option, Object value) {
* Returns an option to specify the job's fields to be returned by the RPC call. If this option
* is not provided all job's fields are returned. {@code JobOption.fields()} can be used to
* specify only the fields of interest. {@link JobInfo#jobId()} as well as type-specific
* configuration (e.g. {@link QueryJobInfo#query()} for Query Jobs) are always returned, even if
* not specified.
* configuration (e.g. {@link QueryJobConfiguration#query()} for Query Jobs) are always
* returned, even if not specified.
*/
public static JobOption fields(JobField... fields) {
return new JobOption(BigQueryRpc.Option.FIELDS, JobField.selector(fields));
Expand Down Expand Up @@ -470,7 +471,7 @@ public static QueryResultsOption maxWaitTime(long maxWaitTime) {
*
* @throws BigQueryException upon failure
*/
<T extends JobInfo> T create(T job, JobOption... options) throws BigQueryException;
JobInfo create(JobInfo job, JobOption... options) throws BigQueryException;

/**
* Returns the requested dataset or {@code null} if not found.
Expand Down Expand Up @@ -611,14 +612,14 @@ Page<List<FieldValue>> listTableData(TableId tableId, TableDataListOption... opt
*
* @throws BigQueryException upon failure
*/
<T extends JobInfo> T getJob(String jobId, JobOption... options) throws BigQueryException;
JobInfo getJob(String jobId, JobOption... options) throws BigQueryException;

/**
* Returns the requested job or {@code null} if not found.
*
* @throws BigQueryException upon failure
*/
<T extends JobInfo> T getJob(JobId jobId, JobOption... options) throws BigQueryException;
JobInfo getJob(JobId jobId, JobOption... options) throws BigQueryException;

/**
* Lists the jobs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public Table call() {
}

@Override
public <T extends JobInfo> T create(T job, JobOption... options) throws BigQueryException {
public JobInfo create(JobInfo job, JobOption... options) throws BigQueryException {
final Job jobPb = setProjectId(job).toPb();
final Map<BigQueryRpc.Option, ?> optionsMap = optionMap(options);
try {
Expand Down Expand Up @@ -442,12 +442,12 @@ public List<FieldValue> apply(TableRow rowPb) {
}

@Override
public <T extends JobInfo> T getJob(String jobId, JobOption... options) throws BigQueryException {
public JobInfo getJob(String jobId, JobOption... options) throws BigQueryException {
return getJob(JobId.of(jobId), options);
}

@Override
public <T extends JobInfo> T getJob(final JobId jobId, JobOption... options)
public JobInfo getJob(final JobId jobId, JobOption... options)
throws BigQueryException {
final Map<BigQueryRpc.Option, ?> optionsMap = optionMap(options);
try {
Expand All @@ -457,7 +457,7 @@ public Job call() {
return bigQueryRpc.getJob(jobId.job(), optionsMap);
}
}, options().retryParams(), EXCEPTION_HANDLER);
return answer == null ? null : JobInfo.<T>fromPb(answer);
return answer == null ? null : JobInfo.fromPb(answer);
} catch (RetryHelper.RetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
Expand Down Expand Up @@ -646,42 +646,48 @@ private TableId setProjectId(TableId table) {
}

private JobInfo setProjectId(JobInfo job) {
if (job instanceof CopyJobInfo) {
CopyJobInfo copyJob = (CopyJobInfo) job;
CopyJobInfo.Builder copyBuilder = copyJob.toBuilder();
copyBuilder.destinationTable(setProjectId(copyJob.destinationTable()));
copyBuilder.sourceTables(
Lists.transform(copyJob.sourceTables(), new Function<TableId, TableId>() {
@Override
public TableId apply(TableId tableId) {
return setProjectId(tableId);
}
}));
return copyBuilder.build();
}
if (job instanceof QueryJobInfo) {
QueryJobInfo queryJob = (QueryJobInfo) job;
QueryJobInfo.Builder queryBuilder = queryJob.toBuilder();
if (queryJob.destinationTable() != null) {
queryBuilder.destinationTable(setProjectId(queryJob.destinationTable()));
}
if (queryJob.defaultDataset() != null) {
queryBuilder.defaultDataset(setProjectId(queryJob.defaultDataset()));
}
return queryBuilder.build();
}
if (job instanceof ExtractJobInfo) {
ExtractJobInfo extractJob = (ExtractJobInfo) job;
ExtractJobInfo.Builder extractBuilder = extractJob.toBuilder();
extractBuilder.sourceTable(setProjectId(extractJob.sourceTable()));
return extractBuilder.build();
}
if (job instanceof LoadJobInfo) {
LoadJobInfo loadJob = (LoadJobInfo) job;
LoadJobInfo.Builder loadBuilder = loadJob.toBuilder();
return loadBuilder.configuration(setProjectId(loadJob.configuration())).build();
}
return job;
JobConfiguration configuration = job.configuration();
JobInfo.Builder jobBuilder = job.toBuilder();
switch (configuration.type()) {
case COPY:
CopyJobConfiguration copyConfiguration = (CopyJobConfiguration) configuration;
CopyJobConfiguration.Builder copyBuilder = copyConfiguration.toBuilder();
copyBuilder.sourceTables(
Lists.transform(copyConfiguration.sourceTables(), new Function<TableId, TableId>() {
@Override
public TableId apply(TableId tableId) {
return setProjectId(tableId);
}
}));
copyBuilder.destinationTable(setProjectId(copyConfiguration.destinationTable()));
jobBuilder.configuration(copyBuilder.build());
break;
case QUERY:
QueryJobConfiguration queryConfiguration = (QueryJobConfiguration) configuration;
QueryJobConfiguration.Builder queryBuilder = queryConfiguration.toBuilder();
if (queryConfiguration.destinationTable() != null) {
queryBuilder.destinationTable(setProjectId(queryConfiguration.destinationTable()));
}
if (queryConfiguration.defaultDataset() != null) {
queryBuilder.defaultDataset(setProjectId(queryConfiguration.defaultDataset()));
}
jobBuilder.configuration(queryBuilder.build());
break;
case EXTRACT:
ExtractJobConfiguration extractConfiguration = (ExtractJobConfiguration) configuration;
ExtractJobConfiguration.Builder extractBuilder = extractConfiguration.toBuilder();
extractBuilder.sourceTable(setProjectId(extractConfiguration.sourceTable()));
jobBuilder.configuration(extractBuilder.build());
break;
case LOAD:
LoadJobConfiguration loadConfiguration = (LoadJobConfiguration) configuration;
jobBuilder.configuration((LoadJobConfiguration) setProjectId(loadConfiguration));
break;
default:
// never reached
throw new IllegalArgumentException("Job configuration is not supported");
}
return jobBuilder.build();
}

private QueryRequest setProjectId(QueryRequest request) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package com.google.gcloud.bigquery;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

import java.io.Serializable;
import java.util.List;
import java.util.Objects;

/**
* Google BigQuery Copy Job configuration. A Copy Job copies an existing table to another new or
* existing table.
*/
public final class CopyJobConfiguration implements JobConfiguration, Serializable {

private static final long serialVersionUID = 1140509641399762967L;

private final List<TableId> sourceTables;
private final TableId destinationTable;
private final JobInfo.CreateDisposition createDisposition;
private final JobInfo.WriteDisposition writeDisposition;

public static final class Builder {

private List<TableId> sourceTables;
private TableId destinationTable;
private JobInfo.CreateDisposition createDisposition;
private JobInfo.WriteDisposition writeDisposition;

private Builder() {}

private Builder(CopyJobConfiguration jobConfiguration) {
this.sourceTables = jobConfiguration.sourceTables;
this.destinationTable = jobConfiguration.destinationTable;
this.createDisposition = jobConfiguration.createDisposition;
this.writeDisposition = jobConfiguration.writeDisposition;
}

private Builder(com.google.api.services.bigquery.model.JobConfiguration configurationPb) {
JobConfigurationTableCopy copyConfigurationPb = configurationPb.getCopy();
this.destinationTable = TableId.fromPb(copyConfigurationPb.getDestinationTable());
if (copyConfigurationPb.getSourceTables() != null) {
this.sourceTables =
Lists.transform(copyConfigurationPb.getSourceTables(), TableId.FROM_PB_FUNCTION);
} else {
this.sourceTables = ImmutableList.of(TableId.fromPb(copyConfigurationPb.getSourceTable()));
}
if (copyConfigurationPb.getCreateDisposition() != null) {
this.createDisposition =
JobInfo.CreateDisposition.valueOf(copyConfigurationPb.getCreateDisposition());
}
if (copyConfigurationPb.getWriteDisposition() != null) {
this.writeDisposition = JobInfo.WriteDisposition.valueOf(
copyConfigurationPb.getWriteDisposition());
}
}

/**
* Sets the source tables to copy.
*/
public Builder sourceTables(List<TableId> sourceTables) {
this.sourceTables = sourceTables != null ? ImmutableList.copyOf(sourceTables) : null;
return this;
}

/**
* Sets the destination table of the copy job.
*/
public Builder destinationTable(TableId destinationTable) {
this.destinationTable = destinationTable;
return this;
}

/**
* Sets whether the job is allowed to create new tables.
*
* @see <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy.createDisposition">
* Create Disposition</a>
*/
public Builder createDisposition(JobInfo.CreateDisposition createDisposition) {
this.createDisposition = createDisposition;
return this;
}

/**
* Sets the action that should occur if the destination table already exists.
*
* @see <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy.writeDisposition">
* Write Disposition</a>
*/
public Builder writeDisposition(JobInfo.WriteDisposition writeDisposition) {
this.writeDisposition = writeDisposition;
return this;
}

public CopyJobConfiguration build() {
return new CopyJobConfiguration(this);
}
}

private CopyJobConfiguration(Builder builder) {
this.sourceTables = checkNotNull(builder.sourceTables);
this.destinationTable = checkNotNull(builder.destinationTable);
this.createDisposition = builder.createDisposition;
this.writeDisposition = builder.writeDisposition;
}

@Override
public Type type() {
return Type.COPY;
}

/**
* Returns the source tables to copy.
*/
public List<TableId> sourceTables() {
return sourceTables;
}

/**
* Returns the destination table to load the data into.
*/
public TableId destinationTable() {
return destinationTable;
}

/**
* Returns whether the job is allowed to create new tables.
*
* @see <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy.createDisposition">
* Create Disposition</a>
*/
public JobInfo.CreateDisposition createDisposition() {
return this.createDisposition;
}

/**
* Returns the action that should occur if the destination table already exists.
*
* @see <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy.writeDisposition">
* Write Disposition</a>
*/
public JobInfo.WriteDisposition writeDisposition() {
return writeDisposition;
}

public Builder toBuilder() {
return new Builder(this);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("sourceTables", sourceTables)
.add("destinationTable", destinationTable)
.add("createDisposition", createDisposition)
.add("writeDisposition", writeDisposition)
.toString();
}

@Override
public boolean equals(Object obj) {
return obj instanceof CopyJobConfiguration
&& Objects.equals(toPb(), ((CopyJobConfiguration) obj).toPb());
}

@Override
public int hashCode() {
return Objects.hash(sourceTables, destinationTable, createDisposition, writeDisposition);
}

com.google.api.services.bigquery.model.JobConfiguration toPb() {
JobConfigurationTableCopy configurationPb = new JobConfigurationTableCopy();
configurationPb.setDestinationTable(destinationTable.toPb());
if (sourceTables.size() == 1) {
configurationPb.setSourceTable(sourceTables.get(0).toPb());
} else {
configurationPb.setSourceTables(Lists.transform(sourceTables, TableId.TO_PB_FUNCTION));
}
if (createDisposition != null) {
configurationPb.setCreateDisposition(createDisposition.toString());
}
if (writeDisposition != null) {
configurationPb.setWriteDisposition(writeDisposition.toString());
}
return new com.google.api.services.bigquery.model.JobConfiguration().setCopy(configurationPb);
}

/**
* Creates a builder for a BigQuery Copy Job configuration given destination and source table.
*/
public static Builder builder(TableId destinationTable, TableId sourceTable) {
return builder(destinationTable, ImmutableList.of(checkNotNull(sourceTable)));
}

/**
* Creates a builder for a BigQuery Copy Job configuration given destination and source tables.
*/
public static Builder builder(TableId destinationTable, List<TableId> sourceTables) {
return new Builder().destinationTable(destinationTable).sourceTables(sourceTables);
}

/**
* Returns a BigQuery Copy Job configuration for the given destination and source table.
*/
public static CopyJobConfiguration of(TableId destinationTable, TableId sourceTable) {
return builder(destinationTable, sourceTable).build();
}

/**
* Returns a BigQuery Copy Job configuration for the given destination and source tables.
*/
public static CopyJobConfiguration of(TableId destinationTable, List<TableId> sourceTables) {
return builder(destinationTable, sourceTables).build();
}

static CopyJobConfiguration fromPb(
com.google.api.services.bigquery.model.JobConfiguration jobPb) {
return new Builder(jobPb).build();
}
}
Loading

0 comments on commit 84402ac

Please sign in to comment.