diff --git a/src/Infrastructure/BotSharp.Abstraction/Conversations/ConversationHookBase.cs b/src/Infrastructure/BotSharp.Abstraction/Conversations/ConversationHookBase.cs index 0a5244f89..855bd79e0 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Conversations/ConversationHookBase.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Conversations/ConversationHookBase.cs @@ -49,10 +49,10 @@ public virtual Task OnTaskCompleted(RoleDialogModel message) public virtual Task OnHumanInterventionNeeded(RoleDialogModel message) => Task.CompletedTask; - public virtual Task OnFunctionExecuting(RoleDialogModel message, string from = InvokeSource.Manual) + public virtual Task OnFunctionExecuting(RoleDialogModel message, InvokeFunctionOptions? options = null) => Task.CompletedTask; - public virtual Task OnFunctionExecuted(RoleDialogModel message, string from = InvokeSource.Manual) + public virtual Task OnFunctionExecuted(RoleDialogModel message, InvokeFunctionOptions? options = null) => Task.CompletedTask; public virtual Task OnMessageReceived(RoleDialogModel message) diff --git a/src/Infrastructure/BotSharp.Abstraction/Conversations/Dtos/ChatResponseDto.cs b/src/Infrastructure/BotSharp.Abstraction/Conversations/Dtos/ChatResponseDto.cs index 19b5e8b28..9e4fcb417 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Conversations/Dtos/ChatResponseDto.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Conversations/Dtos/ChatResponseDto.cs @@ -32,6 +32,10 @@ public class ChatResponseDto : InstructResult [JsonPropertyName("payload")] public string? Payload { get; set; } + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + [JsonPropertyName("indication")] + public string? Indication { get; set; } + [JsonPropertyName("has_message_files")] public bool HasMessageFiles { get; set; } diff --git a/src/Infrastructure/BotSharp.Abstraction/Conversations/Enums/ChatEvent.cs b/src/Infrastructure/BotSharp.Abstraction/Conversations/Enums/ChatEvent.cs new file mode 100644 index 000000000..0d68fef7d --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Conversations/Enums/ChatEvent.cs @@ -0,0 +1,22 @@ +namespace BotSharp.Abstraction.Conversations.Enums; + +public static class ChatEvent +{ + public const string OnConversationInitFromClient = nameof(OnConversationInitFromClient); + public const string OnMessageReceivedFromClient = nameof(OnMessageReceivedFromClient); + public const string OnMessageReceivedFromAssistant = nameof(OnMessageReceivedFromAssistant); + + public const string OnMessageDeleted = nameof(OnMessageDeleted); + public const string OnNotificationGenerated = nameof(OnNotificationGenerated); + public const string OnIndicationReceived = nameof(OnIndicationReceived); + + public const string OnConversationContentLogGenerated = nameof(OnConversationContentLogGenerated); + public const string OnConversateStateLogGenerated = nameof(OnConversateStateLogGenerated); + public const string OnAgentQueueChanged = nameof(OnAgentQueueChanged); + public const string OnStateChangeGenerated = nameof(OnStateChangeGenerated); + + public const string BeforeReceiveLlmStreamMessage = nameof(BeforeReceiveLlmStreamMessage); + public const string OnReceiveLlmStreamMessage = nameof(OnReceiveLlmStreamMessage); + public const string AfterReceiveLlmStreamMessage = nameof(AfterReceiveLlmStreamMessage); + public const string OnSenderActionGenerated = nameof(OnSenderActionGenerated); +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationHook.cs b/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationHook.cs index d1b6f83c8..5a1d8aa28 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationHook.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationHook.cs @@ -61,7 +61,7 @@ public interface IConversationHook : IHookBase /// /// /// - Task OnFunctionExecuting(RoleDialogModel message, string from = InvokeSource.Manual); + Task OnFunctionExecuting(RoleDialogModel message, InvokeFunctionOptions? options = null); /// /// Triggered when the function calling completed. @@ -69,7 +69,7 @@ public interface IConversationHook : IHookBase /// /// /// - Task OnFunctionExecuted(RoleDialogModel message, string from = InvokeSource.Manual); + Task OnFunctionExecuted(RoleDialogModel message, InvokeFunctionOptions? options = null); Task OnResponseGenerated(RoleDialogModel message); diff --git a/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationProgressService.cs b/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationProgressService.cs deleted file mode 100644 index 8958cab53..000000000 --- a/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationProgressService.cs +++ /dev/null @@ -1,10 +0,0 @@ -namespace BotSharp.Abstraction.Conversations; - -public delegate Task FunctionExecuting(RoleDialogModel msg); -public delegate Task FunctionExecuted(RoleDialogModel msg); - -public interface IConversationProgressService -{ - FunctionExecuted OnFunctionExecuted { get; set; } - FunctionExecuting OnFunctionExecuting { get; set; } -} diff --git a/src/Infrastructure/BotSharp.Abstraction/Conversations/Models/ConversationSenderActionModel.cs b/src/Infrastructure/BotSharp.Abstraction/Conversations/Models/ConversationSenderActionModel.cs index b49fc55db..502cca4c2 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Conversations/Models/ConversationSenderActionModel.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Conversations/Models/ConversationSenderActionModel.cs @@ -1,5 +1,3 @@ -using BotSharp.Abstraction.Messaging.Enums; - namespace BotSharp.Abstraction.Conversations.Models; public class ConversationSenderActionModel diff --git a/src/Infrastructure/BotSharp.Abstraction/Conversations/Models/RoleDialogModel.cs b/src/Infrastructure/BotSharp.Abstraction/Conversations/Models/RoleDialogModel.cs index 6a8af2d62..6bc82f81e 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Conversations/Models/RoleDialogModel.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Conversations/Models/RoleDialogModel.cs @@ -120,7 +120,7 @@ public class RoleDialogModel : ITrackableMessage [JsonIgnore(Condition = JsonIgnoreCondition.Always)] public bool IsStreaming { get; set; } - private RoleDialogModel() + public RoleDialogModel() { } diff --git a/src/Infrastructure/BotSharp.Abstraction/MessageHub/Models/HubObserveData.cs b/src/Infrastructure/BotSharp.Abstraction/MessageHub/Models/HubObserveData.cs new file mode 100644 index 000000000..d345ab86d --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/MessageHub/Models/HubObserveData.cs @@ -0,0 +1,6 @@ +namespace BotSharp.Abstraction.MessageHub.Models; + +public class HubObserveData : ObserveDataBase where TData : class, new() +{ + public TData Data { get; set; } = null!; +} diff --git a/src/Infrastructure/BotSharp.Abstraction/MessageHub/Models/ObserveDataBase.cs b/src/Infrastructure/BotSharp.Abstraction/MessageHub/Models/ObserveDataBase.cs new file mode 100644 index 000000000..c7d5c0f6e --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/MessageHub/Models/ObserveDataBase.cs @@ -0,0 +1,7 @@ +namespace BotSharp.Abstraction.MessageHub.Models; + +public class ObserveDataBase +{ + public string EventName { get; set; } = null!; + public string RefId { get; set; } = null!; +} diff --git a/src/Infrastructure/BotSharp.Abstraction/MessageHub/Models/ObserverSubscription.cs b/src/Infrastructure/BotSharp.Abstraction/MessageHub/Models/ObserverSubscription.cs new file mode 100644 index 000000000..0ba755313 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/MessageHub/Models/ObserverSubscription.cs @@ -0,0 +1,28 @@ +using BotSharp.Abstraction.MessageHub.Observers; + +namespace BotSharp.Abstraction.MessageHub.Models; + +public class ObserverSubscription +{ + public IBotSharpObserver Observer { get; set; } + public IDisposable Subscription { get; set; } + + public ObserverSubscription() + { + + } + + public ObserverSubscription( + IBotSharpObserver observer, + IDisposable subscription) + { + Observer = observer; + Subscription = subscription; + } + + public void UnSubscribe() + { + Observer.Deactivate(); + Subscription.Dispose(); + } +} diff --git a/src/Infrastructure/BotSharp.Abstraction/MessageHub/Observers/BotSharpObserverBase.cs b/src/Infrastructure/BotSharp.Abstraction/MessageHub/Observers/BotSharpObserverBase.cs new file mode 100644 index 000000000..1381ea030 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/MessageHub/Observers/BotSharpObserverBase.cs @@ -0,0 +1,48 @@ + +namespace BotSharp.Abstraction.MessageHub.Observers; + +public abstract class BotSharpObserverBase : IBotSharpObserver +{ + private bool _active = false; + protected Dictionary> _listeners = []; + + protected BotSharpObserverBase() + { + + } + + public virtual string Name => string.Empty; + + public virtual bool Active => _active; + + public virtual void Activate() + { + _active = true; + } + + public virtual void Deactivate() + { + _active = false; + _listeners = []; + } + + public virtual void SetEventListeners(Dictionary> listeners) + { + _listeners = listeners; + } + + public virtual void OnCompleted() + { + + } + + public virtual void OnError(Exception error) + { + + } + + public virtual void OnNext(T value) + { + + } +} diff --git a/src/Infrastructure/BotSharp.Abstraction/MessageHub/Observers/IBotSharpObserver.cs b/src/Infrastructure/BotSharp.Abstraction/MessageHub/Observers/IBotSharpObserver.cs new file mode 100644 index 000000000..158ef4a6a --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/MessageHub/Observers/IBotSharpObserver.cs @@ -0,0 +1,11 @@ +namespace BotSharp.Abstraction.MessageHub.Observers; + +public interface IBotSharpObserver : IObserver +{ + string Name { get; } + bool Active { get; } + + void SetEventListeners(Dictionary> listeners); + void Activate(); + void Deactivate(); +} diff --git a/src/Infrastructure/BotSharp.Abstraction/MessageHub/Services/IObserverService.cs b/src/Infrastructure/BotSharp.Abstraction/MessageHub/Services/IObserverService.cs new file mode 100644 index 000000000..582849aa8 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/MessageHub/Services/IObserverService.cs @@ -0,0 +1,13 @@ +using BotSharp.Abstraction.MessageHub.Models; + +namespace BotSharp.Abstraction.MessageHub.Services; + +public interface IObserverService +{ + IDisposable SubscribeObservers( + string refId, + IEnumerable? names = null, + Dictionary>? listeners = null) where T : ObserveDataBase; + + void UnSubscribeObservers(IEnumerable? names = null) where T : ObserveDataBase; +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Observables/Models/HubObserveData.cs b/src/Infrastructure/BotSharp.Abstraction/Observables/Models/HubObserveData.cs deleted file mode 100644 index bf771cb4d..000000000 --- a/src/Infrastructure/BotSharp.Abstraction/Observables/Models/HubObserveData.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace BotSharp.Abstraction.Observables.Models; - -public class HubObserveData : ObserveDataBase -{ - public string EventName { get; set; } = null!; - public RoleDialogModel Data { get; set; } = null!; -} diff --git a/src/Infrastructure/BotSharp.Abstraction/Observables/Models/ObserveDataBase.cs b/src/Infrastructure/BotSharp.Abstraction/Observables/Models/ObserveDataBase.cs deleted file mode 100644 index 177732726..000000000 --- a/src/Infrastructure/BotSharp.Abstraction/Observables/Models/ObserveDataBase.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace BotSharp.Abstraction.Observables.Models; - -public abstract class ObserveDataBase -{ - public IServiceProvider ServiceProvider { get; set; } = null!; -} diff --git a/src/Infrastructure/BotSharp.Abstraction/Routing/IRoutingService.cs b/src/Infrastructure/BotSharp.Abstraction/Routing/IRoutingService.cs index 56c98a824..d387fab16 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Routing/IRoutingService.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Routing/IRoutingService.cs @@ -26,8 +26,8 @@ public interface IRoutingService /// RoutingRule[] GetRulesByAgentId(string id); - Task InvokeAgent(string agentId, List dialogs, string from = InvokeSource.Manual, bool useStream = false); - Task InvokeFunction(string name, RoleDialogModel messages, string from = InvokeSource.Manual); + Task InvokeAgent(string agentId, List dialogs, InvokeAgentOptions? options = null); + Task InvokeFunction(string name, RoleDialogModel messages, InvokeFunctionOptions? options = null); Task InstructLoop(Agent agent, RoleDialogModel message, List dialogs); /// diff --git a/src/Infrastructure/BotSharp.Abstraction/Routing/Models/InvokeOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Routing/Models/InvokeOptions.cs new file mode 100644 index 000000000..5e0317872 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Routing/Models/InvokeOptions.cs @@ -0,0 +1,31 @@ +namespace BotSharp.Abstraction.Routing.Models; + +public abstract class InvokeOptions +{ + public string From { get; set; } +} + +public class InvokeAgentOptions : InvokeOptions +{ + public bool UseStream { get; set; } + + public static InvokeAgentOptions Default() + { + return new() + { + From = InvokeSource.Manual, + UseStream = false + }; + } +} + +public class InvokeFunctionOptions : InvokeOptions +{ + public static InvokeFunctionOptions Default() + { + return new() + { + From = InvokeSource.Manual + }; + } +} \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Abstraction/SideCar/Attributes/SideCarAttribute.cs b/src/Infrastructure/BotSharp.Abstraction/SideCar/Attributes/SideCarAttribute.cs index 4426d71f9..1d93dcbaa 100644 --- a/src/Infrastructure/BotSharp.Abstraction/SideCar/Attributes/SideCarAttribute.cs +++ b/src/Infrastructure/BotSharp.Abstraction/SideCar/Attributes/SideCarAttribute.cs @@ -23,7 +23,12 @@ public override async ValueTask OnEntryAsync(MethodContext context) var instance = context.Target; var retType = context.ReturnType; - var serviceProvider = ((IHaveServiceProvider)instance).ServiceProvider; + var serviceProvider = (instance as IHaveServiceProvider)?.ServiceProvider; + if (serviceProvider == null) + { + return; + } + var (sidecar, sidecarMethod) = GetSideCarMethod(serviceProvider, methodName, retType, methodArgs); if (sidecar == null || sidecarMethod == null) { diff --git a/src/Infrastructure/BotSharp.Abstraction/SideCar/Models/SideCarOptions.cs b/src/Infrastructure/BotSharp.Abstraction/SideCar/Models/SideCarOptions.cs index f221cab20..bc6609c09 100644 --- a/src/Infrastructure/BotSharp.Abstraction/SideCar/Models/SideCarOptions.cs +++ b/src/Infrastructure/BotSharp.Abstraction/SideCar/Models/SideCarOptions.cs @@ -3,19 +3,23 @@ namespace BotSharp.Abstraction.SideCar.Models; public class SideCarOptions { public bool IsInheritStates { get; set; } - public IEnumerable? InheritStateKeys { get; set; } + public HashSet? InheritStateKeys { get; set; } + public HashSet? ExcludedStateKeys { get; set; } public static SideCarOptions Empty() { return new(); } - public static SideCarOptions InheritStates(IEnumerable? targetStates = null) + public static SideCarOptions InheritStates( + HashSet? includedStates = null, + HashSet? excludedStates = null) { return new() { IsInheritStates = true, - InheritStateKeys = targetStates + InheritStateKeys = includedStates, + ExcludedStateKeys = excludedStates }; } } diff --git a/src/Infrastructure/BotSharp.Core.Realtime/Hooks/RealtimeConversationHook.cs b/src/Infrastructure/BotSharp.Core.Realtime/Hooks/RealtimeConversationHook.cs index 095ff2b8f..07b8646cb 100644 --- a/src/Infrastructure/BotSharp.Core.Realtime/Hooks/RealtimeConversationHook.cs +++ b/src/Infrastructure/BotSharp.Core.Realtime/Hooks/RealtimeConversationHook.cs @@ -11,7 +11,7 @@ public RealtimeConversationHook(IServiceProvider services) _services = services; } - public async Task OnFunctionExecuting(RoleDialogModel message, string from = InvokeSource.Manual) + public async Task OnFunctionExecuting(RoleDialogModel message, InvokeFunctionOptions? options = null) { var hub = _services.GetRequiredService(); if (hub.HubConn == null) @@ -32,10 +32,10 @@ public async Task OnFunctionExecuting(RoleDialogModel message, string from = Inv } } - public async Task OnFunctionExecuted(RoleDialogModel message, string from = InvokeSource.Manual) + public async Task OnFunctionExecuted(RoleDialogModel message, InvokeFunctionOptions? options = null) { var hub = _services.GetRequiredService(); - if (from != InvokeSource.Llm || hub.HubConn == null) + if (options?.From != InvokeSource.Llm || hub.HubConn == null) { return; } diff --git a/src/Infrastructure/BotSharp.Core.Realtime/Services/RealtimeHub.cs b/src/Infrastructure/BotSharp.Core.Realtime/Services/RealtimeHub.cs index fe544b4d2..0ff601e1e 100644 --- a/src/Infrastructure/BotSharp.Core.Realtime/Services/RealtimeHub.cs +++ b/src/Infrastructure/BotSharp.Core.Realtime/Services/RealtimeHub.cs @@ -99,7 +99,7 @@ await HookEmitter.Emit(_services, async hook => await hook.OnRouti agent.Id); } - await routing.InvokeFunction(message.FunctionName, message, from: InvokeSource.Llm); + await routing.InvokeFunction(message.FunctionName, message, options: new() { From = InvokeSource.Llm }); } else { diff --git a/src/Infrastructure/BotSharp.Core.SideCar/Services/BotSharpConversationSideCar.cs b/src/Infrastructure/BotSharp.Core.SideCar/Services/BotSharpConversationSideCar.cs index d636fa718..3def01ef7 100644 --- a/src/Infrastructure/BotSharp.Core.SideCar/Services/BotSharpConversationSideCar.cs +++ b/src/Infrastructure/BotSharp.Core.SideCar/Services/BotSharpConversationSideCar.cs @@ -15,6 +15,9 @@ limitations under the License. ******************************************************************************/ using BotSharp.Core.Infrastructures; +using NetTopologySuite.Index.KdTree; +using Newtonsoft.Json.Linq; +using static System.Runtime.InteropServices.JavaScript.JSType; namespace BotSharp.Core.SideCar.Services; @@ -189,24 +192,29 @@ private bool IsValid(string conversationId) private void RestoreStates(ConversationState prevStates) { - var innerStates = prevStates; + var preValues = prevStates.Values.ToList(); + var copy = JsonSerializer.Deserialize>(JsonSerializer.Serialize(preValues)); + var innerStates = new ConversationState(copy ?? []); var state = _services.GetRequiredService(); if (_sideCarOptions?.IsInheritStates == true) { + var hasIncludedStates = _sideCarOptions?.InheritStateKeys?.Any() == true; + var hasExcludedStates = _sideCarOptions?.ExcludedStateKeys?.Any() == true; var curStates = state.GetCurrentState(); + foreach (var pair in curStates) { var endNode = pair.Value.Values.LastOrDefault(); if (endNode == null) continue; - if (_sideCarOptions?.InheritStateKeys?.Any() == true - && !_sideCarOptions.InheritStateKeys.Contains(pair.Key)) + if ((hasIncludedStates && !_sideCarOptions.InheritStateKeys.Contains(pair.Key)) + || (hasExcludedStates && _sideCarOptions.ExcludedStateKeys.Contains(pair.Key))) { continue; } - if (innerStates.ContainsKey(pair.Key)) + if (innerStates.ContainsKey(pair.Key) && innerStates[pair.Key].Versioning) { innerStates[pair.Key].Values.Add(endNode); } @@ -223,6 +231,51 @@ private void RestoreStates(ConversationState prevStates) } } + AccumulateLlmStats(state, prevStates, innerStates); state.SetCurrentState(innerStates); } + + private void AccumulateLlmStats(IConversationStateService state, ConversationState prevState, ConversationState curState) + { + var dict = new Dictionary + { + { "prompt_total", typeof(int) }, + { "completion_total", typeof(int) }, + { "llm_total_cost", typeof(float) } + }; + + foreach (var pair in dict) + { + var preVal = prevState.GetValueOrDefault(pair.Key)?.Values?.LastOrDefault()?.Data; + var curVal = state.GetState(pair.Key); + + object data = pair.Value switch + { + Type t when t == typeof(int) => ParseNumber(preVal) + ParseNumber(curVal), + Type t when t == typeof(float) => ParseNumber(preVal) + ParseNumber(curVal), + _ => default + }; + + var cur = curState.GetValueOrDefault(pair.Key); + if (cur?.Values?.LastOrDefault() != null) + { + cur.Values.Last().Data = $"{data}"; + } + } + } + + private T ParseNumber(string? data) where T : struct + { + if (string.IsNullOrEmpty(data)) + { + return default; + } + + return typeof(T) switch + { + Type t when t == typeof(int) => (T)(object)(int.TryParse(data, out var i) ? i : 0), + Type t when t == typeof(float) => (T)(object)(float.TryParse(data, out var f) ? f : 0), + _ => default + }; + } } \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Core/BotSharp.Core.csproj b/src/Infrastructure/BotSharp.Core/BotSharp.Core.csproj index ddcf68cb4..ec10cb45e 100644 --- a/src/Infrastructure/BotSharp.Core/BotSharp.Core.csproj +++ b/src/Infrastructure/BotSharp.Core/BotSharp.Core.csproj @@ -98,6 +98,7 @@ + @@ -211,6 +212,9 @@ PreserveNewest + + PreserveNewest + @@ -242,5 +246,4 @@ true - diff --git a/src/Infrastructure/BotSharp.Core/Conversations/ConversationPlugin.cs b/src/Infrastructure/BotSharp.Core/Conversations/ConversationPlugin.cs index a559877f8..61c23d1b7 100644 --- a/src/Infrastructure/BotSharp.Core/Conversations/ConversationPlugin.cs +++ b/src/Infrastructure/BotSharp.Core/Conversations/ConversationPlugin.cs @@ -1,18 +1,23 @@ using BotSharp.Abstraction.Google.Settings; using BotSharp.Abstraction.Instructs; +using BotSharp.Abstraction.MessageHub; +using BotSharp.Abstraction.MessageHub.Observers; +using BotSharp.Abstraction.MessageHub.Services; using BotSharp.Abstraction.Messaging; using BotSharp.Abstraction.Planning; using BotSharp.Abstraction.Plugins.Models; using BotSharp.Abstraction.Settings; using BotSharp.Abstraction.Templating; using BotSharp.Core.Instructs; +using BotSharp.Core.MessageHub; +using BotSharp.Core.MessageHub.Observers; +using BotSharp.Core.MessageHub.Services; using BotSharp.Core.Messaging; using BotSharp.Core.Routing.Reasoning; using BotSharp.Core.Templating; using BotSharp.Core.Translation; -using BotSharp.Core.Observables.Queues; +using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.Configuration; -using BotSharp.Abstraction.Observables.Models; namespace BotSharp.Core.Conversations; @@ -43,11 +48,14 @@ public void RegisterDI(IServiceCollection services, IConfiguration config) return settingService.Bind("GoogleApi"); }); - services.AddSingleton>(); + // Observer and observable + services.AddSingleton>>(); + services.AddScoped>>(); + services.AddScoped>, ConversationObserver>(); + services.AddScoped(); services.AddScoped(); services.AddScoped(); - services.AddScoped(); services.AddScoped(); services.AddScoped(); diff --git a/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationProgressService.cs b/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationProgressService.cs deleted file mode 100644 index d7e6440c0..000000000 --- a/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationProgressService.cs +++ /dev/null @@ -1,11 +0,0 @@ -namespace BotSharp.Core.Conversations.Services -{ - public class ConversationProgressService : IConversationProgressService - { - - public FunctionExecuting OnFunctionExecuting { get; set; } - - - public FunctionExecuted OnFunctionExecuted { get; set; } - } -} diff --git a/src/Infrastructure/BotSharp.Core/Demo/Functions/GetFunEventsFn.cs b/src/Infrastructure/BotSharp.Core/Demo/Functions/GetFunEventsFn.cs new file mode 100644 index 000000000..87ddb1eeb --- /dev/null +++ b/src/Infrastructure/BotSharp.Core/Demo/Functions/GetFunEventsFn.cs @@ -0,0 +1,51 @@ +using BotSharp.Abstraction.Functions; +using BotSharp.Core.MessageHub; +using System.Text.Json.Serialization; + +namespace BotSharp.Core.Demo.Functions; + +public class GetFunEventsFn : IFunctionCallback +{ + private readonly IServiceProvider _services; + + public GetFunEventsFn(IServiceProvider services) + { + _services = services; + } + + public string Name => "get_fun_events"; + public string Indication => "Searching fun events"; + + public async Task Execute(RoleDialogModel message) + { + var args = JsonSerializer.Deserialize(message.FunctionArgs); + var conv = _services.GetRequiredService(); + var messageHub = _services.GetRequiredService>>(); + + await Task.Delay(1000); + + message.Indication = $"Start querying event data in {args?.City}"; + messageHub.Push(new() + { + EventName = ChatEvent.OnIndicationReceived, + Data = message, + RefId = conv.ConversationId + }); + + await Task.Delay(1500); + + message.Indication = $"Still searching events in {args?.City}"; + messageHub.Push(new() + { + EventName = ChatEvent.OnIndicationReceived, + Data = message, + RefId = conv.ConversationId + }); + + await Task.Delay(1500); + + message.Content = $"Here in {args?.City}, there are a lot of fun events in summer."; + message.StopCompletion = true; + return true; + } +} \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Core/Demo/Functions/GetWeatherFn.cs b/src/Infrastructure/BotSharp.Core/Demo/Functions/GetWeatherFn.cs index cb6bcd886..6465b88e5 100644 --- a/src/Infrastructure/BotSharp.Core/Demo/Functions/GetWeatherFn.cs +++ b/src/Infrastructure/BotSharp.Core/Demo/Functions/GetWeatherFn.cs @@ -1,4 +1,8 @@ using BotSharp.Abstraction.Functions; +using BotSharp.Abstraction.Models; +using BotSharp.Abstraction.SideCar; +using BotSharp.Core.MessageHub; +using System.Text.Json.Serialization; namespace BotSharp.Core.Demo.Functions; @@ -16,8 +20,56 @@ public GetWeatherFn(IServiceProvider services) public async Task Execute(RoleDialogModel message) { + var args = JsonSerializer.Deserialize(message.FunctionArgs); + var conv = _services.GetRequiredService(); + var messageHub = _services.GetRequiredService>>(); + + await Task.Delay(1000); + + message.Indication = $"Start querying weather data in {args?.City}"; + messageHub.Push(new() + { + EventName = ChatEvent.OnIndicationReceived, + Data = message, + RefId = conv.ConversationId + }); + + await Task.Delay(1500); + + message.Indication = $"Still working on it... Hold on, {args?.City}"; + messageHub.Push(new() + { + EventName = ChatEvent.OnIndicationReceived, + Data = message, + RefId = conv.ConversationId + }); + + await Task.Delay(1500); + message.Content = $"It is a sunny day!"; message.StopCompletion = false; + +#if DEBUG + var sidecar = _services.GetService(); + if (sidecar != null) + { + var text = $"I want to know fun events in {args?.City}"; + var states = new List + { + new() { Key = "channel", Value = "email" } + }; + + var msg = await sidecar.SendMessage(message.CurrentAgentId, text, states: states); + message.Content = $"{message.Content} {msg.Content}"; + } +#endif + return true; } +} + +class WeatherLocation +{ + [JsonPropertyName("city")] + public string City { get; set; } } \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Core/Evaluations/EvaluationConversationHook.cs b/src/Infrastructure/BotSharp.Core/Evaluations/EvaluationConversationHook.cs index b1b4541e6..80757219e 100644 --- a/src/Infrastructure/BotSharp.Core/Evaluations/EvaluationConversationHook.cs +++ b/src/Infrastructure/BotSharp.Core/Evaluations/EvaluationConversationHook.cs @@ -1,4 +1,5 @@ using BotSharp.Abstraction.Evaluations; +using BotSharp.Abstraction.Routing.Models; namespace BotSharp.Core.Evaluations; @@ -22,13 +23,13 @@ public override Task OnMessageReceived(RoleDialogModel message) return base.OnMessageReceived(message); } - public override Task OnFunctionExecuted(RoleDialogModel message, string from = InvokeSource.Manual) + public override Task OnFunctionExecuted(RoleDialogModel message, InvokeFunctionOptions? options = null) { if (Conversation != null && _convSettings.EnableExecutionLog) { _logger.Append(Conversation.Id, $"[{DateTime.Now}] {message.Role}: {message.FunctionName}({message.FunctionArgs}) => {message.Content}"); } - return base.OnFunctionExecuted(message, from: from); + return base.OnFunctionExecuted(message, options); } public override Task OnResponseGenerated(RoleDialogModel message) diff --git a/src/Infrastructure/BotSharp.Core/Observables/Queues/MessageHub.cs b/src/Infrastructure/BotSharp.Core/MessageHub/MessageHub.cs similarity index 83% rename from src/Infrastructure/BotSharp.Core/Observables/Queues/MessageHub.cs rename to src/Infrastructure/BotSharp.Core/MessageHub/MessageHub.cs index affb142da..8f01612df 100644 --- a/src/Infrastructure/BotSharp.Core/Observables/Queues/MessageHub.cs +++ b/src/Infrastructure/BotSharp.Core/MessageHub/MessageHub.cs @@ -1,11 +1,11 @@ using System.Reactive.Subjects; -namespace BotSharp.Core.Observables.Queues; +namespace BotSharp.Core.MessageHub; -public class MessageHub where T : class +public class MessageHub where T : ObserveDataBase { private readonly ILogger> _logger; - private readonly ISubject _observable = new Subject(); + private readonly ISubject _observable = Subject.Synchronize(new Subject()); public IObservable Events => _observable; public MessageHub(ILogger> logger) diff --git a/src/Infrastructure/BotSharp.Core/MessageHub/Observers/ConversationObserver.cs b/src/Infrastructure/BotSharp.Core/MessageHub/Observers/ConversationObserver.cs new file mode 100644 index 000000000..e2588c6cf --- /dev/null +++ b/src/Infrastructure/BotSharp.Core/MessageHub/Observers/ConversationObserver.cs @@ -0,0 +1,45 @@ +using BotSharp.Abstraction.MessageHub.Observers; + +namespace BotSharp.Core.MessageHub.Observers; + +public class ConversationObserver : BotSharpObserverBase> +{ + private readonly ILogger _logger; + private readonly IServiceProvider _services; + + public ConversationObserver( + IServiceProvider services, + ILogger logger) : base() + { + _services = services; + _logger = logger; + } + + public override string Name => nameof(ConversationObserver); + + public override void OnCompleted() + { + _logger.LogWarning($"{nameof(ConversationObserver)} receives complete notification."); + } + + public override void OnError(Exception error) + { + _logger.LogError(error, $"{nameof(ConversationObserver)} receives error notification: {error.Message}"); + } + + public override void OnNext(HubObserveData value) + { + var conv = _services.GetRequiredService(); + + if (value.EventName == ChatEvent.OnIndicationReceived) + { +#if DEBUG + _logger.LogCritical($"Receiving {value.EventName} ({value.Data.Indication}) in {nameof(ConversationObserver)} - {conv.ConversationId}"); +#endif + if (_listeners.TryGetValue(value.EventName, out var func) && func != null) + { + func(value).ConfigureAwait(false).GetAwaiter().GetResult(); + } + } + } +} diff --git a/src/Infrastructure/BotSharp.Core/MessageHub/Services/ObserverService.cs b/src/Infrastructure/BotSharp.Core/MessageHub/Services/ObserverService.cs new file mode 100644 index 000000000..4679235d8 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core/MessageHub/Services/ObserverService.cs @@ -0,0 +1,86 @@ +using BotSharp.Abstraction.MessageHub.Observers; +using BotSharp.Abstraction.MessageHub.Services; +using System.Reactive.Linq; + +namespace BotSharp.Core.MessageHub.Services; + +public class ObserverService : IObserverService +{ + private readonly IServiceProvider _services; + private readonly ILogger _logger; + + public ObserverService( + IServiceProvider services, + ILogger logger) + { + _services = services; + _logger = logger; + } + + public IDisposable SubscribeObservers( + string refId, + IEnumerable? names = null, + Dictionary>? listeners = null) where T : ObserveDataBase + { + var container = _services.GetRequiredService>(); + var observers = _services.GetServices>() + .Where(x => !x.Active) + .ToList(); + + if (!names.IsNullOrEmpty()) + { + observers = observers.Where(x => names.Contains(x.Name)).ToList(); + } + + if (observers.IsNullOrEmpty()) + { + return container; + } + + if (!listeners.IsNullOrEmpty()) + { + foreach (var observer in observers) + { + observer.SetEventListeners(listeners ?? []); + } + } + +#if DEBUG + _logger.LogCritical($"Subscribe observers: {string.Join(",", observers.Select(x => x.Name))}"); +#endif + + var subscriptions = new List>(); + var messageHub = _services.GetRequiredService>(); + foreach (var observer in observers) + { + observer.Activate(); + var sub = messageHub.Events.Where(x => x.RefId == refId).Subscribe(observer); + subscriptions.Add(new() + { + Observer = observer, + Subscription = sub + }); + } + + container.Append(subscriptions); + return container; + } + + public void UnSubscribeObservers(IEnumerable? names = null) where T : ObserveDataBase + { + var container = _services.GetRequiredService>(); + var subscriptions = container.GetSubscriptions(names); + +#if DEBUG + _logger.LogCritical($"UnSubscribe observers: {string.Join(",", subscriptions.Select(x => x.Observer.Name))}"); +#endif + + foreach (var sub in subscriptions) + { + if (!sub.Observer.Active) continue; + + sub.UnSubscribe(); + } + container.Remove(names); + } +} diff --git a/src/Infrastructure/BotSharp.Core/MessageHub/Services/ObserverSubscriptionContainer.cs b/src/Infrastructure/BotSharp.Core/MessageHub/Services/ObserverSubscriptionContainer.cs new file mode 100644 index 000000000..59e33e2ee --- /dev/null +++ b/src/Infrastructure/BotSharp.Core/MessageHub/Services/ObserverSubscriptionContainer.cs @@ -0,0 +1,64 @@ +namespace BotSharp.Core.MessageHub.Services; + +public class ObserverSubscriptionContainer : IDisposable +{ + private readonly ILogger> _logger; + + private List> _subscriptions = []; + private bool _disposed = false; + + public ObserverSubscriptionContainer( + ILogger> logger) + { + _logger = logger; + } + + public List> GetSubscriptions(IEnumerable? names = null) + { + if (!names.IsNullOrEmpty()) + { + return _subscriptions.Where(x => names.Contains(x.Observer.Name)).ToList(); + } + return _subscriptions; + } + + public void Append(List> subscriptions) + { + _subscriptions = _subscriptions.Concat(subscriptions).DistinctBy(x => x.Observer.Name).ToList(); + } + + public void Remove(IEnumerable? names = null) + { + if (!names.IsNullOrEmpty()) + { + _subscriptions = _subscriptions.Where(x => !names.Contains(x.Observer.Name)).ToList(); + return; + } + _subscriptions.Clear(); + } + + public void Clear() + { + _subscriptions.Clear(); + } + + public void Dispose() + { + if (!_disposed) + { +#if DEBUG + _logger.LogCritical($"Start disposing subscriptions..."); +#endif + // UnSubscribe all observers + foreach (var sub in _subscriptions) + { + sub.UnSubscribe(); + } + _subscriptions.Clear(); +#if DEBUG + _logger.LogCritical($"End disposing subscriptions..."); +#endif + _disposed = true; + } + } +} \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Core/Repository/FileRepository/FileRepository.Conversation.cs b/src/Infrastructure/BotSharp.Core/Repository/FileRepository/FileRepository.Conversation.cs index f772acb21..81da40470 100644 --- a/src/Infrastructure/BotSharp.Core/Repository/FileRepository/FileRepository.Conversation.cs +++ b/src/Infrastructure/BotSharp.Core/Repository/FileRepository/FileRepository.Conversation.cs @@ -241,10 +241,9 @@ public void UpdateConversationBreakpoint(string conversationId, ConversationBrea if (!string.IsNullOrEmpty(convDir)) { var breakpointFile = Path.Combine(convDir, BREAKPOINT_FILE); - if (!File.Exists(breakpointFile)) { - File.Create(breakpointFile); + File.WriteAllText(breakpointFile, "[]"); } var content = File.ReadAllText(breakpointFile); @@ -285,7 +284,7 @@ public void UpdateConversationBreakpoint(string conversationId, ConversationBrea var breakpointFile = Path.Combine(convDir, BREAKPOINT_FILE); if (!File.Exists(breakpointFile)) { - File.Create(breakpointFile); + File.WriteAllText(breakpointFile, "[]"); } var content = File.ReadAllText(breakpointFile); @@ -920,7 +919,6 @@ private bool HandleTruncatedLogs(string convDir, DateTime refTime) private bool SaveTruncatedDialogs(string dialogDir, List dialogs) { if (string.IsNullOrEmpty(dialogDir) || dialogs == null) return false; - if (!File.Exists(dialogDir)) File.Create(dialogDir); var texts = ParseDialogElements(dialogs); File.WriteAllText(dialogDir, texts); @@ -930,7 +928,6 @@ private bool SaveTruncatedDialogs(string dialogDir, List dialogs) private bool SaveTruncatedStates(string stateDir, List states) { if (string.IsNullOrEmpty(stateDir) || states == null) return false; - if (!File.Exists(stateDir)) File.Create(stateDir); var stateStr = JsonSerializer.Serialize(states, _options); File.WriteAllText(stateDir, stateStr); @@ -940,7 +937,6 @@ private bool SaveTruncatedStates(string stateDir, List states) private bool SaveTruncatedLatestStates(string latestStateDir, List states) { if (string.IsNullOrEmpty(latestStateDir) || states == null) return false; - if (!File.Exists(latestStateDir)) File.Create(latestStateDir); var latestStates = BuildLatestStates(states); var stateStr = JsonSerializer.Serialize(latestStates, _options); @@ -951,7 +947,6 @@ private bool SaveTruncatedLatestStates(string latestStateDir, List breakpoints) { if (string.IsNullOrEmpty(breakpointDir) || breakpoints == null) return false; - if (!File.Exists(breakpointDir)) File.Create(breakpointDir); var breakpointStr = JsonSerializer.Serialize(breakpoints, _options); File.WriteAllText(breakpointDir, breakpointStr); diff --git a/src/Infrastructure/BotSharp.Core/Repository/FileRepository/FileRepository.KnowledgeBase.cs b/src/Infrastructure/BotSharp.Core/Repository/FileRepository/FileRepository.KnowledgeBase.cs index b0a950276..b7fa26ab3 100644 --- a/src/Infrastructure/BotSharp.Core/Repository/FileRepository/FileRepository.KnowledgeBase.cs +++ b/src/Infrastructure/BotSharp.Core/Repository/FileRepository/FileRepository.KnowledgeBase.cs @@ -15,15 +15,15 @@ public bool AddKnowledgeCollectionConfigs(List configs, } var configFile = Path.Combine(vectorDir, COLLECTION_CONFIG_FILE); - if (reset) + if (!File.Exists(configFile)) { - File.WriteAllText(configFile, JsonSerializer.Serialize(configs ?? new(), _options)); - return true; + File.WriteAllText(configFile, "[]"); } - if (!File.Exists(configFile)) + if (reset) { - File.Create(configFile); + File.WriteAllText(configFile, JsonSerializer.Serialize(configs ?? new(), _options)); + return true; } var str = File.ReadAllText(configFile); diff --git a/src/Infrastructure/BotSharp.Core/Routing/Reasoning/InstructExecutor.cs b/src/Infrastructure/BotSharp.Core/Routing/Reasoning/InstructExecutor.cs index e0ac69a28..9dd19c09b 100644 --- a/src/Infrastructure/BotSharp.Core/Routing/Reasoning/InstructExecutor.cs +++ b/src/Infrastructure/BotSharp.Core/Routing/Reasoning/InstructExecutor.cs @@ -1,5 +1,6 @@ using BotSharp.Abstraction.Infrastructures.Enums; using BotSharp.Abstraction.Planning; +using BotSharp.Abstraction.Routing.Models; namespace BotSharp.Core.Routing.Reasoning; @@ -33,7 +34,7 @@ await HookEmitter.Emit(_services, async hook => await hook.OnRouti if (message.FunctionName != null) { var msg = RoleDialogModel.From(message, role: AgentRole.Function); - await routing.InvokeFunction(message.FunctionName, msg, from: InvokeSource.Llm); + await routing.InvokeFunction(message.FunctionName, msg, options: new() { From = InvokeSource.Routing }); } var agentId = routing.Context.GetCurrentAgentId(); @@ -59,11 +60,12 @@ await HookEmitter.Emit(_services, async hook => await hook.OnRouti { var state = _services.GetRequiredService(); var useStreamMsg = state.GetState("use_stream_message"); - var ret = await routing.InvokeAgent( - agentId, - dialogs, - from: InvokeSource.Routing, - useStream: bool.TryParse(useStreamMsg, out var useStream) && useStream); + var options = new InvokeAgentOptions() + { + From = InvokeSource.Routing, + UseStream = bool.TryParse(useStreamMsg, out var useStream) && useStream + }; + var ret = await routing.InvokeAgent(agentId, dialogs, options); } var response = dialogs.Last(); diff --git a/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InvokeAgent.cs b/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InvokeAgent.cs index 6d2813d9a..6acb58b2d 100644 --- a/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InvokeAgent.cs +++ b/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InvokeAgent.cs @@ -1,3 +1,4 @@ +using BotSharp.Abstraction.Routing.Models; using BotSharp.Abstraction.Templating; namespace BotSharp.Core.Routing; @@ -7,9 +8,9 @@ public partial class RoutingService public async Task InvokeAgent( string agentId, List dialogs, - string from = InvokeSource.Manual, - bool useStream = false) + InvokeAgentOptions? options = null) { + options ??= InvokeAgentOptions.Default(); var agentService = _services.GetRequiredService(); var agent = await agentService.LoadAgent(agentId); @@ -36,7 +37,7 @@ public async Task InvokeAgent( RoleDialogModel response; var message = dialogs.Last(); - if (useStream) + if (options?.UseStream == true) { response = await chatCompletion.GetChatCompletionsStreamingAsync(agent, dialogs); } @@ -59,7 +60,7 @@ public async Task InvokeAgent( message.CurrentAgentId = agent.Id; message.IsStreaming = response.IsStreaming; - await InvokeFunction(message, dialogs, from: from, useStream: useStream); + await InvokeFunction(message, dialogs, options); } else { @@ -83,8 +84,7 @@ public async Task InvokeAgent( private async Task InvokeFunction( RoleDialogModel message, List dialogs, - string from, - bool useStream) + InvokeAgentOptions? options = null) { // execute function // Save states @@ -93,7 +93,8 @@ private async Task InvokeFunction( var routing = _services.GetRequiredService(); // Call functions - await routing.InvokeFunction(message.FunctionName, message, from: from); + var funcOptions = options != null ? new InvokeFunctionOptions() { From = options.From } : null; + await routing.InvokeFunction(message.FunctionName, message, options: funcOptions); // Pass execution result to LLM to get response if (!message.StopCompletion) @@ -120,7 +121,7 @@ private async Task InvokeFunction( // Send to Next LLM var curAgentId = routing.Context.GetCurrentAgentId(); - await InvokeAgent(curAgentId, dialogs, from, useStream); + await InvokeAgent(curAgentId, dialogs, options); } } else diff --git a/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InvokeFunction.cs b/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InvokeFunction.cs index 20c691f8d..dea0947d7 100644 --- a/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InvokeFunction.cs +++ b/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InvokeFunction.cs @@ -1,12 +1,14 @@ -using BotSharp.Abstraction.Hooks; +using BotSharp.Abstraction.Routing.Models; +using BotSharp.Core.MessageHub; using BotSharp.Core.Routing.Executor; namespace BotSharp.Core.Routing; public partial class RoutingService { - public async Task InvokeFunction(string name, RoleDialogModel message, string from = InvokeSource.Manual) + public async Task InvokeFunction(string name, RoleDialogModel message, InvokeFunctionOptions? options = null) { + options ??= InvokeFunctionOptions.Default(); var currentAgentId = message.CurrentAgentId; var agentService = _services.GetRequiredService(); var agent = await agentService.GetAgent(currentAgentId); @@ -23,20 +25,22 @@ public async Task InvokeFunction(string name, RoleDialogModel message, str // Clone message var clonedMessage = RoleDialogModel.From(message); clonedMessage.FunctionName = name; - - var progressService = _services.GetService(); clonedMessage.Indication = await funcExecutor.GetIndicatorAsync(message); - if (progressService?.OnFunctionExecuting != null) + var conv = _services.GetRequiredService(); + var messageHub = _services.GetRequiredService>>(); + messageHub.Push(new() { - await progressService.OnFunctionExecuting(clonedMessage); - } - + EventName = ChatEvent.OnIndicationReceived, + Data = clonedMessage, + RefId = conv.ConversationId + }); + var hooks = _services.GetHooksOrderByPriority(clonedMessage.CurrentAgentId); foreach (var hook in hooks) { hook.SetAgent(agent); - await hook.OnFunctionExecuting(clonedMessage, from: from); + await hook.OnFunctionExecuting(clonedMessage, options); } bool result = false; @@ -48,7 +52,7 @@ public async Task InvokeFunction(string name, RoleDialogModel message, str // After functions have been executed foreach (var hook in hooks) { - await hook.OnFunctionExecuted(clonedMessage, from: from); + await hook.OnFunctionExecuted(clonedMessage, options); } // Set result to original message @@ -64,7 +68,7 @@ public async Task InvokeFunction(string name, RoleDialogModel message, str } catch (JsonException ex) { - _logger.LogError($"The input does not contain any JSON tokens:\r\n{message.Content}\r\n{ex.Message}"); + _logger.LogError(ex, $"The input does not contain any JSON tokens:\r\n{message.Content}\r\n{ex.Message}"); message.StopCompletion = true; message.Content = ex.Message; } @@ -72,7 +76,7 @@ public async Task InvokeFunction(string name, RoleDialogModel message, str { message.StopCompletion = true; message.Content = ex.Message; - _logger.LogError(ex.ToString()); + _logger.LogError(ex, ex.ToString()); } // Make sure content has been populated diff --git a/src/Infrastructure/BotSharp.Core/Routing/RoutingService.cs b/src/Infrastructure/BotSharp.Core/Routing/RoutingService.cs index c66ef4b21..4e43cbd52 100644 --- a/src/Infrastructure/BotSharp.Core/Routing/RoutingService.cs +++ b/src/Infrastructure/BotSharp.Core/Routing/RoutingService.cs @@ -53,11 +53,12 @@ public async Task InstructDirect(Agent agent, RoleDialogModel m { var state = _services.GetRequiredService(); var useStreamMsg = state.GetState("use_stream_message"); - var ret = await routing.InvokeAgent( - agentId, - dialogs, - from: InvokeSource.Routing, - useStream: bool.TryParse(useStreamMsg, out var useStream) && useStream); + var options = new InvokeAgentOptions() + { + From = InvokeSource.Routing, + UseStream = bool.TryParse(useStreamMsg, out var useStream) && useStream + }; + var ret = await routing.InvokeAgent(agentId, dialogs, options); } var response = dialogs.Last(); diff --git a/src/Infrastructure/BotSharp.Core/Using.cs b/src/Infrastructure/BotSharp.Core/Using.cs index 0e8e27622..f67baef04 100644 --- a/src/Infrastructure/BotSharp.Core/Using.cs +++ b/src/Infrastructure/BotSharp.Core/Using.cs @@ -36,6 +36,9 @@ global using BotSharp.Abstraction.Infrastructures.Events; global using BotSharp.Abstraction.Templating.Constants; global using BotSharp.Abstraction.Realtime.Models.Session; +global using BotSharp.Abstraction.Conversations.Enums; +global using BotSharp.Abstraction.Hooks; +global using BotSharp.Abstraction.MessageHub.Models; global using BotSharp.Core.Agents.Services; global using BotSharp.Core.Conversations.Services; global using BotSharp.Core.Infrastructures; diff --git a/src/Infrastructure/BotSharp.Core/data/agents/01e2fc5c-2c89-4ec7-8470-7688608b496c/functions/get_fun_events.json b/src/Infrastructure/BotSharp.Core/data/agents/01e2fc5c-2c89-4ec7-8470-7688608b496c/functions/get_fun_events.json new file mode 100644 index 000000000..94b6abbf1 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core/data/agents/01e2fc5c-2c89-4ec7-8470-7688608b496c/functions/get_fun_events.json @@ -0,0 +1,15 @@ +{ + "name": "get_fun_events", + "description": "Get fun events information for user.", + "visibility_expression": "{% if states.channel == 'email' %}visible{% endif %}", + "parameters": { + "type": "object", + "properties": { + "city": { + "type": "string", + "description": "The city where user wants to find fun events." + } + }, + "required": [ "city" ] + } +} \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Core/data/agents/01e2fc5c-2c89-4ec7-8470-7688608b496c/functions/get_weather.json b/src/Infrastructure/BotSharp.Core/data/agents/01e2fc5c-2c89-4ec7-8470-7688608b496c/functions/get_weather.json index 0fd0a459b..ab73340f3 100644 --- a/src/Infrastructure/BotSharp.Core/data/agents/01e2fc5c-2c89-4ec7-8470-7688608b496c/functions/get_weather.json +++ b/src/Infrastructure/BotSharp.Core/data/agents/01e2fc5c-2c89-4ec7-8470-7688608b496c/functions/get_weather.json @@ -1,6 +1,7 @@ { "name": "get_weather", "description": "Get weather information for user.", + "visibility_expression": "{% if states.channel != 'email' %}visible{% endif %}", "parameters": { "type": "object", "properties": { diff --git a/src/Infrastructure/BotSharp.OpenAPI/Controllers/ConversationController.cs b/src/Infrastructure/BotSharp.OpenAPI/Controllers/ConversationController.cs index bd131ca1e..ec7dead59 100644 --- a/src/Infrastructure/BotSharp.OpenAPI/Controllers/ConversationController.cs +++ b/src/Infrastructure/BotSharp.OpenAPI/Controllers/ConversationController.cs @@ -1,6 +1,8 @@ using BotSharp.Abstraction.Files.Constants; using BotSharp.Abstraction.Files.Enums; using BotSharp.Abstraction.Files.Utilities; +using BotSharp.Abstraction.MessageHub.Models; +using BotSharp.Abstraction.MessageHub.Services; using BotSharp.Abstraction.Options; using BotSharp.Abstraction.Routing; using BotSharp.Abstraction.Users.Dtos; @@ -343,6 +345,9 @@ public async Task SendMessage( [FromRoute] string conversationId, [FromBody] NewMessageModel input) { + var observer = _services.GetRequiredService(); + using var container = observer.SubscribeObservers>(conversationId); + var conv = _services.GetRequiredService(); var inputMsg = new RoleDialogModel(AgentRole.User, input.Text) { @@ -357,7 +362,6 @@ public async Task SendMessage( SetStates(conv, input); var response = new ChatResponseModel(); - await conv.SendMessage(agentId, inputMsg, replyMessage: input.Postback, async msg => @@ -381,6 +385,12 @@ await conv.SendMessage(agentId, inputMsg, [HttpPost("/conversation/{agentId}/{conversationId}/sse")] public async Task SendMessageSse([FromRoute] string agentId, [FromRoute] string conversationId, [FromBody] NewMessageModel input) { + var observer = _services.GetRequiredService(); + using var container = observer.SubscribeObservers>(conversationId, listeners: new() + { + { ChatEvent.OnIndicationReceived, async data => await OnReceiveToolCallIndication(conversationId, data.Data) } + }); + var conv = _services.GetRequiredService(); var inputMsg = new RoleDialogModel(AgentRole.User, input.Text) { @@ -406,7 +416,6 @@ public async Task SendMessageSse([FromRoute] string agentId, [FromRoute] string Response.Headers.Append(Microsoft.Net.Http.Headers.HeaderNames.ContentType, "text/event-stream"); Response.Headers.Append(Microsoft.Net.Http.Headers.HeaderNames.CacheControl, "no-cache"); Response.Headers.Append(Microsoft.Net.Http.Headers.HeaderNames.Connection, "keep-alive"); - InitProgressService(conversationId); await conv.SendMessage(agentId, inputMsg, replyMessage: input.Postback, @@ -430,23 +439,18 @@ await conv.SendMessage(agentId, inputMsg, // await OnEventCompleted(Response); } - private void InitProgressService(string conversationId) + private async Task OnReceiveToolCallIndication(string conversationId, RoleDialogModel msg) { - var progressService = _services.GetService(); - progressService.OnFunctionExecuting = async msg => + var indicator = new ChatResponseModel { - var indicator = new ChatResponseModel - { - ConversationId = conversationId, - MessageId = msg.MessageId, - Text = msg.Indication, - Function = "indicating", - Instruction = msg.Instruction, - States = new Dictionary() - }; - await OnChunkReceived(Response, indicator); + ConversationId = conversationId, + MessageId = msg.MessageId, + Text = msg.Indication, + Function = "indicating", + Instruction = msg.Instruction, + States = new Dictionary() }; - progressService.OnFunctionExecuted = async msg => { }; + await OnChunkReceived(Response, indicator); } #endregion diff --git a/src/Infrastructure/BotSharp.OpenAPI/Controllers/RealtimeController.cs b/src/Infrastructure/BotSharp.OpenAPI/Controllers/RealtimeController.cs index 61ae77a7f..ff3aca461 100644 --- a/src/Infrastructure/BotSharp.OpenAPI/Controllers/RealtimeController.cs +++ b/src/Infrastructure/BotSharp.OpenAPI/Controllers/RealtimeController.cs @@ -26,7 +26,7 @@ public async Task ExecuteFunction(string agentId, string functionName, [ FunctionName = functionName, FunctionArgs = JsonSerializer.Serialize(args) }; - await routing.InvokeFunction(functionName, message, from: InvokeSource.Llm); + await routing.InvokeFunction(functionName, message, options: new() { From = InvokeSource.Llm }); return message.Content; } } diff --git a/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs index 605bceb85..ca2a283e4 100644 --- a/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs @@ -1,9 +1,9 @@ -using Azure; +using BotSharp.Abstraction.Conversations.Enums; using BotSharp.Abstraction.Files.Utilities; using BotSharp.Abstraction.Hooks; -using BotSharp.Abstraction.Observables.Models; +using BotSharp.Abstraction.MessageHub.Models; using BotSharp.Core.Infrastructures.Streams; -using BotSharp.Core.Observables.Queues; +using BotSharp.Core.MessageHub; using OpenAI.Chat; using System.ClientModel; @@ -212,7 +212,8 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, var chatClient = client.GetChatClient(_model); var (prompt, messages, options) = PrepareOptions(agent, conversations); - var hub = _services.GetRequiredService>(); + var hub = _services.GetRequiredService>>(); + var conv = _services.GetRequiredService(); var messageId = conversations.LastOrDefault()?.MessageId ?? string.Empty; var contentHooks = _services.GetHooks(agent.Id); @@ -224,8 +225,8 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, hub.Push(new() { - ServiceProvider = _services, - EventName = "BeforeReceiveLlmStreamMessage", + EventName = ChatEvent.BeforeReceiveLlmStreamMessage, + RefId = conv.ConversationId, Data = new RoleDialogModel(AgentRole.Assistant, string.Empty) { CurrentAgentId = agent.Id, @@ -268,8 +269,8 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, }; hub.Push(new() { - ServiceProvider = _services, - EventName = "OnReceiveLlmStreamMessage", + EventName = ChatEvent.OnReceiveLlmStreamMessage, + RefId = conv.ConversationId, Data = content }); } @@ -311,8 +312,8 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, hub.Push(new() { - ServiceProvider = _services, - EventName = "AfterReceiveLlmStreamMessage", + EventName = ChatEvent.AfterReceiveLlmStreamMessage, + RefId = conv.ConversationId, Data = responseMessage }); diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/ChatHubPlugin.cs b/src/Plugins/BotSharp.Plugin.ChatHub/ChatHubPlugin.cs index d3c3f2acb..514666926 100644 --- a/src/Plugins/BotSharp.Plugin.ChatHub/ChatHubPlugin.cs +++ b/src/Plugins/BotSharp.Plugin.ChatHub/ChatHubPlugin.cs @@ -1,6 +1,8 @@ using BotSharp.Abstraction.Crontab; -using BotSharp.Abstraction.Observables.Models; -using BotSharp.Core.Observables.Queues; +using BotSharp.Abstraction.MessageHub.Models; +using BotSharp.Abstraction.MessageHub.Observers; +using BotSharp.Core.MessageHub; +using BotSharp.Core.MessageHub.Observers; using BotSharp.Plugin.ChatHub.Hooks; using BotSharp.Plugin.ChatHub.Observers; using Microsoft.AspNetCore.Builder; @@ -11,7 +13,7 @@ namespace BotSharp.Plugin.ChatHub; /// /// The dialogue channel connects users, AI assistants and customer service representatives. /// -public class ChatHubPlugin : IBotSharpPlugin, IBotSharpAppPlugin +public class ChatHubPlugin : IBotSharpPlugin { public string Id => "6e52d42d-1e23-406b-8599-36af36c83209"; public string Name => "Chat Hub"; @@ -24,6 +26,8 @@ public void RegisterDI(IServiceCollection services, IConfiguration config) config.Bind("ChatHub", settings); services.AddSingleton(x => settings); + services.AddScoped>, ChatHubObserver>(); + // Register hooks services.AddScoped(); services.AddScoped(); @@ -32,12 +36,4 @@ public void RegisterDI(IServiceCollection services, IConfiguration config) services.AddScoped(); services.AddScoped(); } - - public void Configure(IApplicationBuilder app) - { - var services = app.ApplicationServices; - var queue = services.GetRequiredService>(); - var logger = services.GetRequiredService>>(); - queue.Events.Subscribe(new ChatHubObserver(logger)); - } } diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/Helpers/EventEmitter.cs b/src/Plugins/BotSharp.Plugin.ChatHub/Helpers/EventEmitter.cs new file mode 100644 index 000000000..821b173e3 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.ChatHub/Helpers/EventEmitter.cs @@ -0,0 +1,39 @@ +using Microsoft.AspNetCore.SignalR; +using System.Runtime.CompilerServices; + +namespace BotSharp.Plugin.ChatHub.Helpers; + +internal class EventEmitter +{ + internal static async Task SendChatEvent( + IServiceProvider services, + ILogger logger, + string @event, + string conversationId, + string userId, + T data, + string callerClass = "", + [CallerMemberName] string callerMethod = "", + LogLevel logLevel = LogLevel.Warning) + { + try + { + var settings = services.GetRequiredService(); + var chatHub = services.GetRequiredService>(); + + switch (settings.EventDispatchBy) + { + case EventDispatchType.Group when !string.IsNullOrEmpty(conversationId): + await chatHub.Clients.Group(conversationId).SendAsync(@event, data); + break; + case EventDispatchType.User when !string.IsNullOrEmpty(userId): + await chatHub.Clients.User(userId).SendAsync(@event, data); + break; + } + } + catch (Exception ex) + { + logger.Log(logLevel, ex, $"Failed to send event '{@event}' in ({callerClass}-{callerMethod}) (conversation id: {conversationId})"); + } + } +} diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubConversationHook.cs b/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubConversationHook.cs index 1d641554d..e07f94d47 100644 --- a/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubConversationHook.cs +++ b/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubConversationHook.cs @@ -1,8 +1,11 @@ using BotSharp.Abstraction.Conversations.Dtos; +using BotSharp.Abstraction.Conversations.Enums; using BotSharp.Abstraction.Routing.Enums; +using BotSharp.Abstraction.Routing.Models; using BotSharp.Abstraction.SideCar; using BotSharp.Abstraction.Users.Dtos; using Microsoft.AspNetCore.SignalR; +using System.Runtime.CompilerServices; namespace BotSharp.Plugin.ChatHub.Hooks; @@ -15,15 +18,6 @@ public class ChatHubConversationHook : ConversationHookBase private readonly BotSharpOptions _options; private readonly ChatHubSettings _settings; - #region Events - private const string INIT_CLIENT_CONVERSATION = "OnConversationInitFromClient"; - private const string RECEIVE_CLIENT_MESSAGE = "OnMessageReceivedFromClient"; - private const string RECEIVE_ASSISTANT_MESSAGE = "OnMessageReceivedFromAssistant"; - private const string GENERATE_SENDER_ACTION = "OnSenderActionGenerated"; - private const string DELETE_MESSAGE = "OnMessageDeleted"; - private const string GENERATE_NOTIFICATION = "OnNotificationGenerated"; - #endregion - public ChatHubConversationHook( IServiceProvider services, IHubContext chatHub, @@ -51,7 +45,8 @@ public override async Task OnConversationInitialized(Conversation conversation) var user = await userService.GetUser(conv.User.Id); conv.User = UserDto.FromUser(user); - await InitClientConversation(conv.Id, conv); + //await InitClientConversation(conv.Id, conv); + await SendEvent(ChatEvent.OnConversationInitFromClient, conv.Id, conv); await base.OnConversationInitialized(conversation); } @@ -72,7 +67,7 @@ public override async Task OnMessageReceived(RoleDialogModel message) Text = !string.IsNullOrEmpty(message.SecondaryContent) ? message.SecondaryContent : message.Content, Sender = UserDto.FromUser(sender) }; - await ReceiveClientMessage(conv.ConversationId, model); + await SendEvent(ChatEvent.OnMessageReceivedFromClient, conv.ConversationId, model); // Send typing-on to client var action = new ConversationSenderActionModel @@ -80,23 +75,13 @@ public override async Task OnMessageReceived(RoleDialogModel message) ConversationId = conv.ConversationId, SenderAction = SenderActionEnum.TypingOn }; - - await GenerateSenderAction(conv.ConversationId, action); + await SendEvent(ChatEvent.OnSenderActionGenerated, conv.ConversationId, action); await base.OnMessageReceived(message); } - public override async Task OnFunctionExecuting(RoleDialogModel message, string from = InvokeSource.Manual) + public override async Task OnFunctionExecuting(RoleDialogModel message, InvokeFunctionOptions? options = null) { - var conv = _services.GetRequiredService(); - var action = new ConversationSenderActionModel - { - ConversationId = conv.ConversationId, - SenderAction = SenderActionEnum.TypingOn, - Indication = message.Indication - }; - - await GenerateSenderAction(conv.ConversationId, action); - await base.OnFunctionExecuting(message, from: from); + await base.OnFunctionExecuting(message, options); } public override async Task OnPostbackMessageReceived(RoleDialogModel message, PostbackMessageModel replyMsg) @@ -110,7 +95,7 @@ public override async Task OnResponseGenerated(RoleDialogModel message) var conv = _services.GetRequiredService(); var state = _services.GetRequiredService(); - var json = JsonSerializer.Serialize(new ChatResponseDto() + var data = new ChatResponseDto() { ConversationId = conv.ConversationId, MessageId = message.MessageId, @@ -126,7 +111,7 @@ public override async Task OnResponseGenerated(RoleDialogModel message) LastName = "Assistant", Role = AgentRole.Assistant } - }, _options.JsonSerializerOptions); + }; // Send typing-off to client var action = new ConversationSenderActionModel @@ -134,13 +119,9 @@ public override async Task OnResponseGenerated(RoleDialogModel message) ConversationId = conv.ConversationId, SenderAction = SenderActionEnum.TypingOff }; + await SendEvent(ChatEvent.OnSenderActionGenerated, conv.ConversationId, action); - if (!message.IsStreaming) - { - await GenerateSenderAction(conv.ConversationId, action); - } - - await ReceiveAssistantMessage(conv.ConversationId, json); + await SendEvent(ChatEvent.OnMessageReceivedFromAssistant, conv.ConversationId, data); await base.OnResponseGenerated(message); } @@ -148,7 +129,7 @@ public override async Task OnResponseGenerated(RoleDialogModel message) public override async Task OnNotificationGenerated(RoleDialogModel message) { var conv = _services.GetRequiredService(); - var json = JsonSerializer.Serialize(new ChatResponseDto() + var data = new ChatResponseDto() { ConversationId = conv.ConversationId, MessageId = message.MessageId, @@ -162,9 +143,9 @@ public override async Task OnNotificationGenerated(RoleDialogModel message) LastName = "Assistant", Role = AgentRole.Assistant } - }, _options.JsonSerializerOptions); + }; - await GenerateNotification(conv.ConversationId, json); + await SendEvent(ChatEvent.OnNotificationGenerated, conv.ConversationId, data); await base.OnNotificationGenerated(message); } @@ -177,7 +158,7 @@ public override async Task OnMessageDeleted(string conversationId, string messag MessageId = messageId }; - await DeleteMessage(conversationId, model); + await SendEvent(ChatEvent.OnMessageDeleted, conversationId, model); await base.OnMessageDeleted(conversationId, messageId); } @@ -188,119 +169,10 @@ private bool AllowSendingMessage() return sidecar == null || !sidecar.IsEnabled; } - private async Task InitClientConversation(string conversationId, ConversationDto conversation) - { - try - { - if (_settings.EventDispatchBy == EventDispatchType.Group) - { - await _chatHub.Clients.Group(conversationId).SendAsync(INIT_CLIENT_CONVERSATION, conversation); - } - else - { - await _chatHub.Clients.User(_user.Id).SendAsync(INIT_CLIENT_CONVERSATION, conversation); - } - } - catch (Exception ex) - { - _logger.LogWarning(ex, $"Failed to init client conversation in {nameof(ChatHubConversationHook)} (conversation id: {conversationId})"); - } - } - - private async Task ReceiveClientMessage(string conversationId, ChatResponseDto model) - { - try - { - if (_settings.EventDispatchBy == EventDispatchType.Group) - { - await _chatHub.Clients.Group(conversationId).SendAsync(RECEIVE_CLIENT_MESSAGE, model); - } - else - { - await _chatHub.Clients.User(_user.Id).SendAsync(RECEIVE_CLIENT_MESSAGE, model); - } - } - catch (Exception ex) - { - _logger.LogWarning(ex, $"Failed to receive assistant message in {nameof(ChatHubConversationHook)} (conversation id: {conversationId})"); - } - } - - private async Task ReceiveAssistantMessage(string conversationId, string? json) - { - try - { - if (_settings.EventDispatchBy == EventDispatchType.Group) - { - await _chatHub.Clients.Group(conversationId).SendAsync(RECEIVE_ASSISTANT_MESSAGE, json); - } - else - { - await _chatHub.Clients.User(_user.Id).SendAsync(RECEIVE_ASSISTANT_MESSAGE, json); - } - } - catch (Exception ex) - { - _logger.LogWarning(ex, $"Failed to receive assistant message in {nameof(ChatHubConversationHook)} (conversation id: {conversationId})"); - } - - } - - private async Task GenerateSenderAction(string conversationId, ConversationSenderActionModel action) - { - try - { - if (_settings.EventDispatchBy == EventDispatchType.Group) - { - await _chatHub.Clients.Group(conversationId).SendAsync(GENERATE_SENDER_ACTION, action); - } - else - { - await _chatHub.Clients.User(_user.Id).SendAsync(GENERATE_SENDER_ACTION, action); - } - } - catch (Exception ex) - { - _logger.LogWarning(ex, $"Failed to generate sender action in {nameof(ChatHubConversationHook)} (conversation id: {conversationId})"); - } - } - - private async Task DeleteMessage(string conversationId, ChatResponseDto model) - { - try - { - if (_settings.EventDispatchBy == EventDispatchType.Group) - { - await _chatHub.Clients.Group(conversationId).SendAsync(DELETE_MESSAGE, model); - } - else - { - await _chatHub.Clients.User(_user.Id).SendAsync(DELETE_MESSAGE, model); - } - } - catch (Exception ex) - { - _logger.LogWarning(ex, $"Failed to delete message in {nameof(ChatHubConversationHook)} (conversation id: {conversationId})"); - } - } - - private async Task GenerateNotification(string conversationId, string? json) + private async Task SendEvent(string @event, string conversationId, T data, [CallerMemberName] string callerName = "") { - try - { - if (_settings.EventDispatchBy == EventDispatchType.Group) - { - await _chatHub.Clients.Group(conversationId).SendAsync(GENERATE_NOTIFICATION, json); - } - else - { - await _chatHub.Clients.User(_user.Id).SendAsync(GENERATE_NOTIFICATION, json); - } - } - catch (Exception ex) - { - _logger.LogWarning(ex, $"Failed to generate notification in {nameof(ChatHubConversationHook)} (conversation id: {conversationId})"); - } + var user = _services.GetRequiredService(); + await EventEmitter.SendChatEvent(_services, _logger, @event, conversationId, user?.Id, data, nameof(ChatHubConversationHook), callerName); } #endregion } diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubCrontabHook.cs b/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubCrontabHook.cs index ef9566703..f965d28ef 100644 --- a/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubCrontabHook.cs +++ b/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubCrontabHook.cs @@ -1,4 +1,5 @@ using BotSharp.Abstraction.Conversations.Dtos; +using BotSharp.Abstraction.Conversations.Enums; using BotSharp.Abstraction.Crontab; using BotSharp.Abstraction.Crontab.Models; using Microsoft.AspNetCore.SignalR; @@ -14,11 +15,8 @@ public class ChatHubCrontabHook : ICrontabHook private readonly BotSharpOptions _options; private readonly ChatHubSettings _settings; - #region Events - private const string GENERATE_NOTIFICATION = "OnNotificationGenerated"; - #endregion - - public ChatHubCrontabHook(IServiceProvider services, + public ChatHubCrontabHook( + IServiceProvider services, IHubContext chatHub, ILogger logger, IUserIdentity user, @@ -35,7 +33,7 @@ public ChatHubCrontabHook(IServiceProvider services, public async Task OnCronTriggered(CrontabItem item) { - var json = JsonSerializer.Serialize(new ChatResponseDto() + var data = new ChatResponseDto() { ConversationId = item.ConversationId, MessageId = Guid.NewGuid().ToString(), @@ -47,16 +45,16 @@ public async Task OnCronTriggered(CrontabItem item) LastName = "AI", Role = AgentRole.Assistant } - }, _options.JsonSerializerOptions); + }; - await SendEvent(item, json); + await SendEvent(item, data); } - private async Task SendEvent(CrontabItem item, string json) + private async Task SendEvent(CrontabItem item, ChatResponseDto data) { try { - await _chatHub.Clients.User(item.UserId).SendAsync(GENERATE_NOTIFICATION, json); + await _chatHub.Clients.User(item.UserId).SendAsync(ChatEvent.OnNotificationGenerated, data); } catch { } } diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/StreamingLogHook.cs b/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/StreamingLogHook.cs index 5dd89a2b4..86adaf2c5 100644 --- a/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/StreamingLogHook.cs +++ b/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/StreamingLogHook.cs @@ -1,5 +1,8 @@ +using BotSharp.Abstraction.Conversations.Enums; using BotSharp.Abstraction.Routing.Enums; +using BotSharp.Abstraction.Routing.Models; using Microsoft.AspNetCore.SignalR; +using System.Runtime.CompilerServices; using System.Text.Encodings.Web; using System.Text.Unicode; @@ -19,13 +22,6 @@ public class StreamingLogHook : ConversationHookBase, IContentGeneratingHook, IR private readonly IAgentService _agentService; private readonly IRoutingContext _routingCtx; - #region Events - private const string CONTENT_LOG_GENERATED = "OnConversationContentLogGenerated"; - private const string STATE_LOG_GENERATED = "OnConversateStateLogGenerated"; - private const string AGENT_QUEUE_CHANGED = "OnAgentQueueChanged"; - private const string STATE_CHANGED = "OnStateChangeGenerated"; - #endregion - public StreamingLogHook( ConversationSetting convSettings, BotSharpOptions options, @@ -65,7 +61,8 @@ public override async Task OnMessageReceived(RoleDialogModel message) Source = ContentLogSource.UserInput, Log = log }; - await SendContentLog(conversationId, input); + //await SendContentLog(conversationId, input); + await SendEvent(ChatEvent.OnConversationContentLogGenerated, conversationId, BuildContentLog(input)); } public override async Task OnPostbackMessageReceived(RoleDialogModel message, PostbackMessageModel replyMsg) @@ -83,7 +80,8 @@ public override async Task OnPostbackMessageReceived(RoleDialogModel message, Po Source = ContentLogSource.UserInput, Log = log }; - await SendContentLog(conversationId, input); + //await SendContentLog(conversationId, input); + await SendEvent(ChatEvent.OnConversationContentLogGenerated, conversationId, BuildContentLog(input)); } public async Task OnSessionUpdated(Agent agent, string instruction, FunctionDef[] functions, bool isInit = false) @@ -112,7 +110,8 @@ public async Task OnSessionUpdated(Agent agent, string instruction, FunctionDef[ Source = ContentLogSource.Prompt, Log = log }; - await SendContentLog(conversationId, input); + //await SendContentLog(conversationId, input); + await SendEvent(ChatEvent.OnConversationContentLogGenerated, conversationId, BuildContentLog(input)); } public async Task OnRenderingTemplate(Agent agent, string name, string content) @@ -134,7 +133,8 @@ public async Task OnRenderingTemplate(Agent agent, string name, string content) Source = ContentLogSource.HardRule, Log = log }; - await SendContentLog(conversationId, input); + //await SendContentLog(conversationId, input); + await SendEvent(ChatEvent.OnConversationContentLogGenerated, conversationId, BuildContentLog(input)); } public async Task BeforeGenerating(Agent agent, List conversations) @@ -142,7 +142,7 @@ public async Task BeforeGenerating(Agent agent, List conversati if (!_convSettings.ShowVerboseLog) return; } - public override async Task OnFunctionExecuting(RoleDialogModel message, string from = InvokeSource.Manual) + public override async Task OnFunctionExecuting(RoleDialogModel message, InvokeFunctionOptions? options = null) { var conversationId = _state.GetConversationId(); if (string.IsNullOrEmpty(conversationId)) return; @@ -162,10 +162,11 @@ public override async Task OnFunctionExecuting(RoleDialogModel message, string f Source = ContentLogSource.FunctionCall, Log = log }; - await SendContentLog(conversationId, input); + //await SendContentLog(conversationId, input); + await SendEvent(ChatEvent.OnConversationContentLogGenerated, conversationId, BuildContentLog(input)); } - public override async Task OnFunctionExecuted(RoleDialogModel message, string from = InvokeSource.Manual) + public override async Task OnFunctionExecuted(RoleDialogModel message, InvokeFunctionOptions? options = null) { var conversationId = _state.GetConversationId(); if (string.IsNullOrEmpty(conversationId)) return; @@ -183,7 +184,8 @@ public override async Task OnFunctionExecuted(RoleDialogModel message, string fr Source = ContentLogSource.FunctionCall, Log = log }; - await SendContentLog(conversationId, input); + //await SendContentLog(conversationId, input); + await SendEvent(ChatEvent.OnConversationContentLogGenerated, conversationId, BuildContentLog(input)); } /// @@ -210,7 +212,8 @@ public async Task AfterGenerated(RoleDialogModel message, TokenStatsModel tokenS Source = ContentLogSource.Prompt, Log = log }; - await SendContentLog(conversationId, input); + //await SendContentLog(conversationId, input); + await SendEvent(ChatEvent.OnConversationContentLogGenerated, conversationId, BuildContentLog(input)); } /// @@ -225,7 +228,8 @@ public override async Task OnResponseGenerated(RoleDialogModel message) var conv = _services.GetRequiredService(); var routingCtx = _services.GetRequiredService(); - await SendStateLog(conv.ConversationId, routingCtx.EntryAgentId, _state.GetStates(), message); + var stateLog = BuildStateLog(conv.ConversationId, routingCtx.EntryAgentId, _state.GetStates(), message); + await SendEvent(ChatEvent.OnConversateStateLogGenerated, conv.ConversationId, stateLog); if (message.Role == AgentRole.Assistant) { @@ -244,7 +248,7 @@ public override async Task OnResponseGenerated(RoleDialogModel message) Source = ContentLogSource.AgentResponse, Log = log }; - await SendContentLog(conversationId, input); + await SendEvent(ChatEvent.OnConversationContentLogGenerated, conversationId, BuildContentLog(input)); } } @@ -262,7 +266,7 @@ public override async Task OnTaskCompleted(RoleDialogModel message) Source = ContentLogSource.FunctionCall, Log = log }; - await SendContentLog(conversationId, input); + await SendEvent(ChatEvent.OnConversationContentLogGenerated, conversationId, BuildContentLog(input)); } public override async Task OnConversationEnding(RoleDialogModel message) @@ -279,7 +283,7 @@ public override async Task OnConversationEnding(RoleDialogModel message) Source = ContentLogSource.FunctionCall, Log = log }; - await SendContentLog(conversationId, input); + await SendEvent(ChatEvent.OnConversationContentLogGenerated, conversationId, BuildContentLog(input)); } public override async Task OnBreakpointUpdated(string conversationId, bool resetStates) @@ -307,7 +311,7 @@ public override async Task OnBreakpointUpdated(string conversationId, bool reset }, Log = log }; - await SendContentLog(conversationId, input); + await SendEvent(ChatEvent.OnConversationContentLogGenerated, conversationId, BuildContentLog(input)); } public override async Task OnStateChanged(StateChangeModel stateChange) @@ -317,7 +321,7 @@ public override async Task OnStateChanged(StateChangeModel stateChange) if (stateChange == null) return; - await SendStateChange(conversationId, stateChange); + await SendEvent(ChatEvent.OnStateChangeGenerated, conversationId, BuildStateChangeLog(stateChange)); } #endregion @@ -331,7 +335,7 @@ public async Task OnAgentEnqueued(string agentId, string preAgentId, string? rea // Agent queue log var log = $"{agent.Name} is enqueued"; - await SendAgentQueueLog(conversationId, log); + await SendEvent(ChatEvent.OnAgentQueueChanged, conversationId, BuildAgentQueueChangedLog(conversationId, log)); // Content log log = $"{agent.Name} is enqueued{(reason != null ? $" ({reason})" : "")}"; @@ -346,7 +350,7 @@ public async Task OnAgentEnqueued(string agentId, string preAgentId, string? rea Source = ContentLogSource.HardRule, Log = log }; - await SendContentLog(conversationId, input); + await SendEvent(ChatEvent.OnConversationContentLogGenerated, conversationId, BuildContentLog(input)); } public async Task OnAgentDequeued(string agentId, string currentAgentId, string? reason = null) @@ -359,7 +363,7 @@ public async Task OnAgentDequeued(string agentId, string currentAgentId, string? // Agent queue log var log = $"{agent.Name} is dequeued"; - await SendAgentQueueLog(conversationId, log); + await SendEvent(ChatEvent.OnAgentQueueChanged, conversationId, BuildAgentQueueChangedLog(conversationId, log)); // Content log log = $"{agent.Name} is dequeued{(reason != null ? $" ({reason})" : "")}, current agent is {currentAgent?.Name}"; @@ -374,7 +378,7 @@ public async Task OnAgentDequeued(string agentId, string currentAgentId, string? Source = ContentLogSource.HardRule, Log = log }; - await SendContentLog(conversationId, input); + await SendEvent(ChatEvent.OnConversationContentLogGenerated, conversationId, BuildContentLog(input)); } public async Task OnAgentReplaced(string fromAgentId, string toAgentId, string? reason = null) @@ -387,7 +391,7 @@ public async Task OnAgentReplaced(string fromAgentId, string toAgentId, string? // Agent queue log var log = $"Agent queue is replaced from {fromAgent.Name} to {toAgent.Name}"; - await SendAgentQueueLog(conversationId, log); + await SendEvent(ChatEvent.OnAgentQueueChanged, conversationId, BuildAgentQueueChangedLog(conversationId, log)); // Content log log = $"{fromAgent.Name} is replaced to {toAgent.Name}{(reason != null ? $" ({reason})" : "")}"; @@ -402,7 +406,7 @@ public async Task OnAgentReplaced(string fromAgentId, string toAgentId, string? Source = ContentLogSource.HardRule, Log = log }; - await SendContentLog(conversationId, input); + await SendEvent(ChatEvent.OnConversationContentLogGenerated, conversationId, BuildContentLog(input)); } public async Task OnAgentQueueEmptied(string agentId, string? reason = null) @@ -412,7 +416,7 @@ public async Task OnAgentQueueEmptied(string agentId, string? reason = null) // Agent queue log var log = $"Agent queue is empty"; - await SendAgentQueueLog(conversationId, log); + await SendEvent(ChatEvent.OnAgentQueueChanged, conversationId, BuildAgentQueueChangedLog(conversationId, log)); // Content log log = reason ?? "Agent queue is cleared"; @@ -427,7 +431,7 @@ public async Task OnAgentQueueEmptied(string agentId, string? reason = null) Source = ContentLogSource.HardRule, Log = log }; - await SendContentLog(conversationId, input); + await SendEvent(ChatEvent.OnConversationContentLogGenerated, conversationId, BuildContentLog(input)); } public async Task OnRoutingInstructionReceived(FunctionCallFromLlm instruct, RoleDialogModel message) @@ -446,7 +450,7 @@ public async Task OnRoutingInstructionReceived(FunctionCallFromLlm instruct, Rol Source = ContentLogSource.AgentResponse, Log = log }; - await SendContentLog(conversationId, input); + await SendEvent(ChatEvent.OnConversationContentLogGenerated, conversationId, BuildContentLog(input)); } public async Task OnRoutingInstructionRevised(FunctionCallFromLlm instruct, RoleDialogModel message) @@ -464,90 +468,19 @@ public async Task OnRoutingInstructionRevised(FunctionCallFromLlm instruct, Role Source = ContentLogSource.HardRule, Log = log }; - await SendContentLog(conversationId, input); + await SendEvent(ChatEvent.OnConversationContentLogGenerated, conversationId, BuildContentLog(input)); } #endregion #region Private methods - private async Task SendContentLog(string conversationId, ContentLogInputModel input) + private async Task SendEvent(string @event, string conversationId, T data, [CallerMemberName] string callerName = "") { - try - { - if (_settings.EventDispatchBy == EventDispatchType.Group) - { - await _chatHub.Clients.Group(conversationId).SendAsync(CONTENT_LOG_GENERATED, BuildContentLog(input)); - } - else - { - await _chatHub.Clients.User(_user.Id).SendAsync(CONTENT_LOG_GENERATED, BuildContentLog(input)); - } - } - catch (Exception ex) - { - _logger.LogWarning(ex, $"Failed to send content log in {nameof(StreamingLogHook)} (conversation id: {conversationId})."); - } + var user = _services.GetRequiredService(); + await EventEmitter.SendChatEvent(_services, _logger, @event, conversationId, user?.Id, data, nameof(StreamingLogHook), callerName); } - private async Task SendStateLog(string conversationId, string agentId, Dictionary states, RoleDialogModel message) - { - try - { - if (_settings.EventDispatchBy == EventDispatchType.Group) - { - await _chatHub.Clients.Group(conversationId).SendAsync(STATE_LOG_GENERATED, BuildStateLog(conversationId, agentId, states, message)); - } - else - { - await _chatHub.Clients.User(_user.Id).SendAsync(STATE_LOG_GENERATED, BuildStateLog(conversationId, agentId, states, message)); - } - } - catch (Exception ex) - { - _logger.LogWarning(ex, $"Failed to send state log in {nameof(StreamingLogHook)} (conversation id: {conversationId})."); - } - } - - private async Task SendAgentQueueLog(string conversationId, string log) - { - try - { - if (_settings.EventDispatchBy == EventDispatchType.Group) - { - await _chatHub.Clients.Group(conversationId).SendAsync(AGENT_QUEUE_CHANGED, BuildAgentQueueChangedLog(conversationId, log)); - } - else - { - await _chatHub.Clients.User(_user.Id).SendAsync(AGENT_QUEUE_CHANGED, BuildAgentQueueChangedLog(conversationId, log)); - } - } - catch (Exception ex) - { - _logger.LogWarning(ex, $"Failed to send agent queue log in {nameof(StreamingLogHook)} (conversation id: {conversationId})."); - } - } - - private async Task SendStateChange(string conversationId, StateChangeModel stateChange) - { - try - { - if (_settings.EventDispatchBy == EventDispatchType.Group) - { - await _chatHub.Clients.Group(conversationId).SendAsync(STATE_CHANGED, BuildStateChangeLog(stateChange)); - } - else - { - await _chatHub.Clients.User(_user.Id).SendAsync(STATE_CHANGED, BuildStateChangeLog(stateChange)); - } - } - catch (Exception ex) - { - _logger.LogWarning(ex, $"Failed to send state change in {nameof(StreamingLogHook)} (conversation id: {conversationId})."); - } - } - - - private string BuildContentLog(ContentLogInputModel input) + private ContentLogOutputModel BuildContentLog(ContentLogInputModel input) { var output = new ContentLogOutputModel { @@ -561,8 +494,6 @@ private string BuildContentLog(ContentLogInputModel input) CreatedTime = DateTime.UtcNow }; - var json = JsonSerializer.Serialize(output, _options.JsonSerializerOptions); - var convSettings = _services.GetRequiredService(); if (convSettings.EnableContentLog) { @@ -570,10 +501,10 @@ private string BuildContentLog(ContentLogInputModel input) db.SaveConversationContentLog(output); } - return json; + return output; } - private string BuildStateLog(string conversationId, string agentId, Dictionary states, RoleDialogModel message) + private ConversationStateLogModel BuildStateLog(string conversationId, string agentId, Dictionary states, RoleDialogModel message) { var log = new ConversationStateLogModel { @@ -591,10 +522,10 @@ private string BuildStateLog(string conversationId, string agentId, Dictionary chatHub, ILogger logger, IUserIdentity user, @@ -64,7 +63,7 @@ public override async Task OnUserAgentConnectedInitially(Conversation conversati RichContent = richContent }; - var json = JsonSerializer.Serialize(new ChatResponseDto() + var data = new ChatResponseDto() { ConversationId = conversation.Id, MessageId = dialog.MessageId, @@ -76,35 +75,20 @@ public override async Task OnUserAgentConnectedInitially(Conversation conversati LastName = "", Role = AgentRole.Assistant } - }, _options.JsonSerializerOptions); + }; await Task.Delay(300); - _storage.Append(conversation.Id, dialog); - - await SendEvent(conversation.Id, json); + await SendEvent(ChatEvent.OnMessageReceivedFromAssistant, conversation.Id, data); } } await base.OnUserAgentConnectedInitially(conversation); } - private async Task SendEvent(string conversationId, string json) + private async Task SendEvent(string @event, string conversationId, T data, [CallerMemberName] string callerName = "") { - try - { - if (_settings.EventDispatchBy == EventDispatchType.Group) - { - await _chatHub.Clients.Group(conversationId).SendAsync(RECEIVE_ASSISTANT_MESSAGE, json); - } - else - { - await _chatHub.Clients.User(_user.Id).SendAsync(RECEIVE_ASSISTANT_MESSAGE, json); - } - } - catch (Exception ex) - { - _logger.LogWarning(ex, $"Failed to send event in {nameof(WelcomeHook)} (conversation id: {conversationId})."); - } + var user = _services.GetRequiredService(); + await EventEmitter.SendChatEvent(_services, _logger, @event, conversationId, user?.Id, data, nameof(WelcomeHook), callerName); } } diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs b/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs index c15419ed1..9b5a97e54 100644 --- a/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs +++ b/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs @@ -1,113 +1,126 @@ using BotSharp.Abstraction.Conversations.Dtos; -using BotSharp.Abstraction.Observables.Models; +using BotSharp.Abstraction.Conversations.Enums; +using BotSharp.Abstraction.MessageHub.Models; +using BotSharp.Abstraction.MessageHub.Observers; using BotSharp.Abstraction.SideCar; -using BotSharp.Plugin.ChatHub.Hooks; -using Microsoft.AspNetCore.SignalR; +using System.Runtime.CompilerServices; namespace BotSharp.Plugin.ChatHub.Observers; -public class ChatHubObserver : IObserver +public class ChatHubObserver : BotSharpObserverBase> { private readonly ILogger _logger; - private IServiceProvider _services; + private readonly IServiceProvider _services; - private const string BEFORE_RECEIVE_LLM_STREAM_MESSAGE = "BeforeReceiveLlmStreamMessage"; - private const string ON_RECEIVE_LLM_STREAM_MESSAGE = "OnReceiveLlmStreamMessage"; - private const string AFTER_RECEIVE_LLM_STREAM_MESSAGE = "AfterReceiveLlmStreamMessage"; - private const string GENERATE_SENDER_ACTION = "OnSenderActionGenerated"; - - public ChatHubObserver(ILogger logger) + public ChatHubObserver( + IServiceProvider services, + ILogger logger) : base() { + _services = services; _logger = logger; } - public void OnCompleted() + public override string Name => nameof(ChatHubObserver); + + public override void OnCompleted() { _logger.LogWarning($"{nameof(ChatHubObserver)} receives complete notification."); } - public void OnError(Exception error) + public override void OnError(Exception error) { _logger.LogError(error, $"{nameof(ChatHubObserver)} receives error notification: {error.Message}"); } - public void OnNext(HubObserveData value) + public override void OnNext(HubObserveData value) { - _services = value.ServiceProvider; - - if (!AllowSendingMessage()) return; - var message = value.Data; var model = new ChatResponseDto(); - if (value.EventName == BEFORE_RECEIVE_LLM_STREAM_MESSAGE) + var action = new ConversationSenderActionModel(); + var conv = _services.GetRequiredService(); + + switch (value.EventName) { - var conv = _services.GetRequiredService(); - model = new ChatResponseDto() - { - ConversationId = conv.ConversationId, - MessageId = message.MessageId, - Text = string.Empty, - Sender = new() + case ChatEvent.BeforeReceiveLlmStreamMessage: + if (!AllowSendingMessage()) return; + + model = new ChatResponseDto() { - FirstName = "AI", - LastName = "Assistant", - Role = AgentRole.Assistant - } - }; - - var action = new ConversationSenderActionModel - { - ConversationId = conv.ConversationId, - SenderAction = SenderActionEnum.TypingOn - }; - - GenerateSenderAction(conv.ConversationId, action); - } - else if (value.EventName == AFTER_RECEIVE_LLM_STREAM_MESSAGE && message.IsStreaming) - { - var conv = _services.GetRequiredService(); - model = new ChatResponseDto() - { - ConversationId = conv.ConversationId, - MessageId = message.MessageId, - Text = message.Content, - Sender = new() + ConversationId = conv.ConversationId, + MessageId = message.MessageId, + Text = string.Empty, + Sender = new() + { + FirstName = "AI", + LastName = "Assistant", + Role = AgentRole.Assistant + } + }; + + action = new ConversationSenderActionModel { - FirstName = "AI", - LastName = "Assistant", - Role = AgentRole.Assistant - } - }; - - var action = new ConversationSenderActionModel - { - ConversationId = conv.ConversationId, - SenderAction = SenderActionEnum.TypingOff - }; - - GenerateSenderAction(conv.ConversationId, action); - } - else if (value.EventName == ON_RECEIVE_LLM_STREAM_MESSAGE) - { - var conv = _services.GetRequiredService(); - model = new ChatResponseDto() - { - ConversationId = conv.ConversationId, - MessageId = message.MessageId, - Text = !string.IsNullOrEmpty(message.SecondaryContent) ? message.SecondaryContent : message.Content, - Function = message.FunctionName, - RichContent = message.SecondaryRichContent ?? message.RichContent, - Data = message.Data, - Sender = new() + ConversationId = conv.ConversationId, + SenderAction = SenderActionEnum.TypingOn + }; + + SendEvent(ChatEvent.OnSenderActionGenerated, conv.ConversationId, action); + break; + case ChatEvent.OnReceiveLlmStreamMessage: + if (!AllowSendingMessage()) return; + + model = new ChatResponseDto() + { + ConversationId = conv.ConversationId, + MessageId = message.MessageId, + Text = !string.IsNullOrEmpty(message.SecondaryContent) ? message.SecondaryContent : message.Content, + Function = message.FunctionName, + RichContent = message.SecondaryRichContent ?? message.RichContent, + Data = message.Data, + Sender = new() + { + FirstName = "AI", + LastName = "Assistant", + Role = AgentRole.Assistant + } + }; + break; + case ChatEvent.AfterReceiveLlmStreamMessage: + if (!AllowSendingMessage()) return; + + model = new ChatResponseDto() { - FirstName = "AI", - LastName = "Assistant", - Role = AgentRole.Assistant - } - }; + ConversationId = conv.ConversationId, + MessageId = message.MessageId, + Text = message.Content, + Sender = new() + { + FirstName = "AI", + LastName = "Assistant", + Role = AgentRole.Assistant + } + }; + break; + case ChatEvent.OnIndicationReceived: + model = new ChatResponseDto + { + ConversationId = conv.ConversationId, + MessageId = message.MessageId, + Indication = message.Indication, + Sender = new() + { + FirstName = "AI", + LastName = "Assistant", + Role = AgentRole.Assistant + } + }; + +#if DEBUG + _logger.LogCritical($"Receiving {value.EventName} ({value.Data.Indication}) in {nameof(ChatHubObserver)} - {conv.ConversationId}"); +#endif + break; } - OnReceiveAssistantMessage(value.EventName, model.ConversationId, model); + SendEvent(value.EventName, model.ConversationId, model); } private bool AllowSendingMessage() @@ -116,48 +129,12 @@ private bool AllowSendingMessage() return sidecar == null || !sidecar.IsEnabled; } - private void OnReceiveAssistantMessage(string @event, string conversationId, ChatResponseDto model) - { - try - { - var settings = _services.GetRequiredService(); - var chatHub = _services.GetRequiredService>(); - - if (settings.EventDispatchBy == EventDispatchType.Group) - { - chatHub.Clients.Group(conversationId).SendAsync(@event, model).ConfigureAwait(false).GetAwaiter().GetResult(); - } - else - { - var user = _services.GetRequiredService(); - chatHub.Clients.User(user.Id).SendAsync(@event, model).ConfigureAwait(false).GetAwaiter().GetResult(); - } - } - catch (Exception ex) - { - _logger.LogWarning(ex, $"Failed to receive assistant message in {nameof(ChatHubConversationHook)} (conversation id: {conversationId})"); - } - } - - private void GenerateSenderAction(string conversationId, ConversationSenderActionModel action) + #region Private methods + private void SendEvent(string @event, string conversationId, T data, [CallerMemberName] string callerName = "") { - try - { - var settings = _services.GetRequiredService(); - var chatHub = _services.GetRequiredService>(); - if (settings.EventDispatchBy == EventDispatchType.Group) - { - chatHub.Clients.Group(conversationId).SendAsync(GENERATE_SENDER_ACTION, action).ConfigureAwait(false).GetAwaiter().GetResult(); - } - else - { - var user = _services.GetRequiredService(); - chatHub.Clients.User(user.Id).SendAsync(GENERATE_SENDER_ACTION, action).ConfigureAwait(false).GetAwaiter().GetResult(); - } - } - catch (Exception ex) - { - _logger.LogWarning(ex, $"Failed to generate sender action in {nameof(ChatHubConversationHook)} (conversation id: {conversationId})"); - } + var user = _services.GetRequiredService(); + EventEmitter.SendChatEvent(_services, _logger, @event, conversationId, user?.Id, data, nameof(ChatHubObserver), callerName) + .ConfigureAwait(false).GetAwaiter().GetResult(); } + #endregion } diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/Using.cs b/src/Plugins/BotSharp.Plugin.ChatHub/Using.cs index 4710509cc..3473bb6c7 100644 --- a/src/Plugins/BotSharp.Plugin.ChatHub/Using.cs +++ b/src/Plugins/BotSharp.Plugin.ChatHub/Using.cs @@ -34,4 +34,5 @@ global using BotSharp.Abstraction.Realtime.Models; global using BotSharp.Plugin.ChatHub.Settings; global using BotSharp.Plugin.ChatHub.Enums; -global using BotSharp.Plugin.ChatHub.Models.Stream; \ No newline at end of file +global using BotSharp.Plugin.ChatHub.Models.Stream; +global using BotSharp.Plugin.ChatHub.Helpers; \ No newline at end of file diff --git a/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs index 1335d4dc3..f29dd77ae 100644 --- a/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs @@ -1,8 +1,9 @@ +using BotSharp.Abstraction.Conversations.Enums; using BotSharp.Abstraction.Files; using BotSharp.Abstraction.Hooks; -using BotSharp.Abstraction.Observables.Models; +using BotSharp.Abstraction.MessageHub.Models; using BotSharp.Core.Infrastructures.Streams; -using BotSharp.Core.Observables.Queues; +using BotSharp.Core.MessageHub; using BotSharp.Plugin.DeepSeek.Providers; using Microsoft.Extensions.Logging; using OpenAI.Chat; @@ -179,7 +180,8 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, var chatClient = client.GetChatClient(_model); var (prompt, messages, options) = PrepareOptions(agent, conversations); - var hub = _services.GetRequiredService>(); + var hub = _services.GetRequiredService>>(); + var conv = _services.GetRequiredService(); var messageId = conversations.LastOrDefault()?.MessageId ?? string.Empty; var contentHooks = _services.GetHooks(agent.Id); @@ -191,8 +193,8 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, hub.Push(new() { - ServiceProvider = _services, - EventName = "BeforeReceiveLlmStreamMessage", + EventName = ChatEvent.BeforeReceiveLlmStreamMessage, + RefId = conv.ConversationId, Data = new RoleDialogModel(AgentRole.Assistant, string.Empty) { CurrentAgentId = agent.Id, @@ -235,8 +237,8 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, }; hub.Push(new() { - ServiceProvider = _services, - EventName = "OnReceiveLlmStreamMessage", + EventName = ChatEvent.OnReceiveLlmStreamMessage, + RefId = conv.ConversationId, Data = content }); } @@ -278,8 +280,8 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, hub.Push(new() { - ServiceProvider = _services, - EventName = "AfterReceiveLlmStreamMessage", + EventName = ChatEvent.AfterReceiveLlmStreamMessage, + RefId = conv.ConversationId, Data = responseMessage }); diff --git a/src/Plugins/BotSharp.Plugin.KnowledgeBase/Services/KnowledgeService.Vector.cs b/src/Plugins/BotSharp.Plugin.KnowledgeBase/Services/KnowledgeService.Vector.cs index c230078df..ce5fa43c4 100644 --- a/src/Plugins/BotSharp.Plugin.KnowledgeBase/Services/KnowledgeService.Vector.cs +++ b/src/Plugins/BotSharp.Plugin.KnowledgeBase/Services/KnowledgeService.Vector.cs @@ -33,31 +33,30 @@ public async Task CreateVectorCollection(string collectionName, string col return false; } - var vectorDb = GetVectorDb(); - var created = await vectorDb.CreateCollection(collectionName, dimension); - if (created) + var db = _services.GetRequiredService(); + var created = db.AddKnowledgeCollectionConfigs(new List { - var db = _services.GetRequiredService(); - var userId = await GetUserId(); - - db.AddKnowledgeCollectionConfigs(new List + new VectorCollectionConfig { - new VectorCollectionConfig + Name = collectionName, + Type = collectionType, + VectorStore = new VectorStoreConfig { - Name = collectionName, - Type = collectionType, - VectorStore = new VectorStoreConfig - { - Provider = _settings.VectorDb.Provider - }, - TextEmbedding = new KnowledgeEmbeddingConfig - { - Provider = provider, - Model = model, - Dimension = dimension - } + Provider = _settings.VectorDb.Provider + }, + TextEmbedding = new KnowledgeEmbeddingConfig + { + Provider = provider, + Model = model, + Dimension = dimension } - }); + } + }); + + if (created) + { + var vectorDb = GetVectorDb(); + created = await vectorDb.CreateCollection(collectionName, dimension); } return created; diff --git a/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs index 321b9aee8..08b6c6761 100644 --- a/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs @@ -1,9 +1,10 @@ using BotSharp.Abstraction.Agents; +using BotSharp.Abstraction.Conversations.Enums; using BotSharp.Abstraction.Hooks; using BotSharp.Abstraction.Loggers; -using BotSharp.Abstraction.Observables.Models; +using BotSharp.Abstraction.MessageHub.Models; using BotSharp.Core.Infrastructures.Streams; -using BotSharp.Core.Observables.Queues; +using BotSharp.Core.MessageHub; using Microsoft.AspNetCore.SignalR; using static LLama.Common.ChatHistory; using static System.Net.Mime.MediaTypeNames; @@ -183,13 +184,14 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, _logger.LogInformation(agent.Instruction); } - var hub = _services.GetRequiredService>(); + var hub = _services.GetRequiredService>>(); + var conv = _services.GetRequiredService(); var messageId = conversations.LastOrDefault()?.MessageId ?? string.Empty; hub.Push(new() { - ServiceProvider = _services, - EventName = "BeforeReceiveLlmStreamMessage", + EventName = ChatEvent.BeforeReceiveLlmStreamMessage, + RefId = conv.ConversationId, Data = new RoleDialogModel(AgentRole.Assistant, string.Empty) { CurrentAgentId = agent.Id, @@ -216,8 +218,8 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, }; hub.Push(new() { - ServiceProvider = _services, - EventName = "OnReceiveLlmStreamMessage", + EventName = ChatEvent.OnReceiveLlmStreamMessage, + RefId = conv.ConversationId, Data = content }); } @@ -231,8 +233,8 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, hub.Push(new() { - ServiceProvider = _services, - EventName = "AfterReceiveLlmStreamMessage", + EventName = ChatEvent.AfterReceiveLlmStreamMessage, + RefId = conv.ConversationId, Data = responseMessage }); diff --git a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs index 57b442527..af7e7e0a7 100644 --- a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs @@ -1,10 +1,7 @@ -using Azure; using BotSharp.Abstraction.Hooks; -using BotSharp.Abstraction.Observables.Models; +using BotSharp.Abstraction.MessageHub.Models; using BotSharp.Core.Infrastructures.Streams; -using BotSharp.Core.Observables.Queues; -using BotSharp.Plugin.OpenAI.Models.Realtime; -using Fluid; +using BotSharp.Core.MessageHub; using OpenAI.Chat; namespace BotSharp.Plugin.OpenAI.Providers.Chat; @@ -191,7 +188,8 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, var chatClient = client.GetChatClient(_model); var (prompt, messages, options) = PrepareOptions(agent, conversations); - var hub = _services.GetRequiredService>(); + var hub = _services.GetRequiredService>>(); + var conv = _services.GetRequiredService(); var messageId = conversations.LastOrDefault()?.MessageId ?? string.Empty; var contentHooks = _services.GetHooks(agent.Id); @@ -203,8 +201,8 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, hub.Push(new() { - ServiceProvider = _services, - EventName = "BeforeReceiveLlmStreamMessage", + EventName = ChatEvent.BeforeReceiveLlmStreamMessage, + RefId = conv.ConversationId, Data = new RoleDialogModel(AgentRole.Assistant, string.Empty) { CurrentAgentId = agent.Id, @@ -247,8 +245,8 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, }; hub.Push(new() { - ServiceProvider = _services, - EventName = "OnReceiveLlmStreamMessage", + EventName = ChatEvent.OnReceiveLlmStreamMessage, + RefId = conv.ConversationId, Data = content }); } @@ -290,8 +288,8 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, hub.Push(new() { - ServiceProvider = _services, - EventName = "AfterReceiveLlmStreamMessage", + EventName = ChatEvent.AfterReceiveLlmStreamMessage, + RefId = conv.ConversationId, Data = responseMessage }); diff --git a/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs index 035e03e0c..a1d45dcce 100644 --- a/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs @@ -1,10 +1,11 @@ using BotSharp.Abstraction.Agents; using BotSharp.Abstraction.Agents.Enums; +using BotSharp.Abstraction.Conversations; +using BotSharp.Abstraction.Conversations.Enums; using BotSharp.Abstraction.Loggers; -using BotSharp.Abstraction.Observables.Models; +using BotSharp.Abstraction.MessageHub.Models; using BotSharp.Core.Infrastructures.Streams; -using BotSharp.Core.Observables.Queues; -using Microsoft.AspNetCore.SignalR; +using BotSharp.Core.MessageHub; namespace BotSharp.Plugin.SparkDesk.Providers; @@ -152,12 +153,13 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, var client = new SparkDeskClient(appId: _settings.AppId, apiKey: _settings.ApiKey, apiSecret: _settings.ApiSecret); var (prompt, messages, funcall) = PrepareOptions(agent, conversations); var messageId = conversations.LastOrDefault()?.MessageId ?? string.Empty; - var hub = _services.GetRequiredService>(); + var hub = _services.GetRequiredService>>(); + var conv = _services.GetRequiredService(); hub.Push(new() { - ServiceProvider = _services, - EventName = "BeforeReceiveLlmStreamMessage", + EventName = ChatEvent.BeforeReceiveLlmStreamMessage, + RefId = conv.ConversationId, Data = new RoleDialogModel(AgentRole.Assistant, string.Empty) { CurrentAgentId = agent.Id, @@ -197,8 +199,8 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, hub.Push(new() { - ServiceProvider = _services, - EventName = "OnReceiveLlmStreamMessage", + EventName = ChatEvent.OnReceiveLlmStreamMessage, + RefId = conv.ConversationId, Data = responseMessage }); } @@ -212,8 +214,8 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, hub.Push(new() { - ServiceProvider = _services, - EventName = "AfterReceiveLlmStreamMessage", + EventName = ChatEvent.AfterReceiveLlmStreamMessage, + RefId = conv.ConversationId, Data = responseMessage }); diff --git a/src/Plugins/BotSharp.Plugin.Twilio/Hooks/TwilioConversationHook.cs b/src/Plugins/BotSharp.Plugin.Twilio/Hooks/TwilioConversationHook.cs index 11bd8d36c..bf856ee4f 100644 --- a/src/Plugins/BotSharp.Plugin.Twilio/Hooks/TwilioConversationHook.cs +++ b/src/Plugins/BotSharp.Plugin.Twilio/Hooks/TwilioConversationHook.cs @@ -1,6 +1,7 @@ using BotSharp.Abstraction.Hooks; using BotSharp.Abstraction.Routing; using BotSharp.Abstraction.Routing.Enums; +using BotSharp.Abstraction.Routing.Models; using BotSharp.Plugin.Twilio.Interfaces; using BotSharp.Plugin.Twilio.Models; using Twilio.Rest.Api.V2010.Account; @@ -23,7 +24,7 @@ public TwilioConversationHook(IServiceProvider services, _logger = logger; } - public override async Task OnFunctionExecuted(RoleDialogModel message, string from = InvokeSource.Manual) + public override async Task OnFunctionExecuted(RoleDialogModel message, InvokeFunctionOptions? options = null) { var hooks = _services.GetHooks(message.CurrentAgentId); diff --git a/src/Plugins/BotSharp.Plugin.Twilio/Services/TwilioMessageQueueService.cs b/src/Plugins/BotSharp.Plugin.Twilio/Services/TwilioMessageQueueService.cs index 84f88b5b3..28360518e 100644 --- a/src/Plugins/BotSharp.Plugin.Twilio/Services/TwilioMessageQueueService.cs +++ b/src/Plugins/BotSharp.Plugin.Twilio/Services/TwilioMessageQueueService.cs @@ -1,4 +1,6 @@ using BotSharp.Abstraction.Files; +using BotSharp.Abstraction.MessageHub.Models; +using BotSharp.Abstraction.MessageHub.Services; using BotSharp.Abstraction.Realtime; using BotSharp.Abstraction.Routing; using BotSharp.Core.Infrastructures; @@ -82,8 +84,12 @@ private async Task ProcessUserMessageAsync(CallerMessage message) var routing = sp.GetRequiredService(); var config = sp.GetRequiredService(); var sessionManager = sp.GetRequiredService(); - var progressService = sp.GetRequiredService(); - InitProgressService(message, sessionManager, progressService); + var observer = sp.GetRequiredService(); + + using var container = observer.SubscribeObservers>(message.ConversationId, listeners: new() + { + { ChatEvent.OnIndicationReceived, async data => await OnReceiveToolCallIndication(data.Data, message, sessionManager) } + }); InitConversation(message, inputMsg, conv, routing); // Need to consider Inbound and Outbound call @@ -185,15 +191,11 @@ private static string GetHints(string agentId, AssistantMessage reply, IServiceP return string.Join(", ", hints.Select(x => x.ToLower()).Distinct().Reverse()); } - private static void InitProgressService(CallerMessage message, ITwilioSessionManager sessionManager, IConversationProgressService progressService) + private static async Task OnReceiveToolCallIndication(RoleDialogModel msg, CallerMessage message, ITwilioSessionManager sessionManager) { - progressService.OnFunctionExecuting = async msg => + if (!string.IsNullOrEmpty(msg.Indication)) { - if (!string.IsNullOrEmpty(msg.Indication)) - { - await sessionManager.SetReplyIndicationAsync(message.ConversationId, message.SeqNumber, msg.Indication); - } - }; - progressService.OnFunctionExecuted = async msg => { }; + await sessionManager.SetReplyIndicationAsync(message.ConversationId, message.SeqNumber, msg.Indication); + } } }