diff --git a/.github/workflows/publish-beta.yml b/.github/workflows/publish-beta.yml index 35d0cda..144e4c7 100644 --- a/.github/workflows/publish-beta.yml +++ b/.github/workflows/publish-beta.yml @@ -2,7 +2,10 @@ name: publish-beta on: push: - branches: [ master ] + branches: + - master + paths: + - 'NewLife.RocketMQ/**' jobs: build-publish: @@ -24,5 +27,5 @@ jobs: dotnet pack --no-restore --version-suffix $(date "+%Y.%m%d-beta%H%M") -c Release -o out NewLife.RocketMQ/NewLife.RocketMQ.csproj - name: Publish run: | - dotnet nuget push ./out/*.nupkg --skip-duplicate --source https://nuget.pkg.github.com/NewLifeX/index.json --api-key ${{ github.token }} + # dotnet nuget push ./out/*.nupkg --skip-duplicate --source https://nuget.pkg.github.com/NewLifeX/index.json --api-key ${{ github.token }} dotnet nuget push ./out/*.nupkg --skip-duplicate --source https://api.nuget.org/v3/index.json --api-key ${{ secrets.nugetKey }} diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index d70bea2..8aeb9e6 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -24,5 +24,5 @@ jobs: dotnet pack --no-restore --version-suffix $(date "+%Y.%m%d") -c Release -o out NewLife.RocketMQ/NewLife.RocketMQ.csproj - name: Publish run: | - dotnet nuget push ./out/*.nupkg --skip-duplicate --source https://nuget.pkg.github.com/NewLifeX/index.json --api-key ${{ github.token }} + # dotnet nuget push ./out/*.nupkg --skip-duplicate --source https://nuget.pkg.github.com/NewLifeX/index.json --api-key ${{ github.token }} dotnet nuget push ./out/*.nupkg --skip-duplicate --source https://api.nuget.org/v3/index.json --api-key ${{ secrets.nugetKey }} diff --git a/NewLife.RocketMQ.sln b/NewLife.RocketMQ.sln index 3abc838..c9e1807 100644 --- a/NewLife.RocketMQ.sln +++ b/NewLife.RocketMQ.sln @@ -12,6 +12,8 @@ EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{AF932CB1-8F4D-4317-8028-62F2CACEA9BD}" ProjectSection(SolutionItems) = preProject .editorconfig = .editorconfig + .github\workflows\publish-beta.yml = .github\workflows\publish-beta.yml + .github\workflows\publish.yml = .github\workflows\publish.yml Readme.MD = Readme.MD EndProjectSection EndProject diff --git a/NewLife.RocketMQ/Consumer.cs b/NewLife.RocketMQ/Consumer.cs index f2f678d..7529890 100644 --- a/NewLife.RocketMQ/Consumer.cs +++ b/NewLife.RocketMQ/Consumer.cs @@ -10,6 +10,7 @@ using NewLife.Log; using NewLife.Reflection; using NewLife.RocketMQ.Client; +using NewLife.RocketMQ.Models; using NewLife.RocketMQ.Protocol; using NewLife.Serialization; using NewLife.Threading; @@ -48,6 +49,12 @@ public class Consumer : MqBase /// 消费委托 public Func OnConsume; + + /// 异步消费委托 + public Func> OnConsumeAsync; + + /// 消费事件 + public event EventHandler Consumed; #endregion #region 构造 @@ -409,7 +416,7 @@ private async void DoPull(Object state) if (pr.Messages != null && pr.Messages.Length > 0) { // 触发消费 - var rs = Consume(mq, pr); + var rs = await Consume(mq, pr); // 更新偏移 if (rs) @@ -459,11 +466,14 @@ private async void DoPull(Object state) /// /// /// - protected virtual Boolean Consume(MessageQueue queue, PullResult result) + protected virtual async Task Consume(MessageQueue queue, PullResult result) { if (Log != null && Log.Level <= LogLevel.Debug) WriteLog("{0}", result); + Consumed?.Invoke(this, new ConsumeEventArgs { Queue = queue, Messages = result.Messages, Result = result }); + if (OnConsume != null) return OnConsume(queue, result.Messages); + if (OnConsumeAsync != null) return await OnConsumeAsync(queue, result.Messages); return true; } diff --git a/NewLife.RocketMQ/Models/ConsumeEventArgs.cs b/NewLife.RocketMQ/Models/ConsumeEventArgs.cs new file mode 100644 index 0000000..a82b1cd --- /dev/null +++ b/NewLife.RocketMQ/Models/ConsumeEventArgs.cs @@ -0,0 +1,17 @@ +using NewLife.RocketMQ.Protocol; + +namespace NewLife.RocketMQ.Models +{ + /// 消费事件参数 + public class ConsumeEventArgs : EventArgs + { + /// 队列 + public MessageQueue Queue { get; set; } + + /// 消息集合 + public MessageExt[] Messages { get; set; } + + /// 结果 + public PullResult Result { get; set; } + } +} \ No newline at end of file