Skip to content

Commit

Permalink
Input stream with range query to improve close performance when block…
Browse files Browse the repository at this point in the history
… is read.
  • Loading branch information
madanadit committed Aug 24, 2016
1 parent f63e357 commit c630102
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 4 deletions.
Expand Up @@ -11,11 +11,17 @@

package alluxio.underfs.swift;

import org.javaswift.joss.headers.object.range.ExcludeStartRange;
import alluxio.Configuration;
import alluxio.Constants;
import alluxio.PropertyKey;

import org.javaswift.joss.instructions.DownloadInstructions;
import org.javaswift.joss.model.Account;
import org.javaswift.joss.model.StoredObject;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -25,6 +31,8 @@
*/
@NotThreadSafe
public class SwiftInputStream extends InputStream {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);

/** JOSS Swift account. */
private final Account mAccount;
/** Name of container the object resides in. */
Expand All @@ -35,11 +43,10 @@ public class SwiftInputStream extends InputStream {
/** The backing input stream. */
private InputStream mStream;
/** The current position of the stream. */
private int mPos;
private long mPos;

/**
* Constructor for an input stream to an object in a Swift API based store.
*
* @param account JOSS account with authentication credentials
* @param container the name of container where the object resides
* @param object path of the object in the container
Expand Down Expand Up @@ -92,33 +99,45 @@ public long skip(long n) throws IOException {
if (n <= 0) {
return 0;
}

LOG.info("<<SWIFTPERF {} skipping {} bytes", this.hashCode(), n);
closeStream();
mPos += n;
openStream();
LOG.info(">>SWIFTPERF {} done skipping", this.hashCode());
return n;
}

/**
* Opens a new stream at mPos if the wrapped stream mIn is null.
*/
private void openStream() {
LOG.info("<<SWIFTPERF {} open stream at pos {}", this.hashCode(), mPos);

if (mStream != null) { // stream is already open
LOG.info("SWIFTPERF {} stream is already open", this.hashCode());
return;
}
StoredObject storedObject = mAccount.getContainer(mContainerName).getObject(mObjectPath);
DownloadInstructions downloadInstructions = new DownloadInstructions();
downloadInstructions.setRange(new ExcludeStartRange(mPos));
long blockSize = Configuration.getBytes(PropertyKey.USER_BLOCK_SIZE_BYTES_DEFAULT);
long endPos = mPos + blockSize;
downloadInstructions.setRange(new SwiftRange(mPos, endPos));
mStream = storedObject.downloadObjectAsInputStream(downloadInstructions);
LOG.info(">>SWIFTPERF {} stream open till end pos {}", this.hashCode(), endPos);
}

/**
* Closes the current stream.
*/
private void closeStream() throws IOException {
LOG.info("<<SWIFTPERF {} closing stream with pos {}", this.hashCode(), mPos);
if (mStream == null) {
LOG.info("SWIFTPERF {} stream already closed", this.hashCode());
return;
}
mStream.close();
mStream = null;
LOG.info(">>SWIFTPERF {} stream closed", this.hashCode());
}
}
38 changes: 38 additions & 0 deletions underfs/swift/src/main/java/alluxio/underfs/swift/SwiftRange.java
@@ -0,0 +1,38 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.underfs.swift;

import org.javaswift.joss.headers.object.range.AbstractRange;

/**
* A range of a Swift object.
*/
public class SwiftRange extends AbstractRange {
/**
* Constructor for a range in a Swift object.
* @param startPos starting position in bytes
* @param endPos ending position in bytes
*/
public SwiftRange(long startPos, long endPos) {
super(startPos, endPos);
}

@Override
public long getFrom(int byteArrayLength) {
return this.offset;
}

@Override
public long getTo(int byteArrayLength) {
return this.length;
}
}

0 comments on commit c630102

Please sign in to comment.