Skip to content

Comments

Issue 620: Close the fileChannels for read when they are idle#832

Closed
ArvinDevel wants to merge 21 commits intoapache:masterfrom
ArvinDevel:issue620
Closed

Issue 620: Close the fileChannels for read when they are idle#832
ArvinDevel wants to merge 21 commits intoapache:masterfrom
ArvinDevel:issue620

Conversation

@ArvinDevel
Copy link
Contributor

@ArvinDevel ArvinDevel commented Dec 12, 2017

Descriptions of the changes in this PR:

use guava cache to replace concurrentMap for logid2fileChannel, as guava cache has a eviction way, so we can use it to close the idle fileChannel

Master Issue: #620


Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

If this PR is a BookKeeper Proposal (BP):

  • Make sure the PR title is formatted like:
    <BP-#>: Description of bookkeeper proposal
    e.g. BP-1: 64 bits ledger is support
  • Attach the master issue link in the description of this PR.
  • Attach the google doc link if the BP is written in Google Doc.

Otherwise:

  • Make sure the PR title is formatted like:
    <Issue # or BOOKKEEPER-#>: Description of pull request
    e.g. Issue 123: Description ...
    e.g. BOOKKEEPER-1234: Description ...
  • Make sure tests pass via mvn clean apache-rat:check install findbugs:check.
  • Replace <Issue # or BOOKKEEPER-#> in the title with the actual Issue/JIRA number.

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice improvement, left some comments

};

private final CacheLoader<Long, FileChannel> loader = new CacheLoader<Long, FileChannel> () {
public FileChannel load(Long entryLogId) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you can use lambda

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if you can use lambda for CacheLoader here, it is not an interface. CacheLoader is an abstract class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eolivelli Just as @sijie said, can't use lambda due to it's an abstract class

};
// close the file channel, when it was removed from cache
private final RemovalListener<Long, FileChannel> removalListener = new RemovalListener<Long, FileChannel>() {
public void onRemoval(RemovalNotification<Long, FileChannel> removal) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you can use lambda

}
};

private final ExecutorService removeExecutor = Executors.newSingleThreadExecutor();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we give a name to this thread ?

*/
private final ConcurrentMap<Long, FileChannel> logid2FileChannel = new ConcurrentHashMap<Long, FileChannel>();
private LoadingCache<Long, FileChannel> logid2FileChannel = CacheBuilder.newBuilder()
.expireAfterAccess(1, TimeUnit.HOURS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie should be this value configurable ?
Maybe having a configuration value will let us write some test

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for making this configurable.

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ArvinDevel

I think there is one issue in this change, because the FileChannel in logid2FileChannel is shared by multiple threads in logid2Channel. it becomes problematic when FileChannel is evicted from logid2FileChannel but logid2Channel might still reference the underlying FileChannel.

so I would suggest:

  1. having a reference count structure wrapping over FileChannel. E.g. ReferenceCountedFileChannel.
  2. making logid2FileChannel still a concurrent map holding the reference counted file channels.
ConcurrentMap<Long, ReferenceCountedFileChannel> logid2FileChannel = new ConcurrentHashMap<Long, ReferenceCountedFileChannel>();
  1. change logid2Channel to ThreadLocal<Cache<Long, BufferedReadChannel>> with concurrentLevel(1).

  2. when a BufferedReadChannel is created, it gets the FileChannel from logid2FileChannel and increment the reference count of this file channel.

  3. when a BufferedReadChannel is evicted or closed, decrement the reference count of this file channel. if a file channel is not referenced any more, close the file channel.

Also it would be good to add test cases for this change.

} catch (ExecutionException e){
LOG.error("ExecutionException found in get fileChannel for log {} in logid2FileChannel cache", entryLogId);
// throw exception to avoid pass null to BufferedReadChannel
throw new IOException(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecutionException wrap the actual cause on loading the file channel. so you need to unwrap this.

if (e.getCause() instanceof IOException) {
    throw (IOException) e.getCause();
} else {
    throw new IOException("Encountered unknown exception on opening read channel for entry log " + entryLogId, e.getCause());
}

};

private final CacheLoader<Long, FileChannel> loader = new CacheLoader<Long, FileChannel> () {
public FileChannel load(Long entryLogId) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if you can use lambda for CacheLoader here, it is not an interface. CacheLoader is an abstract class

}
};

private final CacheLoader<Long, FileChannel> loader = new CacheLoader<Long, FileChannel> () {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest renaming this to a more meaningful name, like readonlyFileChannelLoader.

}
};
// close the file channel, when it was removed from cache
private final RemovalListener<Long, FileChannel> removalListener = new RemovalListener<Long, FileChannel>() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename this to 'readonlyFileChannelRemovalListener'

*/
private final ConcurrentMap<Long, FileChannel> logid2FileChannel = new ConcurrentHashMap<Long, FileChannel>();
private LoadingCache<Long, FileChannel> logid2FileChannel = CacheBuilder.newBuilder()
.expireAfterAccess(1, TimeUnit.HOURS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for making this configurable.

private final ConcurrentMap<Long, FileChannel> logid2FileChannel = new ConcurrentHashMap<Long, FileChannel>();
private LoadingCache<Long, FileChannel> logid2FileChannel = CacheBuilder.newBuilder()
.expireAfterAccess(1, TimeUnit.HOURS)
.removalListener(RemovalListeners.asynchronous(removalListener, removeExecutor))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to use asynchronous removal listener here. because this would somehow change the behavior of close and removeFromChannelsAndClose. I would suggest using removalListener here, so the behavior will be the same as what it was before when a filechannel is removed and closed.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove these 3 blank lines

return CacheBuilder.newBuilder().concurrencyLevel(1)
.expireAfterAccess(expireReadChannelCacheInHour, TimeUnit.HOURS)
//decrease the refCnt
.removalListener(removal -> logid2FileChannel.get(removal.getKey()).release())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be good to check if the returned value is null or not.

ReferenceCountUtils.release(logid2FileChannel.get(removal.getKey());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had not found this class in the project, and the AbstractReferenceCounted will check every release number to ensure that every release is valid and only after the refCnt == 0, the object will be deallocated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOG.warn("Exception while closing channel for log file:" + logId);
}
}
//remove the fileChannel from logId2Channel
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to invalidate the local thread cache. The original behavior is to close the file channel directly, the local thread cache might be still referencing this file channel, but it is okay since the file channel is anyway closed.

I would suggest

  • add a method called forceCloseFileChannel() in ReferenceCountedFileChannel
  • deallocate() call this forceCloseFileChannel()
  • in here, you just remove it from logid2FileChannel and ``forceCloseFileChannel()` it.
ReferenceCountedFileChannel fileChannel = logid2FileChannel.remove(logId);
if (null != fileChannel) {
    fileChannel.forceCloseFileChannel();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the new method is not necessary, just keep one deallocate is simple.

}
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove blank line

newFc = oldFc;
}
FileChannel fc = new RandomAccessFile(file, "r").getChannel();
logid2FileChannel.put(entryLogId, new ReferenceCountedFileChannel(fc));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the logic should be similar as before. you need to use putIfAbsent for concurrent operations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

fc.close();
//close corresponding fileChannel
for (ReferenceCountedFileChannel rfc : logid2FileChannel.values()) {
rfc.deallocate();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forceClose

@sijie
Copy link
Member

sijie commented Dec 13, 2017

@ArvinDevel left a few more comments on your latest changes. can you also add a test case for it?

Copy link
Contributor Author

@ArvinDevel ArvinDevel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a question, how to verify the fileChannel is closed? currently I checked the refCnt in test.

return CacheBuilder.newBuilder().concurrencyLevel(1)
.expireAfterAccess(expireReadChannelCacheInHour, TimeUnit.HOURS)
//decrease the refCnt
.removalListener(removal -> logid2FileChannel.get(removal.getKey()).release())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had not found this class in the project, and the AbstractReferenceCounted will check every release number to ensure that every release is valid and only after the refCnt == 0, the object will be deallocated.

LOG.warn("Exception while closing channel for log file:" + logId);
}
}
//remove the fileChannel from logId2Channel
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the new method is not necessary, just keep one deallocate is simple.

newFc = oldFc;
}
FileChannel fc = new RandomAccessFile(file, "r").getChannel();
logid2FileChannel.put(entryLogId, new ReferenceCountedFileChannel(fc));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@ArvinDevel
Copy link
Contributor Author

@eolivelli @sijie Any other suggestions?

assertEquals(0, logid2FileChannel.get(2L).refCnt());
assertEquals(1, logid2FileChannel.get(3L).refCnt());
assertEquals(1, logid2FileChannel.get(4L).refCnt());
// assertNull(logid2FileChannel.get(2L).getFc());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this line if it is not needed

* expire time.
* @return server configuration object.
*/
public ServerConfiguration setExpireReadChannelCache(long millis) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is better to call out "time" and "ms" in the setting, like READ_CHANNEL_CACHE_EXPIRE_TIME_MS

return CacheBuilder.newBuilder().concurrencyLevel(1)
.expireAfterAccess(expireReadChannelCacheInHour, TimeUnit.HOURS)
//decrease the refCnt
.removalListener(removal -> logid2FileChannel.get(removal.getKey()).release())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie
Copy link
Member

sijie commented Dec 27, 2017

there is a question, how to verify the fileChannel is closed? currently I checked the refCnt in test.

A tricky solution is to attempt to write data to a file channel. If a file channel is closed, the write will be thrown with ChannelClosedException

@ArvinDevel
Copy link
Contributor Author

@sijie @eolivelli have fixed comments

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ArvinDevel one last comment about the configuration parameter.

protected static final String COMPACTION_RATE_BY_ENTRIES = "compactionRateByEntries";
protected static final String COMPACTION_RATE_BY_BYTES = "compactionRateByBytes";
protected static final String EXPIRE_READ_CHANNEL_CACHE = "expireReadChannelCache";
protected static final String READ_CHANNEL_CACHE_EXPIRE_TIME_MS = "expireReadChannelCache";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to change the setting key "readChannelCacheExpireTimeMs"

@sijie
Copy link
Member

sijie commented Dec 28, 2017

@eolivelli please also review it since you are involved in this pull request before.

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ArvinDevel @sijie I left some other minor comments. We are on the right way

};

// only used for test.
ThreadLocal<Cache<Long, BufferedReadChannel>> getLogid2Channel() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie I wonder if in the future we will drop rhis way of accessing internal state in fabour od something like powermock whitebox.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accessing internal state is okay if you are just testing the specific test case. I don't see a reason we will change it in future.

@ArvinDevel can you add @VisibleForTesting?

private ConcurrentMap<Long, ReferenceCountedFileChannel>
logid2FileChannel = new ConcurrentHashMap<>();
// only for test.
ConcurrentMap<Long, ReferenceCountedFileChannel> getLogid2FileChannel() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add VisibleForTesting

return fc;
brc = new BufferedReadChannel(newFc, conf.getReadBufferBytes());
putInReadChannels(entryLogId, brc);
LOG.info("put readChannel: {}, corresponding to: {} ", brc, entryLogId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop debug ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks,fixed

long[][] positions = new long[numLogs][];
for (int i = 0; i < numLogs; i++) {
positions[i] = new long[numEntries];
EntryLogger logger = new EntryLogger(conf,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we shutdown this EntryLogger ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that would be good.

// the cache has readChannel for 2.log
assertNotNull(cacheThreadLocal.get().getIfPresent(2L));
// expire time
Thread.sleep(1000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way not to use a blind sleep ? but wait in a loop until a condition is met or a timeout fires ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExpireTime semantic is about a period, I think timeout is just like sleep.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ArvinDevel : guava cache has a mock ticker that allows you to advance time to trigger expriation.

@ArvinDevel
Copy link
Contributor Author

@sijie @eolivelli please review the latest changes.

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

@ivankelly ivankelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a race condition. We dealt with similar in #913 .

It shouldn't be so hard to fix here though, since the resources being cached are read only.

Why do we need a shared map of the file channels at all? The only issue with having multiple filechannels for the same files is that it increases the number of open files, but we could control that by setting the max entries to maxfd/num_read_threads. If we allow these dupes, then there's no need for reference counting and we can let the Cache deal with all loading and releasing.

newFc = oldFc;
newFc = oldFc.fc;
// increment the refCnt
oldFc.retain();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible that oldFc could have been released between getting it from logid2FileChannel and calling retain, so the channel returned could be closed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, I'll consider this later, thanks.

.expireAfterAccess(readChannelCacheExpireTimeMs, TimeUnit.MILLISECONDS)
//decrease the refCnt
.removalListener(removal -> logid2FileChannel.get(removal.getKey()).release())
.build(readChannelLoader);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build(this::getChannelForLogId) should work here, i think, so you don't have to define the loader at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this loader is not an interface, so I can't use this functional style.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an abstract class with only one abstract method, so it should be usable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivankelly I don't think it is worth on stucking here whether to use lambda or not. thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a minor thing. the important thing is that there are no races.

@Override
protected void deallocate() {
try {
fc.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we close when refcount hits 0. When is this removed from the map though?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently the ReferenceCountedFileChannel was not removed from the map, use guava cache to finish this would be better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivankelly I have fixed your comment. I'm not sure whether it still has race condition, please has a review. If it still has race condition, I'll try use your design in #913. And the last resort can be removing hashMap and use multiple fileChannel directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I describe the design briefly: refCnt in AbstractReferenceCounted is volatile and dead is AtomicBoolean. Assume current refCnt is one and thread A is release()ing and thread B is retain()ing. If thread A is executed before thread B, as long as dead is changed, thread B will use the new fileChannel in case of using closed one.

@sijie
Copy link
Member

sijie commented Jan 17, 2018

@ArvinDevel did you have time to address @ivankelly 's comments?

@ArvinDevel
Copy link
Contributor Author

I'll check the #913 and try to finish it in this week.

Copy link
Contributor

@ivankelly ivankelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's still races here :/ I'd suggest doing something very similar to the FileInfoBackingCache, as it's a very similar usecase. This should be simpler even, as there's no harm in having multiple copies of a file channel for a single file, as long as they are eventually closed.

.expireAfterAccess(readChannelCacheExpireTimeMs, TimeUnit.MILLISECONDS)
//decrease the refCnt
.removalListener(removal -> logid2FileChannel.get(removal.getKey()).release())
.build(readChannelLoader);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an abstract class with only one abstract method, so it should be usable.

Map<Long, BufferedReadChannel> threadMap = logid2Channel.get();
return threadMap.put(logId, bc);
public void putInReadChannels(long logId, BufferedReadChannel bc) {
Cache<Long, BufferedReadChannel> threadCahe = logid2Channel.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

newFc = oldFc;
// increment the refCnt
// double check to ensure the fileChannel is not closed due to refCnt down to 0.
if (oldFc.refCnt() > 0 && !oldFc.dead.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A cache on another thread could have released between L1176 and L1177, invalidating the oldFc object.

FileChannel newFc = new RandomAccessFile(file, "r").getChannel();
FileChannel oldFc = logid2FileChannel.putIfAbsent(entryLogId, newFc);
ReferenceCountedFileChannel oldFc =
logid2FileChannel.putIfAbsent(entryLogId, new ReferenceCountedFileChannel(newFc));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see anywhere that entries are removed from logid2FileChannel.

@sijie
Copy link
Member

sijie commented Jan 31, 2018

@ArvinDevel can you address @ivankelly 's comments?

@ArvinDevel
Copy link
Contributor Author

@sijie , I found @ivankelly 's FileInfoBackingCache design can address his comments, I'll adopt his elegant design.

@ArvinDevel
Copy link
Contributor Author

@ivankelly thanks for your design and I adopted it, please review it when you have time.

Copy link
Contributor

@ivankelly ivankelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are still concurrency issues. This stuff needs a lot more unit tests also, to shake out any concurrency issues there may be.


/**
* Attempt to retain the file info.
* When a client obtains a fileinfo from a container object,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the fileinfo comment. needs to be changed for this usage.

* @see FileInfoBackingCache
*/
private final ConcurrentMap<Long, FileChannel> logid2FileChannel = new ConcurrentHashMap<Long, FileChannel>();
class FileChannelBackingCache {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move into it's own file.

*/
private final ThreadLocal<Map<Long, BufferedReadChannel>> logid2Channel =
new ThreadLocal<Map<Long, BufferedReadChannel>>() {
private final ThreadLocal<Cache<Long, BufferedReadChannel>> logid2Channel =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to rename logid2Channel or logid2FileChannel. There's not semantic differences between the names, but they are very different. The similarity make the source very confusing to read.

may I suggest logId2ReadChannel and fileChannelBackingCache

return CacheBuilder.newBuilder().concurrencyLevel(1)
.expireAfterAccess(readChannelCacheExpireTimeMs, TimeUnit.MILLISECONDS)
//decrease the refCnt
.removalListener(removal -> logid2FileChannel.get((Long) removal.getKey()).release())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BufferedReadChannel should have a reference to the CachedFileChannel so you can do removal.getValue().release(). Looking up the value again is not guarantee that you'll get the same instance. It logically should give you it, but the logic is tangled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that adding a readLock when getting value from FileChannelBackingCache has the same effect, and that has little change to BufferedReadChannel. I prefer to this one, which one is better in your opinion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lock would just tangle it up more.

You don't have to modify BufferedReadChannel. There's a couple of options.

  1. You could create a specialization of BufferedReadChannel local to this class which has the CachedFileChannel as a member and use that in the tls cache.
  2. You could create a container class which references both the bufferedReadChannel and the CachedFileChannel and use that in the TLS cache.

I think 1 is the better option, as you'll only have to change a few signatures.

CachedFileChannel loadFileChannel(long logId) throws IOException {
CachedFileChannel cachedFileChannel = fileChannels.get(logId);
if (cachedFileChannel != null) {
boolean retained = cachedFileChannel.tryRetain();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if cachedFileChannel existed in fileChannels, then it's quite possible that another reference to it exists from a previous call to loadFileChannel. If another reference exists, then another the holder of the other reference could release it, so there's no guarantee that tryRetain would return true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

um, that's right. The ReadWriteLock is necessary, I'll refactor again. I thought that lock is used to guarantee FileInfo's file open/write operations before now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you'll need some locking here. Again, FileInfoBackingCache is an example to use.

// it would be better to open using read mode
FileChannel newFc = new RandomAccessFile(file, "r").getChannel();
cachedFileChannel = new CachedFileChannel(logId, newFc);
fileChannels.put(logId, cachedFileChannel);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if another thread has added a cachedFileChannel for the same logId since L385?

} finally {
IOUtils.close(LOG, fc.fc);
}
fileChannels.remove(logId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No guarantee that the CachedFileChannel being removed from fileChannels is actually the one referenced by fc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll use lock to avoid multi write/delete operations to the fileChannels.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it shouldn't need locking. there's a remove method that takes the key and the value, and only removes if both match.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, then I wonder why you still use writeLock in releaseFileInfo method? see code. Can we reduce lock in there? or the lock is a must when change the state of concurrentHashMap?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The write lock is needed because marking a fileinfo as dead, flushing its contents to disk and removing it from the map needs to be an atomic operation. If the lock wasn't there, then we could mark as dead, then before removing from map another thread could call loadFileInfo() and end up returning a dead fileinfo (tryRetain would fail). this would put the caller into a tight loop. We could, i think, remove the lock entirely from this class, but I'm very reluctant to mess with it, since this stuff is hard to get right (and it's often very hard to tell that you got it wrong).

} while (!cachedFileChannel.tryRetain());
} finally {
if (null != cachedFileChannel) {
cachedFileChannel.release();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?
You're releasing the reference straight away after getting it. By the time cachedFileChannel is used on L1278, the reference could easily be dead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loadFileChannel and tryRetain both add one reference count, so releasing one is necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the refcount for loadFileChannel belongs to the cache.
the refcount for tryRetain belongs to the created BufferedReadChannel.

If the CachedFileChannel is invalidated from the cache, you don't want it to be closed immediately, you want it to be closed when the BufferedReadChannel releases it. Similarly, if the BufferedReadChannel releases it, and it is still in the cache, you don't want to close the file channel, as the cache may hand it out to another caller. So you need both of them to hold a refcount.

@asfgit
Copy link

asfgit commented Feb 16, 2018

Can one of the admins verify this patch?

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have lost the track why we are switching to FileChannelBackingCache here. As I can see, there are a couple of issue inFileChannelBackingCache itself. The problem might also exist in FileInfoBackingCache.

I think the original implementation is much simpler approach. the difference between this problem and file info cache is, file info cache needs to handle both write and read, while the problem here just need to handle read only channels. They don't modify persistence. We should handle it in a much simpler way than FileInfoCache. I think we are making things complicated here. But I need to go through the whole history to understand why do you guys want to do that. that's a separate topic.

For the approach we are using here, I would suggest:

  • can you guys answer my question regarding these XYZBackingCache things? I see quite bunch of issues there: e.g. not well performat (locking while doing I/O), race conditions.

  • If we are going to take the BackingCache approach, let's do a refactor to make it a generic class. Duplicating the logic isn't the right approach.

CacheBuilder.newBuilder().concurrencyLevel(1)
.expireAfterAccess(readChannelCacheExpireTimeMs, TimeUnit.MILLISECONDS)
//decrease the refCnt
.removalListener(( RemovalListener<Long, EntryLogBufferedReadChannel>) removal
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

it is a bit hard to read. can you write it in a better format? also remove space before RemovalListener.

.removal((RemovalListener<Long, EntryLogBufferedReadChannel>) notification ->
    removal.getValue().release()
)
.ticker(getTicker())
.build();

public BufferedReadChannel getFromChannels(long logId) {
return logid2Channel.get().get(logId);
}
FileChannelBackingCache fileChannelBackingCache = new FileChannelBackingCache(this::findFile);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: make it final


// entrySize does not include the ledgerId
if (entrySize > maxSaneEntrySize) {
LOG.warn("Sanity check failed for entry size of " + entrySize + " at location " + pos + " in "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

it can be written with '{}'

private EntryLogBufferedReadChannel getChannelForLogId(long entryLogId) throws IOException {
try {
EntryLogBufferedReadChannel brc;
Callable<EntryLogBufferedReadChannel> loader = () -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we create loader everytime, this is generating a lot of garbages on jvm? I remember the first time I reviewed this, I don't think see loader here...

why can't we use LoadingCache instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better improvement, I have fixed it.

LOG.error("Dead fileChannel({}) forced out of cache."
+ "It must have been double-released somewhere.", brc.cachedFileChannel);
}
brc = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need 4 more spaces indent.

}
}

class CachedFileChannel {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be static

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it uses FileChannelBackingCache's non-static method releaseFileChannel, so it can't be static

CachedFileChannel cachedFileChannel = fileChannels.get(logId);
if (cachedFileChannel != null) {
boolean retained = cachedFileChannel.tryRetain();
checkArgument(retained);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who is going to catch IllegalArgumentException?

if fail to retain, we should throw IOException not a runtime exception, no?


lock.writeLock().lock();
try {
File file = fileLoader.load(logId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to lock while loading a file?

don't we just need to lock when putting the channel back?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is borrowed from FileInfoBackingCache, same question is applied there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This guarantees the refCnt's correctness.
Imagine the scenario: thread A and thread B both want to load the same fileChannel without lock:
1.A and B both run to the line after created fileChannel respectively;
2.A execute:
fileChannels.put(logId, cachedFileChannel); boolean retained = cachedFileChannel.tryRetain();;
3.A was switched and B execute these line again;
Then the FileChannelBacking cache will hold the last fileChannel with one refCnt, but the refCnt should be two actually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie for fileinfobackingcache, we need to load under a lock to have mutal exclusion with another thread that is flushing. That isn't the case here, since there's no harm in having two channels to the same file open.

So this can probably be changed to remove the locking completely. When you load a new entry, do a putIfAbsent, and if it fails, clean up what you just opened. Alternatively, computeIfAbsent could be used.

// it would be better to open using read mode
FileChannel newFc = new RandomAccessFile(file, "r").getChannel();
CachedFileChannel cachedFileChannel = new CachedFileChannel(logId, newFc);
fileChannels.put(logId, cachedFileChannel);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to check the return value. there can be race condition that a file channel is added after get.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just like mentioned above, the writeLock can guarantee it.

* @param cachedFileChannel
*/
private void releaseFileChannel(long logId, CachedFileChannel cachedFileChannel) {
lock.writeLock().lock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, I am not sure why we are closing a file channel under a write lock.

so the question is what does lock is actually locking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock can guarantee the fileChannel being loading not closed by another thread.
Below is a race condition.
Thread A holds one refCnt for the specific fileChannel and release it, so the refCnt is 0 and being closed.
Thread B load the fileChannel with ReadLock, so it can't continue if thread A call releaseFileChannel firstly as the Thread A holds writeLock. This case is fine.
If thread B add refCnt before thread A get writeLock, then releaseFileChannel 's logic guarantee the fileChannel was not closed by check markDead() under writeLock. So it still works fine under this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the removal from the map needs to be under the lock, to prevent another thread from handing out file channel being closed. Once the channel has been removed from the map, it can be closed outside of the lock. This is different than with fileinfo, because fileinfo needs to flush it's contents out to disk, ensuring noone else reads them before they are fully flushed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivankelly I think the markDead() needs to be under the lock to achieve preventing another thread from handing out file channel being closed. Close operation can be move out completely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

markDead doesn't need to be under a lock. The caller should call tryRetain() after receiving, which will either prevent markDead from having an effect, if it is called before it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we need some do while check in loadFileChannel.
I update again.

@ivankelly
Copy link
Contributor

I think the original implementation is much simpler approach.

The original implementation had race conditions, where dead file channels would be handed out, which is why I suggested Arvin take a look at the refcounting stuff from the fileinfobackingcache, which dealt with a similar problem. The locking from the fileinfobackingcache is probably not needed though.

FileChannel newFc = new RandomAccessFile(file, "r").getChannel();
cfc = new CachedFileChannel(logFileId, newFc);
} catch (IOException ioe){
throw new UncheckedIOException(ioe);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will end up doing throwing a RuntimeException. Rather than using a call to computeIfAbsent(), do:

CachedFileChannel cachedFileChannel = null;
do {
    CachedFileChannel c = fileChannels.get(logId);
    if (c != null) {
        if (c.tryRetain()) {
            cachedFileChannel = c;
        } else {
            // this isn't strictly necessary, but it's good defensively to avoid infinite loop
            fileChannels.remove(logId, c);
        }
    } else {
        c = // the construction stuff
        CachedFileChannel existing = fileChannels.putIfAbsent(logId, c);
        if (existing != null) {
            // cleanup c
            if (existing.tryRetain()) {
                cachedFileChannel = existing;
            } else {
                fileChannels.remove(logId, existing);
            }
        }
    }
} while (cachedFileChannel == null);

@@ -52,36 +53,33 @@ class FileChannelBackingCache {
final ConcurrentHashMap<Long, CachedFileChannel> fileChannels = new ConcurrentHashMap<>();

CachedFileChannel loadFileChannel(long logId) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lock field should be no longer used, so it can be removed from the class.

fileChannels.remove(logId, cachedFileChannel);
// close corresponding fileChannel
try {
cachedFileChannel.fileChannel.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be better is cachedFileChannel had a close method rather than directly accessing members.

sijie added a commit that referenced this pull request Mar 24, 2018
Descriptions of the changes in this PR:

There are a couple of issues noticed in FileInfoBackingCache:

1) There is a race condition in loadFileInfo between get-check and put. If concurrent loading happens, there might be a FileInfo loaded into the map after get-check. This can cause incorrect reference count on FileInfo.

2) FileLoader is doing I/O operation which happens under a giant write lock.

3) assert is typically not recommended since it is disabled at production runtime typically.

*Changes*

- Check whether fileinfo exists or not after getting write lock and before put
- Move any I/O operations out of write lock
- release the new FileInfo if concurrent puts happen
- remove the usage of assert

Beside that, switch to use ConcurrentLongHashMap to avoid boxing and unboxing.

Related Issues:

#913 #832

Author: Sijie Guo <sijie@apache.org>

Reviewers: Ivan Kelly <ivank@apache.org>

This closes #1284 from sijie/improve_fileinfo_backing_cache
@jvrao
Copy link
Contributor

jvrao commented May 29, 2019

retest

@eolivelli
Copy link
Contributor

closing for inactivity. feel free to reopen

@eolivelli eolivelli closed this May 17, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants