Skip to content
Permalink
Browse files

Redis cache extension (#4615)

* Redis cache extension

* Fix some trival and optimize code

* Add Override annotation in RedisCacheTest
  • Loading branch information...
QiuMM authored and fjy committed Aug 8, 2017
1 parent 4dd1e2b commit f18cc5df97e5826c2dd8ffafba9fcb69d10a4d44
@@ -230,6 +230,8 @@
<argument>-c</argument>
<argument>io.druid.extensions.contrib:druid-rabbitmq</argument>
<argument>-c</argument>
<argument>io.druid.extensions.contrib:druid-redis-cache</argument>
<argument>-c</argument>
<argument>io.druid.extensions.contrib:scan-query</argument>
<argument>-c</argument>
<argument>io.druid.extensions.contrib:sqlserver-metadata-storage</argument>
@@ -0,0 +1,70 @@
<?xml version="1.0" encoding="UTF-8"?>

<!--
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. Metamarkets licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>io.druid.extensions.contrib</groupId>
<artifactId>druid-redis-cache</artifactId>
<name>druid-redis-cache</name>

<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.11.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>

<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fiftyonred</groupId>
<artifactId>mock-jedis</artifactId>
<version>0.4.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

@@ -0,0 +1,190 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.client.cache;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.java.util.common.logger.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.exceptions.JedisException;


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

public class RedisCache implements Cache
{
private static final Logger log = new Logger(RedisCache.class);

private JedisPool pool;
private RedisCacheConfig config;

private final AtomicLong hitCount = new AtomicLong(0);
private final AtomicLong missCount = new AtomicLong(0);
private final AtomicLong timeoutCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);

private final AtomicLong priorRequestCount = new AtomicLong(0);
// both get、put and getBulk will increase request count by 1
private final AtomicLong totalRequestCount = new AtomicLong(0);

private RedisCache(JedisPool pool, RedisCacheConfig config)
{
this.pool = pool;
this.config = config;
}

public static RedisCache create(final RedisCacheConfig config)
{
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(config.getMaxTotalConnections());
poolConfig.setMaxIdle(config.getMaxIdleConnections());
poolConfig.setMinIdle(config.getMinIdleConnections());

JedisPool pool = new JedisPool(poolConfig, config.getHost(), config.getPort(), config.getTimeout());
return new RedisCache(pool, config);
}

@Override
public byte[] get(NamedKey key)
{
totalRequestCount.incrementAndGet();

try (Jedis jedis = pool.getResource()) {
byte[] bytes = jedis.get(key.toByteArray());
if (bytes == null) {
missCount.incrementAndGet();
return null;
} else {
hitCount.incrementAndGet();
return bytes;
}
}
catch (JedisException e) {
if (e.getMessage().contains("Read timed out")) {
timeoutCount.incrementAndGet();
} else {
errorCount.incrementAndGet();
}
log.warn(e, "Exception pulling item from cache");
return null;
}
}

@Override
public void put(NamedKey key, byte[] value)
{
totalRequestCount.incrementAndGet();

try (Jedis jedis = pool.getResource()) {
jedis.psetex(key.toByteArray(), config.getExpiration(), value);
}
catch (JedisException e) {
errorCount.incrementAndGet();
log.warn(e, "Exception pushing item to cache");
}
}

@Override
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
{
totalRequestCount.incrementAndGet();

Map<NamedKey, byte[]> results = new HashMap<>();

try (Jedis jedis = pool.getResource()) {
List<NamedKey> namedKeys = Lists.newArrayList(keys);
List<byte[]> byteKeys = Lists.transform(namedKeys, NamedKey::toByteArray);

List<byte[]> byteValues = jedis.mget(byteKeys.toArray(new byte[0][]));

for (int i = 0; i < byteValues.size(); ++i) {
if (byteValues.get(i) != null) {
results.put(namedKeys.get(i), byteValues.get(i));
}
}

hitCount.addAndGet(results.size());
missCount.addAndGet(namedKeys.size() - results.size());
}
catch (JedisException e) {
if (e.getMessage().contains("Read timed out")) {
timeoutCount.incrementAndGet();
} else {
errorCount.incrementAndGet();
}
log.warn(e, "Exception pulling items from cache");
}

return results;
}

@Override
public void close(String namespace)
{
// no resources to cleanup
}

@Override
public CacheStats getStats()
{
return new CacheStats(
hitCount.get(),
missCount.get(),
0,
0,
0,
timeoutCount.get(),
errorCount.get()
);
}

@Override
public boolean isLocal()
{
return false;
}

@Override
public void doMonitor(ServiceEmitter emitter)
{
final long priorCount = priorRequestCount.get();
final long totalCount = totalRequestCount.get();
final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
emitter.emit(builder.build("query/cache/redis/total/requests", totalCount));
emitter.emit(builder.build("query/cache/redis/delta/requests", totalCount - priorCount));
if (!priorRequestCount.compareAndSet(priorCount, totalCount)) {
log.error("Prior value changed while I was reporting! updating anyways");
priorRequestCount.set(totalCount);
}
}

@VisibleForTesting
static RedisCache create(final JedisPool pool, final RedisCacheConfig config)
{
return new RedisCache(pool, config);
}
}
@@ -0,0 +1,86 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.client.cache;

import com.fasterxml.jackson.annotation.JsonProperty;

public class RedisCacheConfig
{
@JsonProperty
private String host;

@JsonProperty
private int port;

// milliseconds, default to one day
@JsonProperty
private long expiration = 24 * 3600 * 1000;

// milliseconds, the type is 'int' because current Jedis only accept 'int' for timeout
@JsonProperty
private int timeout = 2000;

// max connections of redis connection pool
@JsonProperty
private int maxTotalConnections = 8;

// max idle connections of redis connection pool
@JsonProperty
private int maxIdleConnections = 8;

// min idle connections of redis connection pool
@JsonProperty
private int minIdleConnections = 0;

public String getHost()
{
return host;
}

public int getPort()
{
return port;
}

public long getExpiration()
{
return expiration;
}

public int getTimeout()
{
return timeout;
}

public int getMaxTotalConnections()
{
return maxTotalConnections;
}

public int getMaxIdleConnections()
{
return maxIdleConnections;
}

public int getMinIdleConnections()
{
return minIdleConnections;
}
}
@@ -0,0 +1,32 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.client.cache;

import com.fasterxml.jackson.annotation.JsonTypeName;

@JsonTypeName("redis")
public class RedisCacheProvider extends RedisCacheConfig implements CacheProvider
{
@Override
public Cache get()
{
return RedisCache.create(this);
}
}

0 comments on commit f18cc5d

Please sign in to comment.
You can’t perform that action at this time.