Skip to content

Commit

Permalink
add checks for released MappedFiles
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed May 10, 2017
1 parent 0cf647e commit 0925471
Show file tree
Hide file tree
Showing 19 changed files with 705 additions and 596 deletions.
5 changes: 3 additions & 2 deletions pom.xml
Expand Up @@ -15,7 +15,8 @@
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<additionalparam>-Xdoclint:none</additionalparam>
Expand Down Expand Up @@ -50,7 +51,7 @@
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-bom</artifactId>
<version>1.14.6</version>
<version>1.14.7-SNAPSHOT</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
39 changes: 25 additions & 14 deletions src/main/java/net/openhft/chronicle/queue/impl/WireStorePool.java
Expand Up @@ -20,16 +20,19 @@
import net.openhft.chronicle.queue.TailerDirection;
import org.jetbrains.annotations.NotNull;

import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.text.ParseException;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

public class WireStorePool {
@NotNull
private final WireStoreSupplier supplier;
@NotNull
private final Map<RollDetails, WireStore> stores;
private final Map<RollDetails, WeakReference<WireStore>> stores;
private final StoreFileListener storeFileListener;
private boolean isClosed = false;

Expand All @@ -49,26 +52,33 @@ public synchronized void close() {
return;
isClosed = true;

stores.values().forEach(this::release);
stores.values().stream()
.map(Reference::get)
.filter(Objects::nonNull)
.forEach(this::release);
}

@Nullable
public synchronized WireStore acquire(final int cycle, final long epoch, boolean createIfAbsent) {
RollDetails rollDetails = new RollDetails(cycle, epoch);
WireStore store = stores.get(rollDetails);
if (store != null) {
if (store.tryReserve())
return store;
else
/// this should never happen,
// this method is synchronized
// and this remove below, is only any use if the acquire method below that fails
stores.remove(rollDetails);
WeakReference<WireStore> reference = stores.get(rollDetails);
WireStore store;
if (reference != null) {
store = reference.get();
if (store != null) {
if (store.tryReserve())
return store;
else
/// this should never happen,
// this method is synchronized
// and this remove below, is only any use if the acquire method below that fails
stores.remove(rollDetails);
}
}

store = this.supplier.acquire(cycle, createIfAbsent);
if (store != null) {
stores.put(rollDetails, store);
stores.put(rollDetails, new WeakReference<>(store));
storeFileListener.onAcquired(cycle, store.file());
}
return store;
Expand All @@ -84,8 +94,9 @@ public synchronized void release(@NotNull WireStore store) {
long refCount = store.refCount();
assert refCount >= 0;
if (refCount == 0) {
for (Map.Entry<RollDetails, WireStore> entry : stores.entrySet()) {
if (entry.getValue() == store) {
for (Map.Entry<RollDetails, WeakReference<WireStore>> entry : stores.entrySet()) {
WeakReference<WireStore> ref = entry.getValue();
if (ref != null && ref.get() == store) {
stores.remove(entry.getKey());
storeFileListener.onReleased(entry.getKey().cycle(), store.file());
break;
Expand Down
Expand Up @@ -311,7 +311,7 @@ public long exceptsPerCycle(int cycle) {
try {
long index = rollCycle.toIndex(cycle, 0);
if (tailer.moveToIndex(index)) {
assert tailer.store.refCount() > 0;
assert tailer.store.refCount() > 1;
return tailer.store.lastSequenceNumber(tailer) + 1;
} else {
return -1;
Expand Down Expand Up @@ -547,7 +547,7 @@ public long bufferCapacity() {
//
// *************************************************************************

private MappedBytes mappedBytes(File cycleFile) throws FileNotFoundException {
MappedBytes mappedBytes(File cycleFile) throws FileNotFoundException {
long chunkSize = OS.pageAlign(blockSize);
long overlapSize = OS.pageAlign(blockSize / 4);
return MappedBytes.mappedBytes(cycleFile, chunkSize, overlapSize, readOnly);
Expand Down
Expand Up @@ -1421,6 +1421,7 @@ private boolean cycle(final int cycle, boolean createIfAbsent) {
return true;

context.wire(null);
nextStore.reserve();
this.store = nextStore;
this.state = FOUND_CYCLE;
this.setCycle(cycle);
Expand Down

0 comments on commit 0925471

Please sign in to comment.