Skip to content

Commit

Permalink
Default to UnpooledByteBufAllocator
Browse files Browse the repository at this point in the history
Introduce a new CacheClient builder.

This is the safest default
Netty by default will keep thread local Pools
if the application uses several threads in order to save values
on the case but it does not perform frequent operations (ByteBuffer allocations)
pooling ByteBuffers will lead to a large usage of direct memory
which won't be reclaimed, because by default Netty reclaims memory
per thread and every N allocations.
  • Loading branch information
Enrico Olivelli - Diennea committed Mar 22, 2019
1 parent db9ef94 commit cbf5c3a
Show file tree
Hide file tree
Showing 2 changed files with 226 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import java.util.Arrays;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.util.Set;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -220,7 +220,67 @@ public String getStatus() {
}
}

public final static class Builder {

private Builder() {
}

private boolean offHeap = true;
private boolean poolMemoryBuffers = false;
private String clientId = "localhost";
private String sharedSecret = "changeit";
private ServerLocator serverLocator;

public Builder offHeap(boolean value) {
this.offHeap = value;
return this;
}

public Builder poolMemoryBuffers(boolean value) {
this.poolMemoryBuffers = value;
return this;
}

public Builder clientId(String clientId) {
this.clientId = clientId;
return this;
}

public Builder sharedSecret(String sharedSecret) {
this.sharedSecret = sharedSecret;
return this;
}

public Builder serverLocator(ServerLocator serverLocator) {
this.serverLocator = serverLocator;
return this;
}

public CacheClient build() {
if (serverLocator == null) {
throw new IllegalArgumentException("serverLocator must be set");
}
return new CacheClient(clientId, sharedSecret, serverLocator, offHeap, poolMemoryBuffers);
}
}

public static Builder newBuilder() {
return new Builder();
}

/**
* Create a new CacheClient with the safest default
* @param clientId
* @param sharedSecret
* @param brokerLocator
*/
public CacheClient(String clientId, String sharedSecret, ServerLocator brokerLocator) {
this(clientId, sharedSecret, brokerLocator, true, false /* poolMemoryBuffers = false is safer */);
}

private CacheClient(String clientId, String sharedSecret, ServerLocator brokerLocator,
boolean offHeap, boolean poolMemoryBuffers) {
this.offHeap = offHeap;
this.brokerLocator = brokerLocator;
this.sharedSecret = sharedSecret;
this.coreThread = new Thread(new ConnectionManager(), "cache-connection-manager-" + clientId);
Expand All @@ -241,7 +301,19 @@ public CacheClient(String clientId, String sharedSecret, ServerLocator brokerLoc
this.clientHits = new AtomicLong();
this.clientMissedGetsToSuccessfulFetches = new AtomicLong();
this.clientMissedGetsToMissedFetches = new AtomicLong();
this.allocator = PooledByteBufAllocator.DEFAULT;

if (poolMemoryBuffers) {
this.allocator = PooledByteBufAllocator.DEFAULT;
} else {
// This is the safest default
// Netty by default will keep thread local Pools
// if the application uses several threads in order to save values
// on the case but it does not perform frequent operations (ByteBuffer allocations)
// pooling ByteBuffers will lead to a large usage of direct memory
// which won't be reclaimed, because by default Netty reclaims memory
// per thread and every N allocations
this.allocator = UnpooledByteBufAllocator.DEFAULT;
}
}

/**
Expand All @@ -260,6 +332,11 @@ public void clearStatistics() {
this.clientMissedGetsToMissedFetches.set(0);
}

// visible for testing
ByteBufAllocator getAllocator() {
return allocator;
}

public ServerLocator getBrokerLocator() {
return brokerLocator;
}
Expand Down Expand Up @@ -753,7 +830,7 @@ public String getSharedSecret() {
* @throws Exception
*/
@Override
public void close() throws Exception {
public void close() {
stop();
}

Expand Down Expand Up @@ -793,7 +870,8 @@ public EntryHandle fetch(String key) throws InterruptedException {
* Returns an entry from the local cache, if not found asks the CacheServer
* to find the entry on other clients. If you need to get the local
* 'reference' to the object you can use the {@link #fetchObject(java.lang.String, blazingcache.client.KeyLock) )
* } function. <p>
* } function.
* <p>
* The caller MUST explicitly call {@link EntryHandle#close() }
*
* @param key
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
Licensed to Diennea S.r.l. under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. Diennea S.r.l. licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package blazingcache.client;

import blazingcache.network.Channel;
import blazingcache.network.ChannelEventListener;
import blazingcache.network.ConnectionRequestInfo;
import blazingcache.network.ServerLocator;
import blazingcache.network.ServerNotAvailableException;
import blazingcache.network.ServerRejectedConnectionException;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import org.junit.Test;

/**
* Tests around cache client builder
*
* @author enrico.olivelli
*/
public class CacheClientBuilderTest {

private final ServerLocator serverLocator = new ServerLocator() {
@Override
public Channel connect(ChannelEventListener messageReceiver, ConnectionRequestInfo workerInfo) throws InterruptedException, ServerNotAvailableException, ServerRejectedConnectionException {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}

@Override
public void brokerDisconnected() {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
};

@Test
public void testDefault() {
try (CacheClient client = CacheClient.newBuilder()
.serverLocator(serverLocator)
.build();) {
assertTrue(client.isOffHeap());
assertTrue(client.getAllocator() instanceof UnpooledByteBufAllocator);
assertTrue(client.getClientId().startsWith("localhost"));
}
}

@Test
public void testDefaultLegacyConstructor() {
try (CacheClient client = new CacheClient("ff", "bar", serverLocator)) {
assertTrue(client.isOffHeap());
assertTrue(client.getAllocator() instanceof UnpooledByteBufAllocator);
assertTrue(client.getClientId().startsWith("ff"));
}
}

@Test
public void testOffHeap() {
try (CacheClient client = CacheClient
.newBuilder()
.serverLocator(serverLocator)
.offHeap(false)
.build();) {
assertFalse(client.isOffHeap());
}
try (CacheClient client = CacheClient
.newBuilder()
.serverLocator(serverLocator)
.offHeap(true)
.build();) {
assertTrue(client.isOffHeap());
}
}

@Test
public void testUsePooledByteBuffers() {
try (CacheClient client = CacheClient
.newBuilder()
.serverLocator(serverLocator)
.poolMemoryBuffers(true)
.build();) {
assertTrue(client.getAllocator() instanceof PooledByteBufAllocator);
}
try (CacheClient client = CacheClient
.newBuilder()
.serverLocator(serverLocator)
.poolMemoryBuffers(false)
.build();) {
assertTrue(client.getAllocator() instanceof UnpooledByteBufAllocator);
}
}

@Test
public void testClientId() {
try (CacheClient client = CacheClient
.newBuilder()
.serverLocator(serverLocator)
.clientId("foo")
.build();) {
assertTrue(client.getClientId().startsWith("foo"));
}
}

@Test
public void testSharedSecret() {
try (CacheClient client = CacheClient
.newBuilder()
.serverLocator(serverLocator)
.sharedSecret("aaa")
.build();) {
assertEquals("aaa", client.getSharedSecret());
}
}

@Test
public void testServerLocator() {

try (CacheClient client = CacheClient
.newBuilder()
.serverLocator(serverLocator)
.build();) {
assertSame(serverLocator, client.getBrokerLocator());
}
}

}

0 comments on commit cbf5c3a

Please sign in to comment.