Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CAP Processor triggers the Command Timeout error (MySql) when the consumer takes time to deal with it #68

Closed
ajdwfnhaps opened this issue Dec 8, 2017 · 18 comments

Comments

@ajdwfnhaps
Copy link

commented Dec 8, 2017

重现方法:
1.CAP配置:

services.AddCap(x =>
            {
                x.QueueProcessorCount = 15;
                ……
            });

2.消费者代码:
```
[CapSubscribe("xxx.services.account.check")]
public async Task CheckReceivedMessage(Person person)
{
System.Threading.Thread.Sleep(5000);
return Task.CompletedTask;
}



异常信息:

[2017-12-08 10:05:37.0894] [] [WARN ] [DotNetCore.CAP.Processor.InfiniteRetryProcessor]:
Processor 'DotNetCore.CAP.Processor.DefaultDispatcher' failed. Retrying...
MySql.Data.MySqlClient.MySqlException (0x80004005): The Command Timeout expired before the operation completed. ---> MySql.Data.MySqlClient.MySqlException (0x80004005): The Command Timeout expired before the operation completed.
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredValueTaskAwaitable1.ConfiguredValueTaskAwaiter.GetResult() at MySqlConnector.Protocol.Serialization.BufferedByteReader.<ReadBytesAsync>d__2.MoveNext() in C:\projects\mysqlconnector\src\MySqlConnector\Protocol\Serialization\BufferedByteReader.cs:line 37 --- End of stack trace from previous location where exception was thrown --- at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Threading.Tasks.ValueTask1.get_Result()
at MySqlConnector.Protocol.Serialization.ProtocolUtility.ReadPacketAsync(BufferedByteReader bufferedByteReader, IByteHandler byteHandler, Func1 getNextSequenceNumber, ProtocolErrorBehavior protocolErrorBehavior, IOBehavior ioBehavior) in C:\projects\mysqlconnector\src\MySqlConnector\Protocol\Serialization\ProtocolUtility.cs:line 405 at MySqlConnector.Protocol.Serialization.ProtocolUtility.DoReadPayloadAsync(BufferedByteReader bufferedByteReader, IByteHandler byteHandler, Func1 getNextSequenceNumber, ArraySegmentHolder1 previousPayloads, ProtocolErrorBehavior protocolErrorBehavior, IOBehavior ioBehavior) in C:\projects\mysqlconnector\src\MySqlConnector\Protocol\Serialization\ProtocolUtility.cs:line 459 at MySqlConnector.Protocol.Serialization.StandardPayloadHandler.ReadPayloadAsync(ArraySegmentHolder1 cache, ProtocolErrorBehavior protocolErrorBehavior, IOBehavior ioBehavior) in C:\projects\mysqlconnector\src\MySqlConnector\Protocol\Serialization\StandardPayloadHandler.cs:line 37
at MySqlConnector.Core.ServerSession.ReceiveReplyAsync(IOBehavior ioBehavior, CancellationToken cancellationToken) in C:\projects\mysqlconnector\src\MySqlConnector\Core\ServerSession.cs:line 557
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at MySqlConnector.Core.ServerSession.TryAsyncContinuation(Task1 task) in C:\projects\mysqlconnector\src\MySqlConnector\Core\ServerSession.cs:line 1014 at System.Threading.Tasks.ContinuationResultTaskFromResultTask2.InnerInvoke()
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot)
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredValueTaskAwaitable1.ConfiguredValueTaskAwaiter.GetResult() at MySqlConnector.Core.ResultSet.<ReadResultSetHeaderAsync>d__1.MoveNext() in C:\projects\mysqlconnector\src\MySqlConnector\Core\ResultSet.cs:line 43 at MySql.Data.MySqlClient.MySqlDataReader.ActivateResultSet(ResultSet resultSet) in C:\projects\mysqlconnector\src\MySqlConnector\MySql.Data.MySqlClient\MySqlDataReader.cs:line 92 at MySql.Data.MySqlClient.MySqlDataReader.<ReadFirstResultSetAsync>d__65.MoveNext() in C:\projects\mysqlconnector\src\MySqlConnector\MySql.Data.MySqlClient\MySqlDataReader.cs:line 297 --- End of stack trace from previous location where exception was thrown --- at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at MySql.Data.MySqlClient.MySqlDataReader.<CreateAsync>d__64.MoveNext() in C:\projects\mysqlconnector\src\MySqlConnector\MySql.Data.MySqlClient\MySqlDataReader.cs:line 287 --- End of stack trace from previous location where exception was thrown --- at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at MySqlConnector.Core.TextCommandExecutor.<ExecuteReaderAsync>d__3.MoveNext() in C:\projects\mysqlconnector\src\MySqlConnector\Core\TextCommandExecutor.cs:line 73 --- End of stack trace from previous location where exception was thrown --- at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at MySql.Data.MySqlClient.MySqlCommand.ExecuteDbDataReader(CommandBehavior behavior) in C:\projects\mysqlconnector\src\MySqlConnector\MySql.Data.MySqlClient\MySqlCommand.cs:line 168 at System.Data.Common.DbCommand.System.Data.IDbCommand.ExecuteReader(CommandBehavior behavior) at Dapper.SqlMapper.ExecuteReaderWithFlagsFallback(IDbCommand cmd, Boolean wasClosed, CommandBehavior behavior) at Dapper.SqlMapper.ExecuteReaderImpl(IDbConnection cnn, CommandDefinition& command, CommandBehavior commandBehavior, IDbCommand& cmd) at Dapper.SqlMapper.ExecuteReader(IDbConnection cnn, String sql, Object param, IDbTransaction transaction, Nullable1 commandTimeout, Nullable1 commandType) at DotNetCore.CAP.MySql.MySqlStorageConnection.<FetchNextMessageCoreAsync>d__19.MoveNext() in D:\Work\aimei_finance\trunk\code\aimei_finance\CAP\DotNetCore.CAP.MySql\MySqlStorageConnection.cs:line 179 --- End of stack trace from previous location where exception was thrown --- at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter1.GetResult()
at DotNetCore.CAP.Processor.DefaultDispatcher.d__11.MoveNext() in D:\Work\aimei_finance\trunk\code\aimei_finance\CAP\DotNetCore.CAP\Processor\IDispatcher.Default.cs:line 71
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
at DotNetCore.CAP.Processor.DefaultDispatcher.d__9.MoveNext() in D:\Work\aimei_finance\trunk\code\aimei_finance\CAP\DotNetCore.CAP\Processor\IDispatcher.Default.cs:line 40
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at DotNetCore.CAP.Processor.InfiniteRetryProcessor.d__3.MoveNext() in D:\Work\aimei_finance\trunk\code\aimei_finance\CAP\DotNetCore.CAP\Processor\IProcessor.InfiniteRetry.cs:line 25

@ajdwfnhaps

This comment has been minimized.

Copy link
Author

commented Dec 8, 2017

SELECT MessageId,MessageTypeFROM{_prefix}.queue LIMIT 1 FOR UPDATE;

不确定是否这个sql语句在等待锁时导致的错误,但是按以上的消费者代码及cap配置的确抛出了以上的错误信息,不知道我有没有把问题描述清楚了?

@ajdwfnhaps

This comment has been minimized.

Copy link
Author

commented Dec 8, 2017

PS: 前提条件-消息生产者生产了大量消息
for(var i=0;i<10000;i++)
{
publisher.Publish("xxx.services.account.check", new Person { Name = "Foo", Age = 11 });
}

@ajdwfnhaps

This comment has been minimized.

Copy link
Author

commented Dec 8, 2017

消费者执行中时,查询这个语句会一直等待锁直到超时,是否这个语句导致表锁了,而不是行锁。

查询资料:

由于InnoDB 预设是Row-Level Lock,所以只有「明确」的指定主键,MySQL 才会执行Row lock (只锁住被选取的数据) ,否则MySQL 将会执行Table Lock (将整个数据表单给锁住)。

image

如果优化语句变为行锁,能否减低这种错误的发生?

@yang-xiaodong

This comment has been minimized.

Copy link
Member

commented Dec 8, 2017

@ajdwfnhaps

这个我不是很确定你的问题是这个原因导致的,你可以试试添加一个联合主键到 Queue 表看还会不会出现这个情况。

I'm not sure that your question is caused by this, but you can try create a composite primary keys to see if it's going to happen.

TRUNCATE TABLE `cap.queue`;   -- 清空表数据
ALTER TABLE `cap.queue` ADD PRIMARY KEY (MessageId, MessageType); -- 添加联合主键
@ajdwfnhaps

This comment has been minimized.

Copy link
Author

commented Dec 8, 2017

Hi @yuleyule66

我添加了联合主键,刚刚测试过了,还会报同样的错误。

我的理解是:
首先CAP有个Processor会不断轮循cap.queue表,去处理表里的消息,

当触发DotNetCore.CAP.MySql.MySqlStorageConnection.FetchNextMessageCoreAsync方法时,
会开始一个事务,执行了以下的语句

 var sql = $@"
SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` LIMIT 1 FOR UPDATE;
DELETE FROM `{_prefix}.queue` LIMIT 1;";

这时事务并未立即提交,是要等到消费者订阅方法处理成功后,才提交事务,才会释放锁表操作。

现在的疑问是:
当我设置QueueProcessorCount = 5,
第一个消费者线程在执行订阅方法时,由于太耗时,会一直锁住cap.queue表,
当后续线程陆续触发FetchNextMessageCoreAsync方法时,会一直等待表锁,直到db command timeout,
然后就一直不断抛出超时错误,程序没办法正常运行,除非手动关闭程序,再重新启动,然后这样不断循环。

想知道您觉得是否因为这样的原因导致这个错误发生的呢?

@yang-xiaodong

This comment has been minimized.

Copy link
Member

commented Dec 8, 2017

是这个原因导致的。
所以 MySQL 在消费消息的时候性能差,就是因为 MySql 不支持 Skip Locked 语法来跳过被锁定的行。SQL Server, PostgreSQL , Oracle 均有相关语法支持。

MySQL 直到 8.0+ 才提供 Skip Locked 的支持

@polsnet

This comment has been minimized.

Copy link

commented Dec 8, 2017

已经在用MySql8.0了,是否考虑升级下代码,或者做个兼容。
测试过程中同样遇到类似错误,有多个线程在检测cap.queue表数据时,出现超时错误。
还有时报这个https://github.com/dotnetcore/CAP/issues/63,但在MQ中取消绑定仍然会出现,清空cap.received表后不再报错。

@suxi

This comment has been minimized.

Copy link

commented Dec 11, 2017

mysql的for update会造成cap.queue表被锁定,下一句的for update需要等待,我觉得是不是可以有其他方法,绕过这个性能问题

@ajdwfnhaps

This comment has been minimized.

Copy link
Author

commented Dec 11, 2017

for update获取得了排它锁,未释放前下一句for update一直在等待,使用Mysql旧版本(不支持skip locked)性能真的很有问题,不仅是消费时,发布消息同样有这个问题。

@yuleyule66 不知道能否在mysql 实现时,不使用排它锁,cap.queue增加个字段status, 如果处理中就更新为processing, 然后FetchNextMessageAsync时就查询不在状态processing的记录,

@yang-xiaodong

This comment has been minimized.

Copy link
Member

commented Dec 11, 2017

@ajdwfnhaps 可以考虑你的建议,待我衡量一下,这两天比较忙。

@ajdwfnhaps

This comment has been minimized.

Copy link
Author

commented Dec 11, 2017

@yuleyule66 感谢大大

@suxi

This comment has been minimized.

Copy link

commented Dec 11, 2017

@ajdwfnhaps 只要queue的指令不commit,排他锁这个问题没有办法解决, limit 1也会触发.我在想,不用queue的话会有哪些问题?

@ajdwfnhaps

This comment has been minimized.

Copy link
Author

commented Dec 11, 2017

@suxi  不加for update 的select ... limit 1应该不会获得排他锁的啊,没有X锁,FetchNextMessageAsync时就不用再等待了,问题是不用排他锁,能否有什么好的办法可以确保cap.queue里的消息不会被多线程实例重复消费的问题

@suxi

This comment has been minimized.

Copy link

commented Dec 11, 2017

@ajdwfnhaps 消息里面有messageid,可以一定程度避免多发,outbox模式本身没有这个queue的必要性. 关于锁,你可以试试看.我现在准备跳过这个queue,直接发送message

yang-xiaodong added a commit that referenced this issue Dec 17, 2017
Because of deadlock and performance issues, mysql does not use transa…
…ctions when consuming messages, and requeue the message to queue table when they exception. (#36,#68)
@yang-xiaodong

This comment has been minimized.

Copy link
Member

commented Dec 17, 2017

@ajdwfnhaps 这个问题解决了。

2.1.2-preview-30288174 版本取消了mysql消费时候的事务转而采用的是当失败的时候重新将消息写入队列,成功后删除队列中的消息。

@ajdwfnhaps

This comment has been minimized.

Copy link
Author

commented Dec 17, 2017

收到,太好了,我明天再测试测试,感谢

@ajdwfnhaps

This comment has been minimized.

Copy link
Author

commented Dec 18, 2017

@yuleyule66 棒极了,刚测试了一遍,一切正常,没有了整个事务的锁,性能提升了一倍。感谢作者~~

@yang-xiaodong

This comment has been minimized.

Copy link
Member

commented Dec 19, 2017

Fixed in version 2.1.2

@yang-xiaodong yang-xiaodong changed the title 当消费者进行耗时处理时,CAP Processor 会引发the Command Timeout expired错误(MySql ) CAP Processor triggers the Command Timeout error (MySql) when the consumer takes time to deal with it Dec 21, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.