-
Notifications
You must be signed in to change notification settings - Fork 162
/
Copy pathWebSocketPlugin.cs
243 lines (224 loc) · 9.76 KB
/
WebSocketPlugin.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
using MarketConnectors.WebSocket.Model;
using MarketConnectors.WebSocket.UserControls;
using MarketConnectors.WebSocket.ViewModel;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using VisualHFT.Commons.PluginManager;
using VisualHFT.DataRetriever;
using VisualHFT.DataRetriever.DataParsers;
using VisualHFT.Enums;
using VisualHFT.Model;
using VisualHFT.PluginManager;
using VisualHFT.UserSettings;
namespace MarketConnectors.WebSocket
{
public class WebSocketPlugin : BasePluginDataRetriever
{
public class WebsocketData
{
public required string type { get; set; }
public required string data { get; set; }
}
private new bool _disposed = false; // to track whether the object has been disposed
private Timer _heartbeatTimer;
private static readonly log4net.ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
private DataEventArgs tradeDataEvent = new DataEventArgs() { DataType = "Trades" }; //reusable object. So we avoid allocations
private DataEventArgs marketDataEvent = new DataEventArgs() { DataType = "Market" };//reusable object. So we avoid allocations
private DataEventArgs heartbeatDataEvent = new DataEventArgs() { DataType = "HeartBeats" };//reusable object. So we avoid allocations
private PlugInSettings? _settings;
public override string Name { get; set; } = "WebSocket Plugin";
public override string Version { get; set; } = "1.0.0";
public override string Description { get; set; } = "Connects to custom websocket.";
public override string Author { get ; set; } = "VisualHFT";
public override ISetting Settings { get => _settings; set => _settings = (PlugInSettings)value; }
public override Action? CloseSettingWindow { get; set; }
private IDataParser _parser;
JsonSerializerSettings? _parser_settings = null;
ClientWebSocket? _ws;
public WebSocketPlugin()
{
_parser = new JsonParser();
_parser_settings = new JsonSerializerSettings
{
Converters = new List<JsonConverter> { new CustomDateConverter() },
DateParseHandling = DateParseHandling.None,
DateFormatString = "yyyy.MM.dd-HH.mm.ss.ffffff"
};
// Initialize the timer
_heartbeatTimer = new Timer(CheckConnectionStatus, null, TimeSpan.Zero, TimeSpan.FromSeconds(5)); // Check every 5 seconds
}
~WebSocketPlugin()
{
Dispose(false);
}
public override async Task StartAsync()
{
await base.StartAsync();//call the base first
try
{
await Task.Run(async () =>
{
CancellationTokenSource source = new CancellationTokenSource();
using (_ws = new ClientWebSocket())
{
await _ws.ConnectAsync(new UriBuilder("ws", _settings.HostName, _settings.Port).Uri, CancellationToken.None);
await base.StartAsync();
byte[] buffer = new byte[1024 * 1024];
while (_ws.State == WebSocketState.Open)
{
var result = await _ws.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
if (result.MessageType == WebSocketMessageType.Close)
{
await _ws.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}
else
{
HandleMessage(buffer, result.Count);
}
}
}
log.Info($"Plugin has successfully started.");
RaiseOnDataReceived(GetProviderModel(eSESSIONSTATUS.CONNECTED));
Status = ePluginStatus.STARTED;
});
}
catch (Exception ex)
{
var _error = ex.Message;
log.Error(_error, ex);
await HandleConnectionLost(_error, ex);
}
}
public override async Task StopAsync()
{
Status = ePluginStatus.STOPPING;
log.Info($"{this.Name} is stopping.");
if (_ws != null && _ws.State == WebSocketState.Open)
await _ws.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
//reset models
RaiseOnDataReceived(new List<VisualHFT.Model.OrderBook>());
RaiseOnDataReceived(GetProviderModel(eSESSIONSTATUS.DISCONNECTED));
await base.StopAsync();
}
private void HandleMessage(byte[] buffer, int count)
{
string message = Encoding.UTF8.GetString(buffer, 0, count);
var dataReceived = _parser.Parse<WebsocketData>(message);
string dataType = dataReceived.type;
// Determine the type of data received (e.g., market data, providers, active orders, etc.)
if (dataType == "Market")
{
marketDataEvent.ParsedModel = _parser.Parse<IEnumerable<OrderBook>>(dataReceived.data, _parser_settings);
RaiseOnDataReceived(marketDataEvent);
}
else if (dataType == "ActiveOrders")
{
//ParsedModel = _parser.Parse<List<VisualHFT.Model.Order>>(dataReceived.data, _parser_settings);
}
else if (dataType == "Strategies")
{
//ParsedModel = _parser.Parse<List<StrategyVM>>(dataReceived.data, _parser_settings);
}
else if (dataType == "Exposures")
{
// ParsedModel = _parser.Parse<List<Exposure>>(dataReceived.data, _parser_settings);
}
else if (dataType == "HeartBeats")
{
heartbeatDataEvent.ParsedModel = _parser.Parse<List<VisualHFT.Model.Provider>>(dataReceived.data, _parser_settings);
RaiseOnDataReceived(heartbeatDataEvent);
}
else if (dataType == "Trades")
{
tradeDataEvent.ParsedModel = _parser.Parse<List<Trade>>(dataReceived.data, _parser_settings);
RaiseOnDataReceived(tradeDataEvent);
}
else
{
if (log.IsWarnEnabled)
log.Warn("Websocket data retriever :" + dataType + " error: NOT RECOGNIZED.");
}
}
private void CheckConnectionStatus(object state)
{
bool isConnected = _ws != null && _ws.State == WebSocketState.Open;
if (isConnected)
{
RaiseOnDataReceived(GetProviderModel(eSESSIONSTATUS.CONNECTED));
}
else
{
RaiseOnDataReceived(GetProviderModel(eSESSIONSTATUS.DISCONNECTED));
}
}
public override object GetUISettings()
{
PluginSettingsView view = new PluginSettingsView();
PluginSettingsViewModel viewModel = new PluginSettingsViewModel(CloseSettingWindow);
viewModel.HostName = _settings.HostName;
viewModel.Port = _settings.Port;
viewModel.ProviderId = _settings.ProviderId;
viewModel.ProviderName = _settings.ProviderName;
viewModel.UpdateSettingsFromUI = () =>
{
_settings.HostName = viewModel.HostName;
_settings.Port = viewModel.Port;
_settings.ProviderId = viewModel.ProviderId;
_settings.ProviderName = viewModel.ProviderName;
SaveSettings();
//run this because it will allow to reconnect with the new values
RaiseOnDataReceived(GetProviderModel(eSESSIONSTATUS.CONNECTING));
Status = ePluginStatus.STARTING;
Task.Run(async () => await HandleConnectionLost($"{this.Name} is starting (from reloading settings).", null, true));
};
// Display the view, perhaps in a dialog or a new window.
view.DataContext = viewModel;
return view;
}
protected override void InitializeDefaultSettings()
{
_settings = new PlugInSettings()
{
HostName = "localhost",
Port = 6900,
ProviderId = 3, //must be unique
ProviderName = "WebSocket"
};
SaveToUserSettings(_settings);
}
protected override void LoadSettings()
{
_settings = LoadFromUserSettings<PlugInSettings>();
if (_settings == null)
{
InitializeDefaultSettings();
}
if (_settings.Provider == null) //To prevent back compability with older setting formats
{
_settings.Provider = new VisualHFT.Model.Provider() { ProviderID = 3, ProviderName = "WebSocket" };
}
}
protected override void SaveSettings()
{
SaveToUserSettings(_settings);
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (!_disposed)
{
if (disposing)
{
_ws?.Dispose();
_heartbeatTimer?.Dispose();
}
_disposed = true;
}
}
}
}