Skip to content

Commit

Permalink
Merge branch 'GEODE-2469' of https://github.com/ggreen/geode into GEO…
Browse files Browse the repository at this point in the history
…DE-2469
  • Loading branch information
Gregory Green authored and Gregory Green committed Mar 18, 2017
2 parents 6412412 + 3f8c590 commit d3b2642
Show file tree
Hide file tree
Showing 17 changed files with 268 additions and 156 deletions.
@@ -0,0 +1,22 @@
package org.apache.geode.redis.internal;

import org.apache.geode.cache.Region;

import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;

/**
* Created by gosullivan on 3/10/17.
*/
public class AutoCloseableLock implements AutoCloseable {
Runnable unlock;

public AutoCloseableLock(Runnable unlockFunction) {
unlock = unlockFunction;
}

@Override
public void close() {
this.unlock.run();
}
}
Expand Up @@ -37,6 +37,9 @@
import org.apache.geode.cache.UnsupportedOperationInTransactionException;
import org.apache.geode.cache.query.QueryInvocationTargetException;
import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.redis.internal.executor.transactions.TransactionExecutor;
import org.apache.geode.redis.GeodeRedisServer;

Expand All @@ -57,7 +60,8 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
private static final int WAIT_REGION_DSTRYD_MILLIS = 100;
private static final int MAXIMUM_NUM_RETRIES = (1000 * 60) / WAIT_REGION_DSTRYD_MILLIS; // 60
// seconds
// total
private DistributedLockService hashLockService;
private DistributedLockService setLockService;

private final Cache cache;
private final GeodeRedisServer server;
Expand Down Expand Up @@ -115,6 +119,26 @@ public void run() {
this.regionProvider = regionProvider;
this.authPwd = pwd;
this.isAuthenticated = pwd != null ? false : true;

this.setLockService = getOrCreateDLockService(cache, "__redis_set");
this.hashLockService = getOrCreateDLockService(cache, "__redis_hash");
}

public DistributedLockService getHashLockService() {
return this.hashLockService;
}

public DistributedLockService getSetLockService() {
return this.setLockService;
}

private synchronized DistributedLockService getOrCreateDLockService(Cache cache, String name) {
DistributedSystem distributedSystem = cache.getDistributedSystem();
DistributedLockService service = DistributedLockService.getServiceNamed(name);
if (service == null) {
service = DistributedLockService.create("__redis_set", cache.getDistributedSystem());
}
return service;
}

private void flushChannel() {
Expand Down
Expand Up @@ -16,6 +16,8 @@

import java.util.List;
import java.util.Map;

import org.apache.geode.redis.internal.AutoCloseableLock;
import org.apache.geode.redis.internal.ByteArrayWrapper;
import org.apache.geode.redis.internal.Command;
import org.apache.geode.redis.internal.ExecutionHandlerContext;
Expand Down Expand Up @@ -56,21 +58,23 @@ public void executeCommand(Command command, ExecutionHandlerContext context) {

ByteArrayWrapper key = command.getKey();

Map<ByteArrayWrapper, ByteArrayWrapper> map = getMap(context, key);
try (AutoCloseableLock regionLock = withRegionLock(context, key)) {
Map<ByteArrayWrapper, ByteArrayWrapper> map = getMap(context, key);

if (map == null || map.isEmpty()) {
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), numDeleted));
return;
}
if (map == null || map.isEmpty()) {
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), numDeleted));
return;
}

for (int i = START_FIELDS_INDEX; i < commandElems.size(); i++) {
ByteArrayWrapper field = new ByteArrayWrapper(commandElems.get(i));
Object oldValue = map.remove(field);
if (oldValue != null)
numDeleted++;
for (int i = START_FIELDS_INDEX; i < commandElems.size(); i++) {
ByteArrayWrapper field = new ByteArrayWrapper(commandElems.get(i));
Object oldValue = map.remove(field);
if (oldValue != null)
numDeleted++;
}
// save map
saveMap(map, context, key);
}
// save map
saveMap(map, context, key);

command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), numDeleted));
}
Expand Down
Expand Up @@ -16,6 +16,8 @@

import java.util.List;
import java.util.Map;

import org.apache.geode.redis.internal.AutoCloseableLock;
import org.apache.geode.redis.internal.ByteArrayWrapper;
import org.apache.geode.redis.internal.Command;
import org.apache.geode.redis.internal.ExecutionHandlerContext;
Expand Down Expand Up @@ -54,18 +56,19 @@ public void executeCommand(Command command, ExecutionHandlerContext context) {
return;
}


Map<ByteArrayWrapper, ByteArrayWrapper> map = getMap(context, command.getKey());

if (map == null || map.isEmpty()) {
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), NOT_EXISTS));
return;
}

boolean hasField;
byte[] byteField = commandElems.get(FIELD_INDEX);
ByteArrayWrapper field = new ByteArrayWrapper(byteField);
ByteArrayWrapper key = command.getKey();
try (AutoCloseableLock regionLock = withRegionLock(context, key)) {
Map<ByteArrayWrapper, ByteArrayWrapper> map = getMap(context, key);

boolean hasField = map.containsKey(field);
if (map == null || map.isEmpty()) {
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), NOT_EXISTS));
return;
}
hasField = map.containsKey(field);
}

if (hasField)
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), EXISTS));
Expand Down
Expand Up @@ -19,6 +19,8 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.geode.redis.internal.AutoCloseableLock;
import org.apache.geode.redis.internal.ByteArrayWrapper;
import org.apache.geode.redis.internal.Coder;
import org.apache.geode.redis.internal.Command;
Expand Down Expand Up @@ -54,24 +56,27 @@ public void executeCommand(Command command, ExecutionHandlerContext context) {
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ArityDef.HGETALL));
return;
}
Collection<Entry<ByteArrayWrapper, ByteArrayWrapper>> entries;
ByteArrayWrapper key = command.getKey();
try (AutoCloseableLock regionLock = withRegionLock(context, key)) {
Map<ByteArrayWrapper, ByteArrayWrapper> results = getMap(context, key);

Map<ByteArrayWrapper, ByteArrayWrapper> results = getMap(context, command.getKey());
if (results == null || results.isEmpty()) {
command.setResponse(Coder.getEmptyArrayResponse(context.getByteBufAllocator()));
return;
}

if (results == null || results.isEmpty()) {
command.setResponse(Coder.getEmptyArrayResponse(context.getByteBufAllocator()));
return;
}
entries = results.entrySet();

Collection<Entry<ByteArrayWrapper, ByteArrayWrapper>> entries = results.entrySet();
if (entries == null || entries.isEmpty()) {
command.setResponse(Coder.getEmptyArrayResponse(context.getByteBufAllocator()));
return;
}

if (entries == null || entries.isEmpty()) {
command.setResponse(Coder.getEmptyArrayResponse(context.getByteBufAllocator()));
return;
// create a copy
entries = new ArrayList<Entry<ByteArrayWrapper, ByteArrayWrapper>>(entries);
}

// create a copy
entries = new ArrayList<Entry<ByteArrayWrapper, ByteArrayWrapper>>(entries);

command.setResponse(Coder.getKeyValArrayResponse(context.getByteBufAllocator(), entries));
}

Expand Down
Expand Up @@ -16,6 +16,8 @@

import java.util.List;
import java.util.Map;

import org.apache.geode.redis.internal.AutoCloseableLock;
import org.apache.geode.redis.internal.ByteArrayWrapper;
import org.apache.geode.redis.internal.Command;
import org.apache.geode.redis.internal.ExecutionHandlerContext;
Expand Down Expand Up @@ -52,21 +54,22 @@ public void executeCommand(Command command, ExecutionHandlerContext context) {

ByteArrayWrapper key = command.getKey();

Map<ByteArrayWrapper, ByteArrayWrapper> entry = getMap(context, key);
try (AutoCloseableLock regionLock = withRegionLock(context, key)) {
Map<ByteArrayWrapper, ByteArrayWrapper> entry = getMap(context, key);

if (entry == null) {
command.setResponse(Coder.getNilResponse(context.getByteBufAllocator()));
return;
}
if (entry == null) {
command.setResponse(Coder.getNilResponse(context.getByteBufAllocator()));
return;
}

ByteArrayWrapper valueWrapper = entry.get(field);

if (valueWrapper != null) {
command.setResponse(
Coder.getBulkStringResponse(context.getByteBufAllocator(), valueWrapper.toBytes()));
} else
command.setResponse(Coder.getNilResponse(context.getByteBufAllocator()));
ByteArrayWrapper valueWrapper = entry.get(field);

if (valueWrapper != null) {
command.setResponse(
Coder.getBulkStringResponse(context.getByteBufAllocator(), valueWrapper.toBytes()));
} else
command.setResponse(Coder.getNilResponse(context.getByteBufAllocator()));
}
}

}
Expand Up @@ -77,6 +77,7 @@ public void executeCommand(Command command, ExecutionHandlerContext context) {

ByteArrayWrapper key = command.getKey();


Map<ByteArrayWrapper, ByteArrayWrapper> map = getMap(context, key);

byte[] byteField = commandElems.get(FIELD_INDEX);
Expand Down
Expand Up @@ -18,6 +18,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.geode.redis.internal.AutoCloseableLock;
import org.apache.geode.redis.internal.ByteArrayWrapper;
import org.apache.geode.redis.internal.Coder;
import org.apache.geode.redis.internal.Command;
Expand Down Expand Up @@ -54,16 +56,18 @@ public void executeCommand(Command command, ExecutionHandlerContext context) {
}

ByteArrayWrapper key = command.getKey();
Set<ByteArrayWrapper> keys;
try (AutoCloseableLock regionLock = withRegionLock(context, key)) {
Map<ByteArrayWrapper, ByteArrayWrapper> keyMap = getMap(context, key);

Map<ByteArrayWrapper, ByteArrayWrapper> keyMap = getMap(context, key);

if (keyMap == null || keyMap.isEmpty()) {
command.setResponse(Coder.getEmptyArrayResponse(context.getByteBufAllocator()));
return;
}
if (keyMap == null || keyMap.isEmpty()) {
command.setResponse(Coder.getEmptyArrayResponse(context.getByteBufAllocator()));
return;
}


Set<ByteArrayWrapper> keys = new HashSet<ByteArrayWrapper>(keyMap.keySet());
keys = new HashSet<ByteArrayWrapper>(keyMap.keySet());
}

if (keys.isEmpty()) {
command.setResponse(Coder.getEmptyArrayResponse(context.getByteBufAllocator()));
Expand Down
Expand Up @@ -16,6 +16,8 @@

import java.util.List;
import java.util.Map;

import org.apache.geode.redis.internal.AutoCloseableLock;
import org.apache.geode.redis.internal.ByteArrayWrapper;
import org.apache.geode.redis.internal.Command;
import org.apache.geode.redis.internal.ExecutionHandlerContext;
Expand Down Expand Up @@ -50,15 +52,18 @@ public void executeCommand(Command command, ExecutionHandlerContext context) {
}

ByteArrayWrapper key = command.getKey();
final int size;

Map<ByteArrayWrapper, ByteArrayWrapper> map = getMap(context, key);
try (AutoCloseableLock regionLock = withRegionLock(context, key)) {
Map<ByteArrayWrapper, ByteArrayWrapper> map = getMap(context, key);

if (map == null) {
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), NOT_EXISTS));
return;
}
if (map == null) {
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), NOT_EXISTS));
return;
}

final int size = map.size();
size = map.size();
}

command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), size));
}
Expand Down
Expand Up @@ -57,6 +57,7 @@ public void executeCommand(Command command, ExecutionHandlerContext context) {

ByteArrayWrapper key = command.getKey();


Map<ByteArrayWrapper, ByteArrayWrapper> map = getMap(context, key);

checkDataType(key, RedisDataType.REDIS_HASH, context);
Expand Down
Expand Up @@ -17,6 +17,7 @@
import java.util.List;
import java.util.Map;

import org.apache.geode.redis.internal.AutoCloseableLock;
import org.apache.geode.redis.internal.ByteArrayWrapper;
import org.apache.geode.redis.internal.Coder;
import org.apache.geode.redis.internal.Command;
Expand Down Expand Up @@ -59,27 +60,27 @@ public void executeCommand(Command command, ExecutionHandlerContext context) {
ByteArrayWrapper key = command.getKey();


Map<ByteArrayWrapper, ByteArrayWrapper> map = getMap(context, key);

Object oldValue;
byte[] byteField = commandElems.get(FIELD_INDEX);
byte[] value = commandElems.get(VALUE_INDEX);
ByteArrayWrapper field = new ByteArrayWrapper(byteField);
ByteArrayWrapper putValue = new ByteArrayWrapper(value);

byte[] value = commandElems.get(VALUE_INDEX);
try (AutoCloseableLock regionLock = withRegionLock(context, key)) {
Map<ByteArrayWrapper, ByteArrayWrapper> map = getMap(context, key);

Object oldValue;
if (onlySetOnAbsent())
oldValue = map.putIfAbsent(field, putValue);
else
oldValue = map.put(field, putValue);

if (onlySetOnAbsent())
oldValue = map.putIfAbsent(field, new ByteArrayWrapper(value));
else
oldValue = map.put(field, new ByteArrayWrapper(value));

this.saveMap(map, context, key);
this.saveMap(map, context, key);
}

if (oldValue == null)
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), NEW_FIELD));
else
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), EXISTING_FIELD));

}

protected boolean onlySetOnAbsent() {
Expand Down

0 comments on commit d3b2642

Please sign in to comment.