From cbf5c3a90d4bc02ed45a9fcad48c225abffb8e88 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli - Diennea Date: Fri, 22 Mar 2019 15:13:54 +0100 Subject: [PATCH] Default to UnpooledByteBufAllocator Introduce a new CacheClient builder. This is the safest default Netty by default will keep thread local Pools if the application uses several threads in order to save values on the case but it does not perform frequent operations (ByteBuffer allocations) pooling ByteBuffers will lead to a large usage of direct memory which won't be reclaimed, because by default Netty reclaims memory per thread and every N allocations. --- .../java/blazingcache/client/CacheClient.java | 86 ++++++++++- .../client/CacheClientBuilderTest.java | 144 ++++++++++++++++++ 2 files changed, 226 insertions(+), 4 deletions(-) create mode 100644 blazingcache-core/src/test/java/blazingcache/client/CacheClientBuilderTest.java diff --git a/blazingcache-core/src/main/java/blazingcache/client/CacheClient.java b/blazingcache-core/src/main/java/blazingcache/client/CacheClient.java index 04ed3d9..4dfe5c4 100644 --- a/blazingcache-core/src/main/java/blazingcache/client/CacheClient.java +++ b/blazingcache-core/src/main/java/blazingcache/client/CacheClient.java @@ -50,7 +50,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator; -import java.util.Arrays; +import io.netty.buffer.UnpooledByteBufAllocator; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -220,7 +220,67 @@ public String getStatus() { } } + public final static class Builder { + + private Builder() { + } + + private boolean offHeap = true; + private boolean poolMemoryBuffers = false; + private String clientId = "localhost"; + private String sharedSecret = "changeit"; + private ServerLocator serverLocator; + + public Builder offHeap(boolean value) { + this.offHeap = value; + return this; + } + + public Builder poolMemoryBuffers(boolean value) { + this.poolMemoryBuffers = value; + return this; + } + + public Builder clientId(String clientId) { + this.clientId = clientId; + return this; + } + + public Builder sharedSecret(String sharedSecret) { + this.sharedSecret = sharedSecret; + return this; + } + + public Builder serverLocator(ServerLocator serverLocator) { + this.serverLocator = serverLocator; + return this; + } + + public CacheClient build() { + if (serverLocator == null) { + throw new IllegalArgumentException("serverLocator must be set"); + } + return new CacheClient(clientId, sharedSecret, serverLocator, offHeap, poolMemoryBuffers); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Create a new CacheClient with the safest default + * @param clientId + * @param sharedSecret + * @param brokerLocator + */ public CacheClient(String clientId, String sharedSecret, ServerLocator brokerLocator) { + this(clientId, sharedSecret, brokerLocator, true, false /* poolMemoryBuffers = false is safer */); + } + + private CacheClient(String clientId, String sharedSecret, ServerLocator brokerLocator, + boolean offHeap, boolean poolMemoryBuffers) { + this.offHeap = offHeap; this.brokerLocator = brokerLocator; this.sharedSecret = sharedSecret; this.coreThread = new Thread(new ConnectionManager(), "cache-connection-manager-" + clientId); @@ -241,7 +301,19 @@ public CacheClient(String clientId, String sharedSecret, ServerLocator brokerLoc this.clientHits = new AtomicLong(); this.clientMissedGetsToSuccessfulFetches = new AtomicLong(); this.clientMissedGetsToMissedFetches = new AtomicLong(); - this.allocator = PooledByteBufAllocator.DEFAULT; + + if (poolMemoryBuffers) { + this.allocator = PooledByteBufAllocator.DEFAULT; + } else { + // This is the safest default + // Netty by default will keep thread local Pools + // if the application uses several threads in order to save values + // on the case but it does not perform frequent operations (ByteBuffer allocations) + // pooling ByteBuffers will lead to a large usage of direct memory + // which won't be reclaimed, because by default Netty reclaims memory + // per thread and every N allocations + this.allocator = UnpooledByteBufAllocator.DEFAULT; + } } /** @@ -260,6 +332,11 @@ public void clearStatistics() { this.clientMissedGetsToMissedFetches.set(0); } + // visible for testing + ByteBufAllocator getAllocator() { + return allocator; + } + public ServerLocator getBrokerLocator() { return brokerLocator; } @@ -753,7 +830,7 @@ public String getSharedSecret() { * @throws Exception */ @Override - public void close() throws Exception { + public void close() { stop(); } @@ -793,7 +870,8 @@ public EntryHandle fetch(String key) throws InterruptedException { * Returns an entry from the local cache, if not found asks the CacheServer * to find the entry on other clients. If you need to get the local * 'reference' to the object you can use the {@link #fetchObject(java.lang.String, blazingcache.client.KeyLock) ) - * } function.

+ * } function. + *

* The caller MUST explicitly call {@link EntryHandle#close() } * * @param key diff --git a/blazingcache-core/src/test/java/blazingcache/client/CacheClientBuilderTest.java b/blazingcache-core/src/test/java/blazingcache/client/CacheClientBuilderTest.java new file mode 100644 index 0000000..019ce40 --- /dev/null +++ b/blazingcache-core/src/test/java/blazingcache/client/CacheClientBuilderTest.java @@ -0,0 +1,144 @@ +/* + Licensed to Diennea S.r.l. under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. Diennea S.r.l. licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + */ +package blazingcache.client; + +import blazingcache.network.Channel; +import blazingcache.network.ChannelEventListener; +import blazingcache.network.ConnectionRequestInfo; +import blazingcache.network.ServerLocator; +import blazingcache.network.ServerNotAvailableException; +import blazingcache.network.ServerRejectedConnectionException; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import org.junit.Test; + +/** + * Tests around cache client builder + * + * @author enrico.olivelli + */ +public class CacheClientBuilderTest { + + private final ServerLocator serverLocator = new ServerLocator() { + @Override + public Channel connect(ChannelEventListener messageReceiver, ConnectionRequestInfo workerInfo) throws InterruptedException, ServerNotAvailableException, ServerRejectedConnectionException { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + @Override + public void brokerDisconnected() { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + }; + + @Test + public void testDefault() { + try (CacheClient client = CacheClient.newBuilder() + .serverLocator(serverLocator) + .build();) { + assertTrue(client.isOffHeap()); + assertTrue(client.getAllocator() instanceof UnpooledByteBufAllocator); + assertTrue(client.getClientId().startsWith("localhost")); + } + } + + @Test + public void testDefaultLegacyConstructor() { + try (CacheClient client = new CacheClient("ff", "bar", serverLocator)) { + assertTrue(client.isOffHeap()); + assertTrue(client.getAllocator() instanceof UnpooledByteBufAllocator); + assertTrue(client.getClientId().startsWith("ff")); + } + } + + @Test + public void testOffHeap() { + try (CacheClient client = CacheClient + .newBuilder() + .serverLocator(serverLocator) + .offHeap(false) + .build();) { + assertFalse(client.isOffHeap()); + } + try (CacheClient client = CacheClient + .newBuilder() + .serverLocator(serverLocator) + .offHeap(true) + .build();) { + assertTrue(client.isOffHeap()); + } + } + + @Test + public void testUsePooledByteBuffers() { + try (CacheClient client = CacheClient + .newBuilder() + .serverLocator(serverLocator) + .poolMemoryBuffers(true) + .build();) { + assertTrue(client.getAllocator() instanceof PooledByteBufAllocator); + } + try (CacheClient client = CacheClient + .newBuilder() + .serverLocator(serverLocator) + .poolMemoryBuffers(false) + .build();) { + assertTrue(client.getAllocator() instanceof UnpooledByteBufAllocator); + } + } + + @Test + public void testClientId() { + try (CacheClient client = CacheClient + .newBuilder() + .serverLocator(serverLocator) + .clientId("foo") + .build();) { + assertTrue(client.getClientId().startsWith("foo")); + } + } + + @Test + public void testSharedSecret() { + try (CacheClient client = CacheClient + .newBuilder() + .serverLocator(serverLocator) + .sharedSecret("aaa") + .build();) { + assertEquals("aaa", client.getSharedSecret()); + } + } + + @Test + public void testServerLocator() { + + try (CacheClient client = CacheClient + .newBuilder() + .serverLocator(serverLocator) + .build();) { + assertSame(serverLocator, client.getBrokerLocator()); + } + } + +}