Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Added new camel-redis component

git-svn-id: https://svn.apache.org/repos/asf/camel/trunk@1416896 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
commit 71b7339f47893b61e845d064c97a42509a372aa8 1 parent 4ad810e
@bibryam bibryam authored
Showing with 3,998 additions and 0 deletions.
  1. +4 −0 apache-camel/pom.xml
  2. +1 −0  apache-camel/src/main/descriptors/common-bin.xml
  3. +92 −0 components/camel-redis/pom.xml
  4. +35 −0 components/camel-redis/src/main/java/org/apache/camel/component/redis/Command.java
  5. +483 −0 components/camel-redis/src/main/java/org/apache/camel/component/redis/CommandDispatcher.java
  6. +477 −0 components/camel-redis/src/main/java/org/apache/camel/component/redis/RedisClient.java
  7. +47 −0 components/camel-redis/src/main/java/org/apache/camel/component/redis/RedisComponent.java
  8. +154 −0 components/camel-redis/src/main/java/org/apache/camel/component/redis/RedisConfiguration.java
  9. +46 −0 components/camel-redis/src/main/java/org/apache/camel/component/redis/RedisConstants.java
  10. +94 −0 components/camel-redis/src/main/java/org/apache/camel/component/redis/RedisConsumer.java
  11. +55 −0 components/camel-redis/src/main/java/org/apache/camel/component/redis/RedisEndpoint.java
  12. +51 −0 components/camel-redis/src/main/java/org/apache/camel/component/redis/RedisProducer.java
  13. +94 −0 ...el-redis/src/main/java/org/apache/camel/component/redis/processor/idempotent/RedisIdempotentRepository.java
  14. +18 −0 components/camel-redis/src/main/resources/META-INF/services/org/apache/camel/component/redis
  15. +35 −0 components/camel-redis/src/main/resources/log4j.properties
  16. +77 −0 components/camel-redis/src/test/java/org/apache/camel/component/redis/RedisComponentSpringIntegrationTest.java
  17. +96 −0 components/camel-redis/src/test/java/org/apache/camel/component/redis/RedisConnectionTest.java
  18. +78 −0 components/camel-redis/src/test/java/org/apache/camel/component/redis/RedisConsumerIntegrationTest.java
  19. +94 −0 components/camel-redis/src/test/java/org/apache/camel/component/redis/RedisConsumerTest.java
  20. +220 −0 components/camel-redis/src/test/java/org/apache/camel/component/redis/RedisHashTest.java
  21. +248 −0 components/camel-redis/src/test/java/org/apache/camel/component/redis/RedisKeyTest.java
  22. +285 −0 components/camel-redis/src/test/java/org/apache/camel/component/redis/RedisListTest.java
  23. +61 −0 components/camel-redis/src/test/java/org/apache/camel/component/redis/RedisProducerIntegrationTest.java
  24. +261 −0 components/camel-redis/src/test/java/org/apache/camel/component/redis/RedisSetTest.java
  25. +306 −0 components/camel-redis/src/test/java/org/apache/camel/component/redis/RedisSortedSetTest.java
  26. +319 −0 components/camel-redis/src/test/java/org/apache/camel/component/redis/RedisStringTest.java
  27. +56 −0 components/camel-redis/src/test/java/org/apache/camel/component/redis/RedisTestSupport.java
  28. +82 −0 components/camel-redis/src/test/java/org/apache/camel/component/redis/RedisTransactionTest.java
  29. +68 −0 ...edis/src/test/java/org/apache/camel/component/redis/processor/idempotent/RedisIdempotentRepositoryTest.java
  30. +44 −0 components/camel-redis/src/test/resources/RedisComponentSpringTest-context.xml
  31. +1 −0  components/pom.xml
  32. +6 −0 parent/pom.xml
  33. +5 −0 platforms/karaf/features/src/main/resources/features.xml
  34. +5 −0 tests/camel-itest/pom.xml
View
4 apache-camel/pom.xml
@@ -400,6 +400,10 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-redis</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-restlet</artifactId>
</dependency>
<dependency>
View
1  apache-camel/src/main/descriptors/common-bin.xml
@@ -121,6 +121,7 @@
<include>org.apache.camel:camel-protobuf</include>
<include>org.apache.camel:camel-quartz</include>
<include>org.apache.camel:camel-quickfix</include>
+ <include>org.apache.camel:camel-redis</include>
<include>org.apache.camel:camel-restlet</include>
<include>org.apache.camel:camel-rmi</include>
<include>org.apache.camel:camel-routebox</include>
View
92 components/camel-redis/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>components</artifactId>
+ <version>2.11-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-redis</artifactId>
+ <packaging>bundle</packaging>
+ <name>Camel :: Redis</name>
+ <description>Camel Redis Component</description>
+
+ <properties>
+ <camel.osgi.export.pkg>
+ org.apache.camel.component.redis.*
+ </camel.osgi.export.pkg>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.data</groupId>
+ <artifactId>spring-data-redis</artifactId>
+ <version>${spring-data-redis-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-spring</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
View
35 components/camel-redis/src/main/java/org/apache/camel/component/redis/Command.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.redis;
+
+public enum Command {
+ PING, SET, GET, QUIT, EXISTS, DEL, TYPE, FLUSHDB, KEYS, RANDOMKEY,
+ RENAME, RENAMENX, RENAMEX, DBSIZE, EXPIRE, EXPIREAT, TTL, SELECT,
+ MOVE, FLUSHALL, GETSET, MGET, SETNX, SETEX, MSET, MSETNX, DECRBY,
+ DECR, INCRBY, INCR, APPEND, SUBSTR, HSET, HGET, HSETNX, HMSET, HMGET,
+ HINCRBY, HEXISTS, HDEL, HLEN, HKEYS, HVALS, HGETALL, RPUSH, LPUSH,
+ LLEN, LRANGE, LTRIM, LINDEX, LSET, LREM, LPOP, RPOP, RPOPLPUSH, SADD,
+ SMEMBERS, SREM, SPOP, SMOVE, SCARD, SISMEMBER, SINTER, SINTERSTORE,
+ SUNION, SUNIONSTORE, SDIFF, SDIFFSTORE, SRANDMEMBER, ZADD, ZRANGE,
+ ZREM, ZINCRBY, ZRANK, ZREVRANK, ZREVRANGE, ZCARD, ZSCORE, MULTI, DISCARD,
+ EXEC, WATCH, UNWATCH, SORT, BLPOP, BRPOP, AUTH, SUBSCRIBE, PUBLISH,
+ UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, ZCOUNT, ZRANGEBYSCORE, ZREVRANGEBYSCORE,
+ ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZUNIONSTORE, ZINTERSTORE, SAVE, BGSAVE,
+ BGREWRITEAOF, LASTSAVE, SHUTDOWN, INFO, MONITOR, SLAVEOF, CONFIG, STRLEN,
+ SYNC, LPUSHX, PERSIST, RPUSHX, ECHO, LINSERT, DEBUG, BRPOPLPUSH, SETBIT,
+ GETBIT, SETRANGE, GETRANGE, PEXPIRE, PEXPIREAT;
+}
View
483 components/camel-redis/src/main/java/org/apache/camel/component/redis/CommandDispatcher.java
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.redis;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.RuntimeExchangeException;
+
+public class CommandDispatcher {
+ private final RedisConfiguration configuration;
+ private final Exchange exchange;
+
+ public CommandDispatcher(RedisConfiguration configuration, Exchange exchange) {
+ this.configuration = configuration;
+ this.exchange = exchange;
+ }
+
+ public void execute(final RedisClient redisClient) {
+ switch (determineCommand()) {
+
+ case PING:
+ setResult(redisClient.ping());
+ break;
+ case SET:
+ redisClient.set(getKey(), getValue());
+ break;
+ case GET:
+ setResult(redisClient.get(getKey()));
+ break;
+ case QUIT:
+ redisClient.quit();
+ break;
+ case EXISTS:
+ setResult(redisClient.exists(getKey()));
+ break;
+ case DEL:
+ redisClient.del(getKeys());
+ break;
+ case TYPE:
+ setResult(redisClient.type(getKey()));
+ break;
+ case KEYS:
+ setResult(redisClient.keys(getPattern()));
+ break;
+ case RANDOMKEY:
+ setResult(redisClient.randomkey());
+ break;
+ case RENAME:
+ redisClient.rename(getKey(), getStringValue());
+ break;
+ case RENAMENX:
+ setResult(redisClient.renamenx(getKey(), getStringValue()));
+ break;
+ case EXPIRE:
+ setResult(redisClient.expire(getKey(), getTimeout()));
+ break;
+ case EXPIREAT:
+ setResult(redisClient.expireat(getKey(), getTimestamp()));
+ break;
+ case PEXPIRE:
+ setResult(redisClient.pexpire(getKey(), getTimeout()));
+ break;
+ case PEXPIREAT:
+ setResult(redisClient.pexpireat(getKey(), getTimestamp()));
+ break;
+ case TTL:
+ setResult(redisClient.ttl(getKey()));
+ break;
+ case MOVE:
+ setResult(redisClient.move(getKey(), getDb()));
+ break;
+ case GETSET:
+ setResult(redisClient.getset(getKey(), getValue()));
+ break;
+ case MGET:
+ setResult(redisClient.mget(getFields()));
+ break;
+ case SETNX:
+ setResult(redisClient.setnx(getKey(), getValue()));
+ break;
+ case SETEX:
+ redisClient.setex(getKey(), getValue(), getTimeout(), TimeUnit.SECONDS);
+ break;
+ case MSET:
+ redisClient.mset(getValuesAsMap());
+ break;
+ case MSETNX:
+ redisClient.msetnx(getValuesAsMap());
+ break;
+ case DECRBY:
+ setResult(redisClient.decrby(getKey(), getLongValue()));
+ break;
+ case DECR:
+ setResult(redisClient.decr(getKey()));
+ break;
+ case INCRBY:
+ setResult(redisClient.incrby(getKey(), getLongValue()));
+ break;
+ case INCR:
+ setResult(redisClient.incr(getKey()));
+ break;
+ case APPEND:
+ setResult(redisClient.append(getKey(), getStringValue()));
+ break;
+ case HSET:
+ redisClient.hset(getKey(), getField(), getValue());
+ break;
+ case HGET:
+ setResult(redisClient.hget(getKey(), getField()));
+ break;
+ case HSETNX:
+ setResult(redisClient.hsetnx(getKey(), getField(), getValue()));
+ break;
+ case HMSET:
+ redisClient.hmset(getKey(), getValuesAsMap());
+ break;
+ case HMGET:
+ setResult(redisClient.hmget(getKey(), getFields()));
+ break;
+ case HINCRBY:
+ setResult(redisClient.hincrBy(getKey(), getField(), getValueAsLong()));
+ break;
+ case HEXISTS:
+ setResult(redisClient.hexists(getKey(), getField()));
+ break;
+ case HDEL:
+ redisClient.hdel(getKey(), getField());
+ break;
+ case HLEN:
+ setResult(redisClient.hlen(getKey()));
+ break;
+ case HKEYS:
+ setResult(redisClient.hkeys(getKey()));
+ break;
+ case HVALS:
+ setResult(redisClient.hvals(getKey()));
+ break;
+ case HGETALL:
+ setResult(redisClient.hgetAll(getKey()));
+ break;
+ case RPUSH:
+ setResult(redisClient.rpush(getKey(), getValue()));
+ break;
+ case LPUSH:
+ setResult(redisClient.lpush(getKey(), getValue()));
+ break;
+ case LLEN:
+ setResult(redisClient.llen(getKey()));
+ break;
+ case LRANGE:
+ setResult(redisClient.lrange(getKey(), getStart(), getEnd()));
+ break;
+ case LTRIM:
+ redisClient.ltrim(getKey(), getStart(), getEnd());
+ break;
+ case LINDEX:
+ setResult(redisClient.lindex(getKey(), getIndex()));
+ break;
+ case LSET:
+ redisClient.lset(getKey(), getValue(), getIndex());
+ break;
+ case LREM:
+ setResult(redisClient.lrem(getKey(), getValue(), getCount()));
+ break;
+ case LPOP:
+ setResult(redisClient.lpop(getKey()));
+ break;
+ case RPOP:
+ setResult(redisClient.rpop(getKey()));
+ break;
+ case RPOPLPUSH:
+ setResult(redisClient.rpoplpush(getKey(), getDestination()));
+ break;
+ case SADD:
+ setResult(redisClient.sadd(getKey(), getValue()));
+ break;
+ case SMEMBERS:
+ setResult(redisClient.smembers(getKey()));
+ break;
+ case SREM:
+ setResult(redisClient.srem(getKey(), getValue()));
+ break;
+ case SPOP:
+ setResult(redisClient.spop(getKey()));
+ break;
+ case SMOVE:
+ setResult(redisClient.smove(getKey(), getValue(), getDestination()));
+ break;
+ case SCARD:
+ setResult(redisClient.scard(getKey()));
+ break;
+ case SISMEMBER:
+ setResult(redisClient.sismember(getKey(), getValue()));
+ break;
+ case SINTER:
+ setResult(redisClient.sinter(getKey(), getKeys()));
+ break;
+ case SINTERSTORE:
+ redisClient.sinterstore(getKey(), getKeys(), getDestination());
+ break;
+ case SUNION:
+ setResult(redisClient.sunion(getKey(), getKeys()));
+ break;
+ case SUNIONSTORE:
+ redisClient.sunionstore(getKey(), getKeys(), getDestination());
+ break;
+ case SDIFF:
+ setResult(redisClient.sdiff(getKey(), getKeys()));
+ break;
+ case SDIFFSTORE:
+ redisClient.sdiffstore(getKey(), getKeys(), getDestination());
+ break;
+ case SRANDMEMBER:
+ setResult(redisClient.srandmember(getKey()));
+ break;
+ case ZADD:
+ setResult(redisClient.zadd(getKey(), getValue(), getScore()));
+ break;
+ case ZRANGE:
+ setResult(redisClient.zrange(getKey(), getStart(), getEnd(), getWithScore()));
+ break;
+ case ZREM:
+ setResult(redisClient.zrem(getKey(), getValue()));
+ break;
+ case ZINCRBY:
+ setResult(redisClient.zincrby(getKey(), getValue(), getIncrement()));
+ break;
+ case ZRANK:
+ setResult(redisClient.zrank(getKey(), getValue()));
+ break;
+ case ZREVRANK:
+ setResult(redisClient.zrevrank(getKey(), getValue()));
+ break;
+ case ZREVRANGE:
+ setResult(redisClient.zrevrange(getKey(), getStart(), getEnd(), getWithScore()));
+ break;
+ case ZCARD:
+ setResult(redisClient.zcard(getKey()));
+ break;
+ case MULTI:
+ redisClient.multi();
+ break;
+ case DISCARD:
+ redisClient.discard();
+ break;
+ case EXEC:
+ redisClient.exec();
+ break;
+ case WATCH:
+ redisClient.watch(getKeys());
+ break;
+ case UNWATCH:
+ redisClient.unwatch();
+ break;
+ case SORT:
+ setResult(redisClient.sort(getKey()));
+ break;
+ case BLPOP:
+ setResult(redisClient.blpop(getKey(), getTimeout()));
+ break;
+ case BRPOP:
+ setResult(redisClient.brpop(getKey(), getTimeout()));
+ break;
+ case PUBLISH:
+ redisClient.publish(getChannel(), getMessage());
+ break;
+ case ZCOUNT:
+ setResult(redisClient.zcount(getKey(), getMin(), getMax()));
+ break;
+ case ZRANGEBYSCORE:
+ setResult(redisClient.zrangebyscore(getKey(), getMin(), getMax()));
+ break;
+ case ZREVRANGEBYSCORE:
+ setResult(redisClient.zrevrangebyscore(getKey(), getMin(), getMax()));
+ break;
+ case ZREMRANGEBYRANK:
+ redisClient.zremrangebyrank(getKey(), getStart(), getEnd());
+ break;
+ case ZREMRANGEBYSCORE:
+ redisClient.zremrangebyscore(getKey(), getStart(), getEnd());
+ break;
+ case ZUNIONSTORE:
+ redisClient.zunionstore(getKey(), getKeys(), getDestination());
+ break;
+ case ZINTERSTORE:
+ redisClient.zinterstore(getKey(), getKeys(), getDestination());
+ break;
+ case STRLEN:
+ setResult(redisClient.strlen(getKey()));
+ break;
+ case PERSIST:
+ setResult(redisClient.persist(getKey()));
+ break;
+ case RPUSHX:
+ setResult(redisClient.rpushx(getKey(), getValue()));
+ break;
+ case ECHO:
+ setResult(redisClient.echo(getStringValue()));
+ break;
+ case LINSERT:
+ setResult(redisClient.linsert(getKey(), getValue(), getPivot(), getPosition()));
+ break;
+ case BRPOPLPUSH:
+ setResult(redisClient.brpoplpush(getKey(), getDestination(), getTimeout()));
+ break;
+ case SETBIT:
+ redisClient.setbit(getKey(), getOffset(), getBooleanValue());
+ break;
+ case GETBIT:
+ setResult(redisClient.getbit(getKey(), getOffset()));
+ break;
+ case SETRANGE:
+ redisClient.setex(getKey(), getValue(), getOffset());
+ break;
+ case GETRANGE:
+ setResult(redisClient.getrange(getKey(), getStart(), getEnd()));
+ break;
+ default:
+ throw new RuntimeExchangeException("Unsupported command", exchange);
+ }
+ }
+
+ private Command determineCommand() {
+ String command = exchange.getIn().getHeader(RedisConstants.COMMAND, String.class);
+ if (command == null) {
+ command = configuration.getCommand();
+ }
+ if (command == null) {
+ return Command.SET;
+ }
+ return Command.valueOf(command);
+ }
+
+ private static <T> T getInHeaderValue(Exchange exchange, String key, Class<T> aClass) {
+ return exchange.getIn().getHeader(key, aClass);
+ }
+
+ private void setResult(Object result) {
+ Message message;
+ if (exchange.getPattern().isOutCapable()) {
+ message = exchange.getOut();
+ message.copyFrom(exchange.getIn());
+ } else {
+ message = exchange.getIn();
+ }
+ message.setBody(result);
+ }
+
+ public String getDestination() {
+ return getInHeaderValue(exchange, RedisConstants.DESTINATION, String.class);
+ }
+
+ private String getChannel() {
+ return getInHeaderValue(exchange, RedisConstants.CHANNEL, String.class);
+ }
+
+ private Object getMessage() {
+ return getInHeaderValue(exchange, RedisConstants.MESSAGE, Object.class);
+ }
+
+ public Long getIndex() {
+ return getInHeaderValue(exchange, RedisConstants.INDEX, Long.class);
+ }
+
+ public String getPivot() {
+ return getInHeaderValue(exchange, RedisConstants.PIVOT, String.class);
+ }
+
+ public String getPosition() {
+ return getInHeaderValue(exchange, RedisConstants.POSITION, String.class);
+ }
+
+ public Long getCount() {
+ return getInHeaderValue(exchange, RedisConstants.COUNT, Long.class);
+ }
+
+ private Long getStart() {
+ return getInHeaderValue(exchange, RedisConstants.START, Long.class);
+ }
+
+ private Long getEnd() {
+ return getInHeaderValue(exchange, RedisConstants.END, Long.class);
+ }
+
+ private Long getTimeout() {
+ return getInHeaderValue(exchange, RedisConstants.TIMEOUT, Long.class);
+ }
+
+ private Long getOffset() {
+ return getInHeaderValue(exchange, RedisConstants.OFFSET, Long.class);
+ }
+
+ private Long getValueAsLong() {
+ return getInHeaderValue(exchange, RedisConstants.VALUE, Long.class);
+ }
+
+ private Collection<String> getFields() {
+ return getInHeaderValue(exchange, RedisConstants.FIELDS, Collection.class);
+ }
+
+ private Map<String, Object> getValuesAsMap() {
+ return getInHeaderValue(exchange, RedisConstants.VALUES, new HashMap<String, Object>().getClass());
+ }
+
+ private String getKey() {
+ return getInHeaderValue(exchange, RedisConstants.KEY, String.class);
+ }
+
+ public Collection<String> getKeys() {
+ return getInHeaderValue(exchange, RedisConstants.KEYS, Collection.class);
+ }
+
+ private Object getValue() {
+ return getInHeaderValue(exchange, RedisConstants.VALUE, Object.class);
+ }
+
+ private String getStringValue() {
+ return getInHeaderValue(exchange, RedisConstants.VALUE, String.class);
+ }
+
+ private Long getLongValue() {
+ return getInHeaderValue(exchange, RedisConstants.VALUE, Long.class);
+ }
+
+ private Boolean getBooleanValue() {
+ return getInHeaderValue(exchange, RedisConstants.VALUE, Boolean.class);
+ }
+
+ private String getField() {
+ return getInHeaderValue(exchange, RedisConstants.FIELD, String.class);
+ }
+
+ public Long getTimestamp() {
+ return getInHeaderValue(exchange, RedisConstants.TIMESTAMP, Long.class);
+ }
+
+ public String getPattern() {
+ return getInHeaderValue(exchange, RedisConstants.PATTERN, String.class);
+ }
+
+ public Integer getDb() {
+ return getInHeaderValue(exchange, RedisConstants.DB, Integer.class);
+ }
+
+ public Double getScore() {
+ return getInHeaderValue(exchange, RedisConstants.SCORE, Double.class);
+ }
+
+ public Double getMin() {
+ return getInHeaderValue(exchange, RedisConstants.MIN, Double.class);
+ }
+
+ public Double getMax() {
+ return getInHeaderValue(exchange, RedisConstants.MAX, Double.class);
+ }
+
+ public Double getIncrement() {
+ return getInHeaderValue(exchange, RedisConstants.INCREMENT, Double.class);
+ }
+
+ public Boolean getWithScore() {
+ return getInHeaderValue(exchange, RedisConstants.WITHSCORE, Boolean.class);
+ }
+}
View
477 components/camel-redis/src/main/java/org/apache/camel/component/redis/RedisClient.java
@@ -0,0 +1,477 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.redis;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.springframework.dao.DataAccessException;
+import org.springframework.data.redis.connection.DataType;
+import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.core.RedisCallback;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.query.SortQuery;
+import org.springframework.data.redis.core.query.SortQueryBuilder;
+
+public class RedisClient {
+ private final RedisTemplate<String, Object> redisTemplate;
+
+ public RedisClient(RedisTemplate<String, Object> redisTemplate) {
+ this.redisTemplate = redisTemplate;
+ }
+
+ public void set(String key, Object value) {
+ redisTemplate.opsForValue().set(key, value);
+ }
+
+ public void hmset(String key, Map<String, Object> param) {
+ redisTemplate.opsForHash().putAll(key, param);
+ }
+
+ public Collection<Object> hmget(String key, Collection<String> fields) {
+ return redisTemplate.<String, Object>opsForHash().multiGet(key, fields);
+ }
+
+ public Set<String> hkeys(String key) {
+ return redisTemplate.<String, Object>opsForHash().keys(key);
+ }
+
+ public Long hlen(String key) {
+ return redisTemplate.<String, Object>opsForHash().size(key);
+ }
+
+ public Long hincrBy(String key, String field, Long value) {
+ return redisTemplate.<String, Object>opsForHash().increment(key, field, value);
+ }
+
+ public Map<String, Object> hgetAll(String key) {
+ return redisTemplate.<String, Object>opsForHash().entries(key);
+ }
+
+ public Boolean hexists(String key, String field) {
+ return redisTemplate.<String, Object>opsForHash().hasKey(key, field);
+ }
+
+ public Object hget(String key, String field) {
+ return redisTemplate.<String, Object>opsForHash().get(key, field);
+ }
+
+ public void hdel(String key, String field) {
+ redisTemplate.<String, Object>opsForHash().delete(key, field);
+ }
+
+ public void hset(String key, String field, Object value) {
+ redisTemplate.<String, Object>opsForHash().put(key, field, value);
+ }
+
+ public void quit() {
+ redisTemplate.execute(new RedisCallback<Object>() {
+ @Override
+ public Object doInRedis(RedisConnection connection) throws DataAccessException {
+ connection.close();
+ return null;
+ }
+ });
+ }
+
+ public Object get(String key) {
+ return redisTemplate.opsForValue().get(key);
+ }
+
+ public Collection<Object> hvals(String key) {
+ return redisTemplate.<String, Object>opsForHash().values(key);
+ }
+
+ public Boolean hsetnx(String key, String field, Object value) {
+ return redisTemplate.<String, Object>opsForHash().putIfAbsent(key, field, value);
+ }
+
+ public Long decr(String key) {
+ return redisTemplate.opsForValue().increment(key, -1L);
+ }
+
+ public Long decrby(String key, Long value) {
+ return redisTemplate.opsForValue().increment(key, -value);
+ }
+
+ public Long incr(String key) {
+ return redisTemplate.opsForValue().increment(key, 1L);
+ }
+
+ public Long incrby(String key, Long value) {
+ return redisTemplate.opsForValue().increment(key, value);
+ }
+
+ public String getrange(String key, Long start, Long end) {
+ return redisTemplate.opsForValue().get(key, start, end);
+ }
+
+ public Long strlen(String key) {
+ return redisTemplate.opsForValue().size(key);
+ }
+
+ public List<Object> mget(Collection<String> fields) {
+ return redisTemplate.opsForValue().multiGet(fields);
+ }
+
+ public void mset(Map<String, Object> map) {
+ redisTemplate.opsForValue().multiSet(map);
+ }
+
+ public void msetnx(Map<String, Object> map) {
+ redisTemplate.opsForValue().multiSetIfAbsent(map);
+ }
+
+ public Object getset(String key, Object value) {
+ return redisTemplate.opsForValue().getAndSet(key, value);
+ }
+
+ public Boolean setnx(String key, Object value) {
+ return redisTemplate.opsForValue().setIfAbsent(key, value);
+ }
+
+ public void setex(String key, Object value, Long timeout, TimeUnit timeUnit) {
+ redisTemplate.opsForValue().set(key, value, timeout, timeUnit);
+ }
+
+ public void setex(String key, Object value, Long offset) {
+ redisTemplate.opsForValue().set(key, value, offset);
+ }
+
+ public void setbit(final String key, final Long offset, final Boolean value) {
+ redisTemplate.execute(new RedisCallback<Object>() {
+ @Override
+ public Object doInRedis(RedisConnection connection) throws DataAccessException {
+ connection.setBit(key.getBytes(), offset, value);
+ return null;
+ }
+ });
+ }
+
+ public Boolean getbit(final String key, final Long offset) {
+ return redisTemplate.execute(new RedisCallback<Boolean>() {
+ @Override
+ public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
+ return connection.getBit(key.getBytes(), offset);
+ }
+ });
+ }
+
+ public Integer append(String key, String value) {
+ return redisTemplate.opsForValue().append(key, value);
+ }
+
+ public void multi() {
+ redisTemplate.multi();
+ }
+
+ public void unwatch() {
+ redisTemplate.unwatch();
+ }
+
+ public void discard() {
+ redisTemplate.discard();
+ }
+
+ public void exec() {
+ redisTemplate.exec();
+ }
+
+ public void watch(Collection<String> keys) {
+ redisTemplate.watch(keys);
+ }
+
+ public Boolean sadd(String key, Object value) {
+ return redisTemplate.opsForSet().add(key, value);
+ }
+
+ public Long scard(String key) {
+ return redisTemplate.opsForSet().size(key);
+ }
+
+ public Set<Object> sdiff(String key, Collection<String> keys) {
+ return redisTemplate.opsForSet().difference(key, keys);
+ }
+
+ public void sdiffstore(String key, Collection<String> keys, String destinations) {
+ redisTemplate.opsForSet().differenceAndStore(key, keys, destinations);
+ }
+
+ public Set<Object> sinter(String key, Collection<String> keys) {
+ return redisTemplate.opsForSet().intersect(key, keys);
+ }
+
+ public void sinterstore(String key, Collection<String> keys, String destination) {
+ redisTemplate.opsForSet().intersectAndStore(key, keys, destination);
+ }
+
+ public Boolean sismember(String key, Object value) {
+ return redisTemplate.opsForSet().isMember(key, value);
+ }
+
+ public Set<Object> smembers(String key) {
+ return redisTemplate.opsForSet().members(key);
+ }
+
+ public Boolean smove(String key, Object value, String destination) {
+ return redisTemplate.opsForSet().move(key, value, destination);
+ }
+
+ public Object spop(String key) {
+ return redisTemplate.opsForSet().pop(key);
+ }
+
+ public Object srandmember(String key) {
+ return redisTemplate.opsForSet().randomMember(key);
+ }
+
+ public Boolean srem(String key, Object value) {
+ return redisTemplate.opsForSet().remove(key, value);
+ }
+
+ public Set<Object> sunion(String key, Collection<String> keys) {
+ return redisTemplate.opsForSet().union(key, keys);
+ }
+
+ public void sunionstore(String key, Collection<String> keys, String destination) {
+ redisTemplate.opsForSet().unionAndStore(key, keys, destination);
+ }
+
+ public String echo(final String value) {
+ return redisTemplate.execute(new RedisCallback<String>() {
+ @Override
+ public String doInRedis(RedisConnection connection) throws DataAccessException {
+ return new String(connection.echo(value.getBytes()));
+ }
+ });
+ }
+
+ public String ping() {
+ return redisTemplate.execute(new RedisCallback<String>() {
+ @Override
+ public String doInRedis(RedisConnection connection) throws DataAccessException {
+ return connection.ping();
+ }
+ });
+ }
+
+ public void publish(String channel, Object message) {
+ redisTemplate.convertAndSend(channel, message);
+ }
+
+ public Object lpop(String key) {
+ return redisTemplate.opsForList().leftPop(key);
+ }
+
+ public Object blpop(String key, Long timeout) {
+ return redisTemplate.opsForList().leftPop(key, timeout, TimeUnit.SECONDS);
+ }
+
+ public Object brpoplpush(String key, String destination, Long timeout) {
+ return redisTemplate.opsForList().rightPopAndLeftPush(key, destination, timeout, TimeUnit.SECONDS);
+ }
+
+ public Object rpoplpush(String key, String destination) {
+ return redisTemplate.opsForList().rightPopAndLeftPush(key, destination);
+ }
+
+ public Object lindex(String key, Long index) {
+ return redisTemplate.opsForList().index(key, index);
+ }
+
+ public Long linsert(String key, Object value, String pivot, String position) {
+ if ("BEFORE".equals(position)) {
+ return redisTemplate.opsForList().leftPush(key, pivot, value);
+ } else if ("AFTER".equals(position)) {
+ return redisTemplate.opsForList().rightPush(key, pivot, value);
+ } else {
+ throw new IllegalArgumentException("Wrong position: " + position);
+ }
+ }
+
+ public Object rpop(String key) {
+ return redisTemplate.opsForList().rightPop(key);
+ }
+
+ public Object brpop(String key, Long timeout) {
+ return redisTemplate.opsForList().rightPop(key, timeout, TimeUnit.SECONDS);
+ }
+
+ public Long llen(String key) {
+ return redisTemplate.opsForList().size(key);
+ }
+
+ public List<Object> lrange(String key, Long start, Long end) {
+ return redisTemplate.opsForList().range(key, start, end);
+ }
+
+ public Long lrem(String key, Object value, Long count) {
+ return redisTemplate.opsForList().remove(key, count, value);
+ }
+
+ public void lset(String key, Object value, Long index) {
+ redisTemplate.opsForList().set(key, index, value);
+ }
+
+ public void ltrim(String key, Long start, Long end) {
+ redisTemplate.opsForList().trim(key, start, end);
+ }
+
+ public Long rpush(String key, Object value) {
+ return redisTemplate.opsForList().rightPush(key, value);
+ }
+
+ public Long rpushx(String key, Object value) {
+ return redisTemplate.opsForList().rightPushIfPresent(key, value);
+ }
+
+ public Long lpush(String key, Object value) {
+ return redisTemplate.opsForList().leftPush(key, value);
+ }
+
+ public void del(Collection<String> keys) {
+ redisTemplate.delete(keys);
+ }
+
+ public Boolean exists(String key) {
+ return redisTemplate.hasKey(key);
+ }
+
+ public Boolean expire(String key, Long timeout) {
+ return redisTemplate.expire(key, timeout, TimeUnit.SECONDS);
+ }
+
+ public Boolean expireat(String key, Long seconds) {
+ return redisTemplate.expireAt(key, new Date(seconds * 1000L));
+ }
+
+ public Collection<String> keys(String pattern) {
+ return redisTemplate.keys(pattern);
+ }
+
+ public Boolean move(String key, Integer db) {
+ return redisTemplate.move(key, db);
+ }
+
+ public Boolean persist(String key) {
+ return redisTemplate.persist(key);
+ }
+
+ public Boolean pexpire(String key, Long timeout) {
+ return redisTemplate.expire(key, timeout, TimeUnit.MILLISECONDS);
+ }
+
+ public Boolean pexpireat(String key, Long millis) {
+ return redisTemplate.expireAt(key, new Date(millis));
+ }
+
+ public String randomkey() {
+ return redisTemplate.randomKey();
+ }
+
+ public void rename(String key, String value) {
+ redisTemplate.rename(key, value);
+ }
+
+ public Boolean renamenx(String key, String value) {
+ return redisTemplate.renameIfAbsent(key, value);
+ }
+
+ public Long ttl(String key) {
+ return redisTemplate.getExpire(key);
+ }
+
+ public DataType type(String key) {
+ return redisTemplate.type(key);
+ }
+
+ public List<Object> sort(String key) {
+ SortQuery<String> sortQuery = SortQueryBuilder.sort(key).build();
+ return redisTemplate.sort(sortQuery);
+ }
+
+ public Boolean zadd(String key, Object value, Double score) {
+ return redisTemplate.opsForZSet().add(key, value, score);
+ }
+
+ public Long zcard(String key) {
+ return redisTemplate.opsForZSet().size(key);
+ }
+
+ public Long zcount(String key, Double min, Double max) {
+ return redisTemplate.opsForZSet().count(key, min, max);
+ }
+
+ public Double zincrby(String key, Object value, Double increment) {
+ return redisTemplate.opsForZSet().incrementScore(key, value, increment);
+ }
+
+ public void zinterstore(String key, Collection<String> keys, String destination) {
+ redisTemplate.opsForZSet().intersectAndStore(key, keys, destination);
+ }
+
+ public Object zrange(String key, Long start, Long end, Boolean withScore) {
+ if (withScore != null && withScore) {
+ return redisTemplate.opsForZSet().rangeWithScores(key, start, end);
+ }
+ return redisTemplate.opsForZSet().range(key, start, end);
+ }
+
+ public Set<Object> zrangebyscore(String key, Double min, Double max) {
+ return redisTemplate.opsForZSet().rangeByScore(key, min, max);
+ }
+
+ public Long zrank(String key, Object value) {
+ return redisTemplate.opsForZSet().rank(key, value);
+ }
+
+ public Boolean zrem(String key, Object value) {
+ return redisTemplate.opsForZSet().remove(key, value);
+ }
+
+ public void zremrangebyrank(String key, Long start, Long end) {
+ redisTemplate.opsForZSet().removeRange(key, start, end);
+ }
+
+ public void zremrangebyscore(String key, Long start, Long end) {
+ redisTemplate.opsForZSet().removeRangeByScore(key, start, end);
+ }
+
+ public Object zrevrange(String key, Long start, Long end, Boolean withScore) {
+ if (withScore != null && withScore) {
+ return redisTemplate.opsForZSet().reverseRangeWithScores(key, start, end);
+ }
+
+ return redisTemplate.opsForZSet().reverseRange(key, start, end);
+ }
+
+ public Set<Object> zrevrangebyscore(String key, Double min, Double max) {
+ return redisTemplate.opsForZSet().reverseRangeByScore(key, min, max);
+ }
+
+ public Long zrevrank(String key, Object value) {
+ return redisTemplate.opsForZSet().reverseRank(key, value);
+ }
+
+ public void zunionstore(String key, Collection<String> keys, String destination) {
+ redisTemplate.opsForZSet().unionAndStore(key, keys, destination);
+ }
+}
View
47 components/camel-redis/src/main/java/org/apache/camel/component/redis/RedisComponent.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.redis;
+
+import java.util.Map;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+
+/**
+ * Represents the component that manages {@link RedisEndpoint}.
+ */
+public class RedisComponent extends DefaultComponent {
+
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters)
+ throws Exception {
+ RedisConfiguration configuration = new RedisConfiguration();
+ setHostAndPort(configuration, remaining);
+ setProperties(configuration, parameters);
+ RedisEndpoint endpoint = new RedisEndpoint(uri, this, configuration);
+ return endpoint;
+ }
+
+ private void setHostAndPort(RedisConfiguration configuration, String remaining) {
+ String[] hostAndPort = remaining.split(":");
+ if (hostAndPort.length > 0 && hostAndPort[0].length() > 0) {
+ configuration.setHost(hostAndPort[0]);
+ }
+ if (hostAndPort.length > 1 && hostAndPort[1].length() > 0) {
+ configuration.setPort(Integer.parseInt(hostAndPort[1]));
+ }
+ }
+}
View
154 components/camel-redis/src/main/java/org/apache/camel/component/redis/RedisConfiguration.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.redis;
+
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
+import org.springframework.data.redis.serializer.RedisSerializer;
+
+public class RedisConfiguration {
+ private String command;
+ private String channels;
+ private Integer timeout;
+ private String host;
+ private Integer port;
+ private RedisTemplate redisTemplate;
+ private RedisMessageListenerContainer listenerContainer;
+ private RedisConnectionFactory connectionFactory;
+ private RedisSerializer serializer;
+ private boolean managedListenerContainer;
+ private boolean managedConnectionFactory;
+
+ public String getCommand() {
+ return command;
+ }
+
+ public void setCommand(String command) {
+ this.command = command;
+ }
+
+ public Integer getPort() {
+ return port;
+ }
+
+ public void setPort(Integer port) {
+ this.port = port;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public Integer getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(Integer timeout) {
+ this.timeout = timeout;
+ }
+
+ public RedisTemplate getRedisTemplate() {
+ return redisTemplate != null ? redisTemplate : createDefaultTemplate();
+ }
+
+ public void setRedisTemplate(RedisTemplate redisTemplate) {
+ this.redisTemplate = redisTemplate;
+ }
+
+ public RedisMessageListenerContainer getListenerContainer() {
+ return listenerContainer != null ? listenerContainer : createDefaultListenerContainer();
+ }
+
+ public void setListenerContainer(RedisMessageListenerContainer listenerContainer) {
+ this.listenerContainer = listenerContainer;
+ }
+
+ public String getChannels() {
+ return channels;
+ }
+
+ public void setChannels(String channels) {
+ this.channels = channels;
+ }
+
+ public void setConnectionFactory(RedisConnectionFactory connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ }
+
+ public RedisConnectionFactory getConnectionFactory() {
+ return connectionFactory != null ? connectionFactory : createDefaultConnectionFactory();
+ }
+
+ public RedisSerializer getSerializer() {
+ return serializer != null ? serializer : createDefaultSerializer();
+ }
+
+ public void setSerializer(RedisSerializer serializer) {
+ this.serializer = serializer;
+ }
+
+ private RedisConnectionFactory createDefaultConnectionFactory() {
+ JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
+ managedConnectionFactory = true;
+
+ if (host != null) {
+ jedisConnectionFactory.setHostName(host);
+ }
+ if (port != null) {
+ jedisConnectionFactory.setPort(port);
+ }
+ jedisConnectionFactory.afterPropertiesSet();
+ connectionFactory = jedisConnectionFactory;
+ return jedisConnectionFactory;
+ }
+
+ private RedisTemplate createDefaultTemplate() {
+ redisTemplate = new RedisTemplate();
+ redisTemplate.setConnectionFactory(getConnectionFactory());
+ redisTemplate.afterPropertiesSet();
+ return redisTemplate;
+ }
+
+ private RedisMessageListenerContainer createDefaultListenerContainer() {
+ listenerContainer = new RedisMessageListenerContainer();
+ managedListenerContainer = true;
+ listenerContainer.setConnectionFactory(getConnectionFactory());
+ listenerContainer.afterPropertiesSet();
+ return listenerContainer;
+ }
+
+ private RedisSerializer createDefaultSerializer() {
+ serializer = new JdkSerializationRedisSerializer();
+ return serializer;
+ }
+
+ public void stop() throws Exception {
+ if (managedConnectionFactory) {
+ ((JedisConnectionFactory)connectionFactory).destroy();
+ }
+ if (managedListenerContainer) {
+ listenerContainer.destroy();
+ }
+ }
+}
View
46 components/camel-redis/src/main/java/org/apache/camel/component/redis/RedisConstants.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.redis;
+
+public interface RedisConstants {
+ String COMMAND = "CamelRedis.Command";
+ String KEY = "CamelRedis.Key";
+ String KEYS = "CamelRedis.Keys";
+ String FIELD = "CamelRedis.Field";
+ String FIELDS = "CamelRedis.Fields";
+ String VALUE = "CamelRedis.Value";
+ String VALUES = "CamelRedis.Values";
+ String START = "CamelRedis.Start";
+ String END = "CamelRedis.End";
+ String TIMEOUT = "CamelRedis.Timeout";
+ String OFFSET = "CamelRedis.Offset";
+ String DESTINATION = "CamelRedis.Destination";
+ String CHANNEL = "CamelRedis.Channel";
+ String MESSAGE = "CamelRedis.Message";
+ String INDEX = "CamelRedis.Index";
+ String POSITION = "CamelRedis.Position";
+ String PIVOT = "CamelRedis.Pivot";
+ String COUNT = "CamelRedis.Count";
+ String TIMESTAMP = "CamelRedis.Timestamp";
+ String PATTERN = "CamelRedis.Pattern";
+ String DB = "CamelRedis.Db";
+ String SCORE = "CamelRedis.Score";
+ String MIN = "CamelRedis.Min";
+ String MAX = "CamelRedis.Max";
+ String INCREMENT = "CamelRedis.Increment";
+ String WITHSCORE = "CamelRedis.WithScore";
+}
View
94 components/camel-redis/src/main/java/org/apache/camel/component/redis/RedisConsumer.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.redis;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.direct.DirectConsumer;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.listener.ChannelTopic;
+import org.springframework.data.redis.listener.PatternTopic;
+import org.springframework.data.redis.listener.Topic;
+
+public class RedisConsumer extends DirectConsumer implements MessageListener {
+ private final RedisConfiguration redisConfiguration;
+
+ public RedisConsumer(RedisEndpoint redisEndpoint, Processor processor,
+ RedisConfiguration redisConfiguration) {
+ super(redisEndpoint, processor);
+ this.redisConfiguration = redisConfiguration;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ Collection<Topic> topics = toTopics(redisConfiguration.getChannels());
+ redisConfiguration.getListenerContainer().addMessageListener(this, topics);
+ }
+
+ private Collection<Topic> toTopics(String channels) {
+ String[] channelsArrays = channels.split(",");
+ List<Topic> topics = new ArrayList<Topic>();
+ for (String channel : channelsArrays) {
+ if (Command.PSUBSCRIBE.toString().equals(redisConfiguration.getCommand())) {
+ topics.add(new PatternTopic(channel));
+ } else if (Command.SUBSCRIBE.toString().equals(redisConfiguration.getCommand())) {
+ topics.add(new ChannelTopic(channel));
+ } else {
+ throw new RuntimeException("Unsupported Command");
+ }
+ }
+ return topics;
+ }
+
+ @Override
+ public void onMessage(Message message, byte[] pattern) {
+ try {
+ Exchange exchange = getEndpoint().createExchange();
+ setChannel(exchange, message.getChannel());
+ setPattern(exchange, pattern);
+ setBody(exchange, message.getBody());
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void setBody(Exchange exchange, byte[] body) {
+ if (body != null) {
+ exchange.getIn().setBody(redisConfiguration.getSerializer().deserialize(body));
+ }
+ }
+
+ private void setPattern(Exchange exchange, byte[] pattern) {
+ if (pattern != null) {
+ exchange.getIn().setHeader(RedisConstants.PATTERN, pattern);
+ }
+ }
+
+ private void setChannel(Exchange exchange, byte[] message) throws UnsupportedEncodingException {
+ if (message != null) {
+ exchange.getIn().setHeader(RedisConstants.CHANNEL, new String(message, "UTF8"));
+ }
+ }
+}
View
55 components/camel-redis/src/main/java/org/apache/camel/component/redis/RedisEndpoint.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.redis;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.direct.DirectEndpoint;
+
+/**
+ * Represents a Redis endpoint.
+ */
+public class RedisEndpoint extends DirectEndpoint {
+ private RedisConfiguration configuration;
+
+ public RedisEndpoint(String uri, RedisComponent component, RedisConfiguration configuration) {
+ super(uri, component);
+ this.configuration = configuration;
+ }
+
+ public Producer createProducer() throws Exception {
+ return new RedisProducer(this, configuration);
+ }
+
+ public Consumer createConsumer(Processor processor) throws Exception {
+ return new RedisConsumer(this, processor, configuration);
+ }
+
+ public boolean isSingleton() {
+ return true;
+ }
+
+ protected void doShutdown() throws Exception {
+ super.doShutdown();
+ configuration.stop();
+ }
+
+ public RedisConfiguration getConfiguration() {
+ return configuration;
+ }
+}
View
51 components/camel-redis/src/main/java/org/apache/camel/component/redis/RedisProducer.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.redis;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.URISupport;
+
+/**
+ * The Redis producer.
+ */
+public class RedisProducer extends DefaultProducer {
+ private final RedisClient redisClient;
+
+ public RedisProducer(RedisEndpoint endpoint, RedisConfiguration configuration) {
+ super(endpoint);
+ redisClient = new RedisClient(configuration.getRedisTemplate());
+ }
+
+ public void process(final Exchange exchange) throws Exception {
+ new CommandDispatcher(getConfiguration(), exchange).execute(redisClient);
+ }
+
+ protected RedisConfiguration getConfiguration() {
+ return getEndpoint().getConfiguration();
+ }
+
+ @Override
+ public RedisEndpoint getEndpoint() {
+ return (RedisEndpoint)super.getEndpoint();
+ }
+
+ @Override
+ public String toString() {
+ return "RedisProducer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
+ }
+}
View
94 ...redis/src/main/java/org/apache/camel/component/redis/processor/idempotent/RedisIdempotentRepository.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.redis.processor.idempotent;
+
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.component.redis.RedisConfiguration;
+import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.support.ServiceSupport;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.SetOperations;
+
+@ManagedResource(description = "Redis based message id repository")
+public class RedisIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> {
+ private final SetOperations<String, String> setOperations;
+ private final String processorName;
+ private RedisConfiguration redisConfiguration;
+
+ public RedisIdempotentRepository(RedisTemplate<String, String> redisTemplate, String processorName) {
+ this.setOperations = redisTemplate.opsForSet();
+ this.processorName = processorName;
+ }
+
+ public RedisIdempotentRepository(String processorName) {
+ redisConfiguration = new RedisConfiguration();
+ RedisTemplate<String, String> redisTemplate = redisConfiguration.getRedisTemplate();
+ this.setOperations = redisTemplate.opsForSet();
+ this.processorName = processorName;
+ }
+
+ public static RedisIdempotentRepository redisIdempotentRepository(String processorName) {
+ return new RedisIdempotentRepository(processorName);
+ }
+
+ public static RedisIdempotentRepository redisIdempotentRepository(
+ RedisTemplate<String, String> redisTemplate, String processorName) {
+ return new RedisIdempotentRepository(redisTemplate, processorName);
+ }
+
+ @ManagedOperation(description = "Adds the key to the store")
+ public boolean add(String key) {
+ return setOperations.add(processorName, key);
+ }
+
+ @ManagedOperation(description = "Does the store contain the given key")
+ public boolean contains(String key) {
+ return setOperations.isMember(processorName, key);
+ }
+
+ @ManagedOperation(description = "Remove the key from the store")
+ public boolean remove(String key) {
+ return setOperations.remove(processorName, key);
+ }
+
+ @ManagedAttribute(description = "The processor name")
+ public String getProcessorName() {
+ return processorName;
+ }
+
+ public boolean confirm(String key) {
+ return true;
+ }
+
+ protected void doShutdown() throws Exception {
+ super.doShutdown();
+ if (redisConfiguration != null) {
+ redisConfiguration.stop();
+ }
+ }
+
+ protected void doStart() throws Exception {
+ }
+
+ protected void doStop() throws Exception {
+ }
+}
+
View
18 components/camel-redis/src/main/resources/META-INF/services/org/apache/camel/component/redis
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.redis.RedisComponent
View
35 components/camel-redis/src/main/resources/log4j.properties
@@ -0,0 +1,35 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=INFO, file
+
+#log4j.logger.org.apache.camel.component.redis=DEBUG
+
+# CONSOLE appender not used by default
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.file.file=target/camel-redis-test.log
+log4j.appender.file.append=false
View
77 ...ents/camel-redis/src/test/java/org/apache/camel/component/redis/RedisComponentSpringIntegrationTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.redis;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelSpringTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+@Ignore
+public class RedisComponentSpringIntegrationTest extends CamelSpringTestSupport {
+
+ @EndpointInject(uri = "direct:start")
+ private ProducerTemplate template;
+
+ @EndpointInject(uri = "mock:result")
+ private MockEndpoint result;
+
+ @Test
+ public void shouldFilterDuplicateMessagesUsingIdempotentRepository() throws Exception {
+ result.expectedMessageCount(2);
+
+ template.send("direct:start", new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(RedisConstants.COMMAND, "PUBLISH");
+ exchange.getIn().setHeader(RedisConstants.CHANNEL, "testChannel");
+ exchange.getIn().setHeader(RedisConstants.MESSAGE, "Message one");
+ }
+ });
+
+ template.send("direct:start", new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(RedisConstants.COMMAND, "PUBLISH");
+ exchange.getIn().setHeader(RedisConstants.CHANNEL, "testChannel");
+ exchange.getIn().setHeader(RedisConstants.MESSAGE, "Message one");
+ }
+ });
+
+ template.send("direct:start", new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(RedisConstants.COMMAND, "PUBLISH");
+ exchange.getIn().setHeader(RedisConstants.CHANNEL, "testChannel");
+ exchange.getIn().setHeader(RedisConstants.MESSAGE, "Message two");
+ }
+ });
+ assertMockEndpointsSatisfied();
+
+ Exchange resultExchangeOne = result.getExchanges().get(0);
+ Exchange resultExchangeTwo = result.getExchanges().get(1);
+ assertEquals("Message one", resultExchangeOne.getIn().getBody());
+ assertEquals("Message two", resultExchangeTwo.getIn().getBody());
+ }
+
+ @Override
+ protected ClassPathXmlApplicationContext createApplicationContext() {
+ return new ClassPathXmlApplicationContext("RedisComponentSpringTest-context.xml");
+ }
+}
View
96 components/camel-redis/src/test/java/org/apache/camel/component/redis/RedisConnectionTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.redis;
+
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.core.RedisCallback;
+import org.springframework.data.redis.core.RedisTemplate;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RedisConnectionTest extends RedisTestSupport {
+
+ private RedisTemplate redisTemplate;
+ private RedisConnection redisConnection;
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry registry = super.createRegistry();
+ registry.bind("redisTemplate", redisTemplate);
+ return registry;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ redisTemplate = mock(RedisTemplate.class);
+ redisConnection = mock(RedisConnection.class);
+ super.setUp();
+ }
+
+ private void getMocketConnection() {
+ ArgumentCaptor<RedisCallback> argument = ArgumentCaptor.forClass(RedisCallback.class);
+ verify(redisTemplate).execute(argument.capture());
+ RedisCallback redisCallback = argument.getValue();
+ redisCallback.doInRedis(redisConnection);
+ }
+
+ @Test
+ public void shouldExecuteECHO() throws Exception {
+ when(redisTemplate.execute(any(RedisCallback.class))).thenReturn("value");
+
+ Object result = sendHeaders(
+ RedisConstants.COMMAND, "ECHO",
+ RedisConstants.VALUE, "value");
+
+ assertEquals("value", result);
+ }
+
+ @Test
+ public void shouldExecutePING() throws Exception {
+ when(redisTemplate.execute(any(RedisCallback.class))).thenReturn("PONG");
+
+ Object result = sendHeaders(RedisConstants.COMMAND, "PING");
+
+ assertEquals("PONG", result);
+ }
+
+ @Test
+ public void shouldExecuteQUIT() throws Exception {
+ sendHeaders(RedisConstants.COMMAND, "QUIT");
+
+ verify(redisTemplate).execute(any(RedisCallback.class));
+ }
+
+
+ @Test
+ public void shouldExecutePUBLISH() throws Exception {
+ Object result = sendHeaders(
+ RedisConstants.COMMAND, "PUBLISH",
+ RedisConstants.CHANNEL, "channel",
+ RedisConstants.MESSAGE, "a message");
+
+ verify(redisTemplate).convertAndSend("channel", "a message");
+ }
+
+}
View
78 components/camel-redis/src/test/java/org/apache/camel/component/redis/RedisConsumerIntegrationTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.redis;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+
+@Ignore
+public class RedisConsumerIntegrationTest extends RedisTestSupport {
+ private static final JedisConnectionFactory CONNECTION_FACTORY = new JedisConnectionFactory();
+ private static final RedisMessageListenerContainer LISTENER_CONTAINER = new RedisMessageListenerContainer();
+
+ static {
+ CONNECTION_FACTORY.afterPropertiesSet();
+ LISTENER_CONTAINER.setConnectionFactory(CONNECTION_FACTORY);
+ LISTENER_CONTAINER.afterPropertiesSet();
+ }
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry registry = super.createRegistry();
+
+ redisTemplate = new RedisTemplate();
+ redisTemplate.setConnectionFactory(CONNECTION_FACTORY);
+ redisTemplate.afterPropertiesSet();
+
+ registry.bind("redisTemplate", redisTemplate);
+ registry.bind("listenerContainer", LISTENER_CONTAINER);
+ return registry;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("redis://localhost:6379?command=SUBSCRIBE&channels=one,two&listenerContainer=#listenerContainer&redisTemplate=#redisTemplate")
+ .to("mock:result");
+
+ from("direct:start")
+ .to("redis://localhost:6379?redisTemplate=#redisTemplate");
+ }
+ };
+ }
+
+ @Test
+ public void consumerReceivesMessages() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMinimumMessageCount(1);
+ mock.expectedBodiesReceived("message");
+
+ sendHeaders(
+ RedisConstants.COMMAND, "PUBLISH",
+ RedisConstants.CHANNEL, "two",
+ RedisConstants.MESSAGE, "message");
+ mock.assertIsSatisfied();
+ }
+}
+
View
94 components/camel-redis/src/test/java/org/apache/camel/component/redis/RedisConsumerTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.redis;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.springframework.data.redis.connection.DefaultMessage;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.listener.ChannelTopic;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.data.redis.listener.Topic;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class RedisConsumerTest extends CamelTestSupport {
+ private RedisMessageListenerContainer listenerContainer;
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry registry = super.createRegistry();
+ registry.bind("listenerContainer", listenerContainer);
+ return registry;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ listenerContainer = mock(RedisMessageListenerContainer.class);
+ super.setUp();
+ }
+
+ @Test
+ public void registerConsumerForTwoChannelTopics() throws Exception {
+ ArgumentCaptor<Collection> collectionCaptor = ArgumentCaptor.forClass(Collection.class);
+ verify(listenerContainer).addMessageListener(any(MessageListener.class), collectionCaptor.capture());
+
+ Collection<ChannelTopic> topics = collectionCaptor.getValue();
+ Iterator<ChannelTopic> topicIterator = topics.iterator();
+
+ Topic firstTopic = topicIterator.next();
+ Topic twoTopic = topicIterator.next();
+ assertThat(firstTopic.getTopic(), is("one"));
+ assertThat(twoTopic.getTopic(), is("two"));
+ }
+
+ @Test
+ public void consumerReceivesMessages() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMinimumMessageCount(2);
+
+ ArgumentCaptor<MessageListener> messageListenerCaptor = ArgumentCaptor
+ .forClass(MessageListener.class);
+ verify(listenerContainer).addMessageListener(messageListenerCaptor.capture(), any(Collection.class));
+
+ MessageListener messageListener = messageListenerCaptor.getValue();
+ messageListener.onMessage(new DefaultMessage(null, null), null);
+ messageListener.onMessage(new DefaultMessage(null, null), null);
+
+ mock.assertIsSatisfied();
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("redis://localhost:6379?command=SUBSCRIBE&channels=one,two&listenerContainer=#listenerContainer")
+ .to("mock:result");
+ }
+ };
+ }
+}
View
220 components/camel-redis/src/test/java/org/apache/camel/component/redis/RedisHashTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.redis;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.data.redis.core.HashOperations;
+import org.springframework.data.redis.core.RedisTemplate;
+
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RedisHashTest extends RedisTestSupport {
+ private RedisTemplate redisTemplate;
+ private HashOperations hashOperations;
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ when(redisTemplate.opsForHash()).thenReturn(hashOperations);
+
+ JndiRegistry registry = super.createRegistry();
+ registry.bind("redisTemplate", redisTemplate);
+ return registry;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ redisTemplate = mock(RedisTemplate.class);
+ hashOperations = mock(HashOperations.class);
+ super.setUp();
+ }
+
+ @Test
+ public void shouldExecuteHDEL() throws Exception {
+ Object result = sendHeaders(
+ RedisConstants.COMMAND, "HDEL",
+ RedisConstants.KEY, "key",
+ RedisConstants.FIELD, "field");
+
+ verify(hashOperations).delete("key", "field");
+ }
+
+ @Test
+ public void shouldExecuteHEXISTS() throws Exception {
+ when(hashOperations.hasKey(anyString(), anyString())).thenReturn(true);
+
+ Object result = sendHeaders(
+ RedisConstants.COMMAND, "HEXISTS",
+ RedisConstants.KEY, "key",
+ RedisConstants.FIELD, "field");
+
+ verify(hashOperations).hasKey("key", "field");
+ assertEquals(true, result);
+ }
+
+ @Test
+ public void shouldExecuteHINCRBY() throws Exception {
+ when(hashOperations.increment(anyString(), anyString(), anyLong())).thenReturn(1L);
+
+ Object result = sendHeaders(
+ RedisConstants.COMMAND, "HINCRBY",
+ RedisConstants.KEY, "key",
+ RedisConstants.FIELD, "field",
+ RedisConstants.VALUE, "1");
+
+ verify(hashOperations).increment("key", "field", 1L);
+ assertEquals(1L, result);
+ }
+
+ @Test
+ public void shouldExecuteHKEYS() throws Exception {
+ Set<String> fields = new HashSet<String>(Arrays.asList(new String[] {"field1, field2"}));
+ when(hashOperations.keys(anyString())).thenReturn(fields);
+
+ Object result = sendHeaders(
+ RedisConstants.COMMAND, "HKEYS",
+ RedisConstants.KEY, "key");
+
+ verify(hashOperations).keys("key");
+ assertEquals(fields, result);
+ }
+
+
+ @Test
+ public void shouldExecuteHMSET() throws Exception {
+ HashMap<String, String> values = new HashMap<String, String>();
+ values.put("field1", "value1");
+ values.put("field2", "value");
+
+ Object result = sendHeaders(
+ RedisConstants.COMMAND, "HMSET",
+ RedisConstants.KEY, "key",
+ RedisConstants.VALUES, values);
+
+ verify(hashOperations).putAll("key", values);
+ }
+
+ @Test
+ public void shouldExecuteHVALS() throws Exception {
+ List<String> values = new ArrayList<String>();
+ values.add("val1");
+ values.add("val2");
+
+ when(hashOperations.values(anyString())).thenReturn(values);
+
+ Object result = sendHeaders(
+ RedisConstants.COMMAND, "HVALS",
+ RedisConstants.KEY, "key",
+ RedisConstants.VALUES, values);
+
+ verify(hashOperations).values("key");
+ assertEquals(values, result);
+ }
+
+ @Test
+ public void shouldExecuteHLEN() throws Exception {
+ when(hashOperations.size(anyString())).thenReturn(2L);
+
+ Object result = sendHeaders(
+ RedisConstants.COMMAND, "HLEN",
+ RedisConstants.KEY, "key");
+
+ verify(hashOperations).size("key");
+ assertEquals(2L, result);
+ }
+
+ @Test
+ public void shouldSetHashValue() throws Exception {
+ Object result = sendHeaders(
+ RedisConstants.COMMAND, "HSET",
+ RedisConstants.KEY, "key",
+ RedisConstants.FIELD, "field",
+ RedisConstants.VALUE, "value");
+
+ verify(hashOperations).put("key", "field", "value");
+ }
+
+ @Test
+ public void shouldExecuteHSETNX() throws Exception {
+ when(hashOperations.putIfAbsent(anyString(), anyString(), anyString())).thenReturn(true);
+
+ Object result = sendHeaders(
+ RedisConstants.COMMAND, "HSETNX",
+ RedisConstants.KEY, "key",
+ RedisConstants.FIELD, "field",
+ RedisConstants.VALUE, "value");
+
+ verify(hashOperations).putIfAbsent("key", "field", "value");
+ assertEquals(true, result);
+ }
+
+
+ @Test
+ public void shouldExecuteHGET() throws Exception {
+ when(hashOperations.get(anyString(), anyString())).thenReturn("value");
+
+ Object result = sendHeaders(
+ RedisConstants.COMMAND, "HGET",
+ RedisConstants.KEY, "key",
+ RedisConstants.FIELD, "field");
+
+ verify(hashOperations).get("key", "field");
+ assertEquals("value", result);
+ }
+
+ @Test
+ public void shouldExecuteHGETALL() throws Exception {
+ HashMap<String, String> values = new HashMap<String, String>();
+ values.put("field1", "valu1");
+ when(hashOperations.entries(anyString())).thenReturn(values);
+
+ Object result = sendHeaders(
+ RedisConstants.COMMAND, "HGETALL",
+ RedisConstants.KEY, "key");
+
+ verify(hashOperations).entries("key");
+ assertEquals(values, result);
+ }
+
+ @Test
+ public void shouldExecuteHMGET() throws Exception {
+ List<String> fields = new ArrayList<String>();
+ fields.add("field1");
+ when(hashOperations.multiGet(anyString(), anyCollection())).thenReturn(fields);
+
+ Object result = sendHeaders(
+ RedisConstants.COMMAND, "HMGET",
+ RedisConstants.KEY, "key",
+ RedisConstants.FIELDS, fields);
+
+ verify(hashOperations).multiGet("key", fields);
+ assertEquals(fields, result);
+ }
+}
View
248 components/camel-redis/src/test/java/org/apache/camel/component/redis/RedisKeyTest.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+