Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TaskMergeScheduler: Salvage or deprecate? #354

Closed
NightOwl888 opened this issue Sep 25, 2020 · 4 comments · Fixed by #424
Closed

TaskMergeScheduler: Salvage or deprecate? #354

NightOwl888 opened this issue Sep 25, 2020 · 4 comments · Fixed by #424
Assignees
Milestone

Comments

@NightOwl888
Copy link
Contributor

NightOwl888 commented Sep 25, 2020

TaskMergeScheduler was originally created to sidestep issues with limited multithreading support on .NET Standard 1.x. However, since we have dropped support for .NET Standard 1.x, it is no longer technically a requirement. While having a merge scheduler based on TPL sounds like a brilliant idea, in its current incarnation it falls considerably short of where it needs to be to be included in the release.

  1. It is several orders of magnitude slower than ConcurrentMergeScheduler.
  2. The Lucene.Net.Index.TestTaskMergeScheduler::TestSubclassTaskMergeScheduler() test occasionally fails (see Known Failing Tests on Lucene.Net #269).

TaskMergeScheduler has been removed from being randomly injected into tests in the test framework because of the negative performance impact it has on test runs.

I have fixed a few bugs with it, including the fact it was failing to re-throw a background merge failure, a requirement for IndexWriter to retry.

The main performance issue stems from the fact that it uses ToArray() to copy its _mergeThreads collection when looping. An attempt was made to replace the collection with ConcurrentHashSet<T> and remove ToArray() on each loop, but at least one of the tests starts failing with that change.

So, my question is should we attempt to salvage it, or deprecate it? By deprecate, I mean it will be marked obsolete in the next beta and deleted from the project in the first release candidate.

If someone wants to take a stab at either fixing it or redesigning it, it is fair game. This could be an opportunity to add a more performant alternative to ConcurrentMergeScheduler to the project. If it performs adequately and provides the correct behavior, we will consider making it the default.

using J2N.Collections.Generic.Extensions;
using Lucene.Net.Support.Threading;
using Lucene.Net.Util;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Directory = Lucene.Net.Store.Directory;

namespace Lucene.Net.Index
{
    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */

    /// <summary>
    /// A <see cref="MergeScheduler"/> that runs each merge using
    /// <see cref="Task"/>s on the default <see cref="TaskScheduler"/>.
    /// 
    /// <para>If more than <see cref="MaxMergeCount"/> merges are
    /// requested then this class will forcefully throttle the
    /// incoming threads by pausing until one more more merges
    /// complete.</para>
    ///  
    /// LUCENENET specific
    /// </summary>
    public class TaskMergeScheduler : MergeScheduler, IConcurrentMergeScheduler
    {
        public const string COMPONENT_NAME = "CMS";

        private readonly TaskScheduler _taskScheduler = TaskScheduler.Default;
        private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
        private readonly ManualResetEventSlim _manualResetEvent = new ManualResetEventSlim();
        /// <summary>
        /// List of currently active <see cref="MergeThread"/>s.</summary>
        private readonly IList<MergeThread> _mergeThreads = new List<MergeThread>();

        /// <summary>
        /// How many <see cref="MergeThread"/>s have kicked off (this is use
        /// to name them).
        /// </summary>
        private int _mergeThreadCount;

        /// <summary>
        /// <see cref="Directory"/> that holds the index. </summary>
        private Directory _directory;

        /// <summary>
        /// <see cref="IndexWriter"/> that owns this instance.
        /// </summary>
        private IndexWriter _writer;

        /// <summary>
        /// Sole constructor, with all settings set to default
        /// values.
        /// </summary>
        public TaskMergeScheduler() : base()
        {
            MaxThreadCount = _taskScheduler.MaximumConcurrencyLevel;
            MaxMergeCount = _taskScheduler.MaximumConcurrencyLevel;
        }

        /// <summary>
        /// Sets the maximum number of merge threads and simultaneous merges allowed.
        /// </summary>
        /// <param name="maxMergeCount"> The max # simultaneous merges that are allowed.
        ///       If a merge is necessary yet we already have this many
        ///       threads running, the incoming thread (that is calling
        ///       add/updateDocument) will block until a merge thread
        ///       has completed.  Note that we will only run the
        ///       smallest <paramref name="maxThreadCount"/> merges at a time. </param>
        /// <param name="maxThreadCount"> The max # simultaneous merge threads that should
        ///       be running at once.  This must be &lt;= <paramref name="maxMergeCount"/> </param>
        public void SetMaxMergesAndThreads(int maxMergeCount, int maxThreadCount)
        {
            // This is handled by TaskScheduler.Default.MaximumConcurrencyLevel
        }

        /// <summary>
        /// Max number of merge threads allowed to be running at
        /// once.  When there are more merges then this, we
        /// forcefully pause the larger ones, letting the smaller
        /// ones run, up until <see cref="MaxMergeCount"/> merges at which point
        /// we forcefully pause incoming threads (that presumably
        /// are the ones causing so much merging).
        /// </summary>
        /// <seealso cref="SetMaxMergesAndThreads(int, int)"/>
        public int MaxThreadCount { get; private set; }

        /// <summary>
        /// Max number of merges we accept before forcefully
        /// throttling the incoming threads
        /// </summary>
        public int MaxMergeCount { get; private set; }

        /// <summary>
        /// Return the priority that merge threads run at. This is always the same.
        /// </summary>
        public int MergeThreadPriority
        {
            get
            {
#if !FEATURE_THREAD_PRIORITY
                return 2;
#else
                return (int)ThreadPriority.Normal;
#endif 
            }
        }

        /// <summary>
        /// This method has no effect in <see cref="TaskMergeScheduler"/> because the
        /// <see cref="MergeThreadPriority"/> returns a constant value.
        /// </summary>
        public void SetMergeThreadPriority(int priority)
        {
        }

        /// <summary>
        /// Called whenever the running merges have changed, to pause &amp; unpause
        /// threads. This method sorts the merge threads by their merge size in
        /// descending order and then pauses/unpauses threads from first to last --
        /// that way, smaller merges are guaranteed to run before larger ones.
        /// </summary>
        private void UpdateMergeThreads()
        {
            foreach (var merge in _mergeThreads.ToArray())
            {
                // Prune any dead threads
                if (!merge.IsAlive)
                {
                    _mergeThreads.Remove(merge);
                    merge.Dispose();
                }
            }
        }

        /// <summary>
        /// Returns <c>true</c> if verbosing is enabled. This method is usually used in
        /// conjunction with <see cref="Message(string)"/>, like that:
        ///
        /// <code>
        /// if (Verbose) {
        ///     Message(&quot;your message&quot;);
        /// }
        /// </code>
        /// </summary>
        protected bool Verbose => _writer != null && _writer.infoStream.IsEnabled(COMPONENT_NAME);

        /// <summary>
        /// Outputs the given message - this method assumes <see cref="Verbose"/> was
        /// called and returned <c>true</c>.
        /// </summary>
        protected virtual void Message(string message)
        {
            _writer.infoStream.Message(COMPONENT_NAME, message);
        }

        protected override void Dispose(bool disposing)
        {
            Sync();
            _manualResetEvent.Dispose();
        }

        /// <summary>
        /// Wait for any running merge threads to finish. 
        /// This call is not interruptible as used by <see cref="MergeScheduler.Dispose()"/>.
        /// </summary>
        public virtual void Sync()
        {
            foreach (var merge in _mergeThreads.ToArray())
            {
                if (merge == null || !merge.IsAlive)
                {
                    continue;
                }

                try
                {
                    merge.Wait();
                }
                catch (OperationCanceledException)
                {
                    // expected when we cancel.
                }
                catch (AggregateException ae)
                {
                    ae.Handle(ex =>
                    {
                        if (!(ex is OperationCanceledException))
                        {
                            HandleMergeException(ex);
                            return true;
                        }

                        return false;
                    });
                }
            }
        }

        /// <summary>
        /// Returns the number of merge threads that are alive. Note that this number
        /// is &lt;= <see cref="_mergeThreads"/> size.
        /// </summary>
        private int MergeThreadCount
        {
            get { return _mergeThreads.Count(x => x.IsAlive && x.CurrentMerge != null); }
        }

        [MethodImpl(MethodImplOptions.NoInlining)]
        public override void Merge(IndexWriter writer, MergeTrigger trigger, bool newMergesFound)
        {
            using (_lock.Write())
            {
                _writer = writer;
                _directory = writer.Directory;

                if (Verbose)
                {
                    Message("now merge");
                    Message("  index: " + writer.SegString());
                }

                // First, quickly run through the newly proposed merges
                // and add any orthogonal merges (ie a merge not
                // involving segments already pending to be merged) to
                // the queue.  If we are way behind on merging, many of
                // these newly proposed merges will likely already be
                // registered.

                // Iterate, pulling from the IndexWriter's queue of
                // pending merges, until it's empty:
                while (true)
                {
                    long startStallTime = 0;
                    while (writer.HasPendingMerges() && MergeThreadCount >= MaxMergeCount)
                    {
                        // this means merging has fallen too far behind: we
                        // have already created maxMergeCount threads, and
                        // now there's at least one more merge pending.
                        // Note that only maxThreadCount of
                        // those created merge threads will actually be
                        // running; the rest will be paused (see
                        // updateMergeThreads).  We stall this producer
                        // thread to prevent creation of new segments,
                        // until merging has caught up:
                        startStallTime = Environment.TickCount;
                        if (Verbose)
                        {
                            Message("    too many merges; stalling...");
                        }

                        _manualResetEvent.Reset();
                        _manualResetEvent.Wait();
                    }

                    if (Verbose)
                    {
                        if (startStallTime != 0)
                        {
                            Message("  stalled for " + (Environment.TickCount - startStallTime) + " msec");
                        }
                    }

                    MergePolicy.OneMerge merge = writer.NextMerge();
                    if (merge == null)
                    {
                        if (Verbose)
                        {
                            Message("  no more merges pending; now return");
                        }
                        return;
                    }

                    bool success = false;
                    try
                    {
                        if (Verbose)
                        {
                            Message("  consider merge " + writer.SegString(merge.Segments));
                        }

                        // OK to spawn a new merge thread to handle this
                        // merge:
                        var merger = CreateTask(writer, merge);

                        merger.MergeThreadCompleted += OnMergeThreadCompleted;

                        _mergeThreads.Add(merger);

                        if (Verbose)
                        {
                            Message("    launch new thread [" + merger.Name + "]");
                        }

                        merger.Start(_taskScheduler);

                        // Must call this after starting the thread else
                        // the new thread is removed from mergeThreads
                        // (since it's not alive yet):
                        UpdateMergeThreads();

                        success = true;
                    }
                    finally
                    {
                        if (!success)
                        {
                            writer.MergeFinish(merge);
                        }
                    }
                }
            }
        }

        /// <summary>
        /// Does the actual merge, by calling <see cref="IndexWriter.Merge(MergePolicy.OneMerge)"/> </summary>
        [MethodImpl(MethodImplOptions.NoInlining)]
        protected virtual void DoMerge(MergePolicy.OneMerge merge)
        {
            _writer.Merge(merge);
        }

        private void OnMergeThreadCompleted(object sender, EventArgs e)
        {
            var mergeThread = sender as MergeThread;

            if (mergeThread == null)
            {
                return;
            }

            mergeThread.MergeThreadCompleted -= OnMergeThreadCompleted;

            using (_lock.Write())
            {
                UpdateMergeThreads();
            }
        }

        /// <summary>
        /// Create and return a new <see cref="MergeThread"/> </summary>
        private MergeThread CreateTask(IndexWriter writer, MergePolicy.OneMerge merge)
        {
            var count = Interlocked.Increment(ref _mergeThreadCount);
            var name = string.Format("Lucene Merge Task #{0}", count);

            return new MergeThread(name, writer, merge, writer.infoStream, Verbose, _manualResetEvent, HandleMergeException, DoMerge);
        }

        /// <summary>
        /// Called when an exception is hit in a background merge
        /// thread
        /// </summary>
        protected virtual void HandleMergeException(Exception exc)
        {
            // suppressExceptions is normally only set during testing
            if (suppressExceptions)
            {
                return;
            }

//#if FEATURE_THREAD_INTERRUPT
//            try
//            {
//#endif
                // When an exception is hit during merge, IndexWriter
                // removes any partial files and then allows another
                // merge to run.  If whatever caused the error is not
                // transient then the exception will keep happening,
                // so, we sleep here to avoid saturating CPU in such
                // cases:
                Thread.Sleep(250);
//#if FEATURE_THREAD_INTERRUPT // LUCENENET NOTE: Senseless to catch and rethrow the same exception type
//            }
//            catch (ThreadInterruptedException ie)
//            {
//                throw new ThreadInterruptedException("Thread Interrupted Exception", ie);
//            }
//#endif
            throw new MergePolicy.MergeException(exc, _directory);
        }

        private bool suppressExceptions;

        /// <summary>
        /// Used for testing </summary>
        public virtual void SetSuppressExceptions()
        {
            suppressExceptions = true;
        }

        /// <summary>
        /// Used for testing </summary>
        public virtual void ClearSuppressExceptions()
        {
            suppressExceptions = false;
        }

        public override string ToString()
        {
            StringBuilder sb = new StringBuilder(this.GetType().Name + ": ");
            sb.AppendFormat("maxThreadCount={0}, ", MaxThreadCount);
            sb.AppendFormat("maxMergeCount={0}", MaxMergeCount);
            return sb.ToString();
        }

        public override object Clone()
        {
            TaskMergeScheduler clone = (TaskMergeScheduler)base.Clone();
            clone._writer = null;
            clone._directory = null;
            clone._mergeThreads.Clear();
            return clone;
        }

        /// <summary>
        /// Runs a merge thread, which may run one or more merges
        /// in sequence.
        /// </summary>
        internal class MergeThread : IDisposable
        {
            public event EventHandler MergeThreadCompleted;

            private readonly CancellationTokenSource _cancellationTokenSource;
            private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
            private readonly ManualResetEventSlim _resetEvent;
            private readonly Action<Exception> _exceptionHandler;
            private readonly Action<MergePolicy.OneMerge> _doMerge;
            private readonly InfoStream _logger;
            private readonly IndexWriter _writer;
            private readonly MergePolicy.OneMerge _startingMerge;
            private readonly bool _isLoggingEnabled;

            private Task _task;
            private MergePolicy.OneMerge _runningMerge;
            private volatile bool _isDisposed = false;
            private volatile bool _isDone;

            /// <summary>
            /// Sole constructor. </summary>
            public MergeThread(string name, IndexWriter writer, MergePolicy.OneMerge startMerge,
                InfoStream logger, bool isLoggingEnabled,
                ManualResetEventSlim resetEvent, Action<Exception> exceptionHandler, Action<MergePolicy.OneMerge> doMerge)
            {
                Name = name;
                _cancellationTokenSource = new CancellationTokenSource();
                _writer = writer;
                _startingMerge = startMerge;
                _logger = logger;
                _isLoggingEnabled = isLoggingEnabled;
                _resetEvent = resetEvent;
                _exceptionHandler = exceptionHandler;
                _doMerge = doMerge;
            }

            public string Name { get; private set; }

            public Task Instance
            {
                get
                {
                    using (_lock.Read())
                    {
                        return _task;
                    }
                }
            }

            /// <summary>
            /// Record the currently running merge. </summary>
            public virtual MergePolicy.OneMerge RunningMerge
            {
                get
                {
                    using (_lock.Read())
                    {
                        return _runningMerge;
                    }
                }
                set => Interlocked.Exchange(ref _runningMerge, value);
            }

            /// <summary>
            /// Return the current merge, or <c>null</c> if this 
            /// <see cref="MergeThread"/> is done.
            /// </summary>
            public virtual MergePolicy.OneMerge CurrentMerge
            {
                get
                {
                    using (_lock.Read())
                    {
                        if (_isDone)
                        {
                            return null;
                        }

                        return _runningMerge ?? _startingMerge;
                    }
                }
            }

            public bool IsAlive
            {
                get
                {
                    if (_isDisposed || _isDone)
                    {
                        return false;
                    }

                    using (_lock.Read())
                    {
                        return _task != null
                            && (_task.Status != TaskStatus.Canceled
                            || _task.Status != TaskStatus.Faulted
                            || _task.Status != TaskStatus.RanToCompletion);
                    }
                }
            }

            public void Start(TaskScheduler taskScheduler)
            {
                using (_lock.Write())
                {
                    if (_task == null && !_cancellationTokenSource.IsCancellationRequested)
                    {
                        _task = Task.Factory.StartNew(() => Run(_cancellationTokenSource.Token), _cancellationTokenSource.Token, TaskCreationOptions.None, taskScheduler);
                    }
                }
            }

            public void Wait()
            {
                if (!IsAlive)
                {
                    return;
                }

                _task.Wait(_cancellationTokenSource.Token);
            }

            public void Cancel()
            {
                if (!IsAlive)
                {
                    return;
                }

                using (_lock.Write())
                {
                    if (!_cancellationTokenSource.IsCancellationRequested)
                    {
                        _cancellationTokenSource.Cancel();
                    }
                }
            }

            private void Run(CancellationToken cancellationToken)
            {
                // First time through the while loop we do the merge
                // that we were started with:
                MergePolicy.OneMerge merge = _startingMerge;

                try
                {
                    if (_isLoggingEnabled)
                    {
                        _logger.Message(COMPONENT_NAME, "  merge thread: start");
                    }

                    while (true && !cancellationToken.IsCancellationRequested)
                    {
                        RunningMerge = merge;
                        // LUCENENET NOTE: We MUST call DoMerge(merge) instead of 
                        // _writer.Merge(merge) because the tests specifically look
                        // for the method name DoMerge in the stack trace. 
                        _doMerge(merge);

                        // Subsequent times through the loop we do any new
                        // merge that writer says is necessary:
                        merge = _writer.NextMerge();

                        // Notify here in case any threads were stalled;
                        // they will notice that the pending merge has
                        // been pulled and possibly resume:
                        _resetEvent.Set();

                        if (merge != null)
                        {
                            if (_isLoggingEnabled)
                            {
                                _logger.Message(COMPONENT_NAME, "  merge thread: do another merge " + _writer.SegString(merge.Segments));
                            }
                        }
                        else
                        {
                            break;
                        }
                    }

                    if (_isLoggingEnabled)
                    {
                        _logger.Message(COMPONENT_NAME, "  merge thread: done");
                    }
                }
                catch (Exception exc)
                {
                    // Ignore the exception if it was due to abort:
                    if (!(exc is MergePolicy.MergeAbortedException))
                    {
                        //System.out.println(Thread.currentThread().getName() + ": CMS: exc");
                        //exc.printStackTrace(System.out)
                        _exceptionHandler(exc);
                    }
                }
                finally
                {
                    _isDone = true;

                    if (MergeThreadCompleted != null)
                    {
                        MergeThreadCompleted(this, EventArgs.Empty);
                    }
                }
            }

            public void Dispose()
            {
                if (_isDisposed)
                {
                    return;
                }

                _isDisposed = true;
                _lock.Dispose();
                _cancellationTokenSource.Dispose();
            }

            public override string ToString()
            {
                return _task == null
                    ? string.Format("Task[{0}], Task has not been started yet.", Name)
                    : string.Format("Task[{0}], Id[{1}], Status[{2}]", Name, _task.Id, _task.Status);
            }

            public override bool Equals(object obj)
            {
                var compared = obj as MergeThread;

                if (compared == null
                    || (Instance == null && compared.Instance != null)
                    || (Instance != null && compared.Instance == null))
                {
                    return false;
                }

                return Instance.Id == compared.Instance.Id;
            }

            public override int GetHashCode()
            {
                return Instance == null
                    ? base.GetHashCode()
                    : Instance.GetHashCode();
            }
        }
    }
}
@NightOwl888 NightOwl888 added help-requested up-for-grabs This issue is open to be worked on by anyone is:feature design is:question Further information is requested labels Sep 25, 2020
@NightOwl888 NightOwl888 added this to the 4.8.0 milestone Sep 25, 2020
@Shazwazza
Copy link
Contributor

If there's already a faster and more stable implementation with ConcurrentMergeScheduler, is there any reason to keep TaskMergeScheduler? Or are you saying that there isn't currently a replacement for TaskMergeScheduler?

@NightOwl888
Copy link
Contributor Author

If there's already a faster and more stable implementation with ConcurrentMergeScheduler, is there any reason to keep TaskMergeScheduler? Or are you saying that there isn't currently a replacement for TaskMergeScheduler?

  • ConcurrentMergeScheduler is a direct port from Lucene.
  • TaskMergeScheduler was only required because ConcurrentMergeScheduler didn't compile on .NET Standard 1.x.

The only reason to consider keeping TaskMergeScheduler is the amount of time that has been invested in it already. That said, I don't plan to invest any more time on it myself, so if we don't have a party willing to make it production ready it will definitely be excluded from the final release. IMO, there is no reason to hold up the release for a duplicate of something we already have.

But if someone were willing to step up and make TaskMergeScheduler reliable and more performant than ConcurrentMergeScheduler, it would not only be worth keeping but we could potentially have a new default setting.

@Shazwazza
Copy link
Contributor

Sounds like a challenge! I'll tweet it out and see if there's any takers :)

@NightOwl888
Copy link
Contributor Author

Given that #381 doesn't seem to solve our issue and we have had no other takers, it looks like it is time to deprecate TaskMergeScheduler.

[Obsolete("Use ConcurrentMergeScheduler instead. This class will be removed in 4.8.0 release candidate."), System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)]
public class TaskMergeScheduler : MergeScheduler, IConcurrentMergeScheduler

As part of the deprecation, we need to tear out the LuceneTestCase.ConcurrentMergeSchedulerFactories and revert all of the tests that depend on it back to their original Java Lucene implementation. Other references to TaskMergeScheduler should also be removed from tests.

We can also remove the tests for TaskMergeScheduler altogether, which will improve testing run times.

Although it wasn't part of Lucene, we can leave the IConcurrentMergeScheduler interface in Lucene.NET as it might be in use by users already.

@NightOwl888 NightOwl888 added good-first-issue Good for newcomers pri:high pri:normal and removed is:feature is:question Further information is requested pri:high labels Feb 3, 2021
@NightOwl888 NightOwl888 self-assigned this Feb 17, 2021
@NightOwl888 NightOwl888 removed good-first-issue Good for newcomers help-requested up-for-grabs This issue is open to be worked on by anyone labels Feb 17, 2021
NightOwl888 added a commit to NightOwl888/lucenenet that referenced this issue Feb 17, 2021
NightOwl888 added a commit to NightOwl888/lucenenet that referenced this issue Feb 17, 2021
…rrentMergeSchedulerFactories class from the public API and its use from all tests (see apache#354)
NightOwl888 added a commit to NightOwl888/lucenenet that referenced this issue Feb 17, 2021
@NightOwl888 NightOwl888 linked a pull request Feb 17, 2021 that will close this issue
NightOwl888 added a commit that referenced this issue Feb 17, 2021
…rrentMergeSchedulerFactories class from the public API and its use from all tests (see #354)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants