Permalink
Browse files

[SM-2067] Added a Redis Store

git-svn-id: https://svn.apache.org/repos/asf/servicemix/utils/trunk@1089268 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent df1d1bd commit 5d2e03ad2f2756de61412fa4133903be8f4a3cf4 @iocanel iocanel committed Apr 5, 2011
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2011 iocanel.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * under the License.
+ */
+package org.apache.servicemix.store.redis;
+
+
+import org.apache.servicemix.store.Entry;
+import org.apache.servicemix.store.Store;
+import org.idevlab.rjc.RedisNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.misc.BASE64Decoder;
+import sun.misc.BASE64Encoder;
+
+import java.io.*;
+
+public class RedisStore implements Store {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RedisStore.class);
+
+ private RedisNode redisNode;
+ private String storeName;
+ private String idgenName;
+ private Long timeout = 0L;
+
+ private BASE64Encoder encoder = new BASE64Encoder();
+ private BASE64Decoder decoder = new BASE64Decoder();
+
+ /**
+ * Constructor
+ *
+ * @param redisNode
+ * @param storeName
+ */
+ public RedisStore(RedisNode redisNode, String storeName) {
+ this.redisNode = redisNode;
+ this.storeName = storeName;
+ this.idgenName = storeName + ".idgen";
+ }
+
+ public RedisStore(RedisNode redisNode, String storeName, Long timeout) {
+ this.redisNode = redisNode;
+ this.storeName = storeName;
+ this.idgenName = storeName + ".idgen";
+ this.timeout = timeout;
+ }
+
+ /**
+ * <p>
+ * Returns true if feature is provided by the store (clustered), false else.
+ * </p>
+ *
+ * @param feature the feature.
+ * @return true if the given feature is provided by the store, false else.
+ */
+ public boolean hasFeature(String feature) {
+ if (CLUSTERED.equals(feature) || PERSISTENT.equals(feature))
+ return true;
+ return false;
+ }
+
+ /**
+ * <p>
+ * Put an object in the store under the given id.
+ * This method must be used with caution and the behavior is
+ * unspecified if an object already exist for the same id.
+ * </p>
+ * @param id the id of the object to store
+ * @param data the object to store
+ * @throws IOException if an error occurs
+ */
+ public void store(String id, Object data) throws IOException {
+ LOG.debug("Storing object with id: " + id);
+ try {
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(buffer);
+ out.writeObject(new Entry(data));
+ out.close();
+ redisNode.set(id, encoder.encode(buffer.toByteArray()));
+ } catch (Exception e) {
+ throw (IOException) new IOException("Error storing object").initCause(e);
+ }
+ }
+
+ /**
+ * <p>
+ * Put an object into the store and return the unique id that
+ * may be used at a later time to retrieve the object.
+ * </p>
+ * @param data the object to store
+ * @return the id of the object stored
+ * @throws IOException if an error occurs
+ */
+ public String store(Object data) throws IOException {
+ Long id = redisNode.incr(idgenName);
+ store(String.valueOf(id), data);
+ return null;
+ }
+
+ /**
+ * <p>
+ * Loads an object that has been previously stored under the specified key.
+ * The object is removed from the store.
+ * </p>
+ * @param id the id of the object
+ * @return the object, or <code>null></code> if the object could not be found
+ * @throws IOException if an error occurs
+ */
+ public Object load(String id) throws IOException {
+ LOG.debug("Loading/Removing object with id: " + id);
+ Object result = null;
+ if (timeout > 0) {
+ evict();
+ }
+ try {
+ result = parseEntry(redisNode.get(id));
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Could not load object from store", e);
+ }
+ redisNode.del(id);
+ return result;
+ }
+
+ /**
+ * <p>
+ * Loads an object that has been previously stored under the specified key.
+ * The object is not removed from the store.
+ * </p>
+ * @param id the id of the object
+ * @return the object, or <code>null</code> if the object could not be found
+ * @throws IOException if an error occurs
+ */
+ public Object peek(String id) throws IOException {
+ LOG.debug("Peeking object with id: " + id);
+ Object result = null;
+ try {
+ result = parseEntry(redisNode.get(id)).getData();
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Could not load object from store", e);
+ }
+ return result;
+ }
+
+
+ /**
+ * Decodes a String to an Entry.
+ * @param string
+ * @return
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ private Entry parseEntry(String string) throws IOException, ClassNotFoundException {
+ ByteArrayInputStream buffer = new ByteArrayInputStream(decoder.decodeBuffer(string));
+ ObjectInputStream out = new ObjectInputStream(buffer);
+ return (Entry) out.readObject();
+ }
+
+ /**
+ * Check for Entries that have timed out.
+ * @throws IOException
+ */
+ private void evict() throws IOException {
+ long now = System.currentTimeMillis();
+ for (String key : redisNode.keys("*")) {
+ long age = 0;
+ try {
+ age = now - parseEntry(redisNode.get(key)).getTime();
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Could not load object from store", e);
+ }
+ if (age > timeout) {
+ LOG.debug("Removing object with id " + key + " from store after " + age + " ms");
+ load(key);
+ }
+ }
+ }
+}
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2011 iocanel.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * under the License.
+ */
+package org.apache.servicemix.store.redis;
+
+
+import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.StoreFactory;
+import org.idevlab.rjc.RedisNode;
+import org.idevlab.rjc.SingleRedisOperations;
+import org.idevlab.rjc.ds.DataSource;
+import org.idevlab.rjc.ds.SimpleDataSource;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RedisStoreFactory implements StoreFactory {
+
+ private Map<String, RedisStore> stores = new HashMap<String, RedisStore>();
+
+ private RedisNode redisNode;
+ private long timeout = -1;
+
+ public static final String STORE_PREFIX = "org.apache.servicemix.stores";
+
+ public RedisStoreFactory(RedisNode redisNode) {
+ this.redisNode = redisNode;
+ }
+
+ public synchronized Store open(String name) throws IOException {
+ RedisStore store = stores.get(name);
+ String storeName = STORE_PREFIX + "." + name;
+ if (store == null) {
+ if (timeout <= 0) {
+ store = new RedisStore(redisNode, storeName);
+ } else {
+ store = new RedisStore(redisNode, storeName, timeout);
+ }
+ stores.put(name, store);
+ }
+ return store;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.store.ExchangeStoreFactory#release(org.apache.servicemix.store.ExchangeStore)
+ */
+ public synchronized void close(Store store) throws IOException {
+ stores.remove(store);
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+}

0 comments on commit 5d2e03a

Please sign in to comment.