Skip to content

Commit

Permalink
pool: Fix contention in MetaDataCache
Browse files Browse the repository at this point in the history
Quite some time ago I changed the pool to use several threads for
message processing. The point was in particular that we could benefit
from added concurrency in meta data operations. Since meta data
operations tend to become a bottleneck if the pool is under high load
(in particular if the meta data is stored on the same disks as the data
itself), this parallelization seemed like a good idea.

Transfers of course already run in different threads and any meta data
update issued as part of a transfer will also be done in parallel.

Unfortunately, the MetaDataCache class destroys all this nice
concurrency. The class was designed to achieve concurrency in the read
operation to allow files to be read even while the pool was still
starting. The create and remove methods on the other hand are
synchronized, and thus at most one thread will ever be able to create or
remove entries. I recently observed this to be a major contention point
on one of our ALICE tape pools.

This patch pretty much rewrites MetaDataCache. It introduces a monitor
(in the ADA sense of the word) for each entry, thus simplifying
synchronization significantly. This also made it very easy to allow
create and remove operations to be performed concurrently.

Given the severity of the contention, I kindly ask for a thorough review
followed by merge to recent stable branches. I would skip 1.9.12 due to
its age.

Target: trunk
Require-notes: yes
Require-book: no
Request: 2.3
Request: 2.2
Acked-by: Paul Millar <paul.millar@desy.de>
Patch: http://rb.dcache.org/r/4750/
  • Loading branch information
Gerd Behrmann committed Sep 5, 2012
1 parent 5a5642b commit 84fb7b0
Showing 1 changed file with 133 additions and 95 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package org.dcache.pool.repository;

import java.util.Set;
import java.util.Map;
import java.util.HashSet;
import java.util.HashMap;
import com.google.common.base.Preconditions;
import diskCacheV111.util.CacheException;
import diskCacheV111.util.PnfsId;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import diskCacheV111.util.PnfsId;
import diskCacheV111.util.CacheException;
import static com.google.common.base.Preconditions.*;

/**
* Cache of MetaDataRecords.
Expand All @@ -26,25 +26,16 @@
public class MetaDataCache
implements MetaDataStore
{
/** Map of cached MetaDataRecords.
*/
private final Map<PnfsId,MetaDataRecord> _entries =
new ConcurrentHashMap<PnfsId,MetaDataRecord>();
private static final float LOAD_FACTOR = 0.75f;

/**
* Ids of entries not yet cached. Invariantly all ids of entries
* in the inner MetaDataStore are in the union of this set and the
* key set of the _entries map.
*/
private final Set<PnfsId> _unread;

/**
* Ids of enties currently being read.
/** Map of cached MetaDataRecords.
*/
private final Set<PnfsId> _reading = new HashSet<PnfsId>();
private final ConcurrentMap<PnfsId,Monitor> _entries;

private final MetaDataStore _inner;

private volatile boolean _isClosed;

/**
* Constructs a new cache.
*
Expand All @@ -53,119 +44,166 @@ public class MetaDataCache
public MetaDataCache(MetaDataStore inner)
{
_inner = inner;
_unread = new HashSet(inner.list());
}

@Override
public synchronized Collection<PnfsId> list()
{
if (_unread.size() == 0) {
return Collections.unmodifiableCollection(_entries.keySet());
} else {
Collection<PnfsId> ids = new HashSet<PnfsId>(_unread);
ids.addAll(_entries.keySet());
return ids;
Collection<PnfsId> list = inner.list();
_entries = new ConcurrentHashMap<PnfsId,Monitor>(
(int)(list.size() / LOAD_FACTOR + 1), LOAD_FACTOR);
for (PnfsId id: list) {
_entries.put(id, new Monitor(id));
}
}

@Override
public MetaDataRecord get(PnfsId id)
throws CacheException, InterruptedException
/**
* Encapsulates operations on meta data records, ensuring sequential
* access to any particular record.
*
* Correctness follows by observing that
*
* 1. after an initial check it is guaranteed that
*
* _entries[this._id] == this
*
* it follows that for any given id, the condition can only be true
* for one Monitor at a time
*
* 2. all the monitor methods are synchronized, thus ensuring that for any
* given id only one thread at a time will get past the above check
*
* 3. all modifications of _entries or the Monitor happen from within
* the Monitor itself, only after the above condition has been
* established, and only the entry of this._id is modified, thus
* ensuring that the condition stays true until the end of the method
* or until the Monitor removes itself from _entries.
*
* The point from which the condition in item 1 is true is marked by
* assertions in the code.
*/
private class Monitor
{
synchronized (this) {
while (_reading.contains(id)) {
wait();
private final PnfsId _id;
private MetaDataRecord _record;

private Monitor(PnfsId id)
{
_id = id;
}

private synchronized MetaDataRecord get()
throws InterruptedException, CacheException
{
if (_entries.get(_id) != this) {
return null;
}
if (!_unread.contains(id)) {
return _entries.get(id);
assert _entries.get(_id) == this;
if (_record == null) {
_record = _inner.get(_id);
if (_record == null) {
_entries.remove(_id, this);
}
}
_reading.add(id);
return _record;
}

try {
MetaDataRecord entry = _inner.get(id);
synchronized (this) {
if (entry != null) {
_entries.put(id, entry);
private synchronized MetaDataRecord create()
throws CacheException
{
if (_entries.putIfAbsent(_id, this) != null) {
throw new DuplicateEntryException(_id);
}
assert _entries.get(_id) == this;
try {
checkState(!_isClosed);
_record = _inner.create(_id);
} finally {
if (_record == null) {
_entries.remove(_id);
}
_unread.remove(id);
}
return entry;
} finally {
synchronized (this) {
_reading.remove(id);
notifyAll();
return _record;
}

private synchronized MetaDataRecord create(MetaDataRecord entry)
throws CacheException
{
if (_entries.putIfAbsent(_id, this) != null) {
throw new DuplicateEntryException(_id);
}
assert _entries.get(_id) == this;
try {
checkState(!_isClosed);
_record = _inner.create(entry);
} finally {
if (_record == null) {
_entries.remove(_id);
}
}
return _record;
}

private synchronized void remove()
{
if (_entries.get(_id) == this) {
assert _entries.get(_id) == this;
_inner.remove(_id);
_entries.remove(_id);
}
}

private synchronized void close()
{
_entries.remove(_id, this);
}
}

public MetaDataRecord get(PnfsId id)
throws CacheException, InterruptedException
{
Monitor monitor = _entries.get(id);
return (monitor != null) ? monitor.get() : null;
}

@Override
public synchronized MetaDataRecord create(PnfsId id)
throws DuplicateEntryException, CacheException
public MetaDataRecord create(PnfsId id) throws CacheException
{
if (_entries.containsKey(id) || _unread.contains(id)) {
throw new DuplicateEntryException(id);
}
MetaDataRecord entry = _inner.create(id);
_entries.put(id, entry);
return entry;
return new Monitor(id).create();
}

@Override
public synchronized MetaDataRecord create(MetaDataRecord entry)
throws DuplicateEntryException, CacheException
public MetaDataRecord create(MetaDataRecord entry)
throws CacheException
{
PnfsId id = entry.getPnfsId();
if (_entries.containsKey(id) || _unread.contains(id)) {
throw new DuplicateEntryException(id);
}
entry = _inner.create(entry);
_entries.put(id, entry);
return entry;
return new Monitor(entry.getPnfsId()).create(entry);
}

@Override
public synchronized void remove(PnfsId id)
public void remove(PnfsId id)
{
boolean interrupted = false;
while (_reading.contains(id)) {
try {
wait();
} catch (InterruptedException e) {
interrupted = true;
}
}
_unread.remove(id);
_inner.remove(id);
_entries.remove(id);
if (interrupted) {
Thread.currentThread().interrupt();
Monitor monitor = _entries.get(id);
if (monitor != null) {
monitor.remove();
}
}

@Override
public Collection<PnfsId> list()
{
return Collections.unmodifiableCollection(_entries.keySet());
}

@Override
public boolean isOk()
{
return _inner.isOk();
}

@Override
public synchronized void close()
public void close()
{
boolean interrupted = false;
while (!_reading.isEmpty()) {
try {
wait();
} catch (InterruptedException e) {
interrupted = true;
}
_isClosed = true;
for (Monitor monitor : _entries.values()) {
monitor.close();
}
_unread.clear();
_inner.close();
_entries.clear();
if (interrupted) {
Thread.currentThread().interrupt();
}
}

@Override
Expand Down

0 comments on commit 84fb7b0

Please sign in to comment.