Skip to content

Commit

Permalink
Use AbstractCASReferenceCounted to ensure entry.retain() is valid (#2995
Browse files Browse the repository at this point in the history
)
  • Loading branch information
merlimat authored and massakam committed Nov 19, 2018
1 parent feb848c commit 5e446a6
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 2 deletions.
Expand Up @@ -19,16 +19,18 @@
package org.apache.bookkeeper.mledger.impl;

import com.google.common.collect.ComparisonChain;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCounted;

import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted;

public final class EntryImpl extends AbstractReferenceCounted implements Entry, Comparable<EntryImpl>, ReferenceCounted {
public final class EntryImpl extends AbstractCASReferenceCounted implements Entry, Comparable<EntryImpl>, ReferenceCounted {

private static final Recycler<EntryImpl> RECYCLER = new Recycler<EntryImpl>() {
@Override
Expand Down
@@ -0,0 +1,116 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.util;

/*
* Imported from Netty at https://github.com/netty/netty/blob/netty-4.1.16.Final/common/src/main/java/io/netty/util/AbstractReferenceCounted.java
*
* This is to ensure strict semantic in "increase()" method: if it succeeds, the object is always valid
*
* The semantic was changed in https://github.com/netty/netty/commit/83a19d565064ee36998eb94f946e5a4264001065#diff-b9443e2689a46b3647fe6a8de0fdf3b2
*/

import static io.netty.util.internal.ObjectUtil.checkPositive;

import io.netty.util.IllegalReferenceCountException;
import io.netty.util.ReferenceCounted;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/**
* Abstract base class for classes wants to implement {@link ReferenceCounted}.
*/
public abstract class AbstractCASReferenceCounted implements ReferenceCounted {
private static final AtomicIntegerFieldUpdater<AbstractCASReferenceCounted> refCntUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractCASReferenceCounted.class, "refCnt");

private volatile int refCnt = 1;

@Override
public final int refCnt() {
return refCnt;
}

/**
* An unsafe operation intended for use by a subclass that sets the reference count of the buffer directly
*/
protected final void setRefCnt(int refCnt) {
refCntUpdater.set(this, refCnt);
}

@Override
public ReferenceCounted retain() {
return retain0(1);
}

@Override
public ReferenceCounted retain(int increment) {
return retain0(checkPositive(increment, "increment"));
}

private ReferenceCounted retain0(int increment) {
for (;;) {
int refCnt = this.refCnt;
final int nextCnt = refCnt + increment;

// Ensure we not resurrect (which means the refCnt was 0) and also that we encountered an overflow.
if (nextCnt <= increment) {
throw new IllegalReferenceCountException(refCnt, increment);
}
if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) {
break;
}
}
return this;
}

@Override
public ReferenceCounted touch() {
return touch(null);
}

@Override
public boolean release() {
return release0(1);
}

@Override
public boolean release(int decrement) {
return release0(checkPositive(decrement, "decrement"));
}

private boolean release0(int decrement) {
for (;;) {
int refCnt = this.refCnt;
if (refCnt < decrement) {
throw new IllegalReferenceCountException(refCnt, -decrement);
}

if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
if (refCnt == decrement) {
deallocate();
return true;
}
return false;
}
}
}

/**
* Called once {@link #refCnt()} is equals 0.
*/
protected abstract void deallocate();
}
4 changes: 4 additions & 0 deletions pom.xml
Expand Up @@ -1013,6 +1013,7 @@ flexible messaging model and an intuitive client API.</description>
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java</exclude>
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java</exclude>
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java</exclude>
<exclude>src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java</exclude>
<exclude>**/ByteBufCodedInputStream.java</exclude>
<exclude>**/ByteBufCodedOutputStream.java</exclude>
<exclude>bin/proto/*</exclude>
Expand Down Expand Up @@ -1131,6 +1132,9 @@ flexible messaging model and an intuitive client API.</description>
<exclude>src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedOutputStream.java</exclude>

<!-- Imported from Netty - Apache License v2 -->
<exclude>src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java</exclude>

<!-- This is generated during maven build -->
<exclude>dependency-reduced-pom.xml</exclude>

Expand Down

0 comments on commit 5e446a6

Please sign in to comment.