Skip to content

Commit

Permalink
- Add support to List and Hash datatypes from Redis
Browse files Browse the repository at this point in the history
- Improve connfiguration of redis docker container
- Start implementing queries.
  • Loading branch information
cuent committed Jul 7, 2019
1 parent 21e92d0 commit 4b57c24
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,24 @@
package org.apache.gora.redis.query;

import java.io.IOException;
import java.util.Collection;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.Query;
import org.apache.gora.query.impl.ResultBase;
import org.apache.gora.redis.store.RedisStore;
import org.apache.gora.store.DataStore;
import org.redisson.api.RedissonClient;

/**
* Redis specific implementation of the {@link org.apache.gora.query.Result}
* interface.
*/
public class RedisResult<K, T extends PersistentBase> extends ResultBase<K, T> {

// private RowIterator iterator;
private RedissonClient connection;
private Collection<String> range;
private String[] fields;

/**
* Gets the data store used
*/
Expand All @@ -42,13 +47,12 @@ public RedisStore<K, T> getDataStore() {
* @param query
* @param scanner
*/
public RedisResult(DataStore<K, T> dataStore, Query<K, T> query) {//, Scanner scanner) {
public RedisResult(DataStore<K, T> dataStore, Query<K, T> query, RedissonClient con, Collection<String> rg, String[] fls) {//, Scanner scanner) {
super(dataStore, query);

// if (this.limit > 0) {
// scanner.setBatchSize((int) this.limit);
// }
// iterator = new RowIterator(scanner.iterator());
this.connection = con;
this.range = rg;
this.fields = fls;
}

/**
Expand All @@ -65,30 +69,35 @@ public float getProgress() throws IOException {

@Override
public void close() throws IOException {

}

/**
* Gets the next item
*/
@Override
protected boolean nextInner() throws IOException {
//
// if (!iterator.hasNext()) {
// return false;
// }
//
// key = null;
//
// Iterator<Entry<Key, Value>> nextRow = iterator.next();
// ByteSequence row = getDataStore().populate(nextRow, persistent);
// key = ((RedisStore<K, T>) dataStore).fromBytes(getKeyClass(), row.toArray());

return true;
}

@Override
public int size() {
return (int) this.limit;
return this.range.size();
}

public RedissonClient getConnection() {
return connection;
}

public void setConnection(RedissonClient connection) {
this.connection = connection;
}

public Collection<String> getRange() {
return range;
}
}

public void setRange(Collection<String> range) {
this.range = range;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class RedisMapping {
private int datebase;
private String prefix;
private Map<String, String> fields;
private Map<String, String> types;

public int getDatebase() {
return datebase;
Expand All @@ -51,4 +52,12 @@ public void setFields(Map<String, String> fields) {
this.fields = fields;
}

public Map<String, String> getTypes() {
return types;
}

public void setTypes(Map<String, String> types) {
this.types = types;
}

}
111 changes: 85 additions & 26 deletions gora-redis/src/main/java/org/apache/gora/redis/store/RedisStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand All @@ -32,18 +34,23 @@
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.query.impl.PartitionQueryImpl;
import org.apache.gora.redis.query.RedisQuery;
import org.apache.gora.redis.query.RedisResult;
import org.apache.gora.redis.util.DatumHandler;
import org.apache.gora.redis.util.ServerMode;
import org.apache.gora.redis.util.StorageMode;
import org.apache.gora.store.impl.DataStoreBase;
import org.apache.gora.util.GoraException;
import org.redisson.Redisson;
import org.redisson.api.BatchOptions;
import org.redisson.api.RBatch;
import org.redisson.api.RBucket;
import org.redisson.api.RBucketAsync;
import org.redisson.api.RFuture;
import org.redisson.api.RLexSortedSet;
import org.redisson.api.RLexSortedSetAsync;
import org.redisson.api.RList;
import org.redisson.api.RListAsync;
import org.redisson.api.RMap;
import org.redisson.api.RMapAsync;
import org.redisson.api.RedissonClient;
Expand Down Expand Up @@ -134,8 +141,7 @@ public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties p
config.useSentinelServers()
.setMasterName(name)
.setReadMode(ReadMode.valueOf(readm))
.addSentinelAddress(hosts)
.setDatabase(mapping.getDatebase());
.addSentinelAddress(hosts);
break;
}
redisInstance = Redisson.create(config);
Expand All @@ -161,14 +167,18 @@ protected RedisMapping readMapping(InputStream inputStream) throws IOException {
mapping.setDatebase(Integer.parseInt(classElement.getAttribute("database")));
mapping.setPrefix(classElement.getAttribute("prefix"));
NodeList elementsByTagName = classElement.getElementsByTagName("field");
Map<String, String> map = new HashMap<>();
Map<String, String> mapFields = new HashMap<>();
Map<String, String> mapTypes = new HashMap<>();
for (int j = 0; j < elementsByTagName.getLength(); j++) {
Element item = (Element) elementsByTagName.item(j);
String name = item.getAttribute("name");
String column = item.getAttribute("column");
map.put(name, column);
String type = item.getAttribute("type");
mapFields.put(name, column);
mapTypes.put(name, type);
}
mapping.setFields(map);
mapping.setTypes(mapTypes);
mapping.setFields(mapFields);
}
}
return mapping;
Expand Down Expand Up @@ -220,15 +230,15 @@ public boolean schemaExists() throws GoraException {
}

private String generateKeyHash(Object baseKey) {
return mapping.getPrefix() + ".key." + baseKey;
return mapping.getPrefix() + ".{" + baseKey + "}";
}

private String generateKeyString(String field, Object baseKey) {
return generateKeyStringBase(baseKey) + field;
}

private String generateKeyStringBase(Object baseKey) {
return mapping.getPrefix() + ".key." + baseKey + ".field.";
return mapping.getPrefix() + ".{" + baseKey + "}" + ".";
}

private String generateIndexKey() {
Expand All @@ -241,14 +251,28 @@ public T newInstanceFromString(K key, String[] fields) throws GoraException, IOE
int countRetrieved = 0;
for (String f : fields) {
Schema.Field field = fieldMap.get(f);
RBucket<Object> bucket = redisInstance.getBucket(generateKeyString(mapping.getFields().get(field.name()), key));
Object val = bucket.get();
if (val == null) {
String redisField = mapping.getFields().get(field.name());
String redisType = mapping.getTypes().get(field.name());
Object redisVal = null;
switch (redisType) {
case "String":
RBucket<Object> bucket = redisInstance.getBucket(generateKeyString(redisField, key));
redisVal = bucket.isExists() ? handler.deserializeFieldValue(field, field.schema(), bucket.get(), persistent) : null;
break;
case "List":
RList<Object> list = redisInstance.getList(generateKeyString(redisField, key));
redisVal = list.isExists() ? handler.deserializeFieldList(field, field.schema(), list, persistent) : null;
break;
case "Hash":
RMap<Object, Object> map = redisInstance.getMap(generateKeyString(redisField, key));
redisVal = map.isExists() ? handler.deserializeFieldMap(field, field.schema(), map, persistent) : null;
break;
}
if (redisVal == null) {
continue;
}
Object fieldValue = handler.deserializeFieldValue(field, field.schema(), val, persistent);
countRetrieved++;
persistent.put(field.pos(), fieldValue);
persistent.put(field.pos(), redisVal);
persistent.setDirty(field.pos());
}
return countRetrieved > 0 ? persistent : null;
Expand Down Expand Up @@ -294,7 +318,7 @@ public void put(K key, T obj) throws GoraException {
if (obj.isDirty()) {
Schema schema = obj.getSchema();
List<Schema.Field> fields = schema.getFields();
RBatch batchInstance = redisInstance.createBatch(BatchOptions.defaults());
RBatch batchInstance = redisInstance.createBatch();
//update secundary index
RLexSortedSetAsync secundaryIndex = batchInstance.getLexSortedSet(generateIndexKey());
secundaryIndex.addAsync(key.toString());
Expand All @@ -303,15 +327,32 @@ public void put(K key, T obj) throws GoraException {
for (Schema.Field field : fields) {
Object fieldValue = handler.serializeFieldValue(field.schema(), obj.get(field.pos()));
if (fieldValue != null) {
map.putAsync(mapping.getFields().get(field.name()), fieldValue);
map.fastPutAsync(mapping.getFields().get(field.name()), fieldValue);
}
}
} else {
for (Schema.Field field : fields) {
Object fieldValue = handler.serializeFieldValue(field.schema(), obj.get(field.pos()));
Object fieldValue = obj.get(field.pos());
if (fieldValue != null) {
RBucketAsync<Object> bucket = batchInstance.getBucket(generateKeyString(mapping.getFields().get(field.name()), key));
bucket.setAsync(fieldValue);
String redisField = mapping.getFields().get(field.name());
String redisType = mapping.getTypes().get(field.name());
switch (redisType) {
case "String":
fieldValue = handler.serializeFieldValue(field.schema(), fieldValue);
RBucketAsync<Object> bucket = batchInstance.getBucket(generateKeyString(redisField, key));
bucket.setAsync(fieldValue);
break;
case "List":
List<Object> list = handler.serializeFieldList(schema, fieldValue);
RListAsync<Object> rlist = batchInstance.getList(generateKeyString(redisField, key));
rlist.addAllAsync(list);
break;
case "Map":
Map<Object, Object> mp = handler.serializeFieldMap(schema, fieldValue);
RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyString(redisField, key));
map.putAllAsync(mp);
break;
}
}
}
}
Expand All @@ -328,7 +369,7 @@ public void put(K key, T obj) throws GoraException {
@Override
public boolean delete(K key) throws GoraException {
try {
RBatch batchInstance = redisInstance.createBatch(BatchOptions.defaults());
RBatch batchInstance = redisInstance.createBatch();
//update secundary index
RLexSortedSetAsync secundaryIndex = batchInstance.getLexSortedSet(generateIndexKey());
secundaryIndex.removeAsync(key.toString());
Expand All @@ -338,14 +379,12 @@ public boolean delete(K key) throws GoraException {
batchInstance.execute();
return deleteAsync.get();
} else {
RFuture<Long> deleteByPatternAsync = batchInstance.getKeys().deleteByPatternAsync(generateKeyStringBase(key) + "*");
batchInstance.execute();
return deleteByPatternAsync.get() > 0;
return redisInstance.getKeys().deleteByPattern(generateKeyStringBase(key) + "*") > 0;
}
} catch (Exception ex) {
throw new GoraException(ex);
}

}

@Override
Expand All @@ -358,17 +397,37 @@ public long deleteByQuery(Query<K, T> query) throws GoraException {
*/
@Override
public Result<K, T> execute(Query<K, T> query) throws GoraException {
return null;
String[] fields = getFieldsToQuery(query.getFields());
RLexSortedSet index = redisInstance.getLexSortedSet(generateIndexKey());
Collection<String> range;
if (query.getStartKey() != null && query.getEndKey() != null) {
range = index.range(query.getStartKey().toString(), true, query.getEndKey().toString(), true, 0, (int) query.getLimit());
} else if (query.getStartKey() != null && query.getEndKey() == null) {
range = index.range(query.getStartKey().toString(), true, null, true, 0, (int) query.getLimit());
} else if (query.getStartKey() == null && query.getEndKey() != null) {
range = index.range(null, true, query.getEndKey().toString(), true, 0, (int) query.getLimit());
} else {
range = index.range(null, true, null, true, 0, (int) query.getLimit());
}
RedisResult<K, T> igniteResult = new RedisResult<>(this, query, redisInstance, range, fields);
return igniteResult;
}

@Override
public Query<K, T> newQuery() {
return null;
RedisQuery<K, T> query = new RedisQuery<>(this);
query.setFields(getFieldsToQuery(null));
return query;
}

@Override
public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws GoraException {
return null;
List<PartitionQuery<K, T>> partitions = new ArrayList<>();
PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>(
query);
partitionQuery.setConf(getConf());
partitions.add(partitionQuery);
return partitions;
}

@Override
Expand All @@ -389,4 +448,4 @@ public boolean exists(K key) throws GoraException {
return respKeys.hasNext();
}
}
}
}

0 comments on commit 4b57c24

Please sign in to comment.