Skip to content

Commit

Permalink
Merge 309b6b6 into b73918b
Browse files Browse the repository at this point in the history
  • Loading branch information
vikkyrk committed May 4, 2017
2 parents b73918b + 309b6b6 commit a2584c9
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 85 deletions.
Expand Up @@ -205,7 +205,6 @@ public String apply(String input) {
bigQueryServices,
jobIdTokenView,
schemasView,
stepUuid,
WriteDisposition.WRITE_EMPTY,
CreateDisposition.CREATE_IF_NEEDED,
dynamicDestinations))
Expand Down Expand Up @@ -243,7 +242,6 @@ public String apply(String input) {
bigQueryServices,
jobIdTokenView,
schemasView,
stepUuid,
writeDisposition,
createDisposition,
dynamicDestinations))
Expand Down
Expand Up @@ -36,12 +36,12 @@
import java.util.regex.Matcher;
import javax.annotation.Nullable;

import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;

/** A set of helper functions and classes used by {@link BigQueryIO}. */
public class BigQueryHelpers {
Expand Down Expand Up @@ -309,14 +309,9 @@ static TableReference createTempTableReference(String projectId, String jobUuid)

static String resolveTempLocation(
String tempLocationDir, String bigQueryOperationName, String stepUuid) {
try {
IOChannelFactory factory = IOChannelUtils.getFactory(tempLocationDir);
return factory.resolve(
factory.resolve(tempLocationDir, bigQueryOperationName), stepUuid);
} catch (IOException e) {
throw new RuntimeException(
String.format("Failed to resolve temp destination directory in %s",
tempLocationDir), e);
}
return FileSystems.matchNewResource(tempLocationDir, true)
.resolve(bigQueryOperationName, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
.resolve(stepUuid, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
.toString();
}
}
Expand Up @@ -38,6 +38,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
Expand All @@ -50,6 +52,10 @@
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema;
Expand All @@ -67,8 +73,6 @@
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.Transport;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.KV;
Expand Down Expand Up @@ -520,18 +524,23 @@ void cleanup(PipelineOptions options) throws Exception {

Job extractJob = getBigQueryServices().getJobService(bqOptions).getJob(jobRef);

Collection<String> extractFiles = null;
Collection<ResourceId> extractFiles = null;
if (extractJob != null) {
extractFiles = getExtractFilePaths(extractDestinationDir, extractJob);
} else {
IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir);
Collection<String> dirMatch = factory.match(extractDestinationDir);
if (!dirMatch.isEmpty()) {
extractFiles = factory.match(factory.resolve(extractDestinationDir, "*"));
try {
ResourceId extractDestinationDirResource =
FileSystems.matchSingleFileSpec(extractDestinationDir).resourceId();
extractFiles = ImmutableList.of(
extractDestinationDirResource.resolve("*",
ResolveOptions.StandardResolveOptions.RESOLVE_FILE));
} catch (FileNotFoundException e) {
// Ignore if not found.
}
}
if (extractFiles != null && !extractFiles.isEmpty()) {
IOChannelUtils.getFactory(extractFiles.iterator().next()).remove(extractFiles);
FileSystems.delete(extractFiles,
MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
}
}
};
Expand Down Expand Up @@ -583,7 +592,7 @@ static String getExtractDestinationUri(String extractDestinationDir) {
return String.format("%s/%s", extractDestinationDir, "*.avro");
}

static List<String> getExtractFilePaths(String extractDestinationDir, Job extractJob)
static List<ResourceId> getExtractFilePaths(String extractDestinationDir, Job extractJob)
throws IOException {
JobStatistics jobStats = extractJob.getStatistics();
List<Long> counts = jobStats.getExtract().getDestinationUriFileCounts();
Expand All @@ -597,11 +606,13 @@ static List<String> getExtractFilePaths(String extractDestinationDir, Job extrac
}
long filesCount = counts.get(0);

ImmutableList.Builder<String> paths = ImmutableList.builder();
IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir);
ImmutableList.Builder<ResourceId> paths = ImmutableList.builder();
ResourceId extractDestinationDirResourceId =
FileSystems.matchSingleFileSpec(extractDestinationDir).resourceId();
for (long i = 0; i < filesCount; ++i) {
String filePath =
factory.resolve(extractDestinationDir, String.format("%012d%s", i, ".avro"));
ResourceId filePath = extractDestinationDirResourceId.resolve(
String.format("%012d%s", i, ".avro"),
ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
paths.add(filePath);
}
return paths.build();
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down Expand Up @@ -90,7 +91,7 @@ public List<BoundedSource<TableRow>> split(
resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid);

String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid));
List<String> tempFiles = executeExtract(
List<ResourceId> tempFiles = executeExtract(
extractJobId, tableToExtract, jobService, bqOptions.getProject(), extractDestinationDir);

TableSchema tableSchema = bqServices.getDatasetService(bqOptions)
Expand All @@ -116,7 +117,7 @@ public Coder<TableRow> getDefaultOutputCoder() {
return TableRowJsonCoder.of();
}

private List<String> executeExtract(
private List<ResourceId> executeExtract(
String jobId, TableReference table, JobService jobService, String executingProject,
String extractDestinationDir)
throws InterruptedException, IOException {
Expand All @@ -143,12 +144,11 @@ private List<String> executeExtract(

LOG.info("BigQuery extract job completed: {}", jobId);

List<String> tempFiles = BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob);
return ImmutableList.copyOf(tempFiles);
return BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob);
}

private List<BoundedSource<TableRow>> createSources(
List<String> files, TableSchema tableSchema) throws IOException, InterruptedException {
List<ResourceId> files, TableSchema tableSchema) throws IOException, InterruptedException {
final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(tableSchema);

SerializableFunction<GenericRecord, TableRow> function =
Expand All @@ -160,9 +160,9 @@ public TableRow apply(GenericRecord input) {
}};

List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
for (String fileName : files) {
for (ResourceId file : files) {
avroSources.add(new TransformingSource<>(
AvroSource.from(fileName), function, getDefaultOutputCoder()));
AvroSource.from(file.toString()), function, getDefaultOutputCoder()));
}
return ImmutableList.copyOf(avroSources);
}
Expand Down
Expand Up @@ -18,37 +18,30 @@

package org.apache.beam.sdk.io.gcp.bigquery;

import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;

import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.FileIOChannelFactory;
import org.apache.beam.sdk.util.GcsIOChannelFactory;
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.slf4j.Logger;
Expand All @@ -74,7 +67,6 @@ class WriteTables<DestinationT>
private final BigQueryServices bqServices;
private final PCollectionView<String> jobIdToken;
private final PCollectionView<Map<DestinationT, String>> schemasView;
private final String stepUuid;
private final WriteDisposition writeDisposition;
private final CreateDisposition createDisposition;
private final DynamicDestinations<?, DestinationT> dynamicDestinations;
Expand All @@ -84,15 +76,13 @@ public WriteTables(
BigQueryServices bqServices,
PCollectionView<String> jobIdToken,
PCollectionView<Map<DestinationT, String>> schemasView,
String stepUuid,
WriteDisposition writeDisposition,
CreateDisposition createDisposition,
DynamicDestinations<?, DestinationT> dynamicDestinations) {
this.singlePartition = singlePartition;
this.bqServices = bqServices;
this.jobIdToken = jobIdToken;
this.schemasView = schemasView;
this.stepUuid = stepUuid;
this.writeDisposition = writeDisposition;
this.createDisposition = createDisposition;
this.dynamicDestinations = dynamicDestinations;
Expand All @@ -114,8 +104,6 @@ public void processElement(ProcessContext c) throws Exception {
tableReference, tableDestination.getTableDescription());
}

String tempFilePrefix = resolveTempLocation(
c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", stepUuid);
Integer partition = c.element().getKey().getShardNumber();
List<String> partitionFiles = Lists.newArrayList(c.element().getValue());
String jobIdPrefix =
Expand All @@ -137,7 +125,7 @@ public void processElement(ProcessContext c) throws Exception {
tableDestination.getTableDescription());
c.output(KV.of(tableDestination, BigQueryHelpers.toJsonString(tableReference)));

removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partitionFiles);
removeTemporaryFiles(partitionFiles);
}

private void load(
Expand Down Expand Up @@ -198,22 +186,11 @@ private void load(
BigQueryHelpers.jobToPrettyString(lastFailedLoadJob)));
}

static void removeTemporaryFiles(
PipelineOptions options, String tempFilePrefix, Collection<String> files) throws IOException {
IOChannelFactory factory = IOChannelUtils.getFactory(tempFilePrefix);
if (factory instanceof GcsIOChannelFactory) {
GcsUtil gcsUtil = new GcsUtilFactory().create(options);
gcsUtil.remove(files);
} else if (factory instanceof FileIOChannelFactory) {
for (String filename : files) {
LOG.debug("Removing file {}", filename);
boolean exists = Files.deleteIfExists(Paths.get(filename));
if (!exists) {
LOG.debug("{} does not exist.", filename);
}
}
} else {
throw new IOException("Unrecognized file system.");
static void removeTemporaryFiles(Collection<String> files) throws IOException {
ImmutableList.Builder<ResourceId> fileResources = ImmutableList.builder();
for (String file: files) {
fileResources.add(FileSystems.matchNewResource(file, false/* isDirectory */));
}
FileSystems.delete(fileResources.build(), MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
}
}
Expand Up @@ -83,7 +83,9 @@
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
Expand Down Expand Up @@ -118,7 +120,6 @@
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.util.WindowedValue;
Expand Down Expand Up @@ -1820,7 +1821,9 @@ public void testWriteTables() throws Exception {
for (int k = 0; k < numFilesPerPartition; ++k) {
String filename = Paths.get(baseDir.toString(),
String.format("files0x%08x_%05d", tempTableId.hashCode(), k)).toString();
try (WritableByteChannel channel = IOChannelUtils.create(filename, MimeTypes.TEXT)) {
ResourceId fileResource =
FileSystems.matchNewResource(filename, false /* isDirectory */);
try (WritableByteChannel channel = FileSystems.create(fileResource, MimeTypes.TEXT)) {
try (OutputStream output = Channels.newOutputStream(channel)) {
TableRow tableRow = new TableRow().set("name", tableName);
TableRowJsonCoder.of().encode(tableRow, output, Context.OUTER);
Expand Down Expand Up @@ -1858,7 +1861,6 @@ public void testWriteTables() throws Exception {
fakeBqServices,
jobIdTokenView,
schemaMapView,
stepUuid,
WriteDisposition.WRITE_EMPTY,
CreateDisposition.CREATE_IF_NEEDED,
new IdentityDynamicTables());
Expand Down Expand Up @@ -1904,14 +1906,9 @@ public void testRemoveTemporaryFiles() throws Exception {
File tempDir = new File(bqOptions.getTempLocation());
testNumFiles(tempDir, 10);

WriteTables.removeTemporaryFiles(bqOptions, tempFilePrefix, fileNames);
WriteTables.removeTemporaryFiles(fileNames);

testNumFiles(tempDir, 0);

for (String fileName : fileNames) {
loggedWriteTables.verifyDebug("Removing file " + fileName);
}
loggedWriteTables.verifyDebug(fileNames.get(numFiles) + " does not exist.");
}

@Test
Expand Down

0 comments on commit a2584c9

Please sign in to comment.