Skip to content

Commit

Permalink
Fix key name differences and setup local cluster.
Browse files Browse the repository at this point in the history
  • Loading branch information
calvinjia committed May 6, 2015
1 parent 5760f83 commit 396edbe
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 21 deletions.
4 changes: 2 additions & 2 deletions core/src/test/java/tachyon/client/BlockInStreamTest.java
Expand Up @@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
Expand Down
Expand Up @@ -34,9 +34,9 @@ public class S3OutputStream extends OutputStream {

public S3OutputStream(String bucketName, String key, S3Service client) throws IOException {
mBucketName = bucketName;
mKey = key;
mKey = key.substring(6 + mBucketName.length() + 1);
mClient = client;
mFile = new File("/tmp/" + key);
mFile = new File("/tmp/" + Math.random() * 100);
mOut = new FileOutputStream(mFile);
}

Expand Down
57 changes: 41 additions & 16 deletions underfs/s3/src/main/java/tachyon/underfs/s3/S3UnderFileSystem.java
Expand Up @@ -27,7 +27,6 @@
import org.jets3t.service.S3Service;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
import org.jets3t.service.model.StorageObject;
import org.jets3t.service.security.AWSCredentials;
Expand All @@ -47,6 +46,7 @@ public class S3UnderFileSystem extends UnderFileSystem {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);
private static final String FOLDER_SUFFIX = "_$folder$";
private static final String PATH_SEPARATOR = "/";
private static final String SCHEME = "s3n://";

private final S3Service mClient;
private final String mBucketName;
Expand Down Expand Up @@ -76,7 +76,10 @@ public void connectFromWorker(TachyonConf conf, String hostname) {

@Override
public OutputStream create(String path) throws IOException {
return new S3OutputStream(mBucketName, path, mClient);
if (mkdirs(getParentKey(path), true)) {
return new S3OutputStream(mBucketName, path, mClient);
}
return null;
}

// Same as create(path)
Expand All @@ -101,8 +104,10 @@ public boolean delete(String path, boolean recursive) throws IOException {
// Get all relevant files
String[] pathsToDelete = list(path);
for (String pathToDelete : pathsToDelete) {
String toDelete = path + pathToDelete;
// If we fail to deleteInternal one file, stop
if (!deleteInternal(pathToDelete)) {
if (!deleteInternal(path + pathToDelete)) {
System.out.println("Failed to delete: " + toDelete);
return false;
}
}
Expand Down Expand Up @@ -177,10 +182,11 @@ public boolean isFile(String path) throws IOException {
@Override
public String[] list(String path) throws IOException {
try {
path = path.substring(SCHEME.length() + mBucketName.length() + 1);
S3Object[] objs = mClient.listObjects(mBucketName, path, PATH_SEPARATOR);
String[] ret = new String[objs.length];
for (int i = 0; i < objs.length; i ++) {
ret[i] = objs[i].getKey();
ret[i] = objs[i].getKey().substring(path.length());
}
return ret;
} catch (ServiceException se) {
Expand All @@ -191,17 +197,24 @@ public String[] list(String path) throws IOException {

@Override
public boolean mkdirs(String path, boolean createParent) throws IOException {
if (isFolder(path)) {
return true;
}
if (exists(path)) {
LOG.error("Cannot create directory " + path + " because it is already a file.");
return false;
}
if (!createParent) {
if (checkParent(path)) {
if (parentExists(path)) {
// Parent directory exists
return mkdir(path);
} else {
LOG.error("Cannot create path " + path + " because parent does not exist");
LOG.error("Cannot create directory " + path + " because parent does not exist");
return false;
}
}
// Parent directories should be created
if (checkParent(path)) {
if (parentExists(path)) {
// Parent directory exists
return mkdir(path);
} else {
Expand All @@ -214,6 +227,7 @@ public boolean mkdirs(String path, boolean createParent) throws IOException {
@Override
public InputStream open(String path) throws IOException {
try {
path = path.substring(SCHEME.length() + mBucketName.length() + 1);
return mClient.getObject(mBucketName, path).getDataInputStream();
} catch (ServiceException se) {
LOG.error("Failed to open file: " + path, se);
Expand All @@ -237,6 +251,7 @@ public boolean rename(String src, String dst) throws IOException {
if (!isFolder(src)) {
// Source is a file
// Copy to destination
System.out.println("SRC is a file");
if (copy(src, CommonUtils.concatPath(dst, srcName))) {
// Delete original
return deleteInternal(src);
Expand Down Expand Up @@ -279,6 +294,7 @@ public boolean rename(String src, String dst) throws IOException {
return delete(src, true);
}
// Source is a file and Destination does not exist
System.out.println("Dest does not exist and source is a file");
return copy(src, dst) && deleteInternal(src);
}

Expand All @@ -297,7 +313,7 @@ public void setPermission(String path, String posixPerm) throws IOException {
* @param key
* @return
*/
private boolean checkParent(String key) {
private boolean parentExists(String key) {
// Root does not have a parent
if (isRoot(key)) {
return true;
Expand All @@ -324,10 +340,12 @@ private String convertToFolderName(String key) {
private boolean deleteInternal(String key) throws IOException {
try {
if (isFolder(key)) {
String keyAsFolder = convertToFolderName(key);
String keyAsFolder = convertToFolderName(key).substring(SCHEME.length()
+ mBucketName.length() + 1);
mClient.deleteObject(mBucketName, keyAsFolder);
} else {
mClient.deleteObject(mBucketName, key);
mClient.deleteObject(mBucketName, key.substring(SCHEME.length()
+ mBucketName.length() + 1));
}
} catch (ServiceException se) {
LOG.error("Failed to delete " + key, se);
Expand All @@ -354,10 +372,12 @@ private String getKeyName(String key) {
private StorageObject getObjectDetails(String key) {
try {
if (isFolder(key)) {
String keyAsFolder = convertToFolderName(key);
String keyAsFolder = convertToFolderName(key).substring(SCHEME.length()
+ mBucketName.length() + 1);
return mClient.getObjectDetails(mBucketName, keyAsFolder);
} else {
return mClient.getObjectDetails(mBucketName, key);
return mClient.getObjectDetails(mBucketName, key.substring(SCHEME.length()
+ mBucketName.length() + 1));
}
} catch (ServiceException se) {
return null;
Expand Down Expand Up @@ -386,8 +406,9 @@ private boolean isFolder(String key) {
return true;
}
try {
String keyAsFolder = convertToFolderName(key);
mClient.getObjectDetails(mBucketName, keyAsFolder);
String keyAsFolder = convertToFolderName(key).substring(SCHEME.length()
+ mBucketName.length() + 1);
StorageObject details = mClient.getObjectDetails(mBucketName, keyAsFolder);
// If no exception is thrown, the key exists as a folder
return true;
} catch (ServiceException se) {
Expand All @@ -401,7 +422,7 @@ private boolean isFolder(String key) {
* @return
*/
private boolean isRoot(String key) {
return key.equals(mBucketName);
return key.equals(SCHEME + mBucketName);
}

/**
Expand All @@ -411,7 +432,8 @@ private boolean isRoot(String key) {
*/
private boolean mkdir(String key) {
try {
String keyAsFolder = convertToFolderName(key);
String keyAsFolder =
convertToFolderName(key).substring(SCHEME.length() + mBucketName.length() + 1);
S3Object obj = new S3Object(keyAsFolder);
obj.setDataInputStream(new ByteArrayInputStream(new byte[0]));
obj.setContentLength(0);
Expand All @@ -426,6 +448,9 @@ private boolean mkdir(String key) {

private boolean copy(String src, String dst) {
try {
src = src.substring(SCHEME.length() + mBucketName.length() + 1);
dst = dst.substring(SCHEME.length() + mBucketName.length() + 1);
LOG.info("Copying " + src + " to " + dst);
S3Object obj = new S3Object(dst);
mClient.copyObject(mBucketName, src, mBucketName, obj, false);
return true;
Expand Down
Expand Up @@ -18,15 +18,21 @@
import java.io.IOException;

import tachyon.conf.TachyonConf;
import tachyon.underfs.UnderFileSystem;
import tachyon.underfs.UnderFileSystemCluster;

/**
* This class will use Amazon S3 as the backing store.
*/
public class S3UnderStorageCluster extends UnderFileSystemCluster {

private final String awsAccessKey = "";
private final String awsSecretKey = "";

public S3UnderStorageCluster(String baseDir, TachyonConf tachyonConf) {
super(baseDir, tachyonConf);
System.setProperty("fs.s3n.awsAccessKeyId", awsAccessKey);
System.setProperty("fs.s3n.awsSecretAccessKey", awsSecretKey);
mBaseDir = "s3n://calvin-s3-test/testdir";
}

Expand All @@ -41,7 +47,10 @@ public boolean isStarted() {
}

@Override
public void shutdown() throws IOException {}
public void shutdown() throws IOException {
UnderFileSystem ufs = UnderFileSystem.get(mBaseDir, mTachyonConf);
ufs.delete(mBaseDir, true);
}

@Override
public void start() throws IOException {}
Expand Down

0 comments on commit 396edbe

Please sign in to comment.