diff --git a/blazingcache-core/src/main/java/blazingcache/client/CacheClient.java b/blazingcache-core/src/main/java/blazingcache/client/CacheClient.java index 04ed3d9..32a40e2 100644 --- a/blazingcache-core/src/main/java/blazingcache/client/CacheClient.java +++ b/blazingcache-core/src/main/java/blazingcache/client/CacheClient.java @@ -50,7 +50,8 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator; -import java.util.Arrays; +import io.netty.buffer.UnpooledByteBufAllocator; +import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -220,7 +221,120 @@ public String getStatus() { } } + /** + * Builds a {@link CacheClient}. + */ + public final static class Builder { + + private Builder() { + } + + private boolean offHeap = true; + private String clientId = "localhost"; + private String sharedSecret = "changeit"; + private ServerLocator serverLocator; + private ByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT; + + /** + * Prefer storing data on direct memory. Defaults to 'true'. + * + * @param value + * @return the builder itself + */ + public Builder offHeap(boolean value) { + this.offHeap = value; + return this; + } + + /** + * Set a shared ByteBufAllocator. + * A good choice will be to use a PooledByteBufAllocator with the + * useCacheForAllThreads = false option. + * + * @param allocator + * @return the builder itself + */ + public Builder allocator(ByteBufAllocator allocator) { + Objects.requireNonNull(allocator); + this.allocator = allocator; + return this; + } + + /** + * Set the clientId seed. Defaults to 'localhost'. + * + * @param clientId + * @return the builder itself + */ + public Builder clientId(String clientId) { + Objects.requireNonNull(clientId); + this.clientId = clientId; + return this; + } + + /** + * Set the sharedSecret. Defaults to 'changeit'. + * This is a legacy configuration parameter, in order + * to implement real security please configure JAAS/Kerberos. + * + * @param sharedSecret + * @return the builder itself + */ + public Builder sharedSecret(String sharedSecret) { + this.sharedSecret = sharedSecret; + return this; + } + + /** + * Set the callback used to discovery cache servers on the network. + * There is no default. + * + * @param serverLocator + * @return the builder itself + */ + public Builder serverLocator(ServerLocator serverLocator) { + Objects.requireNonNull(serverLocator); + this.serverLocator = serverLocator; + return this; + } + + /** + * Builds the client. + * @return a new client, to be disposed with {@link CacheClient#close() } + * @throws IllegalArgumentException in case of invalid configuration. + */ + public CacheClient build() throws IllegalArgumentException { + if (serverLocator == null) { + throw new IllegalArgumentException("serverLocator must be set"); + } + return new CacheClient(clientId, sharedSecret, serverLocator, offHeap, allocator); + } + } + + /** + * Start creating a new CacheClient. + * + * @return a builder for a new client. + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Create a new CacheClient with the safest default. + * Use {@link #newBuilder() } in order to have full control. + * + * @param clientId + * @param sharedSecret + * @param brokerLocator + */ public CacheClient(String clientId, String sharedSecret, ServerLocator brokerLocator) { + this(clientId, sharedSecret, brokerLocator, true, UnpooledByteBufAllocator.DEFAULT); + } + + private CacheClient(String clientId, String sharedSecret, ServerLocator brokerLocator, + boolean offHeap, ByteBufAllocator allocator) { + this.offHeap = offHeap; this.brokerLocator = brokerLocator; this.sharedSecret = sharedSecret; this.coreThread = new Thread(new ConnectionManager(), "cache-connection-manager-" + clientId); @@ -241,7 +355,7 @@ public CacheClient(String clientId, String sharedSecret, ServerLocator brokerLoc this.clientHits = new AtomicLong(); this.clientMissedGetsToSuccessfulFetches = new AtomicLong(); this.clientMissedGetsToMissedFetches = new AtomicLong(); - this.allocator = PooledByteBufAllocator.DEFAULT; + this.allocator = allocator; } /** @@ -260,6 +374,11 @@ public void clearStatistics() { this.clientMissedGetsToMissedFetches.set(0); } + // visible for testing + ByteBufAllocator getAllocator() { + return allocator; + } + public ServerLocator getBrokerLocator() { return brokerLocator; } @@ -750,10 +869,9 @@ public String getSharedSecret() { /** * Closes the client. It will never try to reconnect again to the server * - * @throws Exception */ @Override - public void close() throws Exception { + public void close() { stop(); } @@ -793,7 +911,8 @@ public EntryHandle fetch(String key) throws InterruptedException { * Returns an entry from the local cache, if not found asks the CacheServer * to find the entry on other clients. If you need to get the local * 'reference' to the object you can use the {@link #fetchObject(java.lang.String, blazingcache.client.KeyLock) ) - * } function.

+ * } function. + *

* The caller MUST explicitly call {@link EntryHandle#close() } * * @param key diff --git a/blazingcache-core/src/test/java/blazingcache/client/CacheClientBuilderTest.java b/blazingcache-core/src/test/java/blazingcache/client/CacheClientBuilderTest.java new file mode 100644 index 0000000..3a2deb0 --- /dev/null +++ b/blazingcache-core/src/test/java/blazingcache/client/CacheClientBuilderTest.java @@ -0,0 +1,140 @@ +/* + Licensed to Diennea S.r.l. under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. Diennea S.r.l. licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + */ +package blazingcache.client; + +import blazingcache.network.Channel; +import blazingcache.network.ChannelEventListener; +import blazingcache.network.ConnectionRequestInfo; +import blazingcache.network.ServerLocator; +import blazingcache.network.ServerNotAvailableException; +import blazingcache.network.ServerRejectedConnectionException; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.util.internal.PlatformDependent; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import org.junit.Test; + +/** + * Tests around cache client builder + * + * @author enrico.olivelli + */ +public class CacheClientBuilderTest { + + private final ServerLocator serverLocator = new ServerLocator() { + @Override + public Channel connect(ChannelEventListener messageReceiver, ConnectionRequestInfo workerInfo) throws InterruptedException, ServerNotAvailableException, ServerRejectedConnectionException { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + @Override + public void brokerDisconnected() { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + }; + + @Test + public void testDefault() { + try (CacheClient client = CacheClient.newBuilder() + .serverLocator(serverLocator) + .build();) { + assertTrue(client.isOffHeap()); + assertTrue(client.getAllocator() instanceof UnpooledByteBufAllocator); + assertTrue(client.getClientId().startsWith("localhost")); + } + } + + @Test + public void testDefaultLegacyConstructor() { + try (CacheClient client = new CacheClient("ff", "bar", serverLocator)) { + assertTrue(client.isOffHeap()); + assertTrue(client.getAllocator() instanceof UnpooledByteBufAllocator); + assertTrue(client.getClientId().startsWith("ff")); + } + } + + @Test + public void testOffHeap() { + try (CacheClient client = CacheClient + .newBuilder() + .serverLocator(serverLocator) + .offHeap(false) + .build();) { + assertFalse(client.isOffHeap()); + } + try (CacheClient client = CacheClient + .newBuilder() + .serverLocator(serverLocator) + .offHeap(true) + .build();) { + assertTrue(client.isOffHeap()); + } + } + + @Test + public void testUsePooledByteBuffers() { + ByteBufAllocator alloc = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred()); + try (CacheClient client = CacheClient + .newBuilder() + .serverLocator(serverLocator) + .allocator(alloc) + .build();) { + assertSame(alloc, client.getAllocator()); + } + } + + @Test + public void testClientId() { + try (CacheClient client = CacheClient + .newBuilder() + .serverLocator(serverLocator) + .clientId("foo") + .build();) { + assertTrue(client.getClientId().startsWith("foo")); + } + } + + @Test + public void testSharedSecret() { + try (CacheClient client = CacheClient + .newBuilder() + .serverLocator(serverLocator) + .sharedSecret("aaa") + .build();) { + assertEquals("aaa", client.getSharedSecret()); + } + } + + @Test + public void testServerLocator() { + + try (CacheClient client = CacheClient + .newBuilder() + .serverLocator(serverLocator) + .build();) { + assertSame(serverLocator, client.getBrokerLocator()); + } + } + +}