Skip to content

Commit

Permalink
[FLINK-2580] [runtime] Expose more methods form Hadoop output streams…
Browse files Browse the repository at this point in the history
… and exposes wrapped input and output streams.
  • Loading branch information
StephanEwen committed Sep 8, 2015
1 parent b18e410 commit 304139d
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 54 deletions.
Expand Up @@ -16,14 +16,17 @@
* limitations under the License.
*/


package org.apache.flink.core.fs;

import java.io.IOException;
import java.io.OutputStream;

/**
* Interface for a data output stream to a file on a {@link FileSystem}.
*
*/
public abstract class FSDataOutputStream extends OutputStream {

public abstract void flush() throws IOException;

public abstract void sync() throws IOException;
}
Expand Up @@ -77,4 +77,15 @@ public void write(final byte[] b, final int off, final int len) throws IOExcepti
public void close() throws IOException {
fos.close();
}


@Override
public void flush() throws IOException {
fos.flush();
}

@Override
public void sync() throws IOException {
fos.getFD().sync();
}
}
Expand Up @@ -16,7 +16,6 @@
* limitations under the License.
*/


package org.apache.flink.runtime.fs.hdfs;

import java.io.IOException;
Expand All @@ -26,11 +25,10 @@
/**
* Concrete implementation of the {@link FSDataInputStream} for the
* Hadoop Distributed File System.
*
*/
public final class HadoopDataInputStream extends FSDataInputStream {

private org.apache.hadoop.fs.FSDataInputStream fsDataInputStream = null;
private final org.apache.hadoop.fs.FSDataInputStream fsDataInputStream;

/**
* Creates a new data input stream from the given HDFS input stream
Expand All @@ -39,13 +37,15 @@ public final class HadoopDataInputStream extends FSDataInputStream {
* the HDFS input stream
*/
public HadoopDataInputStream(org.apache.hadoop.fs.FSDataInputStream fsDataInputStream) {
if (fsDataInputStream == null) {
throw new NullPointerException();
}
this.fsDataInputStream = fsDataInputStream;
}


@Override
public synchronized void seek(long desired) throws IOException {

fsDataInputStream.seek(desired);
}

Expand All @@ -68,17 +68,22 @@ public void close() throws IOException {
public int read(byte[] buffer, int offset, int length) throws IOException {
return fsDataInputStream.read(buffer, offset, length);
}



@Override
public int available() throws IOException {
return fsDataInputStream.available();
}


@Override
public long skip(long n) throws IOException {
return fsDataInputStream.skip(n);
}

/**
* Gets the wrapped Hadoop input stream.
* @return The wrapped Hadoop input stream.
*/
public org.apache.hadoop.fs.FSDataInputStream getHadoopInputStream() {
return fsDataInputStream;
}
}
Expand Up @@ -16,24 +16,27 @@
* limitations under the License.
*/


package org.apache.flink.runtime.fs.hdfs;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

import org.apache.flink.core.fs.FSDataOutputStream;

public final class HadoopDataOutputStream extends FSDataOutputStream {
public class HadoopDataOutputStream extends FSDataOutputStream {

private org.apache.hadoop.fs.FSDataOutputStream fdos;
private final org.apache.hadoop.fs.FSDataOutputStream fdos;

public HadoopDataOutputStream(org.apache.hadoop.fs.FSDataOutputStream fdos) {
if (fdos == null) {
throw new NullPointerException();
}
this.fdos = fdos;
}

@Override
public void write(int b) throws IOException {

fdos.write(b);
}

Expand All @@ -47,4 +50,125 @@ public void close() throws IOException {
fdos.close();
}

@Override
public void flush() throws IOException {
if (HFLUSH_METHOD != null) {
try {
HFLUSH_METHOD.invoke(fdos);
}
catch (InvocationTargetException e) {
Throwable cause = e.getTargetException();
if (cause instanceof IOException) {
throw (IOException) cause;
}
else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
else if (cause instanceof Error) {
throw (Error) cause;
}
else {
throw new IOException("Exception while invoking hflush()", cause);
}
}
catch (IllegalAccessException e) {
throw new IOException("Cannot invoke hflush()", e);
}
}
else if (HFLUSH_ERROR != null) {
if (HFLUSH_ERROR instanceof NoSuchMethodException) {
throw new UnsupportedOperationException("hflush() method is not available in this version of Hadoop.");
}
else {
throw new IOException("Cannot access hflush() method", HFLUSH_ERROR);
}
}
else {
throw new UnsupportedOperationException("hflush() is not available in this version of Hadoop.");
}
}

@Override
public void sync() throws IOException {
if (HSYNC_METHOD != null) {
try {
HSYNC_METHOD.invoke(fdos);
}
catch (InvocationTargetException e) {
Throwable cause = e.getTargetException();
if (cause instanceof IOException) {
throw (IOException) cause;
}
else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
else if (cause instanceof Error) {
throw (Error) cause;
}
else {
throw new IOException("Exception while invoking hsync()", cause);
}
}
catch (IllegalAccessException e) {
throw new IOException("Cannot invoke hsync()", e);
}
}
else if (HSYNC_ERROR != null) {
if (HSYNC_ERROR instanceof NoSuchMethodException) {
throw new UnsupportedOperationException("hsync() method is not available in this version of Hadoop.");
}
else {
throw new IOException("Cannot access hsync() method", HSYNC_ERROR);
}
}
else {
throw new UnsupportedOperationException("hsync() is not available in this version of Hadoop.");
}
}

/**
* Gets the wrapped Hadoop output stream.
* @return The wrapped Hadoop output stream.
*/
public org.apache.hadoop.fs.FSDataOutputStream getHadoopOutputStream() {
return fdos;
}

// ------------------------------------------------------------------------
// utilities to bridge hsync and hflush to hadoop, even through it is not supported in Hadoop 1
// ------------------------------------------------------------------------

private static final Method HFLUSH_METHOD;
private static final Method HSYNC_METHOD;

private static final Throwable HFLUSH_ERROR;
private static final Throwable HSYNC_ERROR;

static {
Method hflush = null;
Method hsync = null;

Throwable flushError = null;
Throwable syncError = null;

try {
hflush = org.apache.hadoop.fs.FSDataOutputStream.class.getMethod("hflush");
}
catch (Throwable t) {
flushError = t;
}

try {
hsync = org.apache.hadoop.fs.FSDataOutputStream.class.getMethod("hsync");
}
catch (Throwable t) {
syncError = t;
}

HFLUSH_METHOD = hflush;
HSYNC_METHOD = hsync;

HFLUSH_ERROR = flushError;
HSYNC_ERROR = syncError;
}
}
Expand Up @@ -23,18 +23,18 @@
import java.net.URI;
import java.net.UnknownHostException;

import org.apache.flink.core.fs.HadoopFileSystemWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.flink.core.fs.HadoopFileSystemWrapper;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.InstantiationUtil;

import org.apache.hadoop.conf.Configuration;

/**
Expand Down Expand Up @@ -264,6 +264,14 @@ public URI getUri() {
return fs.getUri();
}

/**
* Gets the underlying Hadoop FileSystem.
* @return The underlying Hadoop FileSystem.
*/
public org.apache.hadoop.fs.FileSystem getHadoopFileSystem() {
return this.fs;
}

@Override
public void initialize(URI path) throws IOException {

Expand Down Expand Up @@ -367,21 +375,21 @@ public BlockLocation[] getFileBlockLocations(final FileStatus file, final long s
}

@Override
public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
public HadoopDataInputStream open(final Path f, final int bufferSize) throws IOException {
final org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(f.toString());
final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(path, bufferSize);
return new HadoopDataInputStream(fdis);
}

@Override
public FSDataInputStream open(final Path f) throws IOException {
public HadoopDataInputStream open(final Path f) throws IOException {
final org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(f.toString());
final org.apache.hadoop.fs.FSDataInputStream fdis = fs.open(path);
return new HadoopDataInputStream(fdis);
}

@Override
public FSDataOutputStream create(final Path f, final boolean overwrite, final int bufferSize,
public HadoopDataOutputStream create(final Path f, final boolean overwrite, final int bufferSize,
final short replication, final long blockSize)
throws IOException
{
Expand All @@ -392,7 +400,7 @@ public FSDataOutputStream create(final Path f, final boolean overwrite, final in


@Override
public FSDataOutputStream create(final Path f, final boolean overwrite) throws IOException {
public HadoopDataOutputStream create(final Path f, final boolean overwrite) throws IOException {
final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream = this.fs
.create(new org.apache.hadoop.fs.Path(f.toString()), overwrite);
return new HadoopDataOutputStream(fsDataOutputStream);
Expand Down

0 comments on commit 304139d

Please sign in to comment.