Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

auto-prune in-memory stream data #1095

Open
2 tasks
DaveSkender opened this issue Oct 12, 2023 · 5 comments
Open
2 tasks

auto-prune in-memory stream data #1095

DaveSkender opened this issue Oct 12, 2023 · 5 comments
Labels
enhancement New feature or request
Milestone

Comments

@DaveSkender
Copy link
Owner

DaveSkender commented Oct 12, 2023

Hi, I'm right now trying out v3.0.0 preview and wanted to ask whether there is or will be an option to limit the size of the data collections held by both, observable and observer, as I didn't find any yet. Thanks!

Originally posted by @elpht in #1018 (comment)

The idea of these collections growing in an uncontrolled manner has crossed my mind. Since these are instantiated classes, I think we can add an optional setting so you can supply a max size that'll trigger some auto-pruning after new data arrives. Makes sense.

Objective:

  • add standard optional settings schema with defaults
  • setting for max collection size, with auto-pruning (default: off)

See also:

@DaveSkender DaveSkender added this to the v3 milestone Oct 12, 2023
@DaveSkender DaveSkender added the enhancement New feature or request label Oct 12, 2023
@elpht
Copy link

elpht commented Nov 10, 2023

I've dared to provide a proposal to implement the auto-pruning option in the observable:

public class QuoteProviderSettings
{
    /// <summary>
    /// Gets or sets max size of <see cref="QuoteProvider.Quotes"/> collection.
    /// Once the collection reaches the configured size, the provider will automatically remove from
    /// the collection the oldest quote when a new one is added as an atomic operation.
    /// If the quote to be added is the oldest, it won't get to the collection.
    /// Default is -1 (unlimited).
    /// </summary>
    public int MaxCollectionSize { get; set; } = -1;
}

public class QuoteProvider : IObservable<Quote>
{
    // fields
    private readonly List<IObserver<Quote>> observers;
    private readonly QuoteProviderSettings settings;

    // initialize
    public QuoteProvider()
    {

        observers = new();
        ProtectedQuotes = new();
    }

    // initialize
    public QuoteProvider(Action<QuoteProviderSettings> cfgAction)
        : this()
    {
        if (cfgAction == null)
        {
            throw new ArgumentNullException(nameof(cfgAction));
        }

        settings = new QuoteProviderSettings();
        cfgAction.Invoke(settings);
        ValidateSettings(settings);
    }

    private static void ValidateSettings(QuoteProviderSettings settings)
    {
        if (settings.MaxCollectionSize is < 1 and not -1)
        {
            throw new ArgumentException(
                $"{nameof(settings.MaxCollectionSize)} must be a positive integer greater than 0 or -1 (unlimited)",
                nameof(settings));
        }
    }

    // properties
    public IEnumerable<Quote> Quotes => ProtectedQuotes;

    internal List<Quote> ProtectedQuotes { get; private set; }

    private int OverflowCount { get; set; }

    // METHODS

    // add one
    public void Add(Quote quote)
    {
        // validate quote
        if (quote == null)
        {
            throw new ArgumentNullException(nameof(quote), "Quote cannot be null.");
        }

        // note: I don't think it's a good idea to manipulate a public collection, ProtectedQuotes, that may be potentially enumerated
        // while being modified. Also to guarantee that the addition and removal are executed as an atomic operation, I'd suggest to make
        // ProtectedQuotes immutable and replace its value after the new one is calculated
        int length = ProtectedQuotes.Count;

        if (length == 0)
        {
            // add new quote
            ProtectedQuotes.Add(quote);

            // notify observers
            NotifyObservers(quote);

            return;
        }

        Quote last = ProtectedQuotes[length - 1];
        bool maxSizeReached = settings.MaxCollectionSize != -1 && settings.MaxCollectionSize == length;

        // add quote
        if (quote.Date > last.Date)
        {
            // remove oldest if max size reached
            if (maxSizeReached)
            {
                ProtectedQuotes.RemoveAt(0);
            }

            // add new quote
            ProtectedQuotes.Add(quote);

            // notify observers
            NotifyObservers(quote);
        }

        // same date or quote recieved
        else if (quote.Date <= last.Date)
        {
            // check for overflow condition
            // where same quote continues (possible circular condition)
            if (quote.Date == last.Date)
            {
                OverflowCount++;

                if (OverflowCount > 100)
                {
                    string msg = "A repeated Quote update exceeded the 100 attempt threshold. "
                                 + "Check and remove circular chains or check your Quote provider.";

                    EndTransmission();

                    throw new OverflowException(msg);
                }
            }
            else
            {
                OverflowCount = 0;
            }

            // seek old quote
            int foundIndex = ProtectedQuotes
                .FindIndex(x => x.Date == quote.Date);

            // found
            if (foundIndex >= 0)
            {
                Quote old = ProtectedQuotes[foundIndex];

                old.Open = quote.Open;
                old.High = quote.High;
                old.Low = quote.Low;
                old.Close = quote.Close;
                old.Volume = quote.Volume;
            }

            // add missing quote
            else
            {
                // remove oldest if max size reached
                if (maxSizeReached)
                {
                    ProtectedQuotes.RemoveAt(0);
                }

                ProtectedQuotes.Add(quote);

                // re-sort cache
                ProtectedQuotes = ProtectedQuotes
                    .ToSortedList();
            }

            // let observer handle old + duplicates
            NotifyObservers(quote);
        }
    }

    // add many
    public void Add(IEnumerable<Quote> quotes)
    {
        List<Quote> added = quotes
            .ToSortedList();

        for (int i = 0; i < added.Count; i++)
        {
            Add(added[i]);
        }
    }

    // subscribe observer
    public IDisposable Subscribe(IObserver<Quote> observer)
    {
        if (!observers.Contains(observer))
        {
            observers.Add(observer);
        }

        return new Unsubscriber(observers, observer);
    }

    // close all observations
    public void EndTransmission()
    {
        foreach (IObserver<Quote> observer in observers.ToArray())
        {
            if (observers.Contains(observer))
            {
                observer.OnCompleted();
            }
        }

        observers.Clear();
    }

    // notify observers
    private void NotifyObservers(Quote quote)
    {
        List<IObserver<Quote>> obsList = observers.ToList();

        for (int i = 0; i < obsList.Count; i++)
        {
            IObserver<Quote> obs = obsList[i];
            obs.OnNext(quote);
        }
    }

    // unsubscriber
    private class Unsubscriber : IDisposable
    {
        private readonly List<IObserver<Quote>> observers;
        private readonly IObserver<Quote> observer;

        // identify and save observer
        public Unsubscriber(List<IObserver<Quote>> observers, IObserver<Quote> observer)
        {
            this.observers = observers;
            this.observer = observer;
        }

        // remove single observer
        public void Dispose()
        {
            if (observer != null && observers.Contains(observer))
            {
                observers.Remove(observer);
            }
        }
    }
}

I'd also provide one for the observer, but I'd need to understand first why you added a dependency on observable's protected tuples inside the SMA observer to calculate the increment, because it seems that this calculation depends on observable's collection length.

@DaveSkender
Copy link
Owner Author

DaveSkender commented Nov 11, 2023

Seriously, I'd love to have more contributors helping out. Thank you for the contribution. If you want to open a PR against the v3 branch for the quote provider, please do. Otherwise, I'll ingest what you're providing above and incorporate when I get around to it in my intended sequence of the v3 release plan.

And yes, I too am trying to better address the observer related issues as this has a rather complex arrangement when further considering indicator chaining. The protected TupleProvider is only meant to be an inheritable abstract base class for the UseObserver and by the ChainProvider. I've very possibly overcomplicated the setup initially and am thinking through some of these issues in my current focus on:

@elpht
Copy link

elpht commented Nov 12, 2023

I've been having a deeper look at the current design and I'd like to share some thoughts with you:

  • It is a bit confusing that after EmaObserver ema = provider.GetEma(x);, ema gets called observer and not observable. I know that it actually plays both roles, but if I was given that instance without knowing about its internals I'd expect it to be actually an observable to which I can attach an observer to monitor how indicator values are calculated (it could well be that I misunderstood something).
  • TupleProviderand QuoteProvider are doing very much the same and handling both issues related to out-of-order and duplicated arrival of quotes. An alternative to that setup would be a dumb QuoteProvider, just to be used in financial data feed adapters (to push data as Quote, without even storing the received data), being then TupleProvider some sort of IObservableCollection<Quote> with data arrival issue handling capabilities (its behaviour could be configurable as well as its max size).

Hmm, I think I stop here and I'll try to show you what I have in mind in a dev branch.

@DaveSkender
Copy link
Owner Author

DaveSkender commented Nov 13, 2023

EmaObserver is just a name I came up with, since it currently only observes the quotes for updates. I didn't like the original name EmaBase I had for it originally. In my latest feature branch, I'm trying to make it both an observer and an observable derived from a common ChainProvider profile and am simply calling it Ema.

The reason I'd made QuoteProvider and TupleProvider different is because they have different stored data classes, though can potentially see your point and can see if I can use a generic T based on the existing ISeries interface. I'm not sure if it'd be possible because some chainor indicators will need the full Quote and some only need the simpler (Date,Value) tuple; further, some indicators aren't chainable because they have a composite result with different values.

The reason QuoteProvider stores the data is for both late subscriptions and for a reset scenario where a subscriber would need to be fully regenerated, so it'd need the history.

Regarding max size, I'm thinking it'd just be a base property of each of these. If it's set at the QuoteProvider, subscriber and chainee indicators could inherit the value on instantiation, but also settable if something different is desirable. For example:

public class QuoteProvider : IObservable<Quote>
{
    // constructor
    public QuoteProvider()
    {
        ...
    }

    public QuoteProvider(int maxSize)
    {
        ...
        MaxSize = maxSize;
    }

    internal int? MaxSize { get; set; }
    ...

And a user would optionally initialize it:

QuoteProvider quotes = new(maxSize: 750);

Finally, any subscriber can just lookup MaxSize from its parent when they're subsequently instantiated.

@DaveSkender
Copy link
Owner Author

DaveSkender commented Nov 13, 2023

These are the scenarios I'm trying to enable:

  1. QuoteProvider: stores and sends Quote
  2. QuoteObserver: indicators that can only use the Quote and can't be a provider
  3. TupleProvider: indicators that use the Quote and can provide a Tuple (top level chainor)
  4. TupleObserver: indicators that can use a Tuple from a chainor, but can't be a provider
  5. ChainProvider: indicators that can use a Tuple from a chainor and can provide a Tuple

For example, RENKO is a QuoteObserver, ADL is a TupleProvider, ALLIGATOR is a TupleObserver, and EMA is a ChainProvider,

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Development

No branches or pull requests

2 participants