Skip to content

Commit

Permalink
-Implement two storage strategies HASH, STRING (Madhawa's suggestion)
Browse files Browse the repository at this point in the history
-Create two DataStoreTest classes one for each strategy
-Ignore schema-related methods getSchemaName createSchema deleteSchema schemaExists
-Add secondary index using ScoredSortedSet for querying (put, delete methods)
-Enable tests for methods: get, put, exists (passing).
  • Loading branch information
cuent committed Jun 22, 2019
1 parent c2b4b8f commit 93c19a2
Show file tree
Hide file tree
Showing 11 changed files with 1,942 additions and 1,483 deletions.
28 changes: 11 additions & 17 deletions gora-redis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
Expand Down Expand Up @@ -125,10 +125,10 @@
</dependency>

<!-- <dependency>
<groupId>org.apache.redis</groupId>
<artifactId>redis-minicluster</artifactId>
<version>${redis.version}</version>
<scope>test</scope>
<groupId>org.apache.redis</groupId>
<artifactId>redis-minicluster</artifactId>
<version>${redis.version}</version>
<scope>test</scope>
</dependency>-->

<dependency>
Expand Down Expand Up @@ -177,12 +177,6 @@
<artifactId>junit</artifactId>
</dependency>

<dependency>
<groupId>com.github.fppt</groupId>
<artifactId>jedis-mock</artifactId>
<version>0.1.13</version>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
Expand All @@ -196,4 +190,4 @@

</dependencies>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package org.apache.gora.redis.store;

import java.util.Map;

/**
* Mapping definitions for Redis.
*/
public class RedisMapping {

private int datebase;
private String prefix;
private Map<String, String> fields;

public int getDatebase() {
return datebase;
Expand All @@ -40,6 +43,12 @@ public void setPrefix(String prefix) {
this.prefix = prefix;
}



public Map<String, String> getFields() {
return fields;
}

public void setFields(Map<String, String> fields) {
this.fields = fields;
}

}
159 changes: 129 additions & 30 deletions gora-redis/src/main/java/org/apache/gora/redis/store/RedisStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
Expand All @@ -30,10 +33,13 @@
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.redis.util.DatumHandler;
import org.apache.gora.redis.util.StorageMode;
import org.apache.gora.store.impl.DataStoreBase;
import org.apache.gora.util.GoraException;
import org.redisson.Redisson;
import org.redisson.api.RBucket;
import org.redisson.api.RMap;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.ReadMode;
Expand Down Expand Up @@ -64,6 +70,7 @@ public class RedisStore<K, T extends PersistentBase> extends DataStoreBase<K, T>
public static final Logger LOG = LoggerFactory.getLogger(RedisStore.class);

private static final DatumHandler handler = new DatumHandler();
private StorageMode mode;

/**
* Initialize the data store by reading the credentials, setting the client's
Expand Down Expand Up @@ -91,6 +98,8 @@ public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties p
}
mapping = readMapping(mappingStream);
Config config = new Config();
String storage = getConf().get("gora.datastore.redis.storage", properties.getProperty("gora.datastore.redis.storage"));
mode = StorageMode.valueOf(storage);
String mode = properties.getProperty("gora.datastore.redis.mode");
String name = properties.getProperty("gora.datastore.redis.masterName");
String readm = properties.getProperty("gora.datastore.redis.readMode");
Expand All @@ -99,24 +108,24 @@ public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties p
switch (mode) {
case "single":
config.useSingleServer()
.setAddress("redis://" + hosts[0])
.setDatabase(mapping.getDatebase());
.setAddress("redis://" + hosts[0])
.setDatabase(mapping.getDatebase());
break;
case "cluster":
config.useClusterServers()
.addNodeAddress(hosts);
.addNodeAddress(hosts);
break;
case "replicated":
config.useReplicatedServers()
.addNodeAddress(hosts)
.setDatabase(mapping.getDatebase());
.addNodeAddress(hosts)
.setDatabase(mapping.getDatebase());
break;
case "sentinel":
config.useSentinelServers()
.setMasterName(name)
.setReadMode(ReadMode.valueOf(readm))
.addSentinelAddress(hosts)
.setDatabase(mapping.getDatebase());
.setMasterName(name)
.setReadMode(ReadMode.valueOf(readm))
.addSentinelAddress(hosts)
.setDatabase(mapping.getDatebase());
break;
}
redisInstance = Redisson.create(config);
Expand All @@ -138,9 +147,18 @@ protected RedisMapping readMapping(InputStream inputStream) throws IOException {
for (int i = 0; i < nl.getLength(); i++) {
Element classElement = (Element) nl.item(i);
if (classElement.getAttribute("keyClass").equals(keyClass.getCanonicalName())
&& classElement.getAttribute("name").equals(persistentClass.getCanonicalName())) {
&& classElement.getAttribute("name").equals(persistentClass.getCanonicalName())) {
mapping.setDatebase(Integer.parseInt(classElement.getAttribute("database")));
mapping.setPrefix(classElement.getAttribute("prefix"));
NodeList elementsByTagName = classElement.getElementsByTagName("field");
Map<String, String> map = new HashMap<>();
for (int j = 0; j < elementsByTagName.getLength(); j++) {
Element item = (Element) elementsByTagName.item(j);
String name = item.getAttribute("name");
String column = item.getAttribute("column");
map.put(name, column);
}
mapping.setFields(map);
}
}
return mapping;
Expand All @@ -149,42 +167,93 @@ protected RedisMapping readMapping(InputStream inputStream) throws IOException {
}
}

/**
* Redis, being a schemaless database does not support explicit schema
* creation. When the records are added to the database, the schema is created
* on the fly. Thus, schema operations are unavailable in gora-redis module.
*
* @return null
*/
@Override
public String getSchemaName() {
return mapping.getDatebase() + "";
return null;
}

/**
* Redis, being a schemaless database does not support explicit schema
* creation. When the records are added to the database, the schema is created
* on the fly. Thus, schema operations are unavailable in gora-redis module.
*/
@Override
public void createSchema() throws GoraException {
}

/**
* Redis, being a schemaless database does not support explicit schema
* creation. When the records are added to the database, the schema is created
* on the fly. Thus, schema operations are unavailable in gora-redis module.
*/
@Override
public void deleteSchema() throws GoraException {
redisInstance.getKeys().flushdb();
}

/**
* Redis, being a schemaless database does not support explicit schema
* creation. When the records are added to the database, the schema is created
* on the fly. Thus, schema operations are unavailable in gora-redis module.
*
* @return true
*/
@Override
public boolean schemaExists() throws GoraException {
return true;
}

private String generateKey(Object baseKey) {
private String generateKeyHash(Object baseKey) {
return mapping.getPrefix() + ".key." + baseKey;
}

private String generateKeyString(String field, Object baseKey) {
return generateKeyStringBase(baseKey) + field;
}

private String generateKeyStringBase(Object baseKey) {
return mapping.getPrefix() + ".key." + baseKey + ".field.";
}

private String generateIndexKey() {
return mapping.getPrefix() + ".index";
}

public T newInstance(RMap<String, Object> map, String[] fields) throws GoraException, IOException {
public T newInstanceFromString(K key, String[] fields) throws GoraException, IOException {
fields = getFieldsToQuery(fields);
T persistent = newPersistent();
int countRetrieved = 0;
for (String f : fields) {
Schema.Field field = fieldMap.get(f);
Object fieldValue = handler.deserializeFieldValue(field, field.schema(), map.get(field.name()), persistent);
if (fieldValue == null) {
RBucket<Object> bucket = redisInstance.getBucket(generateKeyString(mapping.getFields().get(field.name()), key));
Object val = bucket.get();
if (val == null) {
continue;
}
Object fieldValue = handler.deserializeFieldValue(field, field.schema(), val, persistent);
countRetrieved++;
persistent.put(field.pos(), fieldValue);
persistent.setDirty(field.pos());
}
return countRetrieved > 0 ? persistent : null;
}

public T newInstanceFromHash(RMap<String, Object> map, String[] fields) throws GoraException, IOException {
fields = getFieldsToQuery(fields);
T persistent = newPersistent();
for (String f : fields) {
Schema.Field field = fieldMap.get(f);
Object val = map.get(mapping.getFields().get(field.name()));
if (val == null) {
continue;
}
Object fieldValue = handler.deserializeFieldValue(field, field.schema(), val, persistent);
persistent.put(field.pos(), fieldValue);
persistent.setDirty(field.pos());
}
Expand All @@ -193,12 +262,16 @@ public T newInstance(RMap<String, Object> map, String[] fields) throws GoraExcep

@Override
public T get(K key, String[] fields) throws GoraException {
RMap<String, Object> map = redisInstance.getMap(generateKey(key));
try {
if (map.size() != 0) {
return newInstance(map, fields);
if (mode == StorageMode.HASH) {
RMap<String, Object> map = redisInstance.getMap(generateKeyHash(key));
if (!map.isEmpty()) {
return newInstanceFromHash(map, fields);
} else {
return null;
}
} else {
return null;
return newInstanceFromString(key, fields);
}
} catch (IOException ex) {
throw new GoraException(ex);
Expand All @@ -211,16 +284,29 @@ public void put(K key, T obj) throws GoraException {
if (obj.isDirty()) {
Schema schema = obj.getSchema();
List<Schema.Field> fields = schema.getFields();
RMap<String, Object> map = redisInstance.getMap(generateKey(key));
for (Schema.Field field : fields) {
Object fieldValue = handler.serializeFieldValue(field.schema(), obj.get(field.pos()));
if (fieldValue != null) {
map.put(field.name(), fieldValue);
//update secundary index
RScoredSortedSet<K> secundaryIndex = redisInstance.getScoredSortedSet(generateIndexKey());
secundaryIndex.add(0, key);
if (mode == StorageMode.HASH) {
RMap<String, Object> map = redisInstance.getMap(generateKeyHash(key));
for (Schema.Field field : fields) {
Object fieldValue = handler.serializeFieldValue(field.schema(), obj.get(field.pos()));
if (fieldValue != null) {
map.put(mapping.getFields().get(field.name()), fieldValue);
}
}
} else {
for (Schema.Field field : fields) {
Object fieldValue = handler.serializeFieldValue(field.schema(), obj.get(field.pos()));
if (fieldValue != null) {
RBucket<Object> bucket = redisInstance.getBucket(generateKeyString(mapping.getFields().get(field.name()), key));
bucket.set(fieldValue);
}
}
}
} else {
LOG.info("Ignored putting object {} in the store as it is neither "
+ "new, neither dirty.", new Object[]{obj});
+ "new, neither dirty.", new Object[]{obj});
}
} catch (Exception e) {
throw new GoraException(e);
Expand All @@ -229,8 +315,15 @@ public void put(K key, T obj) throws GoraException {

@Override
public boolean delete(K key) throws GoraException {
RMap<String, Object> map = redisInstance.getMap(generateKey(key));
return map.delete();
//update secundary index
RScoredSortedSet<K> secundaryIndex = redisInstance.getScoredSortedSet(generateIndexKey());
secundaryIndex.remove(key);
if (mode == StorageMode.HASH) {
RMap<String, Object> map = redisInstance.getMap(generateKeyHash(key));
return map.delete();
} else {
return redisInstance.getKeys().deleteByPattern(generateKeyStringBase(key) + "*") > 0;
}
}

@Override
Expand Down Expand Up @@ -265,7 +358,13 @@ public void close() {
redisInstance.shutdown();
}

@Override
public boolean exists(K key) throws GoraException {
return redisInstance.getKeys().countExists(generateKey(key)) != 0;
if (mode == StorageMode.HASH) {
return redisInstance.getKeys().countExists(generateKeyHash(key)) != 0;
} else {
Iterator<String> respKeys = redisInstance.getKeys().getKeysByPattern(generateKeyStringBase(key) + "*", 1).iterator();
return respKeys.hasNext();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,4 +234,4 @@ private int getUnionSchema(Object instanceValue, Schema unionSchema) {
return 0;
}

}
}

0 comments on commit 93c19a2

Please sign in to comment.