Skip to content

Commit

Permalink
[ALLUXIO-2743] Update gluster fs under fs
Browse files Browse the repository at this point in the history
  • Loading branch information
apc999 committed Apr 25, 2017
1 parent 95de0ba commit 75e9eed
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 66 deletions.
Expand Up @@ -182,7 +182,8 @@ public static String getValue(PropertyKey key, Map<String, String> ufsConf) {
* @return if the key is contained in the given ufs configuration or global configuration.
*/
public static boolean containsKey(PropertyKey key, Map<String, String> ufsConf) {
return ufsConf.containsKey(key.toString()) || Configuration.containsKey(key);
return (ufsConf != null && ufsConf.containsKey(key.toString())) || Configuration
.containsKey(key);
}

private UnderFileSystemUtils() {} // prevent instantiation
Expand Down
Expand Up @@ -15,7 +15,6 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -50,6 +49,7 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Matchers;

import java.io.IOException;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -147,8 +147,8 @@ private void registerFileSystemMock() throws IOException {
.thenReturn(UFS_SPACE_TOTAL);
when(underFileSystemMock.getSpace(TEST_PATH, UnderFileSystem.SpaceType.SPACE_USED)).thenReturn(
UFS_SPACE_USED);
when(underFileSystemFactoryMock.create(eq(TEST_PATH), anyObject())).thenReturn(
underFileSystemMock);
when(underFileSystemFactoryMock.create(eq(TEST_PATH), Matchers.<Map<String, String>>any()))
.thenReturn(underFileSystemMock);
UnderFileSystemRegistry.register(underFileSystemFactoryMock);
}

Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
Expand All @@ -53,6 +54,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* Tests {@link FileDataManager}.
Expand Down Expand Up @@ -83,7 +85,7 @@ public void before() throws Exception {

mUfsFactory = Mockito.mock(UnderFileSystemFactory.class);
Mockito.when(mUfsFactory.supportsPath(Mockito.anyString())).thenReturn(true);
Mockito.when(mUfsFactory.create(Mockito.anyString(), Mockito.anyObject()))
Mockito.when(mUfsFactory.create(Mockito.anyString(), Matchers.<Map<String, String>>any()))
.thenReturn(mUfs);
UnderFileSystemRegistry.register(mUfsFactory);
}
Expand Down
Expand Up @@ -12,10 +12,14 @@
package alluxio.underfs.glusterfs;

import alluxio.AlluxioURI;
import alluxio.Configuration;
import alluxio.PropertyKey;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.hdfs.HdfsUnderFileSystem;
import alluxio.util.UnderFileSystemUtils;

import org.apache.hadoop.conf.Configuration;

import java.util.Map;

import javax.annotation.concurrent.ThreadSafe;

Expand All @@ -35,37 +39,57 @@ public class GlusterFSUnderFileSystem extends HdfsUnderFileSystem {
public static final String SCHEME = "glusterfs://";

/**
* Constructs a new Gluster FS {@link UnderFileSystem}.
* Prepares the Hadoop configuration necessary.
*
* @param uri the {@link AlluxioURI} for this UFS
* @param conf the configuration for Hadoop or GlusterFS
* @param ufsConf Hadoop configuration
*/
public GlusterFSUnderFileSystem(AlluxioURI uri, Object conf) {
super(uri, conf);
}

@Override
public String getUnderFSType() {
return "glusterfs";
}

@Override
protected void prepareConfiguration(String path,
org.apache.hadoop.conf.Configuration hadoopConf) {
public static Configuration prepareConfiguration(String path, Map<String, String> ufsConf) {
if (path.startsWith(SCHEME)) {
Configuration hadoopConf = new Configuration();
// Configure for Gluster FS
hadoopConf.set("fs.glusterfs.impl", Configuration.get(PropertyKey.UNDERFS_GLUSTERFS_IMPL));
hadoopConf.set("mapred.system.dir", Configuration.get(PropertyKey.UNDERFS_GLUSTERFS_MR_DIR));
hadoopConf
.set("fs.glusterfs.volumes", Configuration.get(PropertyKey.UNDERFS_GLUSTERFS_VOLUMES));
hadoopConf.set(
"fs.glusterfs.volume.fuse." + Configuration.get(PropertyKey.UNDERFS_GLUSTERFS_VOLUMES),
Configuration.get(PropertyKey.UNDERFS_GLUSTERFS_MOUNTS));
hadoopConf.set("fs.glusterfs.impl",
UnderFileSystemUtils.getValue(PropertyKey.UNDERFS_GLUSTERFS_IMPL, ufsConf));
hadoopConf.set("mapred.system.dir",
UnderFileSystemUtils.getValue(PropertyKey.UNDERFS_GLUSTERFS_MR_DIR, ufsConf));
hadoopConf.set("fs.glusterfs.volumes",
UnderFileSystemUtils.getValue(PropertyKey.UNDERFS_GLUSTERFS_VOLUMES, ufsConf));
hadoopConf.set("fs.glusterfs.volume.fuse."
+ UnderFileSystemUtils.getValue(PropertyKey.UNDERFS_GLUSTERFS_VOLUMES, ufsConf),
UnderFileSystemUtils.getValue(PropertyKey.UNDERFS_GLUSTERFS_MOUNTS, ufsConf));
return hadoopConf;
} else {
// If not Gluster FS fall back to default HDFS behavior
// This should only happen if someone creates an instance of this directly rather than via the
// registry and factory which enforces the GlusterFS prefix being present.
super.prepareConfiguration(path, hadoopConf);
return HdfsUnderFileSystem.createConfiguration(ufsConf);
}
}

/**
* Factory method to construct a new Gluster FS {@link UnderFileSystem}.
*
* @param uri the {@link AlluxioURI} for this UFS
* @param conf the configuration for Hadoop or GlusterFS
*/
public static GlusterFSUnderFileSystem createInstance(AlluxioURI uri, Map<String, String> conf) {
Configuration hadoopConf = prepareConfiguration(uri.toString(), conf);
return new GlusterFSUnderFileSystem(uri, conf, hadoopConf);
}

/**
* Constructs a new Gluster FS {@link UnderFileSystem}.
*
* @param ufsUri the {@link AlluxioURI} for this UFS
* @param ufsConf the configuration for ufs
* @param hadoopConf the configuration for hdfs
*/
private GlusterFSUnderFileSystem(AlluxioURI ufsUri, Map<String, String> ufsConf, Configuration
hadoopConf) {
super(ufsUri, ufsConf, hadoopConf);
}

@Override
public String getUnderFSType() {
return "glusterfs";
}
}
Expand Up @@ -35,7 +35,7 @@ public GlusterFSUnderFileSystemFactory() {}
@Override
public UnderFileSystem create(String path, Map<String, String> conf) {
Preconditions.checkArgument(path != null, "path may not be null");
return new GlusterFSUnderFileSystem(new AlluxioURI(path), conf);
return GlusterFSUnderFileSystem.createInstance(new AlluxioURI(path), conf);
}

@Override
Expand Down
Expand Up @@ -62,39 +62,35 @@ public class HdfsUnderFileSystem extends BaseUnderFileSystem
private static final int MAX_TRY = 5;

private FileSystem mFileSystem;
private Configuration mHadoopConf;
private Map<String, String> mUfsConf;

/**
* Constructs a new HDFS {@link UnderFileSystem}.
* Factory method to constructs a new HDFS {@link UnderFileSystem} instance.
*
* @param uri the {@link AlluxioURI} for this UFS
* @param ufsUri the {@link AlluxioURI} for this UFS
* @param conf the configuration for Hadoop
*/
public HdfsUnderFileSystem(AlluxioURI uri, Map<String, String> conf) {
super(uri);
final String ufsPrefix = uri.toString();
mUfsConf = conf;
mHadoopConf = prepareConfiguration(ufsPrefix, conf);
public static HdfsUnderFileSystem createInstance(AlluxioURI ufsUri, Map<String, String> conf) {
Configuration hadoopConf = createConfiguration(conf);
return new HdfsUnderFileSystem(ufsUri, conf, hadoopConf);
}

// Load hdfs site properties from the given file and overwrite the default hdfs conf,
// the path of this file can be passed through --option
mHadoopConf.addResource(
new Path(UnderFileSystemUtils.getValue(PropertyKey.UNDERFS_HDFS_CONFIGURATION, conf)));
// NOTE, adding s3 credentials in system properties to hadoop conf for backward compatibility.
// TODO(binfan): remove this as it can be set in mount options through --option
HdfsUnderFileSystemUtils.addS3Credentials(mHadoopConf);
// Set all parameters passed through --option
if (conf != null) {
for (Map.Entry<String, String> entry : conf.entrySet()) {
mHadoopConf.set(entry.getKey(), entry.getValue());
}
}
Path path = new Path(ufsPrefix);
/**
* Constructs a new HDFS {@link UnderFileSystem}.
*
* @param ufsUri the {@link AlluxioURI} for this UFS
* @param ufsConf the configuration for ufs
* @param hadoopConf the hadoop configuration
*/
protected HdfsUnderFileSystem(AlluxioURI ufsUri, Map<String, String> ufsConf,
Configuration hadoopConf) {
super(ufsUri);
mUfsConf = ufsConf;
Path path = new Path(ufsUri.toString());
try {
mFileSystem = path.getFileSystem(mHadoopConf);
mFileSystem = path.getFileSystem(hadoopConf);
} catch (IOException e) {
LOG.warn("Exception thrown when trying to get FileSystem for {} : {}", ufsPrefix,
LOG.warn("Exception thrown when trying to get FileSystem for {} : {}", ufsUri,
e.getMessage());
throw new RuntimeException("Failed to create Hadoop FileSystem", e);
}
Expand All @@ -114,10 +110,9 @@ public String getUnderFSType() {
* configuration necessary for obtaining a usable {@linkplain FileSystem} instance.
* </p>
*
* @param path file system path
* @param ufsConf Hadoop configuration
* @param ufsConf ufs configuration
*/
public static Configuration prepareConfiguration(String path, Map<String, String> ufsConf) {
public static Configuration createConfiguration(Map<String, String> ufsConf) {
Configuration hadoopConf = new Configuration();

// On Hadoop 2.x this is strictly unnecessary since it uses ServiceLoader to automatically
Expand All @@ -133,6 +128,20 @@ public static Configuration prepareConfiguration(String path, Map<String, String
// system property
hadoopConf.set("fs.hdfs.impl.disable.cache",
System.getProperty("fs.hdfs.impl.disable.cache", "true"));

// Load hdfs site properties from the given file and overwrite the default hdfs conf,
// the path of this file can be passed through --option
hadoopConf.addResource(
new Path(UnderFileSystemUtils.getValue(PropertyKey.UNDERFS_HDFS_CONFIGURATION, ufsConf)));
// NOTE, adding s3 credentials in system properties to hadoop conf for backward compatibility.
// TODO(binfan): remove this as it can be set in mount options through --option
HdfsUnderFileSystemUtils.addS3Credentials(hadoopConf);
// Set all parameters passed through --option
if (ufsConf != null) {
for (Map.Entry<String, String> entry : ufsConf.entrySet()) {
hadoopConf.set(entry.getKey(), entry.getValue());
}
}
return hadoopConf;
}

Expand Down
Expand Up @@ -38,7 +38,7 @@ public HdfsUnderFileSystemFactory() {}
@Override
public UnderFileSystem create(String path, Map<String, String> conf) {
Preconditions.checkNotNull(path);
return new HdfsUnderFileSystem(new AlluxioURI(path), conf);
return HdfsUnderFileSystem.createInstance(new AlluxioURI(path), conf);
}

@Override
Expand Down
Expand Up @@ -14,12 +14,14 @@
import alluxio.AlluxioURI;
import alluxio.PropertyKey;

import com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

import java.net.URI;
import java.util.Map;

/**
* Tests {@link HdfsUnderFileSystem}.
Expand All @@ -30,7 +32,7 @@ public final class HdfsUnderFileSystemTest {

@Before
public final void before() throws Exception {
mHdfsUnderFileSystem = new HdfsUnderFileSystem(new AlluxioURI("file:///"), null);
mHdfsUnderFileSystem = HdfsUnderFileSystem.createInstance(new AlluxioURI("file:///"), null);
}

/**
Expand All @@ -43,14 +45,13 @@ public void getUnderFSType() throws Exception {
}

/**
* Tests the {@link HdfsUnderFileSystem#prepareConfiguration} method.
* Tests the {@link HdfsUnderFileSystem#createConfiguration} method.
*
* Checks the hdfs implements class and alluxio underfs config setting
*/
@Test
public void prepareConfiguration() throws Exception {
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
mHdfsUnderFileSystem.prepareConfiguration("", conf);
org.apache.hadoop.conf.Configuration conf = HdfsUnderFileSystem.createConfiguration(null);
Assert.assertEquals("org.apache.hadoop.hdfs.DistributedFileSystem", conf.get("fs.hdfs.impl"));
Assert.assertTrue(conf.getBoolean("fs.hdfs.impl.disable.cache", false));
Assert.assertNotNull(conf.get(PropertyKey.UNDERFS_HDFS_CONFIGURATION.toString()));
Expand All @@ -71,10 +72,9 @@ public void disableHdfsCache() throws Exception {
Assert.assertEquals("3", hadoopFs.getConf().get("dfs.replication"));

// create a new configuration with updated dfs replication value
org.apache.hadoop.conf.Configuration hadoopConf1 = new org.apache.hadoop.conf.Configuration();
hadoopConf1.set("dfs.replication", "1");
Map<String, String> hadoopConf1 = ImmutableMap.of("dfs.replication", "1");
HdfsUnderFileSystem hdfs =
new HdfsUnderFileSystem(new AlluxioURI(underfsAddress), hadoopConf1);
HdfsUnderFileSystem.createInstance(new AlluxioURI(underfsAddress), hadoopConf1);
Assert.assertEquals("1",
((org.apache.hadoop.conf.Configuration) hdfs.getConf()).get("dfs.replication"));
}
Expand Down
Expand Up @@ -50,7 +50,7 @@ public void before() throws InterruptedException, AmazonClientException {
mClient = Mockito.mock(AmazonS3Client.class);
mManager = Mockito.mock(TransferManager.class);
mS3UnderFileSystem = new S3AUnderFileSystem(new AlluxioURI(""), mClient, BUCKET_NAME,
BUCKET_MODE, ACCOUNT_OWNER, mManager);
BUCKET_MODE, ACCOUNT_OWNER, mManager, null);
}

/**
Expand Down

0 comments on commit 75e9eed

Please sign in to comment.