Skip to content

Commit

Permalink
增加异步消费以及事件修复 fix: 44
Browse files Browse the repository at this point in the history
  • Loading branch information
nnhy committed Mar 29, 2022
1 parent 8c2a976 commit 564ae4f
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 5 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/publish-beta.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ name: publish-beta

on:
push:
branches: [ master ]
branches:
- master
paths:
- 'NewLife.RocketMQ/**'

jobs:
build-publish:
Expand All @@ -24,5 +27,5 @@ jobs:
dotnet pack --no-restore --version-suffix $(date "+%Y.%m%d-beta%H%M") -c Release -o out NewLife.RocketMQ/NewLife.RocketMQ.csproj
- name: Publish
run: |
dotnet nuget push ./out/*.nupkg --skip-duplicate --source https://nuget.pkg.github.com/NewLifeX/index.json --api-key ${{ github.token }}
# dotnet nuget push ./out/*.nupkg --skip-duplicate --source https://nuget.pkg.github.com/NewLifeX/index.json --api-key ${{ github.token }}
dotnet nuget push ./out/*.nupkg --skip-duplicate --source https://api.nuget.org/v3/index.json --api-key ${{ secrets.nugetKey }}
2 changes: 1 addition & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ jobs:
dotnet pack --no-restore --version-suffix $(date "+%Y.%m%d") -c Release -o out NewLife.RocketMQ/NewLife.RocketMQ.csproj
- name: Publish
run: |
dotnet nuget push ./out/*.nupkg --skip-duplicate --source https://nuget.pkg.github.com/NewLifeX/index.json --api-key ${{ github.token }}
# dotnet nuget push ./out/*.nupkg --skip-duplicate --source https://nuget.pkg.github.com/NewLifeX/index.json --api-key ${{ github.token }}
dotnet nuget push ./out/*.nupkg --skip-duplicate --source https://api.nuget.org/v3/index.json --api-key ${{ secrets.nugetKey }}
2 changes: 2 additions & 0 deletions NewLife.RocketMQ.sln
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{AF932CB1-8F4D-4317-8028-62F2CACEA9BD}"
ProjectSection(SolutionItems) = preProject
.editorconfig = .editorconfig
.github\workflows\publish-beta.yml = .github\workflows\publish-beta.yml
.github\workflows\publish.yml = .github\workflows\publish.yml
Readme.MD = Readme.MD
EndProjectSection
EndProject
Expand Down
14 changes: 12 additions & 2 deletions NewLife.RocketMQ/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using NewLife.Log;
using NewLife.Reflection;
using NewLife.RocketMQ.Client;
using NewLife.RocketMQ.Models;
using NewLife.RocketMQ.Protocol;
using NewLife.Serialization;
using NewLife.Threading;
Expand Down Expand Up @@ -48,6 +49,12 @@ public class Consumer : MqBase

/// <summary>消费委托</summary>
public Func<MessageQueue, MessageExt[], Boolean> OnConsume;

/// <summary>异步消费委托</summary>
public Func<MessageQueue, MessageExt[], Task<Boolean>> OnConsumeAsync;

/// <summary>消费事件</summary>
public event EventHandler<ConsumeEventArgs> Consumed;
#endregion

#region 构造
Expand Down Expand Up @@ -409,7 +416,7 @@ private async void DoPull(Object state)
if (pr.Messages != null && pr.Messages.Length > 0)
{
// 触发消费
var rs = Consume(mq, pr);
var rs = await Consume(mq, pr);

// 更新偏移
if (rs)
Expand Down Expand Up @@ -459,11 +466,14 @@ private async void DoPull(Object state)
/// <param name="queue"></param>
/// <param name="result"></param>
/// <returns></returns>
protected virtual Boolean Consume(MessageQueue queue, PullResult result)
protected virtual async Task<Boolean> Consume(MessageQueue queue, PullResult result)
{
if (Log != null && Log.Level <= LogLevel.Debug) WriteLog("{0}", result);

Consumed?.Invoke(this, new ConsumeEventArgs { Queue = queue, Messages = result.Messages, Result = result });

if (OnConsume != null) return OnConsume(queue, result.Messages);
if (OnConsumeAsync != null) return await OnConsumeAsync(queue, result.Messages);

return true;
}
Expand Down
17 changes: 17 additions & 0 deletions NewLife.RocketMQ/Models/ConsumeEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using NewLife.RocketMQ.Protocol;

namespace NewLife.RocketMQ.Models
{
/// <summary>消费事件参数</summary>
public class ConsumeEventArgs : EventArgs
{
/// <summary>队列</summary>
public MessageQueue Queue { get; set; }

/// <summary>消息集合</summary>
public MessageExt[] Messages { get; set; }

/// <summary>结果</summary>
public PullResult Result { get; set; }
}
}

0 comments on commit 564ae4f

Please sign in to comment.