Skip to content

Commit

Permalink
some fixes for crdt support (#303)
Browse files Browse the repository at this point in the history
* some fixes for crdt support

* review fixes

* changed supported version to 6.0.0

* changed supported version to 5.0.7 until we fix circle to work with 6
  • Loading branch information
MeirShpilraien committed May 4, 2020
1 parent 0223be6 commit bfd978a
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 16 deletions.
2 changes: 1 addition & 1 deletion ramp.yml
Expand Up @@ -10,7 +10,7 @@ min_redis_pack_version: '5.4.11'
config_command: "RG.CONFIGSET"
capabilities:
- types
- replica_of
- crdb
- failover_migrate
- persistence_aof
- persistence_rdb
Expand Down
47 changes: 42 additions & 5 deletions src/common.c
Expand Up @@ -15,15 +15,41 @@

static char* shardUniqueId = NULL;

int redisMajorVersion;
int redisMinorVersion;
int redisPatchVersion;
RedisVersion currVesion;

RedisVersion supportedVersion = {
.redisMajorVersion = 5,
.redisMinorVersion = 0,
.redisPatchVersion = 7,
};

int rlecMajorVersion;
int rlecMinorVersion;
int rlecPatchVersion;
int rlecBuild;

bool isCrdt;


int CheckSupportedVestion(){
if(currVesion.redisMajorVersion < supportedVersion.redisMajorVersion){
return REDISMODULE_ERR;
}

if(currVesion.redisMajorVersion == supportedVersion.redisMajorVersion){
if(currVesion.redisMinorVersion < supportedVersion.redisMinorVersion){
return REDISMODULE_ERR;
}

if(currVesion.redisMinorVersion == supportedVersion.redisMinorVersion){
if(currVesion.redisPatchVersion < supportedVersion.redisPatchVersion){
return REDISMODULE_ERR;
}
}
}

return REDISMODULE_OK;
}

static uint64_t idHashFunction(const void *key){
return Gears_dictGenHashFunction(key, ID_LEN);
Expand Down Expand Up @@ -130,8 +156,8 @@ void getRedisVersion() {
size_t len;
const char *replyStr = RedisModule_CallReplyStringPtr(reply, &len);

int n = sscanf(replyStr, "# Server\nredis_version:%d.%d.%d", &redisMajorVersion,
&redisMinorVersion, &redisPatchVersion);
int n = sscanf(replyStr, "# Server\nredis_version:%d.%d.%d", &currVesion.redisMajorVersion,
&currVesion.redisMinorVersion, &currVesion.redisPatchVersion);

assert(n == 3);

Expand All @@ -149,6 +175,17 @@ void getRedisVersion() {
}

RedisModule_FreeCallReply(reply);

isCrdt = true;
reply = RedisModule_Call(ctx, "CRDT.CONFIG", "cc", "GET", "active-gc");
if(!reply || RedisModule_CallReplyType(reply) == REDISMODULE_REPLY_ERROR){
isCrdt = false;
}

if(reply){
RedisModule_FreeCallReply(reply);
}

RedisModule_FreeThreadSafeContext(ctx);
}

Expand Down
14 changes: 11 additions & 3 deletions src/common.h
Expand Up @@ -24,19 +24,27 @@

extern Gears_dictType* dictTypeHeapIdsPtr;

extern int redisMajorVersion;
extern int redisMinorVersion;
extern int redisPatchVersion;
typedef struct RedisVersion{
int redisMajorVersion;
int redisMinorVersion;
int redisPatchVersion;
}RedisVersion;

extern RedisVersion currVesion;
extern RedisVersion supportedVersion;

extern int rlecMajorVersion;
extern int rlecMinorVersion;
extern int rlecPatchVersion;
extern int rlecBuild;

extern bool isCrdt;

static inline int IsEnterprise() {
return rlecMajorVersion != -1;
}

int CheckSupportedVestion();
void getRedisVersion();
void SetId(char* finalId, char* idBuf, char* idStrBuf, long long* lastID);
int rg_vasprintf(char **__restrict __ptr, const char *__restrict __fmt, va_list __arg);
Expand Down
11 changes: 9 additions & 2 deletions src/module.c
Expand Up @@ -710,13 +710,20 @@ int RedisGears_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {

getRedisVersion();
RedisModule_Log(ctx, "notice", "Redis version found by RedisGears : %d.%d.%d - %s",
redisMajorVersion, redisMinorVersion, redisPatchVersion,
IsEnterprise() ? "enterprise" : "oss");
currVesion.redisMajorVersion, currVesion.redisMinorVersion, currVesion.redisPatchVersion,
IsEnterprise() ? (isCrdt ? "enterprise-crdt" : "enterprise") : "oss");
if (IsEnterprise()) {
RedisModule_Log(ctx, "notice", "Redis Enterprise version found by RedisGears : %d.%d.%d-%d",
rlecMajorVersion, rlecMinorVersion, rlecPatchVersion, rlecBuild);
}

if(CheckSupportedVestion() != REDISMODULE_OK){
RedisModule_Log(ctx, "warning", "Redis version is to old, please upgrade to redis %d.%d.%d and above.", supportedVersion.redisMajorVersion,
supportedVersion.redisMinorVersion,
supportedVersion.redisPatchVersion);
return REDISMODULE_ERR;
}

if(LockHandler_Initialize() != REDISMODULE_OK){
RedisModule_Log(ctx, "warning", "could not initialize lock handler");
return REDISMODULE_ERR;
Expand Down
20 changes: 16 additions & 4 deletions src/readers/keys_reader.c
Expand Up @@ -199,15 +199,27 @@ static void RG_KeysReaderCtxDeserialize(FlatExecutionPlan* fep, void* ctx, Gears
krctx->noScan = RedisGears_BRReadLong(br);
}

static Record* GetStringValueRecord(RedisModuleKey* handler){
static Record* GetStringValueRecord(RedisModuleKey* handler, RedisModuleCtx* ctx, const char* keyStr){
size_t len;
char* val = RedisModule_StringDMA(handler, &len, REDISMODULE_READ);
char* val = NULL;
RedisModuleCallReply *r = NULL;
if(!isCrdt){
val = RedisModule_StringDMA(handler, &len, REDISMODULE_READ);
} else {
// on crdt the string llapi is not supported so we need to use rm_call
r = RedisModule_Call(ctx, "GET", "c", keyStr);
val = (char*)RedisModule_CallReplyStringPtr(r, &len);
}
char* strVal = RG_ALLOC(len + 1);
memcpy(strVal, val, len);
strVal[len] = '\0';

Record* strRecord = RedisGears_StringRecordCreate(strVal, len);

if(r){
RedisModule_FreeCallReply(r);
}

return strRecord;
}

Expand Down Expand Up @@ -262,7 +274,7 @@ static Record* ValueToListMapper(const char* keyStr, RedisModuleCtx* ctx){
static Record* GetValueRecord(RedisModuleCtx* rctx, const char* keyStr, RedisModuleKey* handler){
switch(RedisModule_KeyType(handler)){
case REDISMODULE_KEYTYPE_STRING:
return GetStringValueRecord(handler);
return GetStringValueRecord(handler, rctx, keyStr);
break;
case REDISMODULE_KEYTYPE_LIST:
return ValueToListMapper(keyStr, rctx);
Expand Down Expand Up @@ -757,7 +769,7 @@ static void KeysReader_RegisterKeySpaceEvent(){
RedisModuleCtx * ctx = RedisModule_GetThreadSafeContext(NULL);
keysReaderRegistration = Gears_listCreate();
int event = REDISMODULE_NOTIFY_ALL;
if(redisMajorVersion >= 6 && IsEnterprise()){
if(currVesion.redisMajorVersion >= 6 && IsEnterprise()){
// we get the trimmed notification on enterprise only from redis v6 and above
event |= REDISMODULE_NOTIFY_TRIMMED;
}
Expand Down
2 changes: 1 addition & 1 deletion src/readers/streams_reader.c
Expand Up @@ -858,7 +858,7 @@ static void StreamReader_UnregisrterTrigger(FlatExecutionPlan* fep, bool abortPe
}

static bool StreamReader_IsStream(RedisModuleKey *kp){
if(redisMajorVersion <= 5){
if(currVesion.redisMajorVersion <= 5){
return RedisModule_KeyType(kp) == 0 || RedisModule_KeyType(kp) == 6;
}else{
return RedisModule_KeyType(kp) == 7;
Expand Down

0 comments on commit bfd978a

Please sign in to comment.