Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pluggable file I/O submodule in TableOperations #14

Merged
merged 16 commits into from
Dec 11, 2018
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.hadoop.HadoopFileIO;
import com.netflix.iceberg.hadoop.HadoopOutputFile;
import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;
Expand Down Expand Up @@ -53,6 +54,7 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
private static final String HIVE_LOCATION_FOLDER_NAME = "empty";

private final Configuration conf;
private final FileIO fileIo;

private TableMetadata currentMetadata = null;
private String currentMetadataLocation = null;
Expand All @@ -62,6 +64,7 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {

protected BaseMetastoreTableOperations(Configuration conf) {
this.conf = conf;
this.fileIo = new HadoopFileIO(conf);
}

@Override
Expand Down Expand Up @@ -98,7 +101,7 @@ protected String writeNewMetadata(TableMetadata metadata, int version) {
}

String newFilename = newTableMetadataFilename(baseLocation, version);
OutputFile newMetadataLocation = HadoopOutputFile.fromPath(new Path(newFilename), conf);
OutputFile newMetadataLocation = fileIo.newOutputFile(new Path(newFilename).toString());
mccheah marked this conversation as resolved.
Show resolved Hide resolved

// write the new metadata
TableMetadataParser.write(metadata, newMetadataLocation);
Expand Down Expand Up @@ -129,24 +132,13 @@ protected void refreshFromMetadataLocation(String newLocation, int numRetries) {
}

@Override
public InputFile newInputFile(String path) {
return fromLocation(path, conf);
public String resolveMetadataPath(String fileName) {
return newMetadataLocation(baseLocation, fileName);
mccheah marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public OutputFile newMetadataFile(String filename) {
return HadoopOutputFile.fromPath(
new Path(newMetadataLocation(baseLocation, filename)), conf);
}

@Override
public void deleteFile(String file) {
Path path = new Path(file);
try {
getFS(path, conf).delete(path, false /* should be a file, not recursive */ );
} catch (IOException e) {
throw new RuntimeIOException(e);
}
public FileIO fileIo() {
return fileIo;
}

@Override
Expand All @@ -167,18 +159,6 @@ private static String newMetadataLocation(String baseLocation, String filename)
return String.format("%s/%s/%s", baseLocation, METADATA_FOLDER_NAME, filename);
}

private static String parseBaseLocation(String metadataLocation) {
mccheah marked this conversation as resolved.
Show resolved Hide resolved
int lastSlash = metadataLocation.lastIndexOf('/');
int secondToLastSlash = metadataLocation.lastIndexOf('/', lastSlash);

// verify that the metadata file was contained in a "metadata" folder
String parentFolderName = metadataLocation.substring(secondToLastSlash + 1, lastSlash);
Preconditions.checkArgument(METADATA_FOLDER_NAME.equals(parentFolderName),
"Invalid metadata location, not in metadata/ folder: %s", metadataLocation);

return metadataLocation.substring(0, secondToLastSlash);
}

private static int parseVersion(String metadataLocation) {
int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0
int versionEnd = metadataLocation.indexOf('-', versionStart);
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public Iterator<DataFile> iterator(Expression partFilter,
Collection<String> columns) {
return Iterables.concat(Iterables.transform(manifestFiles,
(Function<String, Iterable<DataFile>>) path -> {
ManifestReader reader = ManifestReader.read(ops.newInputFile(path));
ManifestReader reader = ManifestReader.read(ops.fileIo().newInputFile(path));
addCloseable(reader);
return reader.filterPartitions(partFilter)
.filterRows(rowFilter)
Expand Down Expand Up @@ -142,7 +142,7 @@ private void cacheChanges() {
// accumulate adds and deletes from all manifests.
// because manifests can be reused in newer snapshots, filter the changes by snapshot id.
for (String manifest : manifestFiles) {
try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest))) {
try (ManifestReader reader = ManifestReader.read(ops.fileIo().newInputFile(manifest))) {
for (ManifestEntry add : reader.addedFiles()) {
if (add.snapshotId() == snapshotId) {
adds.add(add.file().copy());
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/com/netflix/iceberg/BaseTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public CloseableIterable<FileScanTask> planFiles() {
Iterable<Iterable<FileScanTask>> readers = Iterables.transform(
snapshot.manifests(),
manifest -> {
ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest));
ManifestReader reader = ManifestReader.read(ops.fileIo().newInputFile(manifest));
toClose.add(reader);
String schemaString = SchemaParser.toJson(reader.spec().schema());
String specString = PartitionSpecParser.toJson(reader.spec());
Expand Down
13 changes: 4 additions & 9 deletions core/src/main/java/com/netflix/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,18 +263,13 @@ public void commit(TableMetadata base, TableMetadata metadata) {
}

@Override
public InputFile newInputFile(String path) {
return ops.newInputFile(path);
public FileIO fileIo() {
return ops.fileIo();
}

@Override
public OutputFile newMetadataFile(String filename) {
return ops.newMetadataFile(filename);
}

@Override
public void deleteFile(String path) {
ops.deleteFile(path);
public String resolveMetadataPath(String fileName) {
return ops.resolveMetadataPath(fileName);
}

@Override
Expand Down
34 changes: 34 additions & 0 deletions core/src/main/java/com/netflix/iceberg/FileIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.netflix.iceberg;

import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;

import java.io.Serializable;

public interface FileIO extends Serializable {

InputFile newInputFile(String path);
mccheah marked this conversation as resolved.
Show resolved Hide resolved

OutputFile newOutputFile(String path);

void deleteFile(String path);
}
2 changes: 1 addition & 1 deletion core/src/main/java/com/netflix/iceberg/ManifestGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public CloseableIterable<ManifestEntry> entries() {
Iterable<Iterable<ManifestEntry>> readers = Iterables.transform(
manifests,
manifest -> {
ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest));
ManifestReader reader = ManifestReader.read(ops.fileIo().newInputFile(manifest));
FilteredManifest filtered = reader.filterRows(dataFilter).select(columns);
toClose.add(reader);
return Iterables.filter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public List<String> apply(TableMetadata base) {
.executeWith(getWorkerPool())
.run(index -> {
ManifestReader manifest = filterManifest(
deleteExpression, metricsEvaluator, ops.newInputFile(manifests.get(index)));
deleteExpression, metricsEvaluator, ops.fileIo().newInputFile(manifests.get(index)));
readers[index] = manifest;
toClose.add(manifest);
});
Expand Down Expand Up @@ -411,7 +411,7 @@ private ManifestReader filterManifest(Expression deleteExpression,
reader.close();

// return the filtered manifest as a reader
ManifestReader filtered = ManifestReader.read(ops.newInputFile(filteredCopy.location()));
ManifestReader filtered = ManifestReader.read(ops.fileIo().newInputFile(filteredCopy.location()));

// update caches
filteredManifests.put(manifest.location(), filtered);
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class RemoveSnapshots implements ExpireSnapshots {
private final Consumer<String> defaultDelete = new Consumer<String>() {
@Override
public void accept(String file) {
ops.deleteFile(file);
ops.fileIo().deleteFile(file);
}
};

Expand Down Expand Up @@ -161,7 +161,7 @@ public void commit() {
).run(manifest -> {
// even if the manifest is still used, it may contain files that can be deleted
// TODO: eliminate manifests with no deletes without scanning
try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest))) {
try (ManifestReader reader = ManifestReader.read(ops.fileIo().newInputFile(manifest))) {
for (ManifestEntry entry : reader.entries()) {
// if the snapshot ID of the DELETE entry is no longer valid, the data can be deleted
if (entry.status() == ManifestEntry.Status.DELETED &&
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ protected void cleanAll() {
}

protected void deleteFile(String path) {
ops.deleteFile(path);
ops.fileIo().deleteFile(path);
}

protected OutputFile manifestPath(int i) {
Expand Down
25 changes: 6 additions & 19 deletions core/src/main/java/com/netflix/iceberg/TableOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package com.netflix.iceberg;

import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;

/**
Expand Down Expand Up @@ -56,27 +55,15 @@ public interface TableOperations {
void commit(TableMetadata base, TableMetadata metadata);

/**
* Create a new {@link InputFile} for a path.
*
* @param path a string file path
* @return an InputFile instance for the path
* Obtain a handler to read and write files.
mccheah marked this conversation as resolved.
Show resolved Hide resolved
*/
InputFile newInputFile(String path);
FileIO fileIo();
mccheah marked this conversation as resolved.
Show resolved Hide resolved

/**
* Create a new {@link OutputFile} in the table's metadata store.
*
* @param filename a string file name, not a full path
* @return an OutputFile instance for the path
*/
OutputFile newMetadataFile(String filename);
String resolveMetadataPath(String fileName);
mccheah marked this conversation as resolved.
Show resolved Hide resolved

/**
* Delete a file.
*
* @param path path to the file
*/
void deleteFile(String path);
default OutputFile newMetadataFile(String fileName) {
mccheah marked this conversation as resolved.
Show resolved Hide resolved
return fileIo().newOutputFile(resolveMetadataPath(fileName));
}

/**
* Create a new ID for a Snapshot
Expand Down
55 changes: 55 additions & 0 deletions core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.netflix.iceberg.hadoop;

import com.netflix.iceberg.FileIO;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

public class HadoopFileIO implements FileIO {
private static final long serialVersionUID = 1L;
mccheah marked this conversation as resolved.
Show resolved Hide resolved

private transient Configuration hadoopConf;

public HadoopFileIO(Configuration hadoopConf) {
this.hadoopConf = hadoopConf;
}

private void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
hadoopConf.write(out);
mccheah marked this conversation as resolved.
Show resolved Hide resolved
}

private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
in.defaultReadObject();
hadoopConf = new Configuration(false);
hadoopConf.readFields(in);
}

@Override
public InputFile newInputFile(String path) {
return HadoopInputFile.fromLocation(path, hadoopConf);
}

@Override
public OutputFile newOutputFile(String path) {
return HadoopOutputFile.fromPath(new Path(path), hadoopConf);
}

@Override
public void deleteFile(String path) {
Path toDelete = new Path(path);
FileSystem fs = Util.getFS(toDelete, hadoopConf);
try {
fs.delete(toDelete, false /* not recursive */);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to delete file: %s", path);
}
}
}