Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Geode 2469 #404

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -215,6 +217,19 @@ public class GeodeRedisServer {
*/
public static final String STRING_REGION = "ReDiS_StRiNgS";

/**
* TThe field that defines the name of the {@link Region} which holds non-named hash. The current
* value of this field is {@value #HASH_REGION}.
*/
public static final String HASH_REGION = "ReDiS_HASH";

/**
* TThe field that defines the name of the {@link Region} which holds sets. The current value of
* this field is {@value #SET_REGION}.
*/
public static final String SET_REGION = "ReDiS_SET";


/**
* The field that defines the name of the {@link Region} which holds all of the HyperLogLogs. The
* current value of this field is {@value #HLL_REGION}.
Expand Down Expand Up @@ -409,7 +424,10 @@ private void initializeRedis() {
Region<ByteArrayWrapper, ByteArrayWrapper> stringsRegion;

Region<ByteArrayWrapper, HyperLogLogPlus> hLLRegion;
Region<ByteArrayWrapper, Map<ByteArrayWrapper, ByteArrayWrapper>> redisHash;
Region<String, RedisDataType> redisMetaData;
Region<ByteArrayWrapper, Set<ByteArrayWrapper>> redisSet;

GemFireCacheImpl gemFireCache = (GemFireCacheImpl) cache;
try {
if ((stringsRegion = cache.getRegion(STRING_REGION)) == null) {
Expand All @@ -422,6 +440,19 @@ private void initializeRedis() {
gemFireCache.createRegionFactory(this.DEFAULT_REGION_TYPE);
hLLRegion = regionFactory.create(HLL_REGION);
}

if ((redisHash = cache.getRegion(HASH_REGION)) == null) {
RegionFactory<ByteArrayWrapper, Map<ByteArrayWrapper, ByteArrayWrapper>> regionFactory =
gemFireCache.createRegionFactory(this.DEFAULT_REGION_TYPE);
redisHash = regionFactory.create(HASH_REGION);
}

if ((redisSet = cache.getRegion(SET_REGION)) == null) {
RegionFactory<ByteArrayWrapper, Set<ByteArrayWrapper>> regionFactory =
gemFireCache.createRegionFactory(this.DEFAULT_REGION_TYPE);
redisSet = regionFactory.create(SET_REGION);
}

if ((redisMetaData = cache.getRegion(REDIS_META_DATA_REGION)) == null) {
AttributesFactory af = new AttributesFactory();
af.addCacheListener(metaListener);
Expand All @@ -438,10 +469,13 @@ private void initializeRedis() {
throw assErr;
}
this.regionCache = new RegionProvider(stringsRegion, hLLRegion, redisMetaData,
expirationFutures, expirationExecutor, this.DEFAULT_REGION_TYPE);
expirationFutures, expirationExecutor, this.DEFAULT_REGION_TYPE, redisHash, redisSet);
redisMetaData.put(REDIS_META_DATA_REGION, RedisDataType.REDIS_PROTECTED);
redisMetaData.put(HLL_REGION, RedisDataType.REDIS_PROTECTED);
redisMetaData.put(STRING_REGION, RedisDataType.REDIS_PROTECTED);
redisMetaData.put(SET_REGION, RedisDataType.REDIS_PROTECTED);
redisMetaData.put(HASH_REGION, RedisDataType.REDIS_PROTECTED);

}
checkForRegions();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
*/
public class Coder {


/*
* Take no chances on char to byte conversions with default charsets on jvms, so we'll hard code
* the UTF-8 symbol values as bytes here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ public Command(List<byte[]> commandElems) {
this.response = null;

RedisCommandType type;

String commandName = null;
try {
byte[] charCommand = commandElems.get(0);
String commandName = Coder.bytesToString(charCommand).toUpperCase();
commandName = Coder.bytesToString(charCommand).toUpperCase();
type = RedisCommandType.valueOf(commandName);
} catch (Exception e) {
type = RedisCommandType.UNKNOWN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,14 @@ private void writeToChannel(ByteBuf message) {
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Command command = (Command) msg;
executeCommand(ctx, command);
try {
Command command = (Command) msg;
executeCommand(ctx, command);
} catch (Exception e) {
logger.error(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to be logging exceptions as they propagate, I'd like to think about the best place to catch all of them and also it would be nice to add a little error message so we know where we caught the exception.

throw e;
}

}

/**
Expand Down Expand Up @@ -198,6 +204,7 @@ private void executeCommand(ChannelHandlerContext ctx, Command command) throws E
else
executeWithoutTransaction(exec, command);


if (hasTransaction() && command.getCommandType() != RedisCommandType.MULTI) {
writeToChannel(
Coder.getSimpleStringResponse(this.byteBufAllocator, RedisConstants.COMMAND_QUEUED));
Expand Down Expand Up @@ -235,6 +242,8 @@ private void executeWithoutTransaction(final Executor exec, Command command) thr
exec.executeCommand(command, this);
return;
} catch (Exception e) {
logger.error(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see error logging comment above


cause = e;
if (e instanceof RegionDestroyedException || e instanceof RegionNotFoundException
|| e.getCause() instanceof QueryInvocationTargetException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.apache.geode.redis.internal.executor.ExpirationExecutor;
import org.apache.geode.redis.internal.executor.ListQuery;
import org.apache.geode.redis.internal.executor.SortedSetQuery;
import org.apache.geode.redis.internal.executor.hash.HashExecutor;
import org.apache.geode.redis.internal.executor.set.SetExecutor;
import org.apache.geode.internal.hll.HyperLogLogPlus;
import org.apache.geode.management.cli.Result;
import org.apache.geode.management.cli.Result.Status;
Expand Down Expand Up @@ -80,6 +82,9 @@ public class RegionProvider implements Closeable {
*/
private final Region<ByteArrayWrapper, HyperLogLogPlus> hLLRegion;

private final Region<ByteArrayWrapper, Map<ByteArrayWrapper, ByteArrayWrapper>> hashRegion;
private final Region<ByteArrayWrapper, Set<ByteArrayWrapper>> setRegion;

private final Cache cache;
private final QueryService queryService;
private final ConcurrentMap<ByteArrayWrapper, Map<Enum<?>, Query>> preparedQueries =
Expand All @@ -95,18 +100,38 @@ public RegionProvider(Region<ByteArrayWrapper, ByteArrayWrapper> stringsRegion,
Region<ByteArrayWrapper, HyperLogLogPlus> hLLRegion,
Region<String, RedisDataType> redisMetaRegion,
ConcurrentMap<ByteArrayWrapper, ScheduledFuture<?>> expirationsMap,
ScheduledExecutorService expirationExecutor, RegionShortcut defaultShortcut) {
ScheduledExecutorService expirationExecutor, RegionShortcut defaultShortcut,
Region<ByteArrayWrapper, Map<ByteArrayWrapper, ByteArrayWrapper>> hashRegion,
Region<ByteArrayWrapper, Set<ByteArrayWrapper>> setRegion) {

this(stringsRegion, hLLRegion, redisMetaRegion, expirationsMap, expirationExecutor,
defaultShortcut, hashRegion, setRegion, GemFireCacheImpl.getInstance());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 for making the cache an argument to the constructor (even if it's optional). Anything that makes things a little more decoupled.

}

public RegionProvider(Region<ByteArrayWrapper, ByteArrayWrapper> stringsRegion,
Region<ByteArrayWrapper, HyperLogLogPlus> hLLRegion,
Region<String, RedisDataType> redisMetaRegion,
ConcurrentMap<ByteArrayWrapper, ScheduledFuture<?>> expirationsMap,
ScheduledExecutorService expirationExecutor, RegionShortcut defaultShortcut,
Region<ByteArrayWrapper, Map<ByteArrayWrapper, ByteArrayWrapper>> hashRegion,
Region<ByteArrayWrapper, Set<ByteArrayWrapper>> setRegion, Cache cache) {
if (stringsRegion == null || hLLRegion == null || redisMetaRegion == null)
throw new NullPointerException();

this.hashRegion = hashRegion;
this.setRegion = setRegion;

this.regions = new ConcurrentHashMap<ByteArrayWrapper, Region<?, ?>>();
this.stringsRegion = stringsRegion;
this.hLLRegion = hLLRegion;
this.redisMetaRegion = redisMetaRegion;
this.cache = GemFireCacheImpl.getInstance();
this.cache = cache;

this.queryService = cache.getQueryService();
this.expirationsMap = expirationsMap;
this.expirationExecutor = expirationExecutor;
this.defaultRegionType = defaultShortcut;

this.locks = new ConcurrentHashMap<String, Lock>();
}

Expand Down Expand Up @@ -143,14 +168,19 @@ public RedisDataType metaGet(ByteArrayWrapper key) {
}

public Region<?, ?> getRegion(ByteArrayWrapper key) {
if (key == null)
return null;

return this.regions.get(key);

}

public void removeRegionReferenceLocally(ByteArrayWrapper key, RedisDataType type) {
Lock lock = this.locks.get(key.toString());
boolean locked = false;
try {
locked = lock.tryLock();
if (lock != null)
locked = lock.tryLock();
// If we cannot get the lock we ignore this remote event, this key has local event
// that started independently, ignore this event to prevent deadlock
if (locked) {
Expand All @@ -174,7 +204,7 @@ public boolean removeKey(ByteArrayWrapper key, RedisDataType type) {
}

public boolean removeKey(ByteArrayWrapper key, RedisDataType type, boolean cancelExpiration) {
if (type == null || type == RedisDataType.REDIS_PROTECTED)
if (type == RedisDataType.REDIS_PROTECTED)
return false;
Lock lock = this.locks.get(key.toString());
try {
Expand All @@ -187,8 +217,26 @@ public boolean removeKey(ByteArrayWrapper key, RedisDataType type, boolean cance
return this.stringsRegion.remove(key) != null;
} else if (type == RedisDataType.REDIS_HLL) {
return this.hLLRegion.remove(key) != null;
} else if (type == RedisDataType.REDIS_LIST) {
return this.destroyRegion(key, type);
} else {
return destroyRegion(key, type);


// Check hash
ByteArrayWrapper regionName = HashExecutor.toRegionNameByteArray(key);
Region<?, ?> region = this.getRegion(regionName);
if (region != null) {
region.remove(HashExecutor.toEntryKey(key));
}

// remove the set
region = this.getRegion(SetExecutor.SET_REGION_KEY);
if (region != null) {
region.remove(key);
}


return true;
}
} catch (Exception exc) {
return false;
Expand Down Expand Up @@ -259,6 +307,10 @@ public void createRemoteRegionReferenceLocally(ByteArrayWrapper key, RedisDataTy

private Region<?, ?> getOrCreateRegion0(ByteArrayWrapper key, RedisDataType type,
ExecutionHandlerContext context, boolean addToMeta) {

String regionName = key.toString();


checkDataType(key, type);
Region<?, ?> r = this.regions.get(key);
if (r != null && r.isDestroyed()) {
Expand Down Expand Up @@ -289,7 +341,9 @@ public void createRemoteRegionReferenceLocally(ByteArrayWrapper key, RedisDataTy
Exception concurrentCreateDestroyException = null;
do {
concurrentCreateDestroyException = null;
r = createRegionGlobally(stringKey);

r = createRegionGlobally(regionName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change stringKey to regionName, another variable with the same value?


try {
if (type == RedisDataType.REDIS_LIST) {
doInitializeList(key, r);
Expand Down Expand Up @@ -455,6 +509,20 @@ public Region<ByteArrayWrapper, ByteArrayWrapper> getStringsRegion() {
return this.stringsRegion;
}

/**
* @return the hashRegion
*/
public Region<ByteArrayWrapper, Map<ByteArrayWrapper, ByteArrayWrapper>> getHashRegion() {
return hashRegion;
}

/**
* @return the setRegion
*/
public Region<ByteArrayWrapper, Set<ByteArrayWrapper>> getSetRegion() {
return setRegion;
}

public Region<ByteArrayWrapper, HyperLogLogPlus> gethLLRegion() {
return this.hLLRegion;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ protected Query getQuery(ByteArrayWrapper key, Enum<?> type, ExecutionHandlerCon

protected boolean removeEntry(ByteArrayWrapper key, RedisDataType type,
ExecutionHandlerContext context) {
if (type == null || type == RedisDataType.REDIS_PROTECTED)
return false;

RegionProvider rC = context.getRegionProvider();
return rC.removeKey(key, type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
package org.apache.geode.redis.internal.executor;

import java.util.Collection;

import org.apache.geode.redis.internal.Coder;
import org.apache.geode.redis.internal.Command;
import org.apache.geode.redis.internal.ExecutionHandlerContext;
Expand All @@ -23,8 +25,22 @@ public class UnkownExecutor extends AbstractExecutor {

@Override
public void executeCommand(Command command, ExecutionHandlerContext context) {
command.setResponse(
Coder.getErrorResponse(context.getByteBufAllocator(), RedisConstants.ERROR_UNKOWN_COMMAND));
}

StringBuilder commandProcessedText = new StringBuilder();

Collection<byte[]> processedCmds = command.getProcessedCommand();

if (processedCmds != null && !processedCmds.isEmpty()) {

for (byte[] bytes : processedCmds) {
if (bytes == null || bytes.length == 0)
continue; // skip blanks

commandProcessedText.append(Coder.bytesToString(bytes)).append(" ");
}
}

command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
RedisConstants.ERROR_UNKOWN_COMMAND + " " + commandProcessedText));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
package org.apache.geode.redis.internal.executor.hash;

import java.util.List;

import org.apache.geode.cache.Region;
import java.util.Map;
import org.apache.geode.redis.internal.ByteArrayWrapper;
import org.apache.geode.redis.internal.Command;
import org.apache.geode.redis.internal.ExecutionHandlerContext;
import org.apache.geode.redis.internal.RedisDataType;
import org.apache.geode.redis.internal.Coder;
import org.apache.geode.redis.internal.RedisConstants.ArityDef;

Expand All @@ -40,25 +38,22 @@ public void executeCommand(Command command, ExecutionHandlerContext context) {
int numDeleted = 0;

ByteArrayWrapper key = command.getKey();
Map<ByteArrayWrapper, ByteArrayWrapper> map = getMap(context, key);

checkDataType(key, RedisDataType.REDIS_HASH, context);
Region<ByteArrayWrapper, ByteArrayWrapper> keyRegion = getRegion(context, key);

if (keyRegion == null) {
if (map == null || map.isEmpty()) {
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), numDeleted));
return;
}


for (int i = START_FIELDS_INDEX; i < commandElems.size(); i++) {
ByteArrayWrapper field = new ByteArrayWrapper(commandElems.get(i));
Object oldValue = keyRegion.remove(field);
Object oldValue = map.remove(field);
if (oldValue != null)
numDeleted++;
}
if (keyRegion.isEmpty()) {
context.getRegionProvider().removeKey(key, RedisDataType.REDIS_HASH);
}
// save map
saveMap(map, context, key);

command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), numDeleted));
}

Expand Down