Skip to content

Commit

Permalink
Clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
vikkyrk committed May 3, 2017
1 parent 3749acb commit 60726e4
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 15 deletions.
Expand Up @@ -297,11 +297,11 @@ public TableReference apply(String from) {
}
}

static String getJobIdToken(String jobName, String stepUuid) {
return "beam_job_" + getJobUuid(jobName, stepUuid);
static String createJobIdToken(String jobName, String stepUuid) {
return "beam_job_" + createJobUuid(jobName, stepUuid);
}

static String getJobUuid(String jobName, String stepUuid) {
static String createJobUuid(String jobName, String stepUuid) {
return stepUuid + "_" + jobName.replaceAll("-", "");
}

Expand Down
Expand Up @@ -19,8 +19,8 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getJobIdToken;

import com.google.api.client.json.JsonFactory;
import com.google.api.services.bigquery.model.Job;
Expand Down Expand Up @@ -507,7 +507,8 @@ void cleanup(PipelineOptions options) throws Exception {
JobReference jobRef =
new JobReference()
.setProjectId(bqOptions.getProject())
.setJobId(getExtractJobId(getJobIdToken(bqOptions.getJobName(), stepUuid)));
.setJobId(
getExtractJobId(createJobIdToken(bqOptions.getJobName(), stepUuid)));

Job extractJob = getBigQueryServices().getJobService(bqOptions).getJob(jobRef);

Expand Down
Expand Up @@ -19,9 +19,9 @@
package org.apache.beam.sdk.io.gcp.bigquery;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobUuid;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createTempTableReference;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getJobIdToken;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getJobUuid;

import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
Expand Down Expand Up @@ -112,7 +112,7 @@ protected TableReference getTableToExtract(BigQueryOptions bqOptions)

// 2. Create the temporary dataset in the query location.
TableReference tableToExtract = createTempTableReference(
bqOptions.getProject(), getJobUuid(bqOptions.getJobName(), stepUuid));
bqOptions.getProject(), createJobUuid(bqOptions.getJobName(), stepUuid));

tableService.createDataset(
tableToExtract.getProjectId(),
Expand All @@ -121,7 +121,7 @@ protected TableReference getTableToExtract(BigQueryOptions bqOptions)
"Dataset for BigQuery query job temporary table");

// 3. Execute the query.
String queryJobId = getJobIdToken(bqOptions.getJobName(), stepUuid) + "-query";
String queryJobId = createJobIdToken(bqOptions.getJobName(), stepUuid) + "-query";
executeQuery(
bqOptions.getProject(),
queryJobId,
Expand All @@ -133,7 +133,7 @@ protected TableReference getTableToExtract(BigQueryOptions bqOptions)
@Override
protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
TableReference tableToRemove = createTempTableReference(
bqOptions.getProject(), getJobUuid(bqOptions.getJobName(), stepUuid));
bqOptions.getProject(), createJobUuid(bqOptions.getJobName(), stepUuid));

DatasetService tableService = bqServices.getDatasetService(bqOptions);
tableService.deleteTable(tableToRemove);
Expand Down
Expand Up @@ -19,8 +19,8 @@
package org.apache.beam.sdk.io.gcp.bigquery;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getJobIdToken;

import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationExtract;
Expand Down Expand Up @@ -86,7 +86,7 @@ public List<BoundedSource<TableRow>> split(
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
TableReference tableToExtract = getTableToExtract(bqOptions);
JobService jobService = bqServices.getJobService(bqOptions);
String extractJobId = getExtractJobId(getJobIdToken(options.getJobName(), stepUuid));
String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid));
List<String> tempFiles = executeExtract(
extractJobId, tableToExtract, jobService, bqOptions.getProject());

Expand Down
Expand Up @@ -20,8 +20,8 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobUuid;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createTempTableReference;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getJobUuid;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.containsInAnyOrder;
Expand Down Expand Up @@ -1320,7 +1320,7 @@ public void testBigQueryQuerySourceInitSplit() throws Exception {
String stepUuid = "testStepUuid";

TableReference tempTableReference = createTempTableReference(
bqOptions.getProject(), getJobUuid(bqOptions.getJobName(), stepUuid));
bqOptions.getProject(), createJobUuid(bqOptions.getJobName(), stepUuid));
fakeDatasetService.createDataset(
bqOptions.getProject(), tempTableReference.getDatasetId(), "", "");
fakeDatasetService.createTable(new Table()
Expand Down Expand Up @@ -1395,7 +1395,7 @@ public void testBigQueryNoTableQuerySourceInitSplit() throws Exception {
String stepUuid = "testStepUuid";

TableReference tempTableReference = createTempTableReference(
bqOptions.getProject(), getJobUuid(bqOptions.getJobName(), stepUuid));
bqOptions.getProject(), createJobUuid(bqOptions.getJobName(), stepUuid));
List<TableRow> expected = ImmutableList.of(
new TableRow().set("name", "a").set("number", 1L),
new TableRow().set("name", "b").set("number", 2L),
Expand Down

0 comments on commit 60726e4

Please sign in to comment.