Skip to content

Commit

Permalink
fix for issue 183 (sftp.RemoteFile.ReadAheadRemoteFileInputStream)
Browse files Browse the repository at this point in the history
  • Loading branch information
Björn Karge committed Jun 3, 2015
1 parent a18d623 commit 8e74330
Showing 1 changed file with 97 additions and 30 deletions.
127 changes: 97 additions & 30 deletions src/main/java/net/schmizz/sshj/sftp/RemoteFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import net.schmizz.sshj.common.Buffer;
import net.schmizz.sshj.sftp.Response.StatusCode;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -88,8 +89,7 @@ protected Promise<Response, SFTPException> asyncWrite(long fileOffset, byte[] da
throws IOException {
return requester.request(newRequest(PacketType.WRITE)
.putUInt64(fileOffset)
// TODO The SFTP spec claims this field is unneeded...? See #187
.putUInt32(len)
.putUInt32(len - off)

This comment has been minimized.

Copy link
@dkocher

dkocher Jun 3, 2015

This would revert changeset 8638091 from hierynomus#181.

This comment has been minimized.

Copy link
@bkarge

bkarge Jun 4, 2015

Owner

oops -- not sure I meant to touch those lines, apologies!

.putRawBytes(data, off, len)
);
}
Expand Down Expand Up @@ -238,27 +238,56 @@ public class ReadAheadRemoteFileInputStream
private final byte[] b = new byte[1];

private final int maxUnconfirmedReads;
private final Queue<Promise<Response, SFTPException>> unconfirmedReads;
private final Queue<Promise<Response, SFTPException>> unconfirmedReads = new LinkedList<Promise<Response, SFTPException>>();
private final Queue<Long> unconfirmedReadOffsets = new LinkedList<Long>();

private long fileOffset;
private long requestOffset;
private long responseOffset;
private boolean eof;

public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads) {
assert 0 <= maxUnconfirmedReads;

this.maxUnconfirmedReads = maxUnconfirmedReads;
this.unconfirmedReads = new LinkedList<Promise<Response, SFTPException>>();
this.fileOffset = 0;
}

public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads, long fileOffset) {
assert 0 <= maxUnconfirmedReads;
assert 0 <= fileOffset;

this.maxUnconfirmedReads = maxUnconfirmedReads;
this.unconfirmedReads = new LinkedList<Promise<Response, SFTPException>>();
this.fileOffset = fileOffset;
this.requestOffset = this.responseOffset = fileOffset;
}

@Override
public long skip(long n)
throws IOException {
throw new IOException("skip is not supported by ReadAheadFileInputStream, use RemoteFileInputStream instead");
private ByteArrayInputStream pending = new ByteArrayInputStream( new byte[0] );

private boolean retrieveUnconfirmedRead(boolean blocking) throws IOException {

if (unconfirmedReads.size() <= 0)
return false;

if (!blocking && !unconfirmedReads.peek().isDelivered())
return false;

unconfirmedReadOffsets.remove();
final Response res = unconfirmedReads.remove().retrieve( requester.getTimeoutMs(), TimeUnit.MILLISECONDS );
switch (res.getType()) {
case DATA:
int recvLen = res.readUInt32AsInt();

responseOffset += recvLen;
pending = new ByteArrayInputStream( res.array(), res.rpos(), recvLen );
break;

case STATUS:
res.ensureStatusIs( Response.StatusCode.EOF );
eof = true;
break;

default:
throw new SFTPException( "Unexpected packet: " + res.getType() );
}
return true;
}

@Override
Expand All @@ -268,26 +297,64 @@ public int read()
}

@Override
public int read(byte[] into, int off, int len)
throws IOException {
while (!eof && unconfirmedReads.size() <= maxUnconfirmedReads) {
// Send read requests as long as there is no EOF and we have not reached the maximum parallelism
unconfirmedReads.add(asyncRead(fileOffset, len));
fileOffset += len;
}
if (unconfirmedReads.isEmpty()) {
assert eof;
return -1;
}
// Retrieve first in
final Response res = unconfirmedReads.remove().retrieve(requester.getTimeoutMs(), TimeUnit.MILLISECONDS);
final int recvLen = checkReadResponse(res, into, off);
if (recvLen == -1) {
eof = true;
public int read(byte[] into, int off, int len) throws IOException {

while (!eof && pending.available() <= 0) {

// we also need to go here for len <= 0, because pending may be at
// EOF in which case it would return -1 instead of 0

while (unconfirmedReads.size() <= maxUnconfirmedReads) {
// Send read requests as long as there is no EOF and we have not reached the maximum parallelism
int reqLen = Math.max( 1024, len ); // don't be shy!
unconfirmedReads.add( RemoteFile.this.asyncRead( requestOffset, reqLen ) );
unconfirmedReadOffsets.add( requestOffset );
requestOffset += reqLen;
}

long nextOffset = unconfirmedReadOffsets.peek();
if (responseOffset != nextOffset) {

// the server could not give us all the data we needed, so
// we try to fill the gap synchronously

assert responseOffset < nextOffset;
assert 0 < (nextOffset - responseOffset);
assert (nextOffset - responseOffset) <= Integer.MAX_VALUE;

byte[] buf = new byte[(int) (nextOffset - responseOffset)];
int recvLen = RemoteFile.this.read( responseOffset, buf, 0, buf.length );

if (recvLen < 0) {
eof = true;
return -1;
}

if (0 == recvLen) // avoid infinite loops
throw new SFTPException( "Unexpected response size (0), bailing out" );

responseOffset += recvLen;
pending = new ByteArrayInputStream( buf, 0, recvLen );
}
else
if (!retrieveUnconfirmedRead( true /*blocking*/ )) {

// this may happen if we change prefetch strategy
// currently, we should never get here...

throw new IllegalStateException( "Could not retrieve data for pending read request" );
}
}
return recvLen;

return pending.read( into, off, len );
}

@Override
public int available() throws IOException {
while (!eof && (pending.available() <= 0) && retrieveUnconfirmedRead( false /*blocking*/ ))
/*loop*/;
return pending.available();
}
}
}

}

0 comments on commit 8e74330

Please sign in to comment.