Permalink
Browse files

started rroxy implementatoin for SwiftRestClient, several bug fixes a…

…nd HTTP call optimizations in SwiftRestClient
  • Loading branch information...
1 parent 5ca4610 commit 9befd068a02ff35b1b34424315f4a4069e28e3b2 dmezhenskiy committed Feb 3, 2013
View
24 swift-file-system-locality-test/src/main/java/com/mirantis/swift/fs/TestJob.java
@@ -17,8 +17,11 @@
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.PrintWriter;
import java.net.InetAddress;
+import java.net.URI;
import java.net.UnknownHostException;
/**
@@ -69,18 +72,20 @@ public int run(String[] args) throws Exception {
"-username login -pass password -input input-path -output output-path");
//Swift auth properties
- conf.set("swift.auth.url", getParam(args, "host"));
- conf.set("swift.tenant", getParam(args, "tenant"));
- conf.set("swift.username", getParam(args, "username"));
- conf.set("swift.password", getParam(args, "pass"));
- conf.setInt("swift.http.port", 8080);
- conf.setInt("swift.https.port", 443);
+ conf.set("fs.swift.service.rs.auth.url", getParam(args, "host"));
+ conf.set("fs.swift.service.rs.tenant", getParam(args, "tenant"));
+ conf.set("fs.swift.service.rs.username", getParam(args, "username"));
+ conf.set("fs.swift.service.rs.password", getParam(args, "pass"));
+ conf.set("fs.swift.service.rs.public", "true");
+ //conf.set("fs.swift.service.rs.region", "region-a.geo-1");
+ conf.setInt("fs.swift.service.rs.http.port", 8080);
+ conf.setInt("fs.swift.service.rs.https.port", 443);
+
+ conf.setBoolean("mapred.output.compress", false);
final Job job = new Job(conf, "Test Job");
job.setJarByClass(TestJob.class);
- FileInputFormat.setInputPaths(job, getParam(args, "input"));
- FileOutputFormat.setOutputPath(job, new Path(getParam(args, "output")));
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
@@ -91,6 +96,9 @@ public int run(String[] args) throws Exception {
job.setMapperClass(TestMapper.class);
job.setReducerClass(TestReducer.class);
+ FileInputFormat.setInputPaths(job, getParam(args, "input"));
+ FileOutputFormat.setOutputPath(job, new Path(getParam(args, "output")));
+
return job.waitForCompletion(true) ? 0 : 1;
}
View
2 swift-file-system/pom.xml
@@ -16,7 +16,7 @@
<commons.lang.version>2.6</commons.lang.version>
<commons.io.version>2.3</commons.io.version>
<hadoop.version>3.0.0-SNAPSHOT</hadoop.version>
- <httpcomponents.version>4.2.1</httpcomponents.version>
+ <httpcomponents.version>4.2.3</httpcomponents.version>
<jackson.version>1.9.7</jackson.version>
<junit.version>4.10</junit.version>
<mockito.version>1.9.0</mockito.version>
View
9 swift-file-system/src/main/java/org/apache/hadoop/fs/swift/block/SwiftBlockFileSystem.java
@@ -75,7 +75,8 @@ public void initialize(URI uri, Configuration conf) throws IOException {
}
store.initialize(uri, conf);
this.uri = URI.create(String.format("bswift://%s:%d", uri.getHost(), uri.getPort()));
- this.workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this);
+ this.workingDir = new Path("/user", System.getProperty("user.name")).
+ makeQualified(uri, new Path(System.getProperty("user.name")));
}
/**
@@ -140,12 +141,12 @@ public boolean isFile(Path path) throws IOException {
return new FileStatus[]{};
}
if (inode.isFile()) {
- return new FileStatus[]{getFileStatus(f.makeQualified(this), inode)
+ return new FileStatus[]{getFileStatus(f.makeQualified(uri, workingDir), inode)
};
}
ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
for (Path p : store.listSubPaths(absolutePath)) {
- ret.add(getFileStatus(p.makeQualified(this)));
+ ret.add(getFileStatus(p.makeQualified(uri, workingDir)));
}
return ret.toArray(new FileStatus[ret.size()]);
}
@@ -354,7 +355,7 @@ public FileStatus getFileStatus(Path f) throws IOException {
}
@Override
- public long getDefaultBlockSize() {
+ public long getDefaultBlockSize(Path path) {
//64 mb
return 64 * 1024 * 1024;
}
View
6 swift-file-system/src/main/java/org/apache/hadoop/fs/swift/http/SwiftProtocolConstants.java
@@ -39,7 +39,7 @@
public static final String SERVICE_CATALOG_CLOUD_FILES = "cloudFiles";
public static final String SERVICE_CATALOG_OBJECT_STORE = "object-store";
public static final String SWIFT_OBJECT_AUTH_ENDPOINT =
- "/object_endpoint/AUTH_";
+ "/object_endpoint/";
public static final String X_OBJECT_MANIFEST = "X-Object-Manifest";
public static final String X_CONTAINER_OBJECT_COUNT =
"X-Container-Object-Count";
@@ -87,6 +87,8 @@
public static final String DOT_HTTP_PORT = ".http.port";
public static final String DOT_HTTPS_PORT = ".https.port";
public static final String DOT_REGION = ".region";
+ public static final String DOT_PROXY_HOST = ".proxy.host";
+ public static final String DOT_PROXY_PORT = ".proxy.port";
/**
* flag to say use public URL
*/
@@ -105,4 +107,6 @@
public static final String SWIFT_REGION_PROPERTY = FS_SWIFT + DOT_REGION;
public static final String SWIFT_PUBLIC_PROPERTY = FS_SWIFT + DOT_PUBLIC;
+ public static final String SWIFT_PROXY_HOST_PROPERTY = FS_SWIFT + DOT_PROXY_HOST;
+ public static final String SWIFT_PROXY_PORT_PROPERTY = FS_SWIFT + DOT_PROXY_PORT;
}
View
52 swift-file-system/src/main/java/org/apache/hadoop/fs/swift/http/SwiftRestClient.java
@@ -18,12 +18,8 @@
package org.apache.hadoop.fs.swift.http;
-import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
-import org.apache.commons.httpclient.Header;
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpMethod;
-import org.apache.commons.httpclient.HttpMethodBase;
-import org.apache.commons.httpclient.HttpStatus;
+import org.apache.commons.httpclient.*;
+
import static org.apache.commons.httpclient.HttpStatus.*;
import org.apache.commons.httpclient.methods.DeleteMethod;
import org.apache.commons.httpclient.methods.GetMethod;
@@ -62,6 +58,7 @@
import static org.apache.hadoop.fs.swift.http.SwiftProtocolConstants.*;
import org.apache.hadoop.fs.swift.util.SwiftUtils;
+import org.apache.http.conn.params.ConnRoutePNames;
import org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream;
import java.io.FileNotFoundException;
@@ -140,6 +137,9 @@
private final int retryCount;
private final int connectTimeout;
+ private String proxyHost;
+ private int proxyPort;
+
/**
* objects query endpoint. This is synchronized
* to handle a simultaneous update of all auth data in one
@@ -339,6 +339,9 @@ private SwiftRestClient(URI filesystemURI,
connectTimeout = getIntOption(props, SWIFT_CONNECTION_TIMEOUT,
DEFAULT_CONNECT_TIMEOUT);
+ proxyHost = props.getProperty(SWIFT_PROXY_HOST_PROPERTY, null);
+ proxyPort = getIntOption(props, SWIFT_PROXY_PORT_PROPERTY, 8080);
+
if (LOG.isDebugEnabled()) {
//everything you need for diagnostics. The password is omitted.
LOG.debug(String.format(
@@ -493,17 +496,21 @@ public InputStream getDataAsInputStream(SwiftObjectPath path,
public byte[] getObjectLocation(SwiftObjectPath path,
final Header... requestHeaders) throws IOException {
preRemoteCommand("getObjectLocation");
- return perform(pathToURI(path),
+ return perform(pathToObjectLocation(path),
new GetMethodProcessor<byte[]>() {
@Override
public byte[] extractResult(GetMethod method) throws
IOException {
//TODO: remove SC_NO_CONTENT if it depends on Swift versions
- if (method.getStatusCode() == SC_NOT_FOUND || method.getStatusCode() == SC_NO_CONTENT) {
+ if (method.getStatusCode() == SC_NOT_FOUND || method.getStatusCode() == SC_NO_CONTENT ||
+ method.getResponseBodyAsStream() == null) {
return null;
}
- return method.getResponseBody();
+ final InputStream responseBodyAsStream = method.getResponseBodyAsStream();
+ final byte[] locationData = new byte[1024];
+
+ return responseBodyAsStream.read(locationData) > 0 ? locationData : null;
}
@Override
@@ -513,6 +520,23 @@ protected void setup(GetMethod method) {
});
}
+ private URI pathToObjectLocation(SwiftObjectPath path) {
+ URI uri;
+ String dataLocationURI = objectLocationURI.toString();
+ try {
+ if (path.toString().startsWith("/")) {
+ dataLocationURI = dataLocationURI.concat(path.toUriPath());
+ } else {
+ dataLocationURI = dataLocationURI.concat("/").concat(path.toUriPath());
+ }
+
+ uri = new URI(dataLocationURI);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ return uri;
+ }
+
/**
* Find objects under a prefix
* @param path path prefix
@@ -760,6 +784,7 @@ public AccessToken extractResult(PostMethod method) throws IOException {
//these fields are all set together at the end of the operation
URI endpointURI = null;
URI objectLocation;
+ Endpoint swiftEndpoint = null;
AccessToken accessToken;
for (Catalog catalog : serviceCatalog) {
@@ -792,6 +817,7 @@ public AccessToken extractResult(PostMethod method) throws IOException {
}
if (region == null || endpointRegion.equals(region)) {
endpointURI = usePublicURL ?publicURL: internalURL;
+ swiftEndpoint = endpoint;
break;
}
}
@@ -812,7 +838,7 @@ public AccessToken extractResult(PostMethod method) throws IOException {
accessToken = access.getToken();
String path = SWIFT_OBJECT_AUTH_ENDPOINT
- + accessToken.getTenant().getId();
+ + swiftEndpoint.getTenantId();
String host = endpointURI.getHost();
try {
objectLocation = new URI(endpointURI.getScheme(),
@@ -932,7 +958,7 @@ private void preRemoteCommand(String operation) throws IOException {
final M method = processor.createMethod(uri.toString());
-
+
//retry policy
HttpMethodParams methodParams = method.getParams();
methodParams.setParameter(HttpMethodParams.RETRY_HANDLER,
@@ -1138,6 +1164,10 @@ private void setAuthToken(HttpMethodBase method, AccessToken accessToken) {
*/
private <M extends HttpMethod> int exec(M method) throws IOException {
final HttpClient client = new HttpClient();
+ if (proxyHost != null) {
+ client.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, new HttpHost(proxyHost, proxyPort));
+ }
+
int statusCode = execWithDebugOutput(method, client);
if (method.getStatusCode() == HttpStatus.SC_UNAUTHORIZED) {
//unauthed -look at what raised the response
View
72 ...t-file-system/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeFileSystem.java
@@ -21,13 +21,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.BufferedFSInputStream;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.swift.exceptions.SwiftException;
import org.apache.hadoop.fs.swift.exceptions.SwiftNotDirectoryException;
@@ -49,13 +43,9 @@
private static final Log LOG =
- LogFactory.getLog(SwiftNativeFileSystem.class);
+ LogFactory.getLog(SwiftNativeFileSystem.class);
/**
- * URI constant for this filesystem: {@value}
- */
- public static final String SWIFT_FS = "swift";
- /**
* path to user work directory for storing temporary files
*/
private Path workingDir;
@@ -81,30 +71,31 @@ public SwiftNativeFileSystem() {
* This constructor used for testing purposes
*/
public SwiftNativeFileSystem(SwiftNativeFileSystemStore store)
- throws IOException {
+ throws IOException {
this.store = store;
}
/**
* default class initialization
*
- * @param fsuri path to Swift
- * @param conf Hadoop configuration
+ * @param fsuri path to Swift
+ * @param conf Hadoop configuration
* @throws IOException
*/
@Override
public void initialize(URI fsuri, Configuration conf) throws IOException {
super.initialize(fsuri, conf);
+
setConf(conf);
if (store == null) {
store = new SwiftNativeFileSystemStore();
}
this.uri = fsuri;
this.workingDir = new Path("/user",
- System.getProperty("user.name")).makeQualified(this);
+ System.getProperty("user.name")).makeQualified(uri, new Path(System.getProperty("user.name")));
if (LOG.isDebugEnabled()) {
- LOG.debug("Initializing SwiftNativeFileSystem against URI "+ uri
- + " and working dir " + workingDir);
+ LOG.debug("Initializing SwiftNativeFileSystem against URI " + uri
+ + " and working dir " + workingDir);
}
store.initialize(uri, conf);
LOG.debug("SwiftFileSystem initialized");
@@ -115,6 +106,7 @@ public void initialize(URI fsuri, Configuration conf) throws IOException {
*/
@Override
public URI getUri() {
+
return uri;
}
@@ -148,13 +140,14 @@ public void setWorkingDirectory(Path dir) {
*/
@Override
public FileStatus getFileStatus(Path f) throws IOException {
- final FileStatus objectMetadata = store.getObjectMetadata(f);
- return objectMetadata;
+
+ return store.getObjectMetadata(f);
}
@Override
public boolean isFile(Path f) throws IOException {
+
try {
FileStatus fileStatus = getFileStatus(f);
return !SwiftUtils.isDirectory(fileStatus);
@@ -165,6 +158,7 @@ public boolean isFile(Path f) throws IOException {
@Override
public boolean isDirectory(Path f) throws IOException {
+
try {
FileStatus fileStatus = getFileStatus(f);
return SwiftUtils.isDirectory(fileStatus);
@@ -196,7 +190,7 @@ public boolean isDirectory(Path f) throws IOException {
if (listOfFileBlocks.length > 1) {
for (FileStatus fileStatus : listOfFileBlocks) {
if (SwiftObjectPath.fromPath(uri, fileStatus.getPath())
- .equals(SwiftObjectPath.fromPath(uri, file.getPath()))) {
+ .equals(SwiftObjectPath.fromPath(uri, file.getPath()))) {
continue;
}
locations.addAll(store.getObjectLocation(fileStatus.getPath()));
@@ -214,13 +208,12 @@ public boolean isDirectory(Path f) throws IOException {
i++;
}
return new BlockLocation[]{
- new BlockLocation(names, hosts, 0, file.getLen())
+ new BlockLocation(names, hosts, 0, file.getLen())
};
}
@Override
public boolean mkdirs(Path path, FsPermission permission) throws IOException {
-
if (LOG.isDebugEnabled()) {
LOG.debug("SwiftFileSystem.mkdirs: " + path);
}
@@ -264,8 +257,8 @@ private boolean mkdir(Path path) throws IOException {
fileStatus = getFileStatus(absolutePath);
if (!SwiftUtils.isDirectory(fileStatus)) {
throw new SwiftNotDirectoryException(path,
- String.format(": can't mkdir since it is not a directory: %s",
- fileStatus));
+ String.format(": can't mkdir since it is not a directory: %s",
+ fileStatus));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("skipping mkdir(" + path + ") as it exists already");
@@ -301,7 +294,7 @@ private boolean mkdir(Path path) throws IOException {
* This optional operation is not supported yet
*/
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws
- IOException {
+ IOException {
LOG.debug("SwiftFileSystem.append");
throw new SwiftUnsupportedFeatureException("Not supported: append()");
}
@@ -313,9 +306,9 @@ public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
public FSDataOutputStream create(Path file, FsPermission permission,
boolean overwrite, int bufferSize,
short replication, long blockSize, Progressable progress)
- throws IOException {
-
+ throws IOException {
LOG.debug("SwiftFileSystem.create");
+
FileStatus fileStatus = null;
try {
fileStatus = getFileStatus(makeAbsolute(file));
@@ -338,9 +331,9 @@ public FSDataOutputStream create(Path file, FsPermission permission,
}
SwiftNativeOutputStream out = new SwiftNativeOutputStream(getConf(),
- store,
- file.toUri()
- .toString());
+ store,
+ file.toUri()
+ .toString());
return new FSDataOutputStream(out, statistics);
}
@@ -353,8 +346,8 @@ public FSDataOutputStream create(Path file, FsPermission permission,
@Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
return new FSDataInputStream(
- new BufferedFSInputStream(
- new SwiftNativeInputStream(store, statistics, path), bufferSize));
+ new BufferedFSInputStream(
+ new SwiftNativeInputStream(store, statistics, path), bufferSize));
}
/**
@@ -400,7 +393,7 @@ public boolean delete(Path path, boolean recursive) throws IOException {
* raised. This lets the caller distinguish a file not found with
* other reasons for failure, so handles race conditions in recursive
* directory deletes better.
- *
+ * <p/>
* The problem being addressed is: caller A requests a recursive directory
* of directory /dir ; caller B requests a delete of a file /dir/file,
* between caller A enumerating the files contents, and requesting a delete
@@ -413,10 +406,10 @@ public boolean delete(Path path, boolean recursive) throws IOException {
* directory is not empty
* case of a file the recursive can be set to either true or false.
* @return true if the object was deleted
- * @throws IOException IO problems
+ * @throws IOException IO problems
* @throws FileNotFoundException if a file/dir being deleted is not there -
- * this includes entries below the specified path, (if the path is a dir
- * and recursive is true)
+ * this includes entries below the specified path, (if the path is a dir
+ * and recursive is true)
*/
private boolean innerDelete(Path path, boolean recursive) throws IOException {
Path absolutePath = makeAbsolute(path);
@@ -431,8 +424,7 @@ private boolean innerDelete(Path path, boolean recursive) throws IOException {
LOG.debug("Deleting simple file '" + path + "'");
}
store.deleteObject(absolutePath);
- }
- else {
+ } else {
//it's a directory
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting directory '" + path + "'");
@@ -452,7 +444,7 @@ private boolean innerDelete(Path path, boolean recursive) throws IOException {
LOG.debug("Found " + contents.length + " child entries under " + dirPath);
}
ArrayList<FileStatus> children =
- new ArrayList<FileStatus>(contents.length);
+ new ArrayList<FileStatus>(contents.length);
for (FileStatus child : contents) {
if (!(child.getPath().equals(dirPath))) {
if (LOG.isDebugEnabled()) {
View
107 ...e-system/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeFileSystemStore.java
@@ -10,7 +10,6 @@
import org.apache.hadoop.fs.swift.exceptions.SwiftConfigurationException;
import org.apache.hadoop.fs.swift.exceptions.SwiftException;
import org.apache.hadoop.fs.swift.exceptions.SwiftInvalidResponseException;
-import org.apache.hadoop.fs.swift.exceptions.SwiftNotDirectoryException;
import org.apache.hadoop.fs.swift.http.SwiftProtocolConstants;
import org.apache.hadoop.fs.swift.http.SwiftRestClient;
import org.apache.hadoop.fs.swift.util.SwiftObjectPath;
@@ -25,11 +24,7 @@
import java.net.URISyntaxException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.StringTokenizer;
+import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -41,14 +36,15 @@
private static final Pattern URI_PATTERN = Pattern.compile("\"\\S+?\"");
private static final String PATTERN = "EEE, d MMM yyyy hh:mm:ss zzz";
private static final Log LOG =
- LogFactory.getLog(SwiftNativeFileSystemStore.class);
+ LogFactory.getLog(SwiftNativeFileSystemStore.class);
private URI uri;
private SwiftRestClient swiftRestClient;
/**
* Initalize the filesystem store -this creates the REST client binding.
- * @param fsURI URI of the filesystem, which is used to map to the filesystem-specific
- * options in the configuration file
+ *
+ * @param fsURI URI of the filesystem, which is used to map to the filesystem-specific
+ * options in the configuration file
* @param configuration configuration
* @throws IOException on any failure.
*/
@@ -60,14 +56,15 @@ public void initialize(URI fsURI, Configuration configuration) throws IOExceptio
@Override
public String toString() {
return "SwiftNativeFileSystemStore with "
- + swiftRestClient;
+ + swiftRestClient;
}
/**
* Upload a file
- * @param path destination path in the swift filesystem
+ *
+ * @param path destination path in the swift filesystem
* @param inputStream input data
- * @param length length of the data
+ * @param length length of the data
* @throws IOException on a problem
*/
public void uploadFile(Path path, InputStream inputStream, long length) throws IOException {
@@ -76,10 +73,11 @@ public void uploadFile(Path path, InputStream inputStream, long length) throws I
/**
* Upload part of a larger file.
- * @param path destination path
- * @param partNumber item number in the path
+ *
+ * @param path destination path
+ * @param partNumber item number in the path
* @param inputStream input data
- * @param length length of the data
+ * @param length length of the data
* @throws IOException on a problem
*/
public void uploadFilePart(Path path, int partNumber, InputStream inputStream, long length) throws IOException {
@@ -96,6 +94,7 @@ public void uploadFilePart(Path path, int partNumber, InputStream inputStream, l
/**
* Tell the Swift server to expect a multi-part upload by submitting
* a 0-byte file with the X-Object-Manifest header
+ *
* @param path path of final final
* @throws IOException
*/
@@ -109,22 +108,23 @@ public void createManifestForPartUpload(Path path) throws IOException {
}
swiftRestClient.upload(toObjectPath(path),
- new ByteArrayInputStream(new byte[0]),
- 0,
- new Header(SwiftProtocolConstants.X_OBJECT_MANIFEST, pathString));
+ new ByteArrayInputStream(new byte[0]),
+ 0,
+ new Header(SwiftProtocolConstants.X_OBJECT_MANIFEST, pathString));
}
/**
* Get the metadata of an object
+ *
* @param path path
* @return file metadata. -or null if no headers were received back from the server.
- * @throws IOException on a problem
+ * @throws IOException on a problem
* @throws FileNotFoundException if there is nothing at the end
*/
public FileStatus getObjectMetadata(Path path) throws IOException {
final Header[] headers;
headers = swiftRestClient.headRequest(toObjectPath(path),
- SwiftRestClient.NEWEST);
+ SwiftRestClient.NEWEST);
//no headers is treated as a missing file
if (headers.length == 0) {
throw new FileNotFoundException("Not Found " + path.toUri());
@@ -136,7 +136,7 @@ public FileStatus getObjectMetadata(Path path) throws IOException {
for (Header header : headers) {
String headerName = header.getName();
if (headerName.equals(SwiftProtocolConstants.X_CONTAINER_OBJECT_COUNT) ||
- headerName.equals(SwiftProtocolConstants.X_CONTAINER_BYTES_USED)) {
+ headerName.equals(SwiftProtocolConstants.X_CONTAINER_BYTES_USED)) {
length = 0;
isDir = true;
}
@@ -165,28 +165,30 @@ public FileStatus getObjectMetadata(Path path) throws IOException {
/**
* Get the object as an input stream
+ *
* @param path object path
* @return the input stream -this must be closed to terminate the connection
- * @throws IOException IO problems
+ * @throws IOException IO problems
* @throws FileNotFoundException path doesn't resolve to an object
*/
public InputStream getObject(Path path) throws IOException {
return swiftRestClient.getDataAsInputStream(toObjectPath(path),
- SwiftRestClient.NEWEST);
+ SwiftRestClient.NEWEST);
}
/**
* Get the input stream starting from a specific point.
- * @param path path to object
+ *
+ * @param path path to object
* @param byteRangeStart starting point
- * @param length no. of bytes
+ * @param length no. of bytes
* @return an input stream that must be closed
* @throws IOException IO problems
*/
public InputStream getObject(Path path, long byteRangeStart, long length)
- throws IOException {
+ throws IOException {
return swiftRestClient.getDataAsInputStream(
- toObjectPath(path), byteRangeStart, length);
+ toObjectPath(path), byteRangeStart, length);
}
public FileStatus[] listSubPaths(Path path) throws IOException {
@@ -197,6 +199,7 @@ public InputStream getObject(Path path, long byteRangeStart, long length)
/**
* Create a directory
+ *
* @param path path
* @throws IOException
*/
@@ -205,12 +208,12 @@ public void createDirectory(Path path) throws IOException {
}
private SwiftObjectPath toDirPath(Path path) throws
- SwiftConfigurationException {
+ SwiftConfigurationException {
return SwiftObjectPath.fromPath(uri, path, false);
}
private SwiftObjectPath toObjectPath(Path path) throws
- SwiftConfigurationException {
+ SwiftConfigurationException {
return SwiftObjectPath.fromPath(uri, path);
}
@@ -230,6 +233,7 @@ private SwiftObjectPath toObjectPath(Path path) throws
public boolean deleteObject(Path path) throws IOException {
return swiftRestClient.delete(toObjectPath(path));
}
+
/**
* deletes a directory from Swift
*
@@ -243,6 +247,7 @@ public boolean rmdir(Path path) throws IOException {
/**
* Does the object exist
+ *
* @param path object path
* @return true if the metadata of an object could be retrieved
* @throws IOException IO problems
@@ -260,16 +265,17 @@ public boolean objectExists(Path path) throws IOException {
* Rename through copy-and-delete. this is clearly very inefficient, and
* is a consequence of the classic Swift filesystem using the path as the hash
* into the Distributed Hash Table, "the ring" of filenames.
- *
+ * <p/>
* Because of the nature of the operation, it is not atomic.
+ *
* @param src source file/dir
* @param dst destination
* @return true if the entire rename was successful.
* @throws IOException
*/
public boolean renameDirectory(Path src, Path dst) throws IOException {
if (LOG.isDebugEnabled()) {
- LOG.debug("mv " + src +" " + dst);
+ LOG.debug("mv " + src + " " + dst);
}
if (src.equals(dst)) {
LOG.debug("Destination==source -failing");
@@ -308,14 +314,13 @@ public boolean renameDirectory(Path src, Path dst) throws IOException {
//check to see if the parent exists
Path destParent = dst.getParent();
- FileStatus destParentStat;
try {
- destParentStat = destParent != null
- ? getObjectMetadata(destParent)
- : null;
+ FileStatus destParentStat = destParent != null
+ ? getObjectMetadata(destParent)
+ : null;
} catch (FileNotFoundException e) {
//destination parent doesn't exist; bail out
- LOG.debug("destination parent directory "+ destParent + " doesn't exist");
+ LOG.debug("destination parent directory " + destParent + " doesn't exist");
return false;
}
@@ -391,11 +396,14 @@ public boolean renameDirectory(Path src, Path dst) throws IOException {
for (FileStatus fileStatus : fileStatuses) {
if (!fileStatus.isDir()) {
boolean copied =
- swiftRestClient.copyObject(toObjectPath(fileStatus.getPath()),
- targetObjectPath);
+ swiftRestClient.copyObject(toObjectPath(fileStatus.getPath()),
+ targetObjectPath);
result &= copied;
- swiftRestClient.delete(toObjectPath(fileStatus.getPath()));
+ //if client couldn't copy, data will be lost
+ if (copied) {
+ swiftRestClient.delete(toObjectPath(fileStatus.getPath()));
+ }
}
}
@@ -405,9 +413,10 @@ public boolean renameDirectory(Path src, Path dst) throws IOException {
/**
* List a directory
+ *
* @param path path to list
* @return the filestats of all the entities in the directory -or
- * an empty list if no objects were found listed under that prefix
+ * an empty list if no objects were found listed under that prefix
* @throws IOException IO problems
*/
private List<FileStatus> listDirectory(SwiftObjectPath path) throws IOException {
@@ -434,17 +443,15 @@ public boolean renameDirectory(Path src, Path dst) throws IOException {
//this can come back on a root list if the container is empty
if (LOG.isDebugEnabled()) {
LOG.debug("lsdir " + path +
- " status code says NO_CONTENT; "
- + e.toString());
+ " status code says NO_CONTENT; "
+ + e.toString());
}
if (SwiftUtils.isRootDir(path)) {
return Collections.emptyList();
- }
- else {
+ } else {
throw new FileNotFoundException("Not found: " + path);
}
- }
- else {
+ } else {
throw e;
}
}
@@ -469,10 +476,10 @@ public boolean renameDirectory(Path src, Path dst) throws IOException {
private Path getCorrectSwiftPath(Path path) throws URISyntaxException {
final URI fullUri = new URI(uri.getScheme(),
- uri.getAuthority(),
- path.toUri().getPath(),
- null,
- null);
+ uri.getAuthority(),
+ path.toUri().getPath(),
+ null,
+ null);
return new Path(fullUri);
}
View
4 swift-file-system/src/test/java/org/apache/hadoop/fs/swift/TestSwiftFileSystemBasicOps.java
@@ -261,7 +261,7 @@ public void testFileStatus() throws Throwable {
writeTextFile(fs, path, text, false);
FileStatus fileStatus = fs.getFileStatus(path);
assertTrue("Not a file: " + fileStatus, fileStatus.isFile());
- assertFalse("A dir: " + fileStatus, fileStatus.isDirectory());
+ assertFalse("A dir: " + fileStatus, fileStatus.isDir());
} finally {
delete(fs, path);
}
@@ -345,7 +345,7 @@ private void assertDirectory(SwiftNativeFileSystem fs, Path path) throws
assertFalse("Should be a dir, but is a file: " + fileStatus,
fileStatus.isFile());
assertTrue("Should be a dir -but isn't: " + fileStatus,
- fileStatus.isDirectory());
+ fileStatus.isDir());
}
View
4 ...-system/src/test/java/org/apache/hadoop/fs/swift/TestSwiftFileSystemExtendedContract.java
@@ -100,7 +100,7 @@ protected boolean renameSupported() {
}
protected Path path(String pathString) {
- return new Path(pathString).makeQualified(fs);
+ return new Path(pathString).makeQualified(fs.getUri(), fs.getWorkingDirectory());
}
protected void createFile(Path path) throws IOException {
@@ -474,7 +474,7 @@ private void assertIsFile(Path filename) throws IOException {
assertFalse("File claims to be a symlink " + fileInfo,
status.isSymlink());
assertFalse("File claims to be a directory " + fileInfo,
- status.isDirectory());
+ status.isDir());
}
}
View
7 ...system/src/test/java/org/apache/hadoop/fs/swift/functional/tests/SwiftFileSystemTest.java
@@ -3,14 +3,14 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.swift.snative.SwiftFileSystemForFunctionalTests;
import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem;
import org.junit.Before;
import org.junit.Test;
-import java.io.FileNotFoundException;
-import java.io.IOException;
+import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.ExecutorService;
@@ -41,6 +41,7 @@ public void initialization() throws URISyntaxException {
this.uri = new URI("swift://data.rackspace");
}
+
/**
* tests functionality for big files ( > 5Gb) upload
*/
@@ -126,7 +127,7 @@ public void testRenameDirWithSubDis() throws IOException {
final String message = "message";
final Path filePath = new Path("/home/user/documents/file.txt");
- final Path newFilePath = new Path("/home/user/files/file.txt");
+ final Path newFilePath = new Path("/home/user/documents/file2.txt");
final FSDataOutputStream fsDataOutputStream = fileSystem.create(filePath);
fsDataOutputStream.write(message.getBytes());

0 comments on commit 9befd06

Please sign in to comment.