Skip to content

Commit

Permalink
Enable all tests for the DataStore
Browse files Browse the repository at this point in the history
Fix suggestions of Carlos
Enable map reduce tests
  • Loading branch information
cuent committed Jul 29, 2019
1 parent 06c394a commit 82bd99a
Show file tree
Hide file tree
Showing 11 changed files with 172 additions and 370 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class RedisMapping {
private int datebase;
private String prefix;
private Map<String, String> fields;
private Map<String, String> types;
private Map<String, RedisType> types;

public int getDatebase() {
return datebase;
Expand All @@ -52,11 +52,11 @@ public void setFields(Map<String, String> fields) {
this.fields = fields;
}

public Map<String, String> getTypes() {
public Map<String, RedisType> getTypes() {
return types;
}

public void setTypes(Map<String, String> types) {
public void setTypes(Map<String, RedisType> types) {
this.types = types;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@
*/
public class RedisStore<K, T extends PersistentBase> extends DataStoreBase<K, T> {

//Redis constants
private static final String FIELD_SEPARATOR = ".";
private static final String WILDCARD = "*";
private static final String INDEX = "index";
private static final String START_TAG = "{";
private static final String END_TAG = "}";
private static final String PREFIX = "redis://";

protected static final String MOCK_PROPERTY = "redis.mock";
protected static final String INSTANCE_NAME_PROPERTY = "redis.instance";
protected static final String ZOOKEEPERS_NAME_PROPERTY = "redis.zookeepers";
Expand Down Expand Up @@ -121,7 +129,7 @@ public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties p
//Override address in tests
String[] hosts = getConf().get("gora.datastore.redis.address", properties.getProperty("gora.datastore.redis.address")).split(",");
for (int i = 0; i < hosts.length; i++) {
hosts[i] = "redis://" + hosts[i];
hosts[i] = PREFIX + hosts[i];
}
switch (mode) {
case SINGLE:
Expand Down Expand Up @@ -169,14 +177,14 @@ protected RedisMapping readMapping(InputStream inputStream) throws IOException {
mapping.setPrefix(classElement.getAttribute("prefix"));
NodeList elementsByTagName = classElement.getElementsByTagName("field");
Map<String, String> mapFields = new HashMap<>();
Map<String, String> mapTypes = new HashMap<>();
Map<String, RedisType> mapTypes = new HashMap<>();
for (int j = 0; j < elementsByTagName.getLength(); j++) {
Element item = (Element) elementsByTagName.item(j);
String name = item.getAttribute("name");
String column = item.getAttribute("column");
String type = item.getAttribute("type");
mapFields.put(name, column);
mapTypes.put(name, type);
mapTypes.put(name, RedisType.valueOf(type));
}
mapping.setTypes(mapTypes);
mapping.setFields(mapFields);
Expand Down Expand Up @@ -211,7 +219,7 @@ public void createSchema() throws GoraException {

@Override
public void deleteSchema() throws GoraException {
redisInstance.getKeys().deleteByPattern(mapping.getPrefix() + ".*");
redisInstance.getKeys().deleteByPattern(mapping.getPrefix() + FIELD_SEPARATOR + WILDCARD);
}

/**
Expand All @@ -227,19 +235,19 @@ public boolean schemaExists() throws GoraException {
}

private String generateKeyHash(Object baseKey) {
return mapping.getPrefix() + ".{" + baseKey + "}";
return mapping.getPrefix() + FIELD_SEPARATOR + START_TAG + baseKey + END_TAG;
}

private String generateKeyString(String field, Object baseKey) {
return generateKeyStringBase(baseKey) + field;
}

private String generateKeyStringBase(Object baseKey) {
return mapping.getPrefix() + ".{" + baseKey + "}" + ".";
return mapping.getPrefix() + FIELD_SEPARATOR + START_TAG + baseKey + END_TAG + FIELD_SEPARATOR;
}

private String generateIndexKey() {
return mapping.getPrefix() + ".index";
return mapping.getPrefix() + FIELD_SEPARATOR + INDEX;
}

public T newInstanceFromString(K key, String[] fields) throws GoraException, IOException {
Expand All @@ -249,18 +257,18 @@ public T newInstanceFromString(K key, String[] fields) throws GoraException, IOE
for (String f : fields) {
Schema.Field field = fieldMap.get(f);
String redisField = mapping.getFields().get(field.name());
String redisType = mapping.getTypes().get(field.name());
RedisType redisType = mapping.getTypes().get(field.name());
Object redisVal = null;
switch (redisType) {
case "String":
case STRING:
RBucket<Object> bucket = redisInstance.getBucket(generateKeyString(redisField, key));
redisVal = bucket.isExists() ? handler.deserializeFieldValue(field, field.schema(), bucket.get(), persistent) : null;
break;
case "List":
case LIST:
RList<Object> list = redisInstance.getList(generateKeyString(redisField, key));
redisVal = list.isExists() ? handler.deserializeFieldList(field, field.schema(), list, persistent) : null;
break;
case "Hash":
case HASH:
RMap<Object, Object> map = redisInstance.getMap(generateKeyString(redisField, key));
redisVal = map.isExists() ? handler.deserializeFieldMap(field, field.schema(), map, persistent) : null;
break;
Expand Down Expand Up @@ -294,7 +302,7 @@ public T newInstanceFromHash(RMap<String, Object> map, String[] fields) throws G
@Override
public T get(K key, String[] fields) throws GoraException {
try {
if (mode == StorageMode.HASH) {
if (mode == StorageMode.SINGLEKEY) {
RMap<String, Object> map = redisInstance.getMap(generateKeyHash(key));
if (!map.isEmpty()) {
return newInstanceFromHash(map, fields);
Expand All @@ -319,7 +327,7 @@ public void put(K key, T obj) throws GoraException {
//update secundary index
RLexSortedSetAsync secundaryIndex = batchInstance.getLexSortedSet(generateIndexKey());
secundaryIndex.addAsync(key.toString());
if (mode == StorageMode.HASH) {
if (mode == StorageMode.SINGLEKEY) {
RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyHash(key));
for (Schema.Field field : fields) {
Object fieldValue = handler.serializeFieldValue(field.schema(), obj.get(field.pos()));
Expand All @@ -333,25 +341,25 @@ public void put(K key, T obj) throws GoraException {
for (Schema.Field field : fields) {
Object fieldValue = obj.get(field.pos());
String redisField = mapping.getFields().get(field.name());
String redisType = mapping.getTypes().get(field.name());
RedisType redisType = mapping.getTypes().get(field.name());
switch (redisType) {
case "String":
case STRING:
RBucketAsync<Object> bucket = batchInstance.getBucket(generateKeyString(redisField, key));
bucket.deleteAsync();
if (fieldValue != null) {
fieldValue = handler.serializeFieldValue(field.schema(), fieldValue);
bucket.setAsync(fieldValue);
}
break;
case "List":
case LIST:
RListAsync<Object> rlist = batchInstance.getList(generateKeyString(redisField, key));
rlist.deleteAsync();
if (fieldValue != null) {
List<Object> list = handler.serializeFieldList(field.schema(), fieldValue);
rlist.addAllAsync(list);
}
break;
case "Hash":
case HASH:
RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyString(redisField, key));
map.deleteAsync();
if (fieldValue != null) {
Expand Down Expand Up @@ -379,14 +387,14 @@ public boolean delete(K key) throws GoraException {
//update secundary index
RLexSortedSetAsync secundaryIndex = batchInstance.getLexSortedSet(generateIndexKey());
secundaryIndex.removeAsync(key.toString());
if (mode == StorageMode.HASH) {
if (mode == StorageMode.SINGLEKEY) {
RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyHash(key));
RFuture<Boolean> deleteAsync = map.deleteAsync();
batchInstance.execute();
return deleteAsync.get();
} else {
batchInstance.execute();
return redisInstance.getKeys().deleteByPattern(generateKeyStringBase(key) + "*") > 0;
return redisInstance.getKeys().deleteByPattern(generateKeyStringBase(key) + WILDCARD) > 0;
}
} catch (Exception ex) {
throw new GoraException(ex);
Expand All @@ -400,31 +408,31 @@ public long deleteByQuery(Query<K, T> query) throws GoraException {
RLexSortedSetAsync secundaryIndex = batchInstance.getLexSortedSet(generateIndexKey());
if (query.getFields() != null && query.getFields().length < mapping.getFields().size()) {
List<String> dbFields = new ArrayList<>();
List<String> dbTypes = new ArrayList<>();
List<RedisType> dbTypes = new ArrayList<>();
for (String af : query.getFields()) {
dbFields.add(mapping.getFields().get(af));
dbTypes.add(mapping.getTypes().get(af));
}
for (String key : range) {
if (mode == StorageMode.HASH) {
if (mode == StorageMode.SINGLEKEY) {
RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyHash(key));
for (String field : dbFields) {
map.removeAsync(field);
}
} else {
for (int i = 0; i < dbFields.size(); i++) {
String field = dbFields.get(i);
String type = dbTypes.get(i);
RedisType type = dbTypes.get(i);
switch (type) {
case "String":
case STRING:
RBucketAsync<Object> bucket = batchInstance.getBucket(generateKeyString(field, key));
bucket.deleteAsync();
break;
case "List":
case LIST:
RListAsync<Object> rlist = batchInstance.getList(generateKeyString(field, key));
rlist.deleteAsync();
break;
case "Hash":
case HASH:
RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyString(field, key));
map.deleteAsync();
break;
Expand All @@ -435,11 +443,11 @@ public long deleteByQuery(Query<K, T> query) throws GoraException {
} else {
for (String key : range) {
secundaryIndex.removeAsync(key);
if (mode == StorageMode.HASH) {
if (mode == StorageMode.SINGLEKEY) {
RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyHash(key));
map.deleteAsync();
} else {
redisInstance.getKeys().deleteByPattern(generateKeyStringBase(key) + "*");
redisInstance.getKeys().deleteByPattern(generateKeyStringBase(key) + WILDCARD);
}
}
}
Expand Down Expand Up @@ -485,10 +493,10 @@ public void close() {

@Override
public boolean exists(K key) throws GoraException {
if (mode == StorageMode.HASH) {
if (mode == StorageMode.SINGLEKEY) {
return redisInstance.getKeys().countExists(generateKeyHash(key)) != 0;
} else {
Iterator<String> respKeys = redisInstance.getKeys().getKeysByPattern(generateKeyStringBase(key) + "*", 1).iterator();
Iterator<String> respKeys = redisInstance.getKeys().getKeysByPattern(generateKeyStringBase(key) + WILDCARD, 1).iterator();
return respKeys.hasNext();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gora.redis.store;

/**
* Supported data types for Redis.
*/
public enum RedisType {
STRING, LIST, HASH
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
package org.apache.gora.redis.util;

public enum StorageMode {
HASH, STRING
}
SINGLEKEY, MULTIKEY
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gora.redis.mapreduce;

import java.io.IOException;
import org.apache.gora.examples.generated.WebPage;
import org.apache.gora.mapreduce.DataStoreMapReduceTestBase;
import org.apache.gora.redis.GoraRedisTestDriver;
import org.apache.gora.redis.util.ServerMode;
import org.apache.gora.redis.util.StorageMode;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.junit.After;
import org.junit.Before;

/**
* Executes tests for MR jobs over Redis dataStore.
*/
public class RedisStoreMapReduceTest extends DataStoreMapReduceTestBase{

private GoraRedisTestDriver driver;

public RedisStoreMapReduceTest() throws IOException {
super();
driver = new GoraRedisTestDriver(StorageMode.SINGLEKEY, ServerMode.SINGLE);
}

@Override
@Before
public void setUp() throws Exception {
driver.setUpClass();
super.setUp();
}

@Override
@After
public void tearDown() throws Exception {
super.tearDown();
driver.tearDownClass();
}

@Override
protected DataStore<String, WebPage> createWebPageDataStore() throws IOException {
try {
return DataStoreFactory.getDataStore(String.class, WebPage.class, driver.getConfiguration());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gora.redis.mapreduce;

0 comments on commit 82bd99a

Please sign in to comment.