Skip to content

Commit

Permalink
Fixed disposing of streams which fixed bugs of live streams still pub…
Browse files Browse the repository at this point in the history
…lishing data upon exit
  • Loading branch information
bleunguts committed Oct 4, 2023
1 parent cf1ff27 commit d367e75
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions ProjectX.GatewayAPI/BackgroundServices/FXPricingService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ public class FXPricingService : BackgroundService
private readonly FXTasksChannel _fxTaskChannel;
private readonly IFXMarketService _fxMarketService;

private Dictionary<string, IDisposable> _disposables = new Dictionary<string, IDisposable>();

public FXPricingService(ILogger<FXPricingService> logger,
FXTasksChannel fXTasksChannel,
IFXMarketService fxMarketService,
Expand Down Expand Up @@ -43,12 +45,21 @@ await foreach (var request in _fxTaskChannel.ReadAllAsync(stoppingToken))
switch(request.Mode)
{
case FXRateMode.Subscribe:
_fxMarketService.StreamSpotPricesFor(request)
var disposable = _fxMarketService.StreamSpotPricesFor(request)
.Subscribe(priceResponse => hubContext.Clients.All.PushFxRate(new SpotPriceResult(request.ClientName, priceResponse.Timestamp, priceResponse.Value)));
if(!_disposables.TryAdd(request.CurrencyPair, disposable))
{
_logger.LogWarning($"Disposable stream already added {request.CurrencyPair}");
}
break;
case FXRateMode.Unsubscribe:
_fxMarketService.UnStream(request.CurrencyPair);
hubContext.Clients.All.StopFxRate(request.CurrencyPair);
if(_disposables.TryGetValue(request.CurrencyPair, out IDisposable d))
{
d.Dispose();
_disposables.Remove(request.CurrencyPair);
}
await hubContext.Clients.All.StopFxRate(request.CurrencyPair);
break;
default:
throw new NotSupportedException($"SpotRequest type {request.Mode} is not supported");
Expand Down

0 comments on commit d367e75

Please sign in to comment.