diff --git a/src/java/voldemort/client/AbstractStoreClientFactory.java b/src/java/voldemort/client/AbstractStoreClientFactory.java index 965cf45849..ae124f43a4 100644 --- a/src/java/voldemort/client/AbstractStoreClientFactory.java +++ b/src/java/voldemort/client/AbstractStoreClientFactory.java @@ -20,6 +20,8 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; @@ -33,6 +35,7 @@ import voldemort.cluster.Cluster; import voldemort.cluster.Node; import voldemort.cluster.failuredetector.FailureDetector; +import voldemort.common.service.SchedulerService; import voldemort.serialization.ByteArraySerializer; import voldemort.serialization.IdentitySerializer; import voldemort.serialization.SerializationException; @@ -58,6 +61,7 @@ import voldemort.store.versioned.InconsistencyResolvingStore; import voldemort.utils.ByteArray; import voldemort.utils.JmxUtils; +import voldemort.utils.SystemTime; import voldemort.versioning.ChainedResolver; import voldemort.versioning.InconsistencyResolver; import voldemort.versioning.TimeBasedInconsistencyResolver; @@ -101,6 +105,7 @@ public abstract class AbstractStoreClientFactory implements StoreClientFactory { private final int clientZoneId; private final String clientContextName; private final AtomicInteger sequencer; + private final HashSet clientAsyncServiceRepo; public AbstractStoreClientFactory(ClientConfig config) { this.config = config; @@ -122,6 +127,7 @@ public AbstractStoreClientFactory(ClientConfig config) { config.getTimeoutConfig()); this.sequencer = new AtomicInteger(0); + this.clientAsyncServiceRepo = new HashSet(); if(this.isJmxEnabled) { JmxUtils.registerMbean(threadPool, @@ -151,13 +157,19 @@ public StoreClient getStoreClient(String storeName) { public StoreClient getStoreClient(String storeName, InconsistencyResolver> resolver) { + SchedulerService service = new SchedulerService(config.getAsyncJobThreadPoolSize(), + SystemTime.INSTANCE, + true); + clientAsyncServiceRepo.add(service); + return new DefaultStoreClient(storeName, resolver, this, 3, clientContextName, sequencer.getAndIncrement(), - config); + config, + service); } @SuppressWarnings("unchecked") @@ -457,6 +469,16 @@ public void close() { if(failureDetector != null) failureDetector.destroy(); + + stopClientAsyncSchedulers(); + } + + private void stopClientAsyncSchedulers() { + Iterator it = clientAsyncServiceRepo.iterator(); + while(it.hasNext()) { + it.next().stop(); + } + clientAsyncServiceRepo.clear(); } /* Give a unique id to avoid jmx clashes */ diff --git a/src/java/voldemort/client/DefaultStoreClient.java b/src/java/voldemort/client/DefaultStoreClient.java index 3d507e71f7..df26617e68 100644 --- a/src/java/voldemort/client/DefaultStoreClient.java +++ b/src/java/voldemort/client/DefaultStoreClient.java @@ -46,7 +46,6 @@ import voldemort.store.system.SystemStoreConstants; import voldemort.utils.JmxUtils; import voldemort.utils.ManifestFileReader; -import voldemort.utils.SystemTime; import voldemort.utils.Utils; import voldemort.versioning.InconsistencyResolver; import voldemort.versioning.InconsistentDataException; @@ -89,7 +88,7 @@ public DefaultStoreClient(String storeName, InconsistencyResolver> resolver, StoreClientFactory storeFactory, int maxMetadataRefreshAttempts) { - this(storeName, resolver, storeFactory, maxMetadataRefreshAttempts, null, 0, null); + this(storeName, resolver, storeFactory, maxMetadataRefreshAttempts, null, 0, null, null); } public DefaultStoreClient(String storeName, @@ -98,7 +97,8 @@ public DefaultStoreClient(String storeName, int maxMetadataRefreshAttempts, String clientContext, int clientSequence, - ClientConfig config) { + ClientConfig config, + SchedulerService scheduler) { this.storeName = Utils.notNull(storeName); this.resolver = resolver; @@ -112,9 +112,7 @@ public DefaultStoreClient(String storeName, this.clientId = AbstractStoreClientFactory.generateClientId(clientInfo); this.config = config; this.sysRepository = new SystemStoreRepository(); - this.scheduler = new SchedulerService(config.getAsyncJobThreadPoolSize(), - SystemTime.INSTANCE, - true); + this.scheduler = scheduler; // Registering self to be able to bootstrap client dynamically via JMX JmxUtils.registerMbean(this, JmxUtils.createObjectName(JmxUtils.getPackageName(this.getClass()), @@ -147,12 +145,17 @@ private void registerClient(String jobId, int interval) { version); GregorianCalendar cal = new GregorianCalendar(); cal.add(Calendar.SECOND, interval); - scheduler.schedule(jobId + refresher.getClass().getName(), - refresher, - cal.getTime(), - TimeUnit.MILLISECONDS.convert(interval, TimeUnit.SECONDS)); - logger.info("Client registry refresher thread started, refresh interval: " - + interval + " seconds"); + + if(scheduler != null) { + scheduler.schedule(jobId + refresher.getClass().getName(), + refresher, + cal.getTime(), + TimeUnit.MILLISECONDS.convert(interval, TimeUnit.SECONDS)); + logger.info("Client registry refresher thread started, refresh interval: " + + interval + " seconds"); + } else { + logger.warn("Client registry won't run because scheduler service is not configured"); + } } catch(Exception e) { logger.warn("Unable to register with the cluster due to the following error:", e); } @@ -183,13 +186,16 @@ public Void call() throws Exception { // schedule the job to run every 'checkInterval' period, starting // now - scheduler.schedule(jobId + asyncCheckMetadata.getClass().getName(), - asyncCheckMetadata, - new Date(), - interval); - logger.info("Metadata version check thread started. Frequency = Every " + interval - + " ms"); - + if(scheduler != null) { + scheduler.schedule(jobId + asyncCheckMetadata.getClass().getName(), + asyncCheckMetadata, + new Date(), + interval); + logger.info("Metadata version check thread started. Frequency = Every " + interval + + " ms"); + } else { + logger.warn("Metadata version check thread won't start because the scheduler service is not configured."); + } } return asyncCheckMetadata; } @@ -224,10 +230,6 @@ public void bootStrap() { } } - public void close() { - scheduler.stopInner(); - } - public boolean delete(K key) { Versioned versioned = get(key); if(versioned == null) diff --git a/test/unit/voldemort/client/ClientRegistryTest.java b/test/unit/voldemort/client/ClientRegistryTest.java index 7fe43d17ee..1cca91df59 100644 --- a/test/unit/voldemort/client/ClientRegistryTest.java +++ b/test/unit/voldemort/client/ClientRegistryTest.java @@ -30,7 +30,7 @@ import com.google.common.collect.Lists; -@SuppressWarnings({ "unchecked" }) +@SuppressWarnings( { "unchecked" }) public class ClientRegistryTest { public static final String SERVER_LOCAL_URL = "tcp://localhost:"; @@ -143,7 +143,7 @@ public void testHappyPath() { infoList = getClientRegistryContent(it); assertTrue("Client registry not updated.", infoList.get(0).getBootstrapTime() < infoList.get(0).getUpdateTime()); - ((DefaultStoreClient) client1).close(); + socketFactory.close(); } @Test @@ -227,8 +227,7 @@ public void testTwoClients() { assertTrue("Client registry not updated.", infoList.get(1).getBootstrapTime() < infoList.get(1).getUpdateTime()); - ((DefaultStoreClient) client1).close(); - ((DefaultStoreClient) client2).close(); + socketFactory.close(); } @Test @@ -329,8 +328,7 @@ public void testTwoStores() { assertTrue("Client registry not updated.", infoList.get(1).getBootstrapTime() < infoList.get(1).getUpdateTime()); - ((DefaultStoreClient) client1).close(); - ((DefaultStoreClient) client2).close(); + socketFactory.close(); } @Test @@ -462,8 +460,8 @@ public void testTwoFactories() { assertTrue("Client registry not updated.", infoList.get(1).getBootstrapTime() < infoList.get(1).getUpdateTime()); - ((DefaultStoreClient) client1).close(); - ((DefaultStoreClient) client2).close(); + socketFactory1.close(); + socketFactory2.close(); } @Test @@ -556,8 +554,8 @@ public void testOneServerFailre() { assertTrue("Client registry not updated.", infoList.get(1).getBootstrapTime() < infoList.get(1).getUpdateTime()); - ((DefaultStoreClient) client1).close(); - ((DefaultStoreClient) client2).close(); + socketFactory1.close(); + socketFactory2.close(); } @Test @@ -592,8 +590,6 @@ public void testRepeatRegistrationSameFactory() { client1.put("k1", "v1"); client2.put("k2", "v2"); - ((DefaultStoreClient) client1).close(); - ((DefaultStoreClient) client2).close(); } Iterator>> it = adminClient.fetchEntries(1, @@ -604,6 +600,8 @@ public void testRepeatRegistrationSameFactory() { ArrayList infoList = getClientRegistryContent(it); assertEquals("Incrrect # of entries created in client registry", 6, infoList.size()); + socketFactory1.close(); + socketFactory2.close(); } @Test @@ -709,8 +707,8 @@ public void testRepeatRegistrationDifferentFactories() { client1LastBootstrapTime = infoList.get(0).getBootstrapTime(); client2LastBootstrapTime = infoList.get(0).getBootstrapTime(); - ((DefaultStoreClient) client1).close(); - ((DefaultStoreClient) client2).close(); + socketFactory1.close(); + socketFactory2.close(); } }