Skip to content

Commit

Permalink
Merge branch 'unstable' into jsonnumop
Browse files Browse the repository at this point in the history
  • Loading branch information
git-hulk committed Nov 12, 2023
2 parents 2ad17b0 + 21f7997 commit c90d6ed
Show file tree
Hide file tree
Showing 15 changed files with 341 additions and 40 deletions.
13 changes: 5 additions & 8 deletions src/cli/daemon_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,20 @@ inline bool SupervisedSystemd() {
return false;
}

sockaddr_un su;
memset(&su, 0, sizeof(su));
sockaddr_un su = {};
su.sun_family = AF_UNIX;
strncpy(su.sun_path, notify_socket, sizeof(su.sun_path) - 1);
su.sun_path[sizeof(su.sun_path) - 1] = '\0';
if (notify_socket[0] == '@') su.sun_path[0] = '\0';

iovec iov;
memset(&iov, 0, sizeof(iov));
iovec iov = {};
std::string ready = "READY=1";
iov.iov_base = &ready[0];
iov.iov_base = ready.data();
iov.iov_len = ready.size();

msghdr hdr;
memset(&hdr, 0, sizeof(hdr));
msghdr hdr = {};
hdr.msg_name = &su;
hdr.msg_namelen = offsetof(struct sockaddr_un, sun_path) + strlen(notify_socket);
hdr.msg_namelen = offsetof(sockaddr_un, sun_path) + strlen(su.sun_path);
hdr.msg_iov = &iov;
hdr.msg_iovlen = 1;

Expand Down
92 changes: 89 additions & 3 deletions src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "error_constants.h"
#include "event_util.h"
#include "server/server.h"
#include "time_util.h"
#include "types/redis_stream.h"

namespace redis {
Expand Down Expand Up @@ -386,7 +387,7 @@ class CommandXInfo : public Commander {
Status Parse(const std::vector<std::string> &args) override {
auto val = util::ToLower(args[1]);
if (val == "stream" && args.size() >= 2) {
stream_ = true;
subcommand_ = "stream";

if (args.size() > 3 && util::ToLower(args[3]) == "full") {
full_ = true;
Expand All @@ -400,20 +401,35 @@ class CommandXInfo : public Commander {

count_ = *parse_result;
}
} else if (val == "groups" && args.size() == 3) {
subcommand_ = "groups";
} else if (val == "consumers" && args.size() == 4) {
subcommand_ = "consumers";
} else {
return {Status::RedisParseErr, errUnknownSubcommandOrWrongArguments};
}

return Status::OK();
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
if (stream_) {
if (subcommand_ == "stream") {
return getStreamInfo(srv, conn, output);
}

if (subcommand_ == "groups") {
return getGroupInfo(srv, conn, output);
}

if (subcommand_ == "consumers") {
return getConsumerInfo(srv, conn, output);
}
return Status::OK();
}

private:
uint64_t count_ = 10; // default Redis value
bool stream_ = false;
std::string subcommand_;
bool full_ = false;

Status getStreamInfo(Server *srv, Connection *conn, std::string *output) {
Expand Down Expand Up @@ -472,6 +488,76 @@ class CommandXInfo : public Commander {

return Status::OK();
}

Status getGroupInfo(Server *srv, Connection *conn, std::string *output) {
redis::Stream stream_db(srv->storage, conn->GetNamespace());
std::vector<std::pair<std::string, StreamConsumerGroupMetadata>> result_vector;
auto s = stream_db.GetGroupInfo(args_[2], result_vector);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}

if (s.IsNotFound()) {
return {Status::RedisExecErr, errNoSuchKey};
}

output->append(redis::MultiLen(result_vector.size()));
for (auto const &it : result_vector) {
output->append(redis::MultiLen(12));
output->append(redis::BulkString("name"));
output->append(redis::BulkString(it.first));
output->append(redis::BulkString("consumers"));
output->append(redis::Integer(it.second.consumer_number));
output->append(redis::BulkString("pending"));
output->append(redis::Integer(it.second.pending_number));
output->append(redis::BulkString("last-delivered-id"));
output->append(redis::BulkString(it.second.last_delivered_id.ToString()));
output->append(redis::BulkString("entries-read"));
if (it.second.entries_read == -1) {
output->append(redis::NilString());
} else {
output->append(redis::Integer(it.second.entries_read));
}
output->append(redis::BulkString("lag"));
if (it.second.lag == UINT64_MAX) {
output->append(redis::NilString());
} else {
output->append(redis::Integer(it.second.lag));
}
}

return Status::OK();
}

Status getConsumerInfo(Server *srv, Connection *conn, std::string *output) {
redis::Stream stream_db(srv->storage, conn->GetNamespace());
std::vector<std::pair<std::string, redis::StreamConsumerMetadata>> result_vector;
auto s = stream_db.GetConsumerInfo(args_[2], args_[3], result_vector);

if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}

if (s.IsNotFound()) {
return {Status::RedisExecErr, errNoSuchKey};
}

output->append(redis::MultiLen(result_vector.size()));
auto now = util::GetTimeStampMS();
for (auto const &it : result_vector) {
output->append(redis::MultiLen(8));
output->append(redis::BulkString("name"));
output->append(redis::BulkString(it.first));
output->append(redis::BulkString("pending"));
output->append(redis::Integer(it.second.pending_number));
output->append(redis::BulkString("idle"));
output->append(redis::Integer(now - it.second.last_idle));
output->append(redis::BulkString("inactive"));
output->append(redis::Integer(now - it.second.last_active));
}

return Status::OK();
}
};

class CommandXRange : public Commander {
Expand Down
2 changes: 1 addition & 1 deletion src/common/cron.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Status Cron::SetScheduleTime(const std::vector<std::string> &args) {
return Status::OK();
}

bool Cron::IsTimeMatch(struct tm *tm) {
bool Cron::IsTimeMatch(tm *tm) {
if (tm->tm_min == last_tm_.tm_min && tm->tm_hour == last_tm_.tm_hour && tm->tm_mday == last_tm_.tm_mday &&
tm->tm_mon == last_tm_.tm_mon && tm->tm_wday == last_tm_.tm_wday) {
return false;
Expand Down
2 changes: 1 addition & 1 deletion src/common/cron.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class Cron {
~Cron() = default;

Status SetScheduleTime(const std::vector<std::string> &args);
bool IsTimeMatch(struct tm *tm);
bool IsTimeMatch(tm *tm);
std::string ToString() const;
bool IsEnabled() const;

Expand Down
6 changes: 3 additions & 3 deletions src/common/io_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ StatusOr<std::vector<std::string>> LookupHostByName(const std::string &host) {
for (auto p = servinfo; p != nullptr; p = p->ai_next) {
char ip[INET6_ADDRSTRLEN] = {};
if (p->ai_family == AF_INET) {
inet_ntop(p->ai_family, &((struct sockaddr_in *)p->ai_addr)->sin_addr, ip, sizeof(ip));
inet_ntop(p->ai_family, &((sockaddr_in *)p->ai_addr)->sin_addr, ip, sizeof(ip));
} else {
inet_ntop(p->ai_family, &((struct sockaddr_in6 *)p->ai_addr)->sin6_addr, ip, sizeof(ip));
inet_ntop(p->ai_family, &((sockaddr_in6 *)p->ai_addr)->sin6_addr, ip, sizeof(ip));
}
ips.emplace_back(ip);
}
Expand Down Expand Up @@ -350,7 +350,7 @@ StatusOr<std::tuple<std::string, uint32_t>> GetPeerAddr(int fd) {
int GetLocalPort(int fd) {
sockaddr_in6 address;
socklen_t len = sizeof(address);
if (getsockname(fd, (struct sockaddr *)&address, &len) == -1) {
if (getsockname(fd, (sockaddr *)&address, &len) == -1) {
return 0;
}

Expand Down
4 changes: 2 additions & 2 deletions src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void Connection::Close() {

void Connection::Detach() { owner_->DetachConnection(this); }

void Connection::OnRead(struct bufferevent *bev) {
void Connection::OnRead(bufferevent *bev) {
DLOG(INFO) << "[connection] on read: " << bufferevent_getfd(bev);

SetLastInteraction();
Expand All @@ -93,7 +93,7 @@ void Connection::OnRead(struct bufferevent *bev) {
}
}

void Connection::OnWrite(struct bufferevent *bev) {
void Connection::OnWrite(bufferevent *bev) {
if (IsFlagEnabled(kCloseAfterReply) || IsFlagEnabled(kCloseAsync)) {
Close();
}
Expand Down
4 changes: 2 additions & 2 deletions src/server/redis_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ class Connection : public EvbufCallbackBase<Connection> {

void Close();
void Detach();
void OnRead(struct bufferevent *bev);
void OnWrite(struct bufferevent *bev);
void OnRead(bufferevent *bev);
void OnWrite(bufferevent *bev);
void OnEvent(bufferevent *bev, int16_t events);
void Reply(const std::string &msg);
void SendFile(int fd);
Expand Down
2 changes: 1 addition & 1 deletion src/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ Status Worker::ListenUnixSocket(const std::string &path, int perm, int backlog)
return {Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())};
}

if (bind(fd, (struct sockaddr *)&sa, sizeof(sa)) < 0) {
if (bind(fd, (sockaddr *)&sa, sizeof(sa)) < 0) {
return {Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())};
}

Expand Down
2 changes: 1 addition & 1 deletion src/stats/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class Stats {
std::atomic<uint64_t> out_bytes = {0};

mutable std::shared_mutex inst_metrics_mutex;
std::vector<struct InstMetric> inst_metrics;
std::vector<InstMetric> inst_metrics;

std::atomic<uint64_t> fullsync_counter = {0};
std::atomic<uint64_t> psync_err_counter = {0};
Expand Down
12 changes: 6 additions & 6 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -756,20 +756,20 @@ Status Storage::WriteToPropagateCF(const std::string &key, const std::string &va
}

Status Storage::ShiftReplId() {
const char *charset = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
const int charset_len = static_cast<int>(strlen(charset));
static constexpr std::string_view charset = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";

// Do nothing if rsid psync is not enabled
if (!config_->use_rsid_psync) return Status::OK();

std::random_device rd;
std::mt19937 gen(rd() + getpid());
std::uniform_int_distribution<> distrib(0, charset_len - 1);
std::string rand_str;
std::uniform_int_distribution<size_t> distrib(0, charset.size() - 1);

std::string rand_str(kReplIdLength, 0);
for (int i = 0; i < kReplIdLength; i++) {
rand_str.push_back(charset[distrib(gen)]);
rand_str[i] = charset[distrib(gen)];
}
replid_ = rand_str;
replid_ = std::move(rand_str);
LOG(INFO) << "[replication] New replication id: " << replid_;

// Write new replication id into db engine
Expand Down

0 comments on commit c90d6ed

Please sign in to comment.