diff --git a/docs/configs/janusgraph-cfg.md b/docs/configs/janusgraph-cfg.md index fa6bbab411..6adb155877 100644 --- a/docs/configs/janusgraph-cfg.md +++ b/docs/configs/janusgraph-cfg.md @@ -18,10 +18,17 @@ Configuration options that modify JanusGraph's caching behavior | Name | Description | Datatype | Default Value | Mutability | | ---- | ---- | ---- | ---- | ---- | +| cache.cache-type | Enable or disable Redis cache (redis/inmemory) | String | redis | MASKABLE | | cache.db-cache | Whether to enable JanusGraph's database-level cache, which is shared across all transactions. Enabling this option speeds up traversals by holding hot graph elements in memory, but also increases the likelihood of reading stale data. Disabling it forces each transaction to independently fetch graph elements from storage before reading/writing them. | Boolean | false | MASKABLE | | cache.db-cache-clean-wait | How long, in milliseconds, database-level cache will keep entries after flushing them. This option is only useful on distributed storage backends that are capable of acknowledging writes without necessarily making them immediately visible. | Integer | 50 | MASKABLE | | cache.db-cache-size | Size of JanusGraph's database level cache. Values between 0 and 1 are interpreted as a percentage of VM heap, while larger values are interpreted as an absolute size in bytes. | Double | 0.3 | MASKABLE | | cache.db-cache-time | Default expiration time, in milliseconds, for entries in the database-level cache. Entries are evicted when they reach this age even if the cache has room to spare. Set to 0 to disable expiration (cache entries live forever or until memory pressure triggers eviction when set to 0). | Long | 10000 | MASKABLE | +| cache.redis-cache-connectTimeout | Timeout during connecting to any Redis server. | Integer | 1000 | MASKABLE | +| cache.redis-cache-connectionMinimumIdleSize | Minimum idle Redis connection amount. | Integer | 24 | MASKABLE | +| cache.redis-cache-connectionPoolSize | Redis connection maximum pool size. | Integer | 64 | MASKABLE | +| cache.redis-cache-host | Redis host name | String | localhost | MASKABLE | +| cache.redis-cache-keepAlive | Enables TCP keepAlive for connection. | Boolean | true | MASKABLE | +| cache.redis-cache-port | Redis host port | Integer | 6379 | MASKABLE | | cache.tx-cache-size | Maximum size of the transaction-level cache of recently-used vertices. | Integer | 20000 | MASKABLE | | cache.tx-dirty-size | Initial size of the transaction-level cache of uncommitted dirty vertices. This is a performance hint for write-heavy, performance-sensitive transactional workloads. If set, it should roughly match the median vertices modified per transaction. | Integer | (no default value) | MASKABLE | diff --git a/janusgraph-core/pom.xml b/janusgraph-core/pom.xml index fe58327baa..bff97e9f23 100644 --- a/janusgraph-core/pom.xml +++ b/janusgraph-core/pom.xml @@ -20,6 +20,54 @@ janusgraph-driver ${project.version} + + de.ruedigermoeller + fst + 2.56 + + + org.redisson + redisson + 3.16.8 + + + io.netty + netty-transport + + + io.netty + netty-resolver + + + io.netty + netty-handler + + + io.netty + netty-codec + + + io.netty + netty-buffer + + + io.netty + netty-common + + + io.netty + netty-resolver-dns + + + net.bytebuddy + byte-buddy + + + org.slf4j + slf4j-api + + + org.apache.tinkerpop diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/Backend.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/Backend.java index 42dad3bb09..1e055b0243 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/Backend.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/Backend.java @@ -43,6 +43,7 @@ import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; import org.janusgraph.diskstorage.keycolumnvalue.cache.CacheTransaction; import org.janusgraph.diskstorage.keycolumnvalue.cache.ExpirationKCVSCache; +import org.janusgraph.diskstorage.keycolumnvalue.cache.ExpirationKCVSRedisCache; import org.janusgraph.diskstorage.keycolumnvalue.cache.KCVSCache; import org.janusgraph.diskstorage.keycolumnvalue.cache.NoKCVSCache; import org.janusgraph.diskstorage.keycolumnvalue.keyvalue.OrderedKeyValueStoreManager; @@ -86,6 +87,7 @@ import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.ALLOW_CUSTOM_VERTEX_ID_TYPES; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.BASIC_METRICS; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.BUFFER_SIZE; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.CACHE_TYPE; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.DB_CACHE; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.DB_CACHE_CLEAN_WAIT; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.DB_CACHE_SIZE; @@ -156,6 +158,7 @@ public class Backend implements LockerProvider, AutoCloseable { public static final String SYSTEM_TX_LOG_NAME = "txlog"; public static final String SYSTEM_MGMT_LOG_NAME = "systemlog"; + public static final String REDIS_TAG = "redis"; public static final double EDGESTORE_CACHE_PERCENT = 0.8; public static final double INDEXSTORE_CACHE_PERCENT = 0.2; @@ -362,8 +365,22 @@ public void initialize(Configuration config) { long edgeStoreCacheSize = Math.round(cacheSizeBytes * EDGESTORE_CACHE_PERCENT); long indexStoreCacheSize = Math.round(cacheSizeBytes * INDEXSTORE_CACHE_PERCENT); - edgeStore = new ExpirationKCVSCache(edgeStoreRaw,getMetricsCacheName(EDGESTORE_NAME),expirationTime,cleanWaitTime,edgeStoreCacheSize); - indexStore = new ExpirationKCVSCache(indexStoreRaw,getMetricsCacheName(INDEXSTORE_NAME),expirationTime,cleanWaitTime,indexStoreCacheSize); + String cacheType = configuration.get(CACHE_TYPE); + + if(REDIS_TAG.equals(cacheType)){ + log.info("======== Configuring redis cache ========"); + edgeStore = new ExpirationKCVSRedisCache(edgeStoreRaw,getMetricsCacheName(EDGESTORE_NAME)!=null?getMetricsCacheName(EDGESTORE_NAME) + :"edgeStore",expirationTime,cleanWaitTime, + edgeStoreCacheSize, configuration); + indexStore = new ExpirationKCVSRedisCache(indexStoreRaw,getMetricsCacheName(INDEXSTORE_NAME)!=null? + getMetricsCacheName(INDEXSTORE_NAME):"indexStore",expirationTime,cleanWaitTime, + indexStoreCacheSize, configuration); + }else{ + log.info("======== Configuring inmemory cache ========"); + edgeStore = new ExpirationKCVSCache(edgeStoreRaw,getMetricsCacheName(EDGESTORE_NAME),expirationTime,cleanWaitTime,edgeStoreCacheSize); + indexStore = new ExpirationKCVSCache(indexStoreRaw,getMetricsCacheName(INDEXSTORE_NAME),expirationTime,cleanWaitTime,indexStoreCacheSize); + } + } else { edgeStore = new NoKCVSCache(edgeStoreRaw); indexStore = new NoKCVSCache(indexStoreRaw); diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/EntryList.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/EntryList.java index d75f6fb409..ecaa4063b1 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/EntryList.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/EntryList.java @@ -14,6 +14,7 @@ package org.janusgraph.diskstorage; +import java.io.Serializable; import java.util.AbstractList; import java.util.Iterator; import java.util.List; @@ -45,7 +46,7 @@ public interface EntryList extends List { EmptyList EMPTY_LIST = new EmptyList(); - class EmptyList extends AbstractList implements EntryList { + class EmptyList extends AbstractList implements EntryList, Serializable { @Override public Entry get(int index) { diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/KeySliceQuery.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/KeySliceQuery.java index 692edb8e1b..bf967d4b12 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/KeySliceQuery.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/KeySliceQuery.java @@ -17,6 +17,7 @@ import com.google.common.base.Preconditions; import org.janusgraph.diskstorage.StaticBuffer; +import java.io.Serializable; import java.util.Objects; /** @@ -24,7 +25,7 @@ * @author Matthias Broecheler (me@matthiasb.com) */ -public class KeySliceQuery extends SliceQuery { +public class KeySliceQuery extends SliceQuery implements Serializable { private final StaticBuffer key; diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/SliceQuery.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/SliceQuery.java index 61e31e7afe..801fddc2ac 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/SliceQuery.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/SliceQuery.java @@ -23,6 +23,7 @@ import org.janusgraph.graphdb.query.BackendQuery; import org.janusgraph.graphdb.query.BaseQuery; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -37,7 +38,7 @@ * @author Matthias Broecheler (me@matthiasb.com) */ -public class SliceQuery extends BaseQuery implements BackendQuery { +public class SliceQuery extends BaseQuery implements BackendQuery, Serializable { private final StaticBuffer sliceStart; private final StaticBuffer sliceEnd; diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/ExpirationKCVSRedisCache.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/ExpirationKCVSRedisCache.java new file mode 100644 index 0000000000..9789ba4728 --- /dev/null +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/ExpirationKCVSRedisCache.java @@ -0,0 +1,308 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.keycolumnvalue.cache; + + +import com.google.common.base.Preconditions; +import com.google.common.cache.CacheLoader; +import org.janusgraph.core.JanusGraphException; +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.EntryList; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.configuration.Configuration; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore; +import org.janusgraph.diskstorage.keycolumnvalue.KeySliceQuery; +import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery; +import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; +import org.janusgraph.diskstorage.util.CacheMetricsAction; +import org.nustaq.serialization.FSTConfiguration; +import org.redisson.api.LocalCachedMapOptions; +import org.redisson.api.RLocalCachedMap; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static org.janusgraph.util.datastructures.ByteSize.OBJECT_HEADER; +import static org.janusgraph.util.datastructures.ByteSize.OBJECT_REFERENCE; +import static org.janusgraph.util.datastructures.ByteSize.STATICARRAYBUFFER_RAW_SIZE; + +/** + * @author Matthias Broecheler (me@matthiasb.com) + */ +public class ExpirationKCVSRedisCache extends KCVSCache { + + //Weight estimation + private static final int STATIC_ARRAY_BUFFER_SIZE = STATICARRAYBUFFER_RAW_SIZE + 10; // 10 = last number is average length + private static final int KEY_QUERY_SIZE = OBJECT_HEADER + 4 + 1 + 3 * (OBJECT_REFERENCE + STATIC_ARRAY_BUFFER_SIZE); // object_size + int + boolean + 3 static buffers + + private static final int INVALIDATE_KEY_FRACTION_PENALTY = 1000; + private static final int PENALTY_THRESHOLD = 5; + public static final String REDIS_CACHE_PREFIX = "redis-cache-"; + public static final String REDIS_INDEX_CACHE_PREFIX = "redis-index-cache-"; + + private volatile CountDownLatch penaltyCountdown; + + private final ConcurrentHashMap expiredKeys; + + private final long cacheTimeMS; + private final long invalidationGracePeriodMS; + private final CleanupThread cleanupThread; + private RedissonClient redissonClient; + private RLocalCachedMap redisCache; + private RLocalCachedMap> redisIndexKeys; + private static Logger logger = Logger.getLogger("redis-logger"); + private static FSTConfiguration fastConf = FSTConfiguration.createDefaultConfiguration(); + + public ExpirationKCVSRedisCache(final KeyColumnValueStore store, String metricsName, final long cacheTimeMS, + final long invalidationGracePeriodMS, final long maximumByteSize, Configuration configuration) { + super(store, metricsName); + Preconditions.checkArgument(cacheTimeMS > 0, "Cache expiration must be positive: %s", cacheTimeMS); + Preconditions.checkArgument(System.currentTimeMillis() + 1000L * 3600 * 24 * 365 * 100 + cacheTimeMS > 0, "Cache expiration time too large, overflow may occur: %s", cacheTimeMS); + this.cacheTimeMS = cacheTimeMS; + final int concurrencyLevel = Runtime.getRuntime().availableProcessors(); + Preconditions.checkArgument(invalidationGracePeriodMS >= 0, "Invalid expiration grace period: %s", invalidationGracePeriodMS); + this.invalidationGracePeriodMS = invalidationGracePeriodMS; + + redissonClient = RedissonCache.getRedissonClient(configuration); + redisCache = redissonClient.getLocalCachedMap(REDIS_CACHE_PREFIX + metricsName, LocalCachedMapOptions.defaults()); + redisIndexKeys = redissonClient.getLocalCachedMap(REDIS_INDEX_CACHE_PREFIX + metricsName, LocalCachedMapOptions.defaults()); + expiredKeys = new ConcurrentHashMap<>(50, 0.75f, concurrencyLevel); + penaltyCountdown = new CountDownLatch(PENALTY_THRESHOLD); + + cleanupThread = new CleanupThread(); + cleanupThread.start(); + logger.info("********************** Configurations are loaded **********************"); + } + + @Override + public EntryList getSlice(final KeySliceQuery query, final StoreTransaction txh) throws BackendException { + incActionBy(1, CacheMetricsAction.RETRIEVAL, txh); + if (isExpired(query)) { + incActionBy(1, CacheMetricsAction.MISS, txh); + return store.getSlice(query, unwrapTx(txh)); + } + + try { + return get(query, () -> { + incActionBy(1, CacheMetricsAction.MISS, txh); + return store.getSlice(query, unwrapTx(txh)); + }); + } catch (Exception e) { + if (e instanceof JanusGraphException) throw (JanusGraphException) e; + else if (e.getCause() instanceof JanusGraphException) throw (JanusGraphException) e.getCause(); + else throw new JanusGraphException(e); + } + } + + private EntryList get(KeySliceQuery query, Callable valueLoader) { + byte[] bytQuery = redisCache.get(query); + EntryList entries = bytQuery != null ? (EntryList) fastConf.asObject(bytQuery) : null; + if (entries == null) { + logger.log(Level.INFO, "reading from the store................."); + try { + entries = valueLoader.call(); + if (entries == null) { + throw new CacheLoader.InvalidCacheLoadException("valueLoader must not return null, key=" + query); + } else { + redisCache.fastPutAsync(query, fastConf.asByteArray(entries)); + RLock lock = redisIndexKeys.getLock(query.getKey()); + try { + lock.tryLock(1, 2, TimeUnit.SECONDS); + ArrayList queryList = redisIndexKeys.get(query.getKey()); + if (queryList == null) + queryList = new ArrayList<>(); + queryList.add(query); + redisIndexKeys.fastPutAsync(query.getKey(), queryList); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + lock.unlock(); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + return entries; + } + + @Override + public Map getSlice(final List keys, final SliceQuery query, final StoreTransaction txh) throws BackendException { + final Map results = new HashMap<>(keys.size()); + final List remainingKeys = new ArrayList<>(keys.size()); + KeySliceQuery[] ksqs = new KeySliceQuery[keys.size()]; + incActionBy(keys.size(), CacheMetricsAction.RETRIEVAL, txh); + byte[] bytResult = null; + //Find all cached queries + for (int i = 0; i < keys.size(); i++) { + final StaticBuffer key = keys.get(i); + ksqs[i] = new KeySliceQuery(key, query); + EntryList result = null; + if (!isExpired(ksqs[i])) { + bytResult = redisCache.get(ksqs[i]); + result = bytResult != null ? (EntryList) fastConf.asObject(bytResult) : null; + } else ksqs[i] = null; + if (result != null) results.put(key, result); + else remainingKeys.add(key); + } + //Request remaining ones from backend + if (!remainingKeys.isEmpty()) { + incActionBy(remainingKeys.size(), CacheMetricsAction.MISS, txh); + Map subresults = store.getSlice(remainingKeys, query, unwrapTx(txh)); + + for (int i = 0; i < keys.size(); i++) { + StaticBuffer key = keys.get(i); + EntryList subresult = subresults.get(key); + if (subresult != null) { + results.put(key, subresult); + if (ksqs[i] != null) { + logger.info("adding to cache subresult " + subresult); + redisCache.fastPutAsync(ksqs[i], fastConf.asByteArray(subresult)); + RLock lock = redisIndexKeys.getLock(ksqs[i].getKey()); + try { + lock.tryLock(1, 2, TimeUnit.SECONDS); + ArrayList queryList = redisIndexKeys.get(ksqs[i].getKey()); + if (queryList == null) + queryList = new ArrayList<>(); + queryList.add(ksqs[i]); + redisIndexKeys.fastPut(ksqs[i].getKey(), queryList); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + lock.unlock(); + } + } + } + } + } + return results; + } + + @Override + public void clearCache() { + redisCache.clearExpire(); + expiredKeys.clear(); + penaltyCountdown = new CountDownLatch(PENALTY_THRESHOLD); + } + + @Override + public void invalidate(StaticBuffer key, List entries) { + List keySliceQueryList = redisIndexKeys.get(key); + if (keySliceQueryList != null) { + for (KeySliceQuery keySliceQuery : keySliceQueryList) { + if (key.equals(keySliceQuery.getKey())) { + redisCache.fastRemove(keySliceQuery); + } + } + + Preconditions.checkArgument(!hasValidateKeysOnly() || entries.isEmpty()); + expiredKeys.put(key, getExpirationTime()); + if (Math.random() < 1.0 / INVALIDATE_KEY_FRACTION_PENALTY) penaltyCountdown.countDown(); + } + } + + @Override + public void forceClearExpiredCache() { + + } + + @Override + public void close() throws BackendException { + cleanupThread.stopThread(); + super.close(); + } + + private boolean isExpired(final KeySliceQuery query) { + Long until = expiredKeys.get(query.getKey()); + if (until == null) return false; + if (isBeyondExpirationTime(until)) { + expiredKeys.remove(query.getKey(), until); + return false; + } + //We suffer a cache miss, hence decrease the count down + penaltyCountdown.countDown(); + return true; + } + + private long getExpirationTime() { + return System.currentTimeMillis() + cacheTimeMS; + } + + private boolean isBeyondExpirationTime(long until) { + return until < System.currentTimeMillis(); + } + + private long getAge(long until) { + long age = System.currentTimeMillis() - (until - cacheTimeMS); + assert age >= 0; + return age; + } + +private class CleanupThread extends Thread { + + private boolean stop = false; + + public CleanupThread() { + this.setDaemon(true); + this.setName("ExpirationStoreCache-" + getId()); + } + + @Override + public void run() { + while (true) { + if (stop) return; + try { + + penaltyCountdown.await(); + } catch (InterruptedException e) { + if (stop) return; + else throw new RuntimeException("Cleanup thread got interrupted", e); + } + //Do clean up work by invalidating all entries for expired keys + final Map expiredKeysCopy = new HashMap<>(expiredKeys.size()); + for (Map.Entry expKey : expiredKeys.entrySet()) { + if (isBeyondExpirationTime(expKey.getValue())) + expiredKeys.remove(expKey.getKey(), expKey.getValue()); + else if (getAge(expKey.getValue()) >= invalidationGracePeriodMS) + expiredKeysCopy.put(expKey.getKey(), expKey.getValue()); + } + for (KeySliceQuery ksq : redisCache.keySet()) { + if (expiredKeysCopy.containsKey(ksq.getKey())) redisCache.remove(ksq); + } + penaltyCountdown = new CountDownLatch(PENALTY_THRESHOLD); + for (Map.Entry expKey : expiredKeysCopy.entrySet()) { + expiredKeys.remove(expKey.getKey(), expKey.getValue()); + } + } + } + + void stopThread() { + stop = true; + this.interrupt(); + } +} + + +} diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/RedissonCache.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/RedissonCache.java new file mode 100644 index 0000000000..b156595cfb --- /dev/null +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/RedissonCache.java @@ -0,0 +1,55 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +package org.janusgraph.diskstorage.keycolumnvalue.cache; + +import org.janusgraph.diskstorage.configuration.Configuration; +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; + +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.REDIS_CACHE_CONNECTION_MIN_IDLE_SIZE; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.REDIS_CACHE_CONNECTION_POOL_SIZE; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.REDIS_CACHE_CONNECTION_TIME_OUT; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.REDIS_CACHE_HOST; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.REDIS_CACHE_KEEP_ALIVE; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.REDIS_CACHE_PORT; + +public class RedissonCache { + + private static String redisCacheHost; + private static int redisCachePort; + private static int connectionPoolSize; + private static int connectionMinimumIdleSize; + private static int connectTimeout; + private static boolean keepAlive; + + public static RedissonClient getRedissonClient(Configuration configuration) { + + redisCacheHost = configuration.get(REDIS_CACHE_HOST); + redisCachePort = configuration.get(REDIS_CACHE_PORT); + connectionPoolSize = configuration.get(REDIS_CACHE_CONNECTION_POOL_SIZE); + connectionMinimumIdleSize = configuration.get(REDIS_CACHE_CONNECTION_MIN_IDLE_SIZE); + connectTimeout = configuration.get(REDIS_CACHE_CONNECTION_TIME_OUT); + keepAlive = configuration.get(REDIS_CACHE_KEEP_ALIVE); + + Config config = new Config(); + config.useSingleServer().setAddress("redis://" + redisCacheHost + ":" + redisCachePort).setConnectionPoolSize(connectionPoolSize) + .setConnectionMinimumIdleSize(connectionMinimumIdleSize) + .setConnectTimeout(connectTimeout) + .setKeepAlive(keepAlive); + return Redisson.create(config); + } +} diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/EntryArrayList.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/EntryArrayList.java index 778a865bc9..2bfc451873 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/EntryArrayList.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/EntryArrayList.java @@ -17,11 +17,12 @@ import org.janusgraph.diskstorage.Entry; import org.janusgraph.diskstorage.EntryList; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; -public class EntryArrayList extends ArrayList implements EntryList { +public class EntryArrayList extends ArrayList implements EntryList, Serializable { public static final int ENTRY_SIZE_ESTIMATE = 256; diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayBuffer.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayBuffer.java index cce6d40bab..9968b77400 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayBuffer.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayBuffer.java @@ -18,6 +18,7 @@ import org.janusgraph.diskstorage.ReadBuffer; import org.janusgraph.diskstorage.StaticBuffer; +import java.io.Serializable; import java.nio.ByteBuffer; /** @@ -29,7 +30,7 @@ * @author Matthias Broecheler (me@matthiasb.com) */ -public class StaticArrayBuffer implements StaticBuffer { +public class StaticArrayBuffer implements StaticBuffer, Serializable { private final byte[] array; private int offset; diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayEntryList.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayEntryList.java index 0700c00509..b77f8e987a 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayEntryList.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayEntryList.java @@ -23,6 +23,7 @@ import org.janusgraph.graphdb.relations.RelationCache; import org.janusgraph.util.encoding.StringEncoding; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.AbstractList; import java.util.Arrays; @@ -38,7 +39,7 @@ /** * @author Matthias Broecheler (me@matthiasb.com) */ -public class StaticArrayEntryList extends AbstractList implements EntryList { +public class StaticArrayEntryList extends AbstractList implements EntryList, Serializable { /** * All of the entries are stored sequentially in this byte array. The limitAndValuePos array contains the offset and diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java index e2dde93a50..79d3d057e5 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java @@ -449,6 +449,37 @@ public boolean apply(@Nullable String s) { "transaction to independently fetch graph elements from storage before reading/writing them.", ConfigOption.Type.MASKABLE, false); + public static final ConfigOption CACHE_TYPE = new ConfigOption<>(CACHE_NS,"cache-type", + "Enable or disable Redis cache (redis/inmemory)", + ConfigOption.Type.MASKABLE, "redis"); + + public static final ConfigOption REDIS_CACHE_HOST = new ConfigOption<>(CACHE_NS,"redis-cache-host", + "Redis host name", + ConfigOption.Type.MASKABLE, "localhost"); + + public static final ConfigOption REDIS_CACHE_PORT = new ConfigOption<>(CACHE_NS,"redis-cache-port", + "Redis host port", + ConfigOption.Type.MASKABLE, 6379); + + public static final ConfigOption REDIS_CACHE_CONNECTION_POOL_SIZE = new ConfigOption<>(CACHE_NS,"redis-cache-connectionPoolSize", + "Redis connection maximum pool size.", + ConfigOption.Type.MASKABLE, 64); + + public static final ConfigOption REDIS_CACHE_CONNECTION_MIN_IDLE_SIZE = new ConfigOption<>(CACHE_NS,"redis-cache" + + "-connectionMinimumIdleSize", + "Minimum idle Redis connection amount.", + ConfigOption.Type.MASKABLE, 24); + + public static final ConfigOption REDIS_CACHE_CONNECTION_TIME_OUT = new ConfigOption<>(CACHE_NS,"redis-cache" + + "-connectTimeout", + "Timeout during connecting to any Redis server.", + ConfigOption.Type.MASKABLE, 1000); + + public static final ConfigOption REDIS_CACHE_KEEP_ALIVE = new ConfigOption<>(CACHE_NS,"redis-cache" + + "-keepAlive", + "Enables TCP keepAlive for connection.", + ConfigOption.Type.MASKABLE, true); + /** * The size of the database level cache. * If this value is between 0.0 (strictly bigger) and 1.0 (strictly smaller), then it is interpreted as a diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/query/BaseQuery.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/query/BaseQuery.java index 1559c5b88b..43cd89e148 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/query/BaseQuery.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/query/BaseQuery.java @@ -14,12 +14,14 @@ package org.janusgraph.graphdb.query; +import java.io.Serializable; + /** * Standard implementation of {@link Query}. * * @author Matthias Broecheler (me@matthiasb.com) */ -public class BaseQuery implements Query { +public class BaseQuery implements Query, Serializable { private int limit;