Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

咨询问题pipeline #935

Closed
wseven10 opened this issue Jul 8, 2020 · 16 comments
Closed

咨询问题pipeline #935

wseven10 opened this issue Jul 8, 2020 · 16 comments

Comments

@wseven10
Copy link

wseven10 commented Jul 8, 2020

pipeline试了是可以支持的,但是没看到文档说明,不知道是不是有问题

@whoiami
Copy link
Contributor

whoiami commented Jul 9, 2020

没有问题

@ownthink
Copy link

ownthink commented Aug 26, 2021

pipeline别用,有bug,例如以下语句,在并发执行的时候redis中正确执行,在pika中数据执行错乱。
go版本

pipe := client.Pipeline()
pipe.LRange("key", 0, 200-1)
pipe.LTrim("key", 200, -1)
cmders, err := pipe.Exec()

python版本

with handle.pipeline(transaction=False) as p:
        p.lrange('key', 0, 200-1)
        p.ltrim('key', 200, -1)
        res = p.execute()
        items = res[0]

@AlexStocks
Copy link
Collaborator

@ForestLH 负责验证该功能

@ForestLH
Copy link
Contributor

pipeline别用,有bug,例如以下语句,在并发执行的时候redis中正确执行,在pika中数据执行错乱。 go版本

pipe := client.Pipeline()
pipe.LRange("key", 0, 200-1)
pipe.LTrim("key", 200, -1)
cmders, err := pipe.Exec()

python版本

with handle.pipeline(transaction=False) as p:
        p.lrange('key', 0, 200-1)
        p.ltrim('key', 200, -1)
        res = p.execute()
        items = res[0]

可以详细一点儿吗,是不是使用的客户端的问题,我用这个"github.com/go-redis/redis/v8"测试没有发现问题呀

@AlexStocks
Copy link
Collaborator

pipeline别用,有bug,例如以下语句,在并发执行的时候redis中正确执行,在pika中数据执行错乱。 go版本

pipe := client.Pipeline()
pipe.LRange("key", 0, 200-1)
pipe.LTrim("key", 200, -1)
cmders, err := pipe.Exec()

python版本

with handle.pipeline(transaction=False) as p:
        p.lrange('key', 0, 200-1)
        p.ltrim('key', 200, -1)
        res = p.execute()
        items = res[0]

可以详细一点儿吗,是不是使用的客户端的问题,我用这个"github.com/go-redis/redis/v8"测试没有发现问题呀

So long time that you maybe can not get a reply.

@ownthink
Copy link

ownthink commented May 4, 2023

用的这个。go get github.com/go-redis/redis

@ownthink
Copy link

ownthink commented May 4, 2023

单个测试没问题,在多协程中同时执行以下语句会有问题,测试的时候可以往pika队列里面写入1-10000000的数字字符串,然后1000个协程同时执行以下语句将获取到的值存入txt,对比txt是否是1-10000000。

pipe := client.Pipeline()
pipe.LRange("key", 0, 200-1)
pipe.LTrim("key", 200, -1)
cmders, err := pipe.Exec()

@ForestLH
Copy link
Contributor

ForestLH commented May 8, 2023

单个测试没问题,在多协程中同时执行以下语句会有问题,测试的时候可以往pika队列里面写入1-10000000的数字字符串,然后1000个协程同时执行以下语句将获取到的值存入txt,对比txt是否是1-10000000。

pipe := client.Pipeline()
pipe.LRange("key", 0, 200-1)
pipe.LTrim("key", 200, -1)
cmders, err := pipe.Exec()

我这样测试还是没有测试出来问题,你可以看看这样测试对不对吗,谢谢您了
https://github.com/ForestLH/goredisclient

@ownthink
Copy link

ownthink commented Jun 25, 2023

哈喽不好意思,到现在才来提交。往redis里写数据参考put.py,从redis里读数据参考get.go。已经提交到https://github.com/ForestLH/goredisclient。redis写入和读取数据一致,pika会多数据,不知道这样编写是否有错误哈,仅供参考。

@AlexStocks
Copy link
Collaborator

哈喽不好意思,到现在才来提交。往 redis 里写数据参考 put.py,从 redis 里读数据参考 get.go。已经提交到 https://github.com/ForestLH/goredisclient。redis 写入和读取数据一致,pika 会多数据,不知道这样编写是否有错误哈,仅供参考。

你能否加下 Pika 微信助手 PikiwiDB【加的时候请注明暗号 pika pipeline】,然后它会把你拉进 Pika 微信群,我们在微信群里讨论下这个问题?

@tedli
Copy link
Contributor

tedli commented Jun 27, 2023

nread = read(fd(), rbuf_ + next_read_pos, remain);
if (nread == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
nread = 0;
return kReadHalf; // HALF
} else {
// error happened, close client
return kReadError;
}
} else if (nread == 0) {
// client closed, close client
return kReadClose;
}
// assert(nread > 0);
last_read_pos_ += nread;
msg_peak_ = last_read_pos_;
command_len_ += nread;
if (command_len_ >= rbuf_max_len_) {
LOG(INFO) << "close conn command_len " << command_len_ << ", rbuf_max_len " << rbuf_max_len_;
return kFullError;
}
int processed_len = 0;
RedisParserStatus ret = redis_parser_.ProcessInputBuffer(rbuf_ + next_read_pos, nread, &processed_len);

89 行从网络 read 一次,得到 nread,112 行,把 nread 传给 redis parser。

RedisParserStatus RedisParser::ProcessInputBuffer(const char* input_buf, int length, int* parsed_len) {
if (status_code_ == kRedisParserInitDone || status_code_ == kRedisParserHalf || status_code_ == kRedisParserDone) {
// TODO(): AZ: avoid copy
std::string tmp_str(input_buf, length);
input_str_ = half_argv_ + tmp_str;
input_buf_ = input_str_.c_str();
length_ = length + half_argv_.size();
if (redis_parser_type_ == REDIS_PARSER_REQUEST) {
ProcessRequestBuffer();

parser 里逻辑,会判断上一次 read,有没有读“一半”的,有剩的一半,给拼到本次读的前面,然后进行对 request 进行 parse(解 cmd)

RedisParserStatus RedisParser::ProcessRequestBuffer() {
RedisParserStatus ret;
while (cur_pos_ <= length_ - 1) {

可以看到解的过程是个循环,也就是说会把读到的可完整解出来的,都解出来,然后就是给 worker 现成执行了。

但是问题是读到的,比如 pipeline 里命令很多的话,或者网络 io 并发高等等因素,导致一次 read 没有把所有 pipeline 里命令都读完(不然正常 read 函数也不承诺一口气都读完),则 pipeline 里的多个命令,可能会在不同的 worker 线程里执行,导致响应的顺序跟请求的顺序对不上。

@tedli
Copy link
Contributor

tedli commented Jun 27, 2023

还有一个点是 go-redis 的 pipeline 实现问题

https://github.com/redis/go-redis/blob/50f04c14dee344147e9cf061f752ee913fd6c3ee/redis.go#L497-L501

https://github.com/redis/go-redis/blob/50f04c14dee344147e9cf061f752ee913fd6c3ee/internal/proto/writer.go#L37-L53

https://github.com/redis/go-redis/blob/50f04c14dee344147e9cf061f752ee913fd6c3ee/command.go#L59-L61

可以看到,只是把 cmd 对象攒起来,然后循环 write 而已。

而看 redis 自己的 client pipeline 实现:

https://github.com/redis/redis/blob/22a29935ff7a354b8d90c110040f5bd0ea80068e/deps/hiredis/hiredis.c#L1106-L1117

可以看到 redis 自己的 client 的 pipeline 实现,攒 cmd 的逻辑是在拼 buffer,newbuf = hi_sdscatlen(c->obuf,cmd,len),这个 buffer 是最终一口气一次 write。

不过这个差别也不是硬伤,毕竟 write 也不承诺一口气都能写完。

@AlexStocks
Copy link
Collaborator

the oldest pika issue #19

@tedli
Copy link
Contributor

tedli commented Jun 28, 2023

问题分 2 个维度,1 是 pipeline 本身维度,2 是并发读写状态维度。

  1. pipeline 本身维度:
    少部分用户反馈的小概率遇到 pipeline 出错问题。
    原因是,redis 网络协议层面判断不出来是不是 pipeline,io 层面只是知道 read 到了若干 command。pika 成功解出来 command 就放 worker 线程里执行了,command 执行是并发的,会有几率导致 pipeline 里后执行的 command,先 response 了,如果 pipeline 里前后 command 的返回值类型不同,比如 lpushx 的响应是个 int,lrange 的响应是个 string[],这种先后顺序回乱了,反映到 client 端代码,就是报错(解响应失败)。这个情况,pipeline 串起来的命令越长越容易复现。

  2. 并发读写维度:
    @ownthink 反馈的问题。这个见 复现代码。这个可以认为是 by design。至于 redis 为啥不会有这个问题,因为 redis 是单线程实现的。

@AlexStocks
Copy link
Collaborator

AlexStocks commented Jul 7, 2023

        * 0429 李浩负责验证推进 
        * 0506 李浩继续跟进
        * 0520 提问者未回复,不再跟进
        * 0624 李浩继续跟进,预计两周后做完
        * 0701 转到tedli继续跟进
        * 0708 已提交 PR,待合并
        * 0715 需要在确认复现
        * 0805 金鸽跟进复现
        * 0813 跟李振对一下,继续推进复现

@wangshao1
Copy link
Collaborator

pika server解析出完整命令之后交由工作线程处理,然后会del对应connfd的读写事件,所以我理解在上一个请求处理完成向client回包之前,不会在收包,所以不会出现同一个连接上多个请求同时被工作线程池中的多个线程处理的情况。
但pika server不能保证多条conn的串行执行,所以可能跟客户端实现有关。即:多线程发起pipeline请求,最终每个线程的pipeline可能是由不同连接发送出去的。github.com/redis/go-redis/v9这里的客户端实现看起来就是这种情况。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants