Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URLDecoder;
Expand Down Expand Up @@ -516,20 +515,21 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
disconnect(channel);
throw new IOException(String.format(E_PATH_DIR, f));
}
InputStream is;
try {
// the path could be a symbolic link, so get the real path
absolute = new Path("/", channel.realpath(absolute.toUri().getPath()));

is = channel.get(absolute.toUri().getPath());
} catch (SftpException e) {
throw new IOException(e);
}
return new FSDataInputStream(new SFTPInputStream(is, statistics)){
return new FSDataInputStream(
new SFTPInputStream(channel, absolute, statistics)){
@Override
public void close() throws IOException {
super.close();
disconnect(channel);
try {
super.close();
} finally {
disconnect(channel);
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,86 +15,114 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.sftp;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;

import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.SftpATTRS;
import com.jcraft.jsch.SftpException;

import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/** SFTP FileSystem input stream. */
class SFTPInputStream extends FSInputStream {

public static final String E_SEEK_NOTSUPPORTED = "Seek not supported";
public static final String E_NULL_INPUTSTREAM = "Null InputStream";
public static final String E_STREAM_CLOSED = "Stream closed";

private final ChannelSftp channel;
private final Path path;
private InputStream wrappedStream;
private FileSystem.Statistics stats;
private boolean closed;
private long pos;
private long nextPos;
private long contentLength;

SFTPInputStream(InputStream stream, FileSystem.Statistics stats) {

if (stream == null) {
throw new IllegalArgumentException(E_NULL_INPUTSTREAM);
SFTPInputStream(ChannelSftp channel, Path path, FileSystem.Statistics stats)
throws IOException {
try {
this.channel = channel;
this.path = path;
this.stats = stats;
this.wrappedStream = channel.get(path.toUri().getPath());
SftpATTRS stat = channel.lstat(path.toString());
this.contentLength = stat.getSize();
} catch (SftpException e) {
throw new IOException(e);
}
this.wrappedStream = stream;
this.stats = stats;
}

this.pos = 0;
this.closed = false;
@Override
public synchronized void seek(long position) throws IOException {
checkNotClosed();
if (position < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
}
nextPos = position;
}

@Override
public void seek(long position) throws IOException {
throw new IOException(E_SEEK_NOTSUPPORTED);
public synchronized int available() throws IOException {
checkNotClosed();
long remaining = contentLength - nextPos;
if (remaining > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
return (int) remaining;
}

private void seekInternal() throws IOException {
if (pos == nextPos) {
return;
}
if (nextPos > pos) {
long skipped = wrappedStream.skip(nextPos - pos);
pos = pos + skipped;
}
if (nextPos < pos) {
wrappedStream.close();
try {
wrappedStream = channel.get(path.toUri().getPath());
pos = wrappedStream.skip(nextPos);
} catch (SftpException e) {
throw new IOException(e);
}
}
}

@Override
public boolean seekToNewSource(long targetPos) throws IOException {
throw new IOException(E_SEEK_NOTSUPPORTED);
return false;
}

@Override
public long getPos() throws IOException {
return pos;
public synchronized long getPos() throws IOException {
return nextPos;
}

@Override
public synchronized int read() throws IOException {
if (closed) {
throw new IOException(E_STREAM_CLOSED);
checkNotClosed();
if (this.contentLength == 0 || (nextPos >= contentLength)) {
return -1;
}

seekInternal();
int byteRead = wrappedStream.read();
if (byteRead >= 0) {
pos++;
nextPos++;
}
if (stats != null & byteRead >= 0) {
stats.incrementBytesRead(1);
}
return byteRead;
}

public synchronized int read(byte[] buf, int off, int len)
throws IOException {
if (closed) {
throw new IOException(E_STREAM_CLOSED);
}

int result = wrappedStream.read(buf, off, len);
if (result > 0) {
pos += result;
}
if (stats != null & result > 0) {
stats.incrementBytesRead(result);
}

return result;
}

public synchronized void close() throws IOException {
if (closed) {
return;
Expand All @@ -103,4 +131,12 @@ public synchronized void close() throws IOException {
wrappedStream.close();
closed = true;
}

private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException(
path.toUri() + ": " + FSExceptionMessages.STREAM_IS_CLOSED
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ public void init() throws IOException {

}

/**
* Any teardown logic can go here.
* @throws IOException IO problems
*/
public void teardown() throws IOException {

}

/**
* Add a configuration resource to this instance's configuration
* @param resource resource reference
Expand Down Expand Up @@ -113,7 +121,7 @@ public FileSystem getFileSystem(URI uri) throws IOException {
public abstract FileSystem getTestFileSystem() throws IOException;

/**
* Get the scheme of this FS
* Get the scheme of this FS.
* @return the scheme this FS supports
*/
public abstract String getScheme();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ public void teardown() throws Exception {
Thread.currentThread().setName("teardown");
LOG.debug("== Teardown ==");
deleteTestDirInTeardown();
if (contract != null) {
contract.teardown();
}
LOG.debug("== Teardown complete ==");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.contract.sftp;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.sftp.SFTPFileSystem;
import org.apache.sshd.common.NamedFactory;
import org.apache.sshd.server.SshServer;
import org.apache.sshd.server.auth.UserAuth;
import org.apache.sshd.server.auth.password.UserAuthPasswordFactory;
import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory;

public class SFTPContract extends AbstractFSContract {

private static final String CONTRACT_XML = "contract/sftp.xml";
private static final URI TEST_URI =
URI.create("sftp://user:password@localhost");
private final String testDataDir =
new FileSystemTestHelper().getTestRootDir();
private final Configuration conf;
private SshServer sshd;

public SFTPContract(Configuration conf) {
super(conf);
addConfResource(CONTRACT_XML);
this.conf = conf;
}

@Override
public void init() throws IOException {
sshd = SshServer.setUpDefaultServer();
// ask OS to assign a port
sshd.setPort(0);
sshd.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());

List<NamedFactory<UserAuth>> userAuthFactories = new ArrayList<>();
userAuthFactories.add(new UserAuthPasswordFactory());

sshd.setUserAuthFactories(userAuthFactories);
sshd.setPasswordAuthenticator((username, password, session) ->
username.equals("user") && password.equals("password")
);

sshd.setSubsystemFactories(
Collections.singletonList(new SftpSubsystemFactory()));

sshd.start();
int port = sshd.getPort();

conf.setClass("fs.sftp.impl", SFTPFileSystem.class, FileSystem.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be set in core-default.xml already?
if not, sftp:// urls would break. (yes, i know every stack overflow spark example does this, but that is just superstition)

Copy link
Contributor Author

@mpryahin mpryahin May 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not set in core-default.xml, and if not specified here sftp urls won't be resolved by sftp schema. Could you please clarify a bit what exactly you mean here? Thank you!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double checked, it's quite strange. Is it a candidate for an improvement issue?

conf.setInt("fs.sftp.host.port", port);
conf.setBoolean("fs.sftp.impl.disable.cache", true);
}

@Override
public void teardown() throws IOException {
if (sshd != null) {
sshd.stop();
}
}

@Override
public FileSystem getTestFileSystem() throws IOException {
return FileSystem.get(TEST_URI, conf);
}

@Override
public String getScheme() {
return "sftp";
}

@Override
public Path getTestPath() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shame we didn't declare this as raising an ioe; probably too late now

Copy link
Contributor Author

@mpryahin mpryahin May 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not too late I suppose, would you like me to try to declare it as throwing IOE? I can issue a new Jira and fix it there.

try {
FileSystem fs = FileSystem.get(
URI.create("sftp://user:password@localhost"), conf
);
return fs.makeQualified(new Path(testDataDir));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.contract.sftp;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;

public class TestSFTPContractSeek extends AbstractContractSeekTest {

@Override
protected AbstractFSContract createContract(Configuration conf) {
return new SFTPContract(conf);
}
}
Loading