Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Default to UnpooledByteBufAllocator #157

Merged
merged 5 commits into from
Mar 25, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
125 changes: 120 additions & 5 deletions blazingcache-core/src/main/java/blazingcache/client/CacheClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import java.util.Arrays;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.util.Set;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -220,7 +220,117 @@ public String getStatus() {
}
}

/**
* Builds a {@link CacheClient}.
*/
public final static class Builder {

private Builder() {
}

private boolean offHeap = true;
private String clientId = "localhost";
private String sharedSecret = "changeit";
private ServerLocator serverLocator;
private ByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT;

/**
* Prefer storing data on direct memory. Defaults to 'true'.
*
* @param value
* @return the builder itself
*/
public Builder offHeap(boolean value) {
this.offHeap = value;
return this;
}

/**
* Set a shared ByteBufAllocator.
* A good choice will be to use a PooledByteBufAllocator with the
* useCacheForAllThreads = false option.
*
* @param allocator
* @return the builder itself
*/
public Builder allocator(ByteBufAllocator allocator) {
this.allocator = allocator;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you may want to do a null check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, sure

return this;
}

/**
* Set the clientId seed. Defaults to 'localhost'.
*
* @param clientId
* @return the builder itself
*/
public Builder clientId(String clientId) {
this.clientId = clientId;
return this;
}

/**
* Set the sharedSecret. Defaults to 'changeit'.
* This is a legacy configuration parameter, in order
* to implement real security please configure JAAS/Kerberos.
*
* @param sharedSecret
* @return the builder itself
*/
public Builder sharedSecret(String sharedSecret) {
this.sharedSecret = sharedSecret;
return this;
}

/**
* Set the callback used to discovery cache servers on the network.
* There is no default.
*
* @param serverLocator
* @return the builder itself
*/
public Builder serverLocator(ServerLocator serverLocator) {
this.serverLocator = serverLocator;
return this;
}

/**
* Builds the client.
* @return a new client, to be disposed with {@link CacheClient#close() }
* @throws IllegalArgumentException in case of invalid configuration.
*/
public CacheClient build() throws IllegalArgumentException {
if (serverLocator == null) {
throw new IllegalArgumentException("serverLocator must be set");
}
return new CacheClient(clientId, sharedSecret, serverLocator, offHeap, allocator);
}
}

/**
* Start creating a new CacheClient.
*
* @return a builder for a new client.
*/
public static Builder newBuilder() {
return new Builder();
}

/**
* Create a new CacheClient with the safest default.
* Use {@link #newBuilder() } in order to have full control.
*
* @param clientId
* @param sharedSecret
* @param brokerLocator
*/
public CacheClient(String clientId, String sharedSecret, ServerLocator brokerLocator) {
this(clientId, sharedSecret, brokerLocator, true, UnpooledByteBufAllocator.DEFAULT);
}

private CacheClient(String clientId, String sharedSecret, ServerLocator brokerLocator,
boolean offHeap, ByteBufAllocator allocator) {
this.offHeap = offHeap;
this.brokerLocator = brokerLocator;
this.sharedSecret = sharedSecret;
this.coreThread = new Thread(new ConnectionManager(), "cache-connection-manager-" + clientId);
Expand All @@ -241,7 +351,7 @@ public CacheClient(String clientId, String sharedSecret, ServerLocator brokerLoc
this.clientHits = new AtomicLong();
this.clientMissedGetsToSuccessfulFetches = new AtomicLong();
this.clientMissedGetsToMissedFetches = new AtomicLong();
this.allocator = PooledByteBufAllocator.DEFAULT;
this.allocator = allocator;
}

/**
Expand All @@ -260,6 +370,11 @@ public void clearStatistics() {
this.clientMissedGetsToMissedFetches.set(0);
}

// visible for testing
ByteBufAllocator getAllocator() {
return allocator;
}

public ServerLocator getBrokerLocator() {
return brokerLocator;
}
Expand Down Expand Up @@ -750,10 +865,9 @@ public String getSharedSecret() {
/**
* Closes the client. It will never try to reconnect again to the server
*
* @throws Exception
*/
@Override
public void close() throws Exception {
public void close() {
stop();
}

Expand Down Expand Up @@ -793,7 +907,8 @@ public EntryHandle fetch(String key) throws InterruptedException {
* Returns an entry from the local cache, if not found asks the CacheServer
* to find the entry on other clients. If you need to get the local
* 'reference' to the object you can use the {@link #fetchObject(java.lang.String, blazingcache.client.KeyLock) )
* } function. <p>
* } function.
* <p>
* The caller MUST explicitly call {@link EntryHandle#close() }
*
* @param key
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
Licensed to Diennea S.r.l. under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. Diennea S.r.l. licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

*/
package blazingcache.client;

import blazingcache.network.Channel;
import blazingcache.network.ChannelEventListener;
import blazingcache.network.ConnectionRequestInfo;
import blazingcache.network.ServerLocator;
import blazingcache.network.ServerNotAvailableException;
import blazingcache.network.ServerRejectedConnectionException;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.util.internal.PlatformDependent;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import org.junit.Test;

/**
* Tests around cache client builder
*
* @author enrico.olivelli
*/
public class CacheClientBuilderTest {

private final ServerLocator serverLocator = new ServerLocator() {
@Override
public Channel connect(ChannelEventListener messageReceiver, ConnectionRequestInfo workerInfo) throws InterruptedException, ServerNotAvailableException, ServerRejectedConnectionException {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}

@Override
public void brokerDisconnected() {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
};

@Test
public void testDefault() {
try (CacheClient client = CacheClient.newBuilder()
.serverLocator(serverLocator)
.build();) {
assertTrue(client.isOffHeap());
assertTrue(client.getAllocator() instanceof UnpooledByteBufAllocator);
assertTrue(client.getClientId().startsWith("localhost"));
}
}

@Test
public void testDefaultLegacyConstructor() {
try (CacheClient client = new CacheClient("ff", "bar", serverLocator)) {
assertTrue(client.isOffHeap());
assertTrue(client.getAllocator() instanceof UnpooledByteBufAllocator);
assertTrue(client.getClientId().startsWith("ff"));
}
}

@Test
public void testOffHeap() {
try (CacheClient client = CacheClient
.newBuilder()
.serverLocator(serverLocator)
.offHeap(false)
.build();) {
assertFalse(client.isOffHeap());
}
try (CacheClient client = CacheClient
.newBuilder()
.serverLocator(serverLocator)
.offHeap(true)
.build();) {
assertTrue(client.isOffHeap());
}
}

@Test
public void testUsePooledByteBuffers() {
ByteBufAllocator alloc = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
try (CacheClient client = CacheClient
.newBuilder()
.serverLocator(serverLocator)
.allocator(alloc)
.build();) {
assertSame(alloc, client.getAllocator());
}
}

@Test
public void testClientId() {
try (CacheClient client = CacheClient
.newBuilder()
.serverLocator(serverLocator)
.clientId("foo")
.build();) {
assertTrue(client.getClientId().startsWith("foo"));
}
}

@Test
public void testSharedSecret() {
try (CacheClient client = CacheClient
.newBuilder()
.serverLocator(serverLocator)
.sharedSecret("aaa")
.build();) {
assertEquals("aaa", client.getSharedSecret());
}
}

@Test
public void testServerLocator() {

try (CacheClient client = CacheClient
.newBuilder()
.serverLocator(serverLocator)
.build();) {
assertSame(serverLocator, client.getBrokerLocator());
}
}

}