Skip to content

Commit

Permalink
Implement update features
Browse files Browse the repository at this point in the history
Improve maps serialization
Implement deleteByQuery
Implement deleteSchema
  • Loading branch information
cuent committed Jul 21, 2019
1 parent 943ee63 commit 06c394a
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,9 @@ public String getSchemaName() {
public void createSchema() throws GoraException {
}

/**
* Redis, being a schemaless database does not support explicit schema
* creation. When the records are added to the database, the schema is created
* on the fly. Thus, schema operations are unavailable in gora-redis module.
*/
@Override
public void deleteSchema() throws GoraException {
redisInstance.getKeys().deleteByPattern(mapping.getPrefix() + ".*");
}

/**
Expand Down Expand Up @@ -341,29 +337,26 @@ public void put(K key, T obj) throws GoraException {
switch (redisType) {
case "String":
RBucketAsync<Object> bucket = batchInstance.getBucket(generateKeyString(redisField, key));
bucket.deleteAsync();
if (fieldValue != null) {
fieldValue = handler.serializeFieldValue(field.schema(), fieldValue);
bucket.setAsync(fieldValue);
} else {
bucket.deleteAsync();
}
break;
case "List":
RListAsync<Object> rlist = batchInstance.getList(generateKeyString(redisField, key));
rlist.deleteAsync();
if (fieldValue != null) {
List<Object> list = handler.serializeFieldList(field.schema(), fieldValue);
rlist.addAllAsync(list);
} else {
rlist.deleteAsync();
}
break;
case "Hash":
RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyString(redisField, key));
map.deleteAsync();
if (fieldValue != null) {
Map<Object, Object> mp = handler.serializeFieldMap(field.schema(), fieldValue);
map.putAllAsync(mp);
} else {
map.deleteAsync();
}
break;
}
Expand Down Expand Up @@ -402,26 +395,64 @@ public boolean delete(K key) throws GoraException {

@Override
public long deleteByQuery(Query<K, T> query) throws GoraException {
return 0;
Collection<String> range = runQuery(query);
RBatch batchInstance = redisInstance.createBatch();
RLexSortedSetAsync secundaryIndex = batchInstance.getLexSortedSet(generateIndexKey());
if (query.getFields() != null && query.getFields().length < mapping.getFields().size()) {
List<String> dbFields = new ArrayList<>();
List<String> dbTypes = new ArrayList<>();
for (String af : query.getFields()) {
dbFields.add(mapping.getFields().get(af));
dbTypes.add(mapping.getTypes().get(af));
}
for (String key : range) {
if (mode == StorageMode.HASH) {
RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyHash(key));
for (String field : dbFields) {
map.removeAsync(field);
}
} else {
for (int i = 0; i < dbFields.size(); i++) {
String field = dbFields.get(i);
String type = dbTypes.get(i);
switch (type) {
case "String":
RBucketAsync<Object> bucket = batchInstance.getBucket(generateKeyString(field, key));
bucket.deleteAsync();
break;
case "List":
RListAsync<Object> rlist = batchInstance.getList(generateKeyString(field, key));
rlist.deleteAsync();
break;
case "Hash":
RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyString(field, key));
map.deleteAsync();
break;
}
}
}
}
} else {
for (String key : range) {
secundaryIndex.removeAsync(key);
if (mode == StorageMode.HASH) {
RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyHash(key));
map.deleteAsync();
} else {
redisInstance.getKeys().deleteByPattern(generateKeyStringBase(key) + "*");
}
}
}
batchInstance.execute();
return range.size();
}

/**
* Execute the query and return the result.
*/
@Override
public Result<K, T> execute(Query<K, T> query) throws GoraException {
RLexSortedSet index = redisInstance.getLexSortedSet(generateIndexKey());
Collection<String> range;
int limit = query.getLimit() > -1 ? (int) query.getLimit() : Integer.MAX_VALUE;
if (query.getStartKey() != null && query.getEndKey() != null) {
range = index.range(query.getStartKey().toString(), true, query.getEndKey().toString(), true, 0, limit);
} else if (query.getStartKey() != null && query.getEndKey() == null) {
range = index.rangeTail(query.getStartKey().toString(), true, 0, limit);
} else if (query.getStartKey() == null && query.getEndKey() != null) {
range = index.rangeHead(query.getEndKey().toString(), true, 0, limit);
} else {
range = index.stream().limit(limit).collect(Collectors.toList());
}
Collection<String> range = runQuery(query);
RedisResult<K, T> redisResult = new RedisResult<>(this, query, range);
return redisResult;
}
Expand Down Expand Up @@ -461,4 +492,20 @@ public boolean exists(K key) throws GoraException {
return respKeys.hasNext();
}
}

private Collection<String> runQuery(Query<K, T> query) {
RLexSortedSet index = redisInstance.getLexSortedSet(generateIndexKey());
Collection<String> range;
int limit = query.getLimit() > -1 ? (int) query.getLimit() : Integer.MAX_VALUE;
if (query.getStartKey() != null && query.getEndKey() != null) {
range = index.range(query.getStartKey().toString(), true, query.getEndKey().toString(), true, 0, limit);
} else if (query.getStartKey() != null && query.getEndKey() == null) {
range = index.rangeTail(query.getStartKey().toString(), true, 0, limit);
} else if (query.getStartKey() == null && query.getEndKey() != null) {
range = index.rangeHead(query.getEndKey().toString(), true, 0, limit);
} else {
range = index.stream().limit(limit).collect(Collectors.toList());
}
return range;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ public Object serializeFieldValue(Schema fieldSchema, Object fieldValue) {
public Map<Object, Object> serializeFieldMap(Schema fieldSchema, Object fieldValue) {
Map<Object, Object> map = new HashMap();
switch (fieldSchema.getType()) {
case UNION:
for (Schema sc : fieldSchema.getTypes()) {
if (sc.getType() == Schema.Type.MAP) {
map = serializeFieldMap(sc, fieldValue);
}
}
break;
case MAP:
Map<CharSequence, ?> mp = (Map<CharSequence, ?>) fieldValue;
for (Entry<CharSequence, ?> e : mp.entrySet()) {
Expand Down Expand Up @@ -145,15 +152,15 @@ public List<Object> serializeFieldList(Schema fieldSchema, Object fieldValue) {

@SuppressWarnings("unchecked")
public Object deserializeFieldValue(Schema.Field field, Schema fieldSchema,
Object redisValue, T persistent) throws IOException {
Object redisValue, T persistent) throws IOException {
Object fieldValue = null;
switch (fieldSchema.getType()) {
case MAP:
case ARRAY:
case RECORD:
@SuppressWarnings("rawtypes") SpecificDatumReader reader = getDatumReader(fieldSchema);
fieldValue = IOUtils.deserialize((byte[]) redisValue, reader,
persistent.get(field.pos()));
persistent.get(field.pos()));
break;
case ENUM:
fieldValue = AvroUtils.getEnumValue(fieldSchema, redisValue.toString());
Expand All @@ -174,7 +181,7 @@ public Object deserializeFieldValue(Schema.Field field, Schema fieldSchema,
} else {
reader = getDatumReader(fieldSchema);
fieldValue = IOUtils.deserialize((byte[]) redisValue, reader,
persistent.get(field.pos()));
persistent.get(field.pos()));
}
break;
default:
Expand All @@ -185,9 +192,16 @@ public Object deserializeFieldValue(Schema.Field field, Schema fieldSchema,

@SuppressWarnings("unchecked")
public Object deserializeFieldMap(Schema.Field field, Schema fieldSchema,
RMap<Object, Object> redisMap, T persistent) throws IOException {
RMap<Object, Object> redisMap, T persistent) throws IOException {
Map<Utf8, Object> fieldValue = new HashMap<>();
switch (fieldSchema.getType()) {
case UNION:
for (Schema sc : fieldSchema.getTypes()) {
if (sc.getType() == Schema.Type.MAP) {
return deserializeFieldMap(field, sc, redisMap, persistent);
}
}
break;
case MAP:
for (Entry<Object, Object> aEntry : redisMap.entrySet()) {
String key = aEntry.getKey().toString();
Expand All @@ -203,7 +217,7 @@ public Object deserializeFieldMap(Schema.Field field, Schema fieldSchema,

@SuppressWarnings("unchecked")
public Object deserializeFieldList(Schema.Field field, Schema fieldSchema,
RList<Object> redisList, T persistent) throws IOException {
RList<Object> redisList, T persistent) throws IOException {
List<Object> fieldValue = new ArrayList<>();
switch (fieldSchema.getType()) {
case ARRAY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,6 @@ public void testPutMap() throws Exception {
public void testPutMixedMaps() throws Exception {
}

@Test
@Ignore
@Override
public void testUpdate() throws Exception {
}

@Test
@Ignore
@Override
public void testEmptyUpdate() throws Exception {
}

@Test
@Ignore
@Override
Expand Down Expand Up @@ -145,24 +133,6 @@ public void testGetWebPageDefaultFields() throws Exception {
public void testGetNonExisting() throws Exception {
}

@Test
@Ignore
@Override
public void testQueryStartKey() throws Exception {
}

@Test
@Ignore
@Override
public void testQueryEndKey() throws Exception {
}

@Test
@Ignore
@Override
public void testQueryKeyRange() throws Exception {
}

@Test
@Ignore
@Override
Expand All @@ -181,22 +151,10 @@ public void testQueryWebPageSingleKeyDefaultFields() throws Exception {
public void testQueryWebPageQueryEmptyResults() throws Exception {
}

@Test
@Ignore
@Override
public void testDeleteByQuery() throws Exception {
}

@Test
@Ignore
@Override
public void testDeleteByQueryFields() throws Exception {
}

@Test
@Ignore
@Override
public void testGetPartitions() throws Exception {
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,6 @@ public void testPutMap() throws Exception {
public void testPutMixedMaps() throws Exception {
}

@Test
@Ignore
@Override
public void testUpdate() throws Exception {
}

@Test
@Ignore
@Override
public void testEmptyUpdate() throws Exception {
}

@Test
@Ignore
@Override
Expand Down Expand Up @@ -145,23 +133,6 @@ public void testGetWebPageDefaultFields() throws Exception {
public void testGetNonExisting() throws Exception {
}

@Test
@Ignore
@Override
public void testQueryStartKey() throws Exception {
}

@Test
@Ignore
@Override
public void testQueryEndKey() throws Exception {
}

@Test
@Ignore
@Override
public void testQueryKeyRange() throws Exception {
}

@Test
@Ignore
Expand All @@ -181,22 +152,11 @@ public void testQueryWebPageSingleKeyDefaultFields() throws Exception {
public void testQueryWebPageQueryEmptyResults() throws Exception {
}

@Test
@Ignore
@Override
public void testDeleteByQuery() throws Exception {
}

@Test
@Ignore
@Override
public void testDeleteByQueryFields() throws Exception {
}

@Test
@Ignore
@Override
public void testGetPartitions() throws Exception {
}

}
}

0 comments on commit 06c394a

Please sign in to comment.