Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support blpop/brpop #1548

Merged
merged 57 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
acb8ee9
working on BlpopCmd::Doinitial
May 15, 2023
ab8caf4
working on : making the maps and new monitoring thread
May 16, 2023
fc6f771
added bLRPop_blocking_info_ in pika_server.cc
May 16, 2023
e2558d5
adding functions
May 17, 2023
35273ee
adding functions02
May 17, 2023
324fe89
temp save
May 17, 2023
c67cdfe
Merge branch 'OpenAtomFoundation:unstable' into add_blpop_brpop
cheniujh May 18, 2023
6a47103
data structure done
May 18, 2023
934c3bb
working
May 20, 2023
fe79658
revised data structure
May 20, 2023
2ab0ad4
working on moving
May 20, 2023
1c9a278
working
May 20, 2023
a1fd89f
Merge branch 'OpenAtomFoundation:unstable' into add_blpop_brpop
cheniujh May 21, 2023
abebaa3
next step is to add a timeout scan
May 21, 2023
2f0d057
added TimerTaskManager
May 21, 2023
5cf9461
modified TimerTaskManager(v2)
May 21, 2023
c867955
scan added
May 21, 2023
24cd349
tiny fix
May 22, 2023
bbfc0d0
changed the task of unblocking conn from sync to async
May 23, 2023
b3399e5
Merge branch 'back_up_add_blpop_brpop' of github.com:cheniujh/pika in…
May 23, 2023
3f6b702
removed some code for testing
May 23, 2023
5a271b3
pull from unstable and solve conflicts
Jun 4, 2023
6ab355a
change from partition to db
Jun 4, 2023
c8b9ad6
temp save for testing
Jun 5, 2023
ac7a75d
handle binlog of blr/pop(when conn get served, write a binlog of lpop…
Jun 6, 2023
8be5a32
Merge branch 'OpenAtomFoundation:unstable' into new_add_blpop_brpop
cheniujh Jun 6, 2023
3bb995a
improved sanitizer options in CMakeLists
Jun 6, 2023
e2dbb57
temp save
Jun 8, 2023
33de9fc
add unit test
Jun 8, 2023
ae9b951
modified cmakelists
Jun 8, 2023
9693eab
temp save
Jun 10, 2023
5742588
improved code based on reviwer's opinion
Jun 10, 2023
c0e0155
Merge branch 'OpenAtomFoundation:unstable' into new_add_blpop_brpop
cheniujh Jun 13, 2023
9581675
renamed some variables
Jun 13, 2023
8362eac
Merge branch 'OpenAtomFoundation:unstable' into new_add_blpop_brpop
cheniujh Jun 14, 2023
3f3376d
Merge remote-tracking branch 'origin/new_add_blpop_brpop' into new_ad…
Jun 14, 2023
819576f
1. added record lock in ServeAndUnblockConns(void* args)
Jun 14, 2023
a511054
added multi-db concurrency test
Jun 14, 2023
590faf4
renamed a flag from kCmdFlagsMayDfferWrite to kCmdFlagsMayDfferWriteB…
Jun 14, 2023
a440420
1. added record lock in ServeAndUnblockConns(void* args)
Jun 14, 2023
c34e997
Merge remote-tracking branch 'origin/new_add_blpop_brpop' into new_ad…
Jun 16, 2023
b600811
removed unsed code
Jun 16, 2023
6163b42
Merge branch 'OpenAtomFoundation:unstable' into new_add_blpop_brpop
cheniujh Jun 19, 2023
3cfa0dd
Merge branch 'OpenAtomFoundation:unstable' into new_add_blpop_brpop
cheniujh Jun 19, 2023
96f72f7
revised code based on opinion of reviewer
Jun 19, 2023
5ddb1d5
Merge branch 'unstable_new' of github.com:cheniujh/pika into new_add_…
Jun 20, 2023
b9e55f8
Merge branch 'unstable' of github.com:cheniujh/pika into new_add_blpo…
Jul 5, 2023
4dbe71f
add the unit test to github workflow
Jul 5, 2023
0ba21ee
revised unit test file
Jul 5, 2023
24d123e
removed time counting
Jul 6, 2023
f324669
clear github action cache
Jul 13, 2023
5e6ad36
Merge branch 'OpenAtomFoundation:unstable' into new_add_blpop_brpop
cheniujh Jul 13, 2023
8619909
revised based on reviwer's opinions
Jul 13, 2023
70d19f9
Merge branch 'OpenAtomFoundation:unstable' into new_add_blpop_brpop
cheniujh Jul 18, 2023
5d061da
Modify to adapt to Mac.
Jul 18, 2023
100d07a
Merge remote-tracking branch 'origin/new_add_blpop_brpop' into new_ad…
Jul 18, 2023
c409cd0
Modify to adapt to Mac 2.
Jul 18, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions .github/workflows/pika.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ jobs:
steps:
- uses: actions/checkout@v3

- name: cache dependencies
uses: actions/cache@v2
id: cache
with:
path: |
${{ github.workspace }}/${{ env.INSTALL_LOCATION }}
~/.cache/pip
key: ${{ runner.os }}-dependencies
# - name: cache dependencies
# uses: actions/cache@v2
# id: cache
# with:
# path: |
# ${{ github.workspace }}/${{ env.INSTALL_LOCATION }}
# ~/.cache/pip
# key: ${{ runner.os }}-dependencies

- name: install Deps
if: ${{ steps.cache.output.cache-hit != 'true' }}
# if: ${{ steps.cache.output.cache-hit != 'true' }}
run: |
sudo apt install autoconf libprotobuf-dev protobuf-compiler -y
sudo apt-get install -y clang-tidy-12
Expand Down Expand Up @@ -69,7 +69,7 @@ jobs:
working-directory: ${{github.workspace}}/build
run: |
python3 ../tests/integration/pika_replication_test.py

python3 ../tests/unit/Blpop_Brpop_test.py

build_on_centos:
# The CMake configure and build commands are platform agnostic and should work equally well on Windows or Mac.
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
14 changes: 9 additions & 5 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ elseif (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
endif()
endif()

#Sanitizer for testing stage
#CMAKE_BUILD_TYPE must be "Debug" if you wanna use sanitizer.

############# You should enable sanitizer if you are developing pika #############
# Uncomment the following two lines to enable AddressSanitizer to detect memory leaks and other memory-related bugs.
#set(CMAKE_BUILD_TYPE "Debug")
#set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fsanitize=address -O0 -fno-omit-frame-pointer -fno-optimize-sibling-calls")

# [Notice] AddressSanitizer and ThreadSanitizer can not be enabled at the same time.

#set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fsanitize=address")
# [Notice] address sanitizer and thread sanitizer can not be enabled at the same time.
#set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fsanitize=thread")
# Uncomment the following two lines to enable ThreadSanitizer to detect data race and other thread-related issue.
#set(CMAKE_BUILD_TYPE "Debug")
#set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fsanitize=thread -O0 -fno-omit-frame-pointer -fno-optimize-sibling-calls")

execute_process(COMMAND uname -p OUTPUT_VARIABLE HOST_ARCH)
string(TOLOWER ${HOST_ARCH} HOST_ARCH)
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
1 change: 1 addition & 0 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class PikaClientConn : public net::RedisConn {
bool IsPubSub() { return is_pubsub_; }
void SetIsPubSub(bool is_pubsub) { is_pubsub_ = is_pubsub; }
void SetCurrentTable(const std::string& db_name) { current_db_ = db_name; }
const std::string& GetCurrentTable() override{ return current_db_; }
void SetWriteCompleteCallback(WriteCompleteCallback cb) { write_completed_cb_ = std::move(cb); }

net::ServerThread* server_thread() { return server_thread_; }
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
16 changes: 16 additions & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "pstd/include/pstd_string.h"

#include "include/pika_slot.h"
#include "net/src/dispatch_thread.h"

class SyncMasterSlot;
class SyncSlaveSlot;
Expand Down Expand Up @@ -131,13 +132,15 @@ const std::string kCmdNamePKHRScanRange = "pkhrscanrange";
const std::string kCmdNameLIndex = "lindex";
const std::string kCmdNameLInsert = "linsert";
const std::string kCmdNameLLen = "llen";
const std::string kCmdNameBLPop = "blpop";
const std::string kCmdNameLPop = "lpop";
const std::string kCmdNameLPush = "lpush";
const std::string kCmdNameLPushx = "lpushx";
const std::string kCmdNameLRange = "lrange";
const std::string kCmdNameLRem = "lrem";
const std::string kCmdNameLSet = "lset";
const std::string kCmdNameLTrim = "ltrim";
const std::string kCmdNameBRpop = "brpop";
const std::string kCmdNameRPop = "rpop";
const std::string kCmdNameRPopLPush = "rpoplpush";
const std::string kCmdNameRPush = "rpush";
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -398,6 +401,18 @@ class CmdRes {
CmdRet ret_ = kNone;
};

/**
* Current used by:
* blpop,brpop
*/
struct UnblockTaskArgs {
std::string key;
std::shared_ptr<Slot> slot;
net::DispatchThread* dispatchThread;
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
UnblockTaskArgs(std::string key_, std::shared_ptr<Slot> slot_, net::DispatchThread* dispatchThread_)
: key(std::move(key_)), slot(slot_), dispatchThread(dispatchThread_) {}
};

class Cmd : public std::enable_shared_from_this<Cmd> {
public:
enum CmdStage { kNone, kBinlogStage, kExecuteStage };
Expand Down Expand Up @@ -449,6 +464,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {

bool is_read() const;
bool is_write() const;

bool is_local() const;
bool is_suspend() const;
bool is_admin_require() const;
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
79 changes: 72 additions & 7 deletions include/pika_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,50 @@ class LLenCmd : public Cmd {
void DoInitial() override;
};

class BlockingBaseCmd : public Cmd {
public:
BlockingBaseCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}

//blpop/brpop used start
struct WriteBinlogOfPopArgs{
BlockKeyType block_type;
std::string key;
std::shared_ptr<Slot> slot;
std::shared_ptr<net::NetConn> conn;
WriteBinlogOfPopArgs() = default;
WriteBinlogOfPopArgs(BlockKeyType block_type_, const std::string& key_,
std::shared_ptr<Slot> slot_, std::shared_ptr<net::NetConn> conn_)
: block_type(block_type_), key(key_), slot(slot_), conn(conn_){}
};
void BlockThisClientToWaitLRPush(BlockKeyType block_pop_type, std::vector<std::string>& keys, int64_t expire_time);
void TryToServeBLrPopWithThisKey(const std::string& key, std::shared_ptr<Slot> slot);
static void ServeAndUnblockConns(void* args);
static void WriteBinlogOfPop(std::vector<WriteBinlogOfPopArgs>& pop_args);
void removeDuplicates(std::vector<std::string> & keys_);
//blpop/brpop used functions end
};

class BLPopCmd final : public BlockingBaseCmd {
public:
BLPopCmd(const std::string& name, int arity, uint16_t flag) : BlockingBaseCmd(name, arity, flag){};
virtual std::vector<std::string> current_key() const {
std::vector<std::string> res = keys_;
return res;
}
virtual void Do(std::shared_ptr<Slot> slot = nullptr);
virtual void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys){};
virtual void Merge(){};
virtual Cmd* Clone() override { return new BLPopCmd(*this); }
void DoInitial() override;
void DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) override;

private:
std::vector<std::string> keys_;
int64_t expire_time_{0};
WriteBinlogOfPopArgs binlog_args_;
bool is_binlog_deferred_{false};
};

class LPopCmd : public Cmd {
public:
LPopCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){};
Expand All @@ -90,9 +134,10 @@ class LPopCmd : public Cmd {
void DoInitial() override;
};

class LPushCmd : public Cmd {

class LPushCmd : public BlockingBaseCmd {
public:
LPushCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){};
LPushCmd(const std::string& name, int arity, uint16_t flag) : BlockingBaseCmd(name, arity, flag){};
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(key_);
Expand Down Expand Up @@ -209,6 +254,26 @@ class LTrimCmd : public Cmd {
void DoInitial() override;
};

class BRPopCmd final : public BlockingBaseCmd {
public:
BRPopCmd(const std::string& name, int arity, uint16_t flag) : BlockingBaseCmd(name, arity, flag){};
virtual std::vector<std::string> current_key() const {
std::vector<std::string> res = keys_;
return res;
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
}
virtual void Do(std::shared_ptr<Slot> slot = nullptr);
virtual void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys){};
virtual void Merge(){};
virtual Cmd* Clone() override { return new BRPopCmd(*this); }
void DoInitial() override;
void DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) override;
private:
std::vector<std::string> keys_;
int64_t expire_time_{0};
WriteBinlogOfPopArgs binlog_args_;
bool is_binlog_deferred_{false};
};

class RPopCmd : public Cmd {
public:
RPopCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){};
Expand All @@ -227,14 +292,14 @@ class RPopCmd : public Cmd {
void DoInitial() override;
};

class RPopLPushCmd : public Cmd {
class RPopLPushCmd : public BlockingBaseCmd {
public:
RPopLPushCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {
RPopLPushCmd(const std::string& name, int arity, uint16_t flag) : BlockingBaseCmd(name, arity, flag) {
rpop_cmd_ = std::make_shared<RPopCmd>(kCmdNameRPop, 2, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsList);
lpush_cmd_ = std::make_shared<LPushCmd>(kCmdNameLPush, -3, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsList);
};
RPopLPushCmd(const RPopLPushCmd& other)
: Cmd(other),
: BlockingBaseCmd(other),
source_(other.source_),
receiver_(other.receiver_),
value_poped_from_source_(other.value_poped_from_source_),
Expand Down Expand Up @@ -265,9 +330,9 @@ class RPopLPushCmd : public Cmd {
void DoInitial() override;
};

class RPushCmd : public Cmd {
class RPushCmd : public BlockingBaseCmd {
public:
RPushCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){};
RPushCmd(const std::string& name, int arity, uint16_t flag) : BlockingBaseCmd(name, arity, flag){};
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(key_);
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
1 change: 1 addition & 0 deletions src/net/include/redis_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class RedisConn : public NetConn {
void NotifyEpoll(bool success);

virtual int DealMessage(const RedisCmdArgsType& argv, std::string* response) = 0;
virtual const std::string& GetCurrentTable() = 0;

private:
static int ParserDealMessageCb(RedisParser* parser, const RedisCmdArgsType& argv);
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
12 changes: 9 additions & 3 deletions src/net/include/server_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "net/include/net_define.h"
#include "net/include/net_thread.h"
#include "net/src/net_multiplexer.h"
#include "pstd/include/env.h"
#include "pstd/include/pstd_mutex.h"
#include "pstd/include/pstd_status.h"

Expand Down Expand Up @@ -111,9 +112,11 @@ const int kDefaultKeepAliveTime = 60; // (s)

class ServerThread : public Thread {
public:
ServerThread(int port, int cron_interval, const ServerHandle* handle);
ServerThread(const std::string& bind_ip, int port, int cron_interval, const ServerHandle* handle);
ServerThread(const std::set<std::string>& bind_ips, int port, int cron_interval, const ServerHandle* handle);
ServerThread(int port, int cron_interval, const ServerHandle* handle, ServerThread* dispatcher = nullptr);
ServerThread(const std::string& bind_ip, int port, int cron_interval, const ServerHandle* handle,
ServerThread* dispatcher = nullptr);
ServerThread(const std::set<std::string>& bind_ips, int port, int cron_interval, const ServerHandle* handle,
ServerThread* dispatcher = nullptr);

#ifdef __ENABLE_SSL
/*
Expand Down Expand Up @@ -175,6 +178,9 @@ class ServerThread : public Thread {
// process events in notify_queue
virtual void ProcessNotifyEvents(const NetFiredEvent* pfe);

//dispatcher's life time should not be managed by this serverThread,
//so, use raw pointer instead of smart pointer
ServerThread* dispatcher_;
const ServerHandle* handle_;
bool own_handle_ = false;

cheniujh marked this conversation as resolved.
Show resolved Hide resolved
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Loading
Loading