Skip to content
This repository has been archived by the owner on Mar 28, 2020. It is now read-only.

Commit

Permalink
Workaround for aborted websocket issue added;
Browse files Browse the repository at this point in the history
  • Loading branch information
KonH committed Nov 29, 2018
1 parent 31ae917 commit 7b16dfe
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 0 deletions.
43 changes: 43 additions & 0 deletions Server/Extensions/ReflectionExts.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using System;
using System.Data;
using System.Reflection;

namespace ReflectionExtensions {
public static class ReflectionExts {
static void ArgumentNotNull(object instance, string name) {
if ( instance == null ) {
throw new ArgumentNullException(name);
}
}

static void ValueNotNull(object instance, string message) {
if ( instance == null ) {
throw new InvalidExpressionException(message);
}
}

public static object GetPrivateField(this object instance, string name) {
ArgumentNotNull(instance, nameof(instance));
var type = instance.GetType();
var fieldInfo = type.GetField(name, BindingFlags.Instance | BindingFlags.NonPublic);
ValueNotNull(fieldInfo, $"Can't find private instance field '{name}' on object of type {type.FullName}");
var fieldValue = fieldInfo.GetValue(instance);
return fieldValue;
}

public static T GetPrivateField<T>(this object instance, string name) {
var rawFieldValue = GetPrivateField(instance, name);
var fieldValue = (T)rawFieldValue;
return fieldValue;
}

public static object InvokePrivateMethod(this object instance, string name, params object[] args) {
ArgumentNotNull(instance, nameof(instance));
var type = instance.GetType();
var methodInfo = type.GetMethod(name, BindingFlags.Instance | BindingFlags.NonPublic);
ValueNotNull(methodInfo, $"Can't find private instance method '{name}' on object of type {type.FullName}");
var result = methodInfo.Invoke(instance, args);
return result;
}
}
}
10 changes: 10 additions & 0 deletions Server/Services/SlackService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using SlackBotNet;
using Microsoft.Extensions.Logging;
using SlackBotNet.Messages;
using SlackBotNetExtensions;

namespace Server.Services {
public class SlackService : IService, IContextService {
Expand Down Expand Up @@ -34,6 +35,8 @@ public class SlackService : IService, IContextService {
string _hub;
SlackBot _bot;

WebsocketResurrector _resurrector;

bool IsValidSettings(string name, string token, string hub) {
return
!(string.IsNullOrEmpty(name) || string.IsNullOrEmpty(token) || string.IsNullOrEmpty(hub));
Expand Down Expand Up @@ -65,6 +68,7 @@ public class SlackService : IService, IContextService {
_logger.LogDebug(
$"InitBot: Start initialize with \"{name}\", token: \"{token}\", hub: \"{hub}\"");
InitBotAsync(token, loggerFactory).GetAwaiter().GetResult();
_resurrector = SetupResurrector();
} catch (Exception e) {
_logger.LogError($"InitBot: exception: \"{e}\"");
return false;
Expand All @@ -85,6 +89,12 @@ public class SlackService : IService, IContextService {
});
}

WebsocketResurrector SetupResurrector() {
var resurrector = new WebsocketResurrector(_bot, _loggerFactory);
resurrector.Start();
return resurrector;
}

async void OnSendMessageFailure(IThrottleQueue<IMessage> queue, IMessage message, ILogger logger, Exception exception) {
if ( message.SendAttempts <= 5 ) {
logger?.LogWarning($"OnSendMessageFailure. Failed to send message {message.Text}. Tried {message.SendAttempts} times (exception: {exception})");
Expand Down
135 changes: 135 additions & 0 deletions Server/Utils/WebsocketResurrector.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
using System;
using System.Net.WebSockets;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using ReflectionExtensions;
using SlackBotNet;

namespace SlackBotNetExtensions {
/// <summary>
/// Workaround to handle invalid websocket state inside SlackBotNet.SlackBot
/// It can happens randomly, especially when client running for long time
/// When it happens, bot refused to handle any messages and logs shown such messages:
/// WARN [SlackBotNet.SlackBot] [0] Not pinging because the socket is not open. Current state is: Aborted
/// Possible cause of this behaviour is SlackRtmDriver.cs:
/// ConnectAsync(...)
/// {
/// Observable
/// .Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5))
/// .Subscribe(async _ =>
/// {
/// ...
/// if (this.websocket.State != WebSocketState.Open) // This condition prevent driver from further work
/// {
/// logger.LogWarning($"Not pinging because the socket is not open. Current state is: {this.websocket.State}");
/// return;
/// }
/// ...
/// }
/// ...
/// }
/// Method to restart websocket connection is ReconnectRtmAsync, but in called twice below this condition and it can't be reached
/// To workaround this situation without modifying library sources, we need to call it manually using reflection,
/// when websocket is invalid state
/// Layout of required classes:
/// SlackBot.cs:
/// public class SlackBot
/// {
/// ...
/// private readonly IMessageBus messageBus;
/// private readonly ILogger`SlackBot` logger;
/// private IDriver driver; // SlackRtmDriver
/// ...
/// }
/// SlackRtmDriver.cs:
/// class SlackRtmDriver
/// {
/// ...
/// private ClientWebSocket websocket;
/// ...
/// private async Task`SlackBotState` ReconnectRtmAsync(IMessageBus bus, ILogger logger) // result ignored
/// ...
/// }
/// </summary>
public class WebsocketResurrector {
static double _defaultInterval = 30;
static Func<WebSocketState, bool> _defaultSelector = state => state == WebSocketState.Aborted;

readonly SlackBot _bot;
readonly ILogger _logger;
readonly double _checkInterval;
readonly Func<WebSocketState, bool> _invalidStateSelector;

bool _stopped = false;

public WebsocketResurrector(
SlackBot bot, ILoggerFactory loggerFactory,
double checkInterval, Func<WebSocketState, bool> invalidStateSelector
) {
_bot = bot;
_logger = loggerFactory?.CreateLogger<WebsocketResurrector>();
_checkInterval = checkInterval;
_invalidStateSelector = invalidStateSelector;
}

public WebsocketResurrector(SlackBot bot, ILoggerFactory loggerFactory)
: this(bot, loggerFactory, _defaultInterval, _defaultSelector) {}

/// <summary>
/// Start tracking websocket state
/// </summary>
public void Start() {
Task.Run(async () => {
while ( !_stopped ) {
await Task.Delay(TimeSpan.FromSeconds(_checkInterval));
await TryResurrectWebsocket();
}
});
}

/// <summary>
/// Stop tracking websocket state
/// </summary>
public void Stop() {
_stopped = true;
}

async Task TryResurrectWebsocket() {
var driver = _bot.GetPrivateField("driver");
var client = GetClient(driver);
var oldState = client.State;
if ( IsNeedToReconnect(oldState) ) {
_logger?.LogWarning($"Websocket state is invalid: {oldState}, try to resurrect it");
await ResurrectWebsocket(driver);
}
}

ClientWebSocket GetClient(object driver) => driver.GetPrivateField<ClientWebSocket>("websocket");

bool IsNeedToReconnect(WebSocketState state) => _invalidStateSelector(state);

async Task ResurrectWebsocket(object driver) {
var messageBus = _bot.GetPrivateField("messageBus");
var logger = _bot.GetPrivateField<ILogger<SlackBot>>("logger");
var reconnectRtmAsyncTask = driver.InvokePrivateMethod("ReconnectRtmAsync", messageBus, logger);
switch ( reconnectRtmAsyncTask ) {
case Task task: {
await task;
var client = GetClient(driver);
_logger?.LogWarning($"Reconnect completed, new state is: {client.State}");
break;
}

case null: {
_logger?.LogError($"Unexpected null result of reconnect method");
break;
}

default: {
_logger?.LogError($"Invalid result of reconnect method ({reconnectRtmAsyncTask})");
break;
}
}
}
}
}

0 comments on commit 7b16dfe

Please sign in to comment.