Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Team notifications with the power of RX #442

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 16 additions & 19 deletions Fritz.Chatbot/Commands/TeamCommand.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Fritz.StreamLib.Core;
using Fritz.StreamTools.Hubs;
using Fritz.Chatbot.Helpers;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
Expand All @@ -11,6 +12,8 @@
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace Fritz.Chatbot.Commands
{
Expand All @@ -23,6 +26,11 @@ public class TeamCommand : IExtendedCommand
private HttpClient _HttpClient;
private readonly IHubContext<AttentionHub> _Context;
private ILogger _Logger;
private Subject<string> _TeammateNotifications = new Subject<string>();
/// <summary>
/// Handle to the teammates notification subscription. Dispose to stop the subscription
/// </summary>
private IDisposable _subscriptionHandle;

public string Name { get; } = "Team Detection";
public string Description { get; } = "Alert when a teammate joins the stream and starts chatting";
Expand All @@ -31,8 +39,7 @@ public class TeamCommand : IExtendedCommand
public TimeSpan? Cooldown { get; } = TimeSpan.FromSeconds(5);
public TimeSpan ShoutoutCooldown;
public string ShoutoutFormat;
public Queue<string> _TeammateNotifications = new Queue<string>();


public TeamCommand(IConfiguration configuration, ILoggerFactory loggerFactory, IHubContext<AttentionHub> context, IHttpClientFactory httpClientFactory)
{
_TeamName = configuration["StreamServices:Twitch:Team"];
Expand All @@ -56,24 +63,14 @@ public TeamCommand(IConfiguration configuration, ILoggerFactory loggerFactory, I

GetTeammates().GetAwaiter().GetResult();

Task.Run(SendNotificationsToWidget);

SubscribeToTeamNotifications();
}

private void SendNotificationsToWidget()
{

while (true) {

if (_TeammateNotifications.TryPeek(out var _)) {

_Context.Clients.All.SendAsync("Teammate", _TeammateNotifications.Dequeue());
Task.Delay(5000);

}

}

private void SubscribeToTeamNotifications() {
//when notifications appear, throttle them to 1 item per 5 seconds
_subscriptionHandle = _TeammateNotifications.Throttle(1, TimeSpan.FromSeconds(5)).Subscribe(notification => {
_Context.Clients.All.SendAsync("Teammate", notification);
});
}

public bool CanExecute(string userName, string fullCommandText)
Expand All @@ -95,7 +92,7 @@ public async Task Execute(IChatService chatService, string userName, string full
await chatService.SendMessageAsync(ShoutoutFormat.Replace("{teammate}", userName));
}

_TeammateNotifications.Enqueue(userName);
_TeammateNotifications.OnNext(userName);

}

Expand Down
1 change: 1 addition & 0 deletions Fritz.Chatbot/Fritz.Chatbot.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.1.1" />
<PackageReference Include="NetCoreAudio" Version="1.5.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="system.reactive.linq" Version="4.4.1" />
</ItemGroup>

<ItemGroup>
Expand Down
97 changes: 97 additions & 0 deletions Fritz.Chatbot/Helpers/ObservableExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Collections.Concurrent;
using System.Threading;

namespace Fritz.Chatbot.Helpers
{
public static class ObservableExtensions
{
/// <summary>
/// Pass through up to <paramref name="count"/> items downstream within given <paramref name="interval"/>.
/// Once more elements are about to get through they will become buffered, until interval resets.
/// </summary>
public static IObservable<T> Throttle<T>(this IObservable<T> source, int count, TimeSpan interval) =>
new Throttle<T>(source, count, interval);
}

/// <summary>
/// Custom Throttle implementation, because the default provided with rx.net does not work like we want.
/// code is by @Horusiath Who confirmed that i was indeed not crazy, and that rx does not has an operator OOB
/// that behaves like this.
/// </summary>
/// <typeparam name="T"></typeparam>
public class Throttle<T> : IObservable<T>
{
private readonly IObservable<T> _source;
private readonly int _count;
private readonly TimeSpan _interval;

public Throttle(IObservable<T> source, int count, TimeSpan interval)
{
_source = source;
_count = count;
_interval = interval;
}

public IDisposable Subscribe(IObserver<T> observer) =>
_source.SubscribeSafe(new Observer(observer, _count, _interval));

private sealed class Observer : IObserver<T>
{
private readonly IObserver<T> _observer;
private readonly int _count;
private readonly Timer _timer;
private readonly ConcurrentQueue<T> _buffer;
private int _remaining;

public Observer(IObserver<T> observer, int count, TimeSpan interval)
{
_observer = observer;
_remaining = _count = count;
_buffer = new ConcurrentQueue<T>();
_timer = new Timer(_ =>
{
// first, try to dequeue up to `_count` buffered items
// after that is done, reset `_remaining` quota to what's left
var i = _count;
while (i > 0 && _buffer.TryDequeue(out var value))
{
i--;
_observer.OnNext(value);
}

// reset remaining count at the end of the interval
Interlocked.Exchange(ref _remaining, i);
}, null, interval, interval);
}

public void OnCompleted()
{
// what to do with buffered items? Up to you.
_timer.Dispose();
_observer.OnCompleted();
}

public void OnError(Exception error)
{
_observer.OnError(error);
}

public void OnNext(T value)
{
if (Interlocked.Decrement(ref _remaining) >= 0)
{
// if we have free quota to spare in this interval, emit value downstream
_observer.OnNext(value);
}
else
{
// otherwise buffer value until timer will reset it
_buffer.Enqueue(value);
}
}
}
}
}