Skip to content

Commit

Permalink
更新 KEYS、SELECT、SET 命令以 模拟支持 多DB功能
Browse files Browse the repository at this point in the history
  • Loading branch information
XiaoHeitu committed Mar 26, 2024
1 parent 2aea8fb commit db4d673
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 5 deletions.
66 changes: 62 additions & 4 deletions libs/server/Resp/ArrayCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text;
using Garnet.common;
using Garnet.server.Custom;
Expand All @@ -22,6 +23,8 @@ internal sealed unsafe partial class RespServerSession : ServerSessionBase
int opsDone = 0;
int keysDeleted = 0;

byte[] db_id = [(byte)'0'];

/// <summary>
/// MGET
/// </summary>
Expand Down Expand Up @@ -649,8 +652,10 @@ private bool NetworkSELECT(byte* ptr)

readHead = (int)(ptr - recvBufferPtr);

if (string.Equals(result, "0"))
if (!string.Equals(result, "16"))
{
this.db_id = Encoding.UTF8.GetBytes(result);

while (!RespWriteUtils.WriteResponse(CmdStrings.RESP_OK, ref dcurr, dend))
SendAndReset();
}
Expand Down Expand Up @@ -684,7 +689,21 @@ private bool NetworkKEYS<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storag
if (!RespReadUtils.ReadPtrWithLengthHeader(ref pattern, ref psize, ref ptr, recvBufferPtr + bytesRead))
return false;

var patternAS = new ArgSlice(pattern, psize);
ArgSlice patternAS = default;

if (this.db_id.Length == 1 && this.db_id[0] == '0')
{
patternAS.ptr = (byte*)pattern;
patternAS.length = psize;
}
else
{
this.AddKeyPrefix(pattern, psize, (p, s) =>
{
patternAS.ptr = (byte*)p;
patternAS.length = s;
});
}

var keys = storageApi.GetDbKeys(patternAS);

Expand All @@ -697,8 +716,16 @@ private bool NetworkKEYS<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storag
// Write the keys matching the pattern
foreach (var item in keys)
{
while (!RespWriteUtils.WriteBulkString(item, ref dcurr, dend))
SendAndReset();
if (this.db_id.Length == 1 && this.db_id[0] == '0')
{
while (!RespWriteUtils.WriteBulkString(item, ref dcurr, dend))
SendAndReset();
}
else
{
while (!RespWriteUtils.WriteBulkString(this.RemoveKeyPrefix(item), ref dcurr, dend))
SendAndReset();
}
}
}
else
Expand Down Expand Up @@ -881,5 +908,36 @@ private void WriteOutputForScan(long cursorValue, List<byte[]> keys, ref byte* c
SendAndReset();
}
}

private void AddKeyPrefix(byte* source, int sourceSize, Action<nint, int> action)
{
int newSize = sourceSize + db_id.Length + 1;
fixed (byte* newptr = new byte[newSize])
{

Marshal.Copy(db_id, 0, (nint)newptr, db_id.Length);
*(newptr + db_id.Length) = (byte)':';

byte* sourceStart = newptr + db_id.Length + 1;

for (int i = 0; i < sourceSize; i++)
{
*(sourceStart + i) = *(source + i);
}

action((nint)newptr, newSize);
}
}

private byte[] RemoveKeyPrefix(byte[] bytes)
{
byte[] result = new byte[bytes.Length - (db_id.Length + 1)];
Array.Copy(bytes, db_id.Length + 1, result, 0, result.Length);
return result;
}
//private void RemoveKeyPrefix(byte* ptr)
//{

//}
}
}
22 changes: 21 additions & 1 deletion libs/server/Resp/BasicCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using Garnet.common;
using Tsavorite.core;

Expand Down Expand Up @@ -255,12 +256,31 @@ private bool NetworkSET<TGarnetApi>(byte* ptr, ref TGarnetApi storageApi)
if (NetworkSingleKeySlotVerify(keyPtr, ksize, false))
return true;


var temp = Marshal.AllocHGlobal(sizeof(int) + this.db_id.Length + 1 + ksize);

var newkeyptr = temp + sizeof(int);


Unsafe.CopyBlock((byte*)newkeyptr, (byte*)keyPtr, (uint)ksize);

this.AddKeyPrefix((byte*)newkeyptr, ksize, (p, s) =>
{
Unsafe.CopyBlock((byte*)newkeyptr, (byte*)p, (uint)s);
keyPtr = (byte*)newkeyptr;
ksize = s;
});

keyPtr -= sizeof(int);
valPtr -= sizeof(int);
*(int*)keyPtr = ksize;
*(int*)valPtr = vsize;

var status = storageApi.SET(ref Unsafe.AsRef<SpanByte>(keyPtr), ref Unsafe.AsRef<SpanByte>(valPtr));

Marshal.FreeHGlobal(temp);


while (!RespWriteUtils.WriteResponse(CmdStrings.RESP_OK, ref dcurr, dend))
SendAndReset();

Expand Down

0 comments on commit db4d673

Please sign in to comment.