Skip to content

Commit

Permalink
HDFS-12337. Ozone: Concurrent RocksDB open calls fail because of "No …
Browse files Browse the repository at this point in the history
…locks available". Contributed by Mukul Kumar Singh.
  • Loading branch information
anuengineer authored and omalley committed Apr 26, 2018
1 parent 9e73321 commit f352b10
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 48 deletions.
Expand Up @@ -64,18 +64,11 @@ public static MetadataStore getDB(ContainerData container,
ContainerCache cache = ContainerCache.getInstance(conf);
Preconditions.checkNotNull(cache);
try {
MetadataStore db = cache.getDB(container.getContainerName());
if (db == null) {
db = MetadataStoreBuilder.newBuilder()
.setDbFile(new File(container.getDBPath()))
.setCreateIfMissing(false)
.build();
cache.putDB(container.getContainerName(), db);
}
return db;
return cache.getDB(container.getContainerName(), container.getDBPath());
} catch (IOException ex) {
String message = "Unable to open DB. DB Name: %s, Path: %s. ex: %s"
.format(container.getContainerName(), container.getDBPath(), ex);
String message =
String.format("Unable to open DB. DB Name: %s, Path: %s. ex: %s",
container.getContainerName(), container.getDBPath(), ex.getMessage());
throw new StorageContainerException(message, UNABLE_TO_READ_METADATA_DB);
}
}
Expand Down
Expand Up @@ -41,7 +41,9 @@

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.Objects;

Expand Down Expand Up @@ -113,8 +115,10 @@ public static XceiverServerRatis newXceiverServerRatis(String datanodeID,
if (ozoneConf.getBoolean(OzoneConfigKeys
.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT)) {
try (ServerSocket socket = new ServerSocket(0)) {
try (ServerSocket socket = new ServerSocket()) {
socket.setReuseAddress(true);
SocketAddress address = new InetSocketAddress(0);
socket.bind(address);
localPort = socket.getLocalPort();
LOG.info("Found a free port for the server : {}", localPort);
// If we have random local ports configured this means that it
Expand Down
Expand Up @@ -21,12 +21,14 @@
import com.google.common.base.Preconditions;
import org.apache.commons.collections.MapIterator;
import org.apache.commons.collections.map.LRUMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -35,7 +37,8 @@
* container cache is a LRUMap that maintains the DB handles.
*/
public final class ContainerCache extends LRUMap {
static final Log LOG = LogFactory.getLog(ContainerCache.class);
private static final Logger LOG =
LoggerFactory.getLogger(ContainerCache.class);
private final Lock lock = new ReentrantLock();
private static ContainerCache cache;
private static final float LOAD_FACTOR = 0.75f;
Expand Down Expand Up @@ -115,17 +118,31 @@ protected boolean removeLRU(LinkEntry entry) {
}

/**
* Returns a DB handle if available, null otherwise.
* Returns a DB handle if available, create the handler otherwise.
*
* @param containerName - Name of the container.
* @return MetadataStore.
*/
public MetadataStore getDB(String containerName) {
public MetadataStore getDB(String containerName, String containerDBPath)
throws IOException {
Preconditions.checkNotNull(containerName);
Preconditions.checkState(!containerName.isEmpty());
lock.lock();
try {
return (MetadataStore) this.get(containerName);
MetadataStore db = (MetadataStore) this.get(containerName);

if (db == null) {
db = MetadataStoreBuilder.newBuilder()
.setDbFile(new File(containerDBPath))
.setCreateIfMissing(false)
.build();
this.put(containerName, db);
}
return db;
} catch (Exception e) {
LOG.error("Error opening DB. Container:{} ContainerPath:{}",
containerName, containerDBPath, e);
throw e;
} finally {
lock.unlock();
}
Expand All @@ -141,28 +158,11 @@ public void removeDB(String containerName) {
Preconditions.checkState(!containerName.isEmpty());
lock.lock();
try {
MetadataStore db = this.getDB(containerName);
MetadataStore db = (MetadataStore)this.get(containerName);
closeDB(containerName, db);
this.remove(containerName);
} finally {
lock.unlock();
}
}

/**
* Add a new DB to the cache.
*
* @param containerName - Name of the container
* @param db - DB handle
*/
public void putDB(String containerName, MetadataStore db) {
Preconditions.checkNotNull(containerName);
Preconditions.checkState(!containerName.isEmpty());
lock.lock();
try {
this.put(containerName, db);
} finally {
lock.unlock();
}
}
}
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
import org.apache.hadoop.ozone.ksm.KeySpaceManager;
import org.apache.hadoop.ozone.web.client.OzoneRestClient;
Expand Down Expand Up @@ -57,6 +58,11 @@
.DFS_CONTAINER_IPC_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.DFS_CONTAINER_IPC_RANDOM_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.DFS_CONTAINER_RATIS_IPC_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT;

import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState
.HEALTHY;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -148,14 +154,25 @@ public boolean restartDataNode(int i) throws IOException {
public boolean restartDataNode(int i, boolean keepPort) throws IOException {
if (keepPort) {
DataNodeProperties dnProp = dataNodes.get(i);
int currentPort = dnProp.getDatanode().getOzoneContainerManager()
.getContainerServerPort();
OzoneContainer container =
dnProp.getDatanode().getOzoneContainerManager();
Configuration config = dnProp.getConf();
int currentPort = container.getContainerServerPort();
config.setInt(DFS_CONTAINER_IPC_PORT, currentPort);
config.setBoolean(DFS_CONTAINER_IPC_RANDOM_PORT, false);
int ratisPort = container.getRatisContainerServerPort();
config.setInt(DFS_CONTAINER_RATIS_IPC_PORT, ratisPort);
config.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, false);
}
boolean status = super.restartDataNode(i, keepPort);

try {
this.waitActive();
this.waitForHeartbeatProcessed();
this.waitOzoneReady();
} catch (TimeoutException | InterruptedException e) {
Thread.interrupted();
}
boolean status = super.restartDataNode(i, true);
this.waitActive();
return status;
}

Expand Down
Expand Up @@ -71,21 +71,13 @@ public class TestKeys {

/**
* Create a MiniDFSCluster for testing.
*
* Ozone is made active by setting OZONE_ENABLED = true and
* OZONE_HANDLER_TYPE_KEY = "local" , which uses a local
* directory to emulate Ozone backend.
*
* @throws IOException
*/
@BeforeClass
public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();

path = GenericTestUtils.getTempPath(TestKeys.class.getSimpleName());
path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG);

ozoneCluster = new MiniOzoneCluster.Builder(conf)
Expand Down

0 comments on commit f352b10

Please sign in to comment.