Skip to content

GlebTheProgrammer/OSSP-LabWork4

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Изучение создания и использования потоков и механизмов синхронизации (Создание кастомного ThreadPool-а для решения определённого рода задач)

Необходимо:

  • Разработать очередь заданий, в которую несколько потоков могут вставлять элементы атомарно. (В проекте - Task 1)
  • Разработать обработчик этой очереди, который извлекает из нее задания и раздает заданному количеству потоков. (В проекте - Task 2)
  • Разработать программу, которая использует очередь заданий и обработчик очереди для сортировки строк в текстовом файле: Входной поток читает файл в память, нарезает его на части и создает несколько заданий на сортировку (по числу сортирующих потоков), которые помещает в очередь заданий. Сортирующие потоки извлекают задания, сортируют свои части файла, отдают все результаты выходному потоку. Выходной поток дожидается всех сортированных частей и объединяет их методом сортирующего слияния. (В проекте - Task 3)

Результат выполнения работы

Задание №1

4

В ходе выполнения задания, необходимо было разработать программу, которая способна будет создавать потоки, которые будут работать с одной очередью задач атомарно, а именно будут вставлять в очередь элементы на дальнейшую обработку (без самой обработки).

Для решения этой задачи, мной было решено использовать подход, который предполагает использование мьютексов и класса Monitor. Сам же подход носит название Мониторы хоара.

Для начала, в программе создаётся объект, который включает в себя создание пустой очереди задач, а также содержит методы для работы с этой самой очередью атомарно. Сам объект представляет из себя следующее:

    public class TaskQueue
    {
        // Очередь задач на выполнение
        private Queue<Action> tasks;

        public TaskQueue()
        {
            // Создаём объект пустой очереди задач
            tasks = new Queue<Action>();
        }

        // Метод для помещения задачи в очередь на выполнение
        public void EnqueueTask(Action task)
        {
            // Забираем блокировку на объект очереди
            lock (tasks)
            {
                // Помещаем в очередь задачу на выполнение
                tasks.Enqueue(task);
                Console.WriteLine("We have a new task to do!");
                Thread.Sleep(1000);
            }
        }
    }

Возникаем вопрос, а как именно работает этот объект? Что такое lock, как это всё работает? Ответ есть:

Предположим, мы создали 10 потоков, и дали им всем одну и ту же задачу - У объекта TaskQueue вызвать метод EnqueueTask, параметром в которую передать какую-либо задачу (в нашем случае - делегат типа Action). Пусть это будет задача на вывод обычного сообщения на консоль:

    // Делегат для постановки в очередь
    Action action = ExAction.PrintHello;

    public class ExAction
    {
        // Action - заглушка
        public static void PrintHello()
        {
            Console.WriteLine("Hello. Im a Task");
        }
    }

И потом мы, в цикле головного потока, запускаем все 10 фоновых потоков для постановки задачи в очередь. Каждый поток, начав выполнять метод EnqueueTask объекта TaskQueue натыкается на оператор lock(task). Тот поток, который успеет дойти до данного сегмента кода первым - заберёт себе блокировку, которая позволит ему получить доступ к объекту task в рамках головного объекта TaskQueue. Все дальнейшие потоки при достижении этого сегмента кода будут становиться в очередь в том порядке, в котором они дойдут до данного сегмента. Тот поток, который получил блокировку первым, начнёт задачу постановки делегата в очередь на выполнение сразу же после её получения, после чего выведет соответствующее сообщение на консоль "We have a new task to do!", ну и в конце подержит блокировку у себя ещё одну секунду. Как только Thread.Sleep отработает, блокировка будет освобождена, и следующий в очереди поток на получение блокировки объекта task сможет забрать её себе и начать уже свою задачу на постановку переданного ему делегата Action в очередь.

Note: Оператор lock компилятором транслируется в следующий код:

Из:

lock(object obj)
{
    // Команды на выполнение
}

В:

{
  Monitor.Enter(object obj);
  
  // Команды на выполнение
  
  Monitor.Exit(object obj);
}

Если бы мы не использовали блокировку, головной поток за пару миллисекунд запустил бы 10 потоков, а они бы в свою очередь быстренько поставили бы задачи в очередь на выполнение, после чего подождали бы 1000 миллисекунд, и в конце завершили бы свою работу. Таким образом, на всё выполнение было бы затрачено менее 2 секунд. С приведенным же подходом, со взятием блокировки, суммарное время выполнения программы будет превышать 10 секунд, а задачи на постановку делегатов на выполнение в очередь будут выполняться с импользованием внутреннего объекта очереди (атомарно).

Задание №2

Данная задача является улучшением предыдущей, и предполагает логику не только атомарного помещения задач в очередь на выполнение, но и извлечения из очереди этих самых задач и раздачи ин на выполнение заданному количеству потоков. Рассмотрим результат выполнения:

5

Фрагмент кода, отвечающий за обработку этой очереди немного вырос, по сравнению с тем, что использовался в 1-ой задаче:

public class TaskQueue
    {
        // Список активных потоков
        private List<Thread> threads;

        // Очередб задач на выполнение
        private Queue<Action> tasks;

        // Конструктор для активации объекта
        public TaskQueue(int threadCount)
        {
            // Создаём пустые списки
            tasks = new Queue<Action>();
            threads = new List<Thread>();

            // Метод для запуска указанного в параметрах числа активных потоков
            for (int i = 0; i < threadCount; i++)
            {
                // Устанавливаем потоку Default-ную задачу на выполнение, добавляем наш поток в список, устанавливаем его как фоновый и активируем его (начинаем выполнять Default-ную задачу)
                var t = new Thread(DoThreadWork);
                threads.Add(t);
                t.IsBackground = true;
                t.Start();
            }
        }

        // Default-ный action для помещения в качестве метода на выполнение в только что созданный поток
        private void DoThreadWork()
        {
            // Имитация работы потока (работает всё время)
            while (true)
            {
                // Пытаемся извлечь в качестве задачи на выполнение action из очереди
                Action task = DequeueTask();

                // Если задача на выполнение не null 
                if (task != null)
                {
                    // Пытаемся выполнить action
                    try
                    {
                        task();
                    }
                    catch (ThreadAbortException)
                    {
                        // Если каким-то образом поток закрывается внутри метода -> отменяем закрытие
                        Thread.ResetAbort();
                    }
                    catch (Exception ex)
                    {
                        // Если во время выполнения возникла ошибка -> выводим её в консоль
                        Console.WriteLine(ex);
                    }
                }
                else // Если задача на выполнение null (сигнал от нас о закрытии потоков) -> Выход из цикла. Поток прекращает работу, тк задача DoThreadWork() была выполнена
                    break;
            }
        }

        // Метод для извлечения Action из очереди
        private Action DequeueTask()
        {
            // Забираем блокировку на объект очереди, чтоб можно было работать с ним атомарно
            lock (tasks)
            {
                // Пока число задач на выполнение в очереди = 0 -> отдаём объект блокировки и ставим метод в очередь на ожидание
                while (tasks.Count == 0)
                    Monitor.Wait(tasks);

                // Как только очередь задач не пуста и данный метод (поток) захватил себе блокировку -> возвращаем задачу на выполнение данному потоку и освобожаем блокировку
                return tasks.Dequeue();
            }
        }

        // Метод для помещения задачи в очередь на выполнение
        public void EnqueueTask(Action task)
        {
            // Забираем блокировку на объект очереди
            lock (tasks)
            {
                // Помещаем в очередь задачу на выполнение
                tasks.Enqueue(task);

                // Информируем стоящие в очереди потоки о поступлении нового метода в очередь на выполнение
                Monitor.Pulse(tasks);
            }
        }
        
        // Метод для закрытия всех актинвых потоков 
        public void Close()
        {
            // Посылаем каждому потоку в списке сигнал для его закрытия
            for (int i = 0; i < threads.Count; i++)
                EnqueueTask(null); 

            // Ожидаем закрытия всех потоков, после чего управление передаётся обратно, головному потоку
            foreach (Thread t in threads)
                t.Join();
        }
    }

Как работать с этой пограммой я постараюсь описать ниже:

Для начала работы с нашей очередью, её необходимо сначала создать. Мы создаём объект класса очереди, в который передаём число потоков, которые будут отвечать за обработку (или же выполнение) переданных её методам делегатов. В конструкторе мы создаём N-ное число фоновых потоков, передав им Default-ную задачу на выполнение, после чего добавляем их в список и отправляем работать. Каждый запустившийся поток пытается извлечь задачу на выполнение при помощи метода DequeueTask. Этот метод берёт блокировку на объект внутренней очереди tasks и пытается получить число делегатов, находящихся в данный момент в очереди на выполнение. Так как в начальный момент времени, при создании объекта, никаких делегатов мы ещё не передали, то поток становится в <очередь> на ожидание так называемого сигнала при помощи метода Monitor.Wait(tasks). Этот метод ставит поток в очередь ожидания сигнала, посылаемого методом MonitorPulse(tasks), после чего освобождает блокировку объекта tasks. Таким образом, все активированные нами потоки становятся в очередь в методе DequeueTask() на ожидание сигнала о поступлении нового делегата на обработку.

Как только мы создали наш объект очереди, создадим 2 <шаблона> потоков из головного, каждый из которых возьмёт на себя задачу поставить делегат Action в очередь на выполнение, при этом выведет соответствующее сообщение о том, что он пытается поставить задачу в очередь. Данные части будут выглядеть следующим образом:

Запуск потоков на постановку задачи в очередь на обработку

  // Будет запущено 2 потока
  for (int i = 0; i < 2; i++)
  {
      // Каждый поток вставляет в очередь заданий action на обработку атомарно
      Thread.Sleep(1000);
      new Thread(() =>
      {
          Console.WriteLine($"Task {i} from Thread with PID : {Thread.CurrentThread.ManagedThreadId} has been pulled into the queue");
          tasksQueue.EnqueueTask(action);
      }).Start();
  }

Сам класс с задачей на выполнение потокам, отвечающим за обработку очереди

    public class ExThread
    {
        // Action для тестирования
        public static void PrintPID()
        {
            for (int i = 0; i < 10; i++)
            {
                Console.WriteLine($"Message from Thread with PID: {Thread.CurrentThread.ManagedThreadId}");
                Thread.Sleep(1000);
            }
        }
    }

Что происходит, когда мы запускаем потоки, отвечающие за постановку делегатов в очередь на выполнение? У объекта созданной нами очереди вызывается метод EnqueueTask(action), где action - делегат на выполнение. Поток, созданный из головного потока (не тот, который отвечает за обработку очереди), забирает себе блокировку на объект очереди tasks головного объекта TaskQueue, после чего с помощью системного метода Enqueue(task) вставляет в очередь наш делегат task на выполнение одному из потоков, занимающихся обработкой очереди, после чего посылает сигнал Mobitor.Pulse(tasks), который будет означать, что в очереди появилась новая задача на выполнение, попутно снимая блокировку с объекта tasks и передавая её следующему потоку в очереди на получение этой самой блокировки. Поток, отвечающий за обработку (как было описано выше) задач, стоящий первый в очереди, снова получит блокировку, проверит количество делегатов в очереди и обнаружит, что число делегатов на выполнение уже не 0, а как минимум 1. Следовательно, метод DequeueTask() в качестве своего возвращаемого значения в рамках текущего потока вернёт первый элемент (в нашем случае делегат), который потоку необходимо будет выполнить (при этом, после извлечения делегата из очереди, освободит блокировку объекта tasks, так как все действия над этим объектам уже были выполнены, и уже другой поток сможет получить доступ к объекту очереди).

Далее, мы вернулись обратно в метод DoThreadWork(), но уже получив делегат на выполнение. Дальше мы проверяем, что наш делегат не null, после чего мы можем спокойно начать обрабатывать эту задачу, при этом, если в обрабатываемой задаче встречается метод ThreadAbort() - мы просто отменяем его, ну а при возникновении любой другой ошибки - просто выводим сообщение ошибки на консоль, после чего выполнение задачи завершается, и наш поток снова пытается получить задачу на выполнение, снова вызвав метод DequeueTask(). Таким образом была реализована многопоточная обработка очереди N-ным числом потоков (По факту, мы реализовали собственный объект класса ThreadPool).

Однако, как нам дождаться выполнения всех добавленных на выполнение задач перед тем, как выключать программу? За решение этой проблемы отвечает метод Close объекта TaskQueue, который призван не только дождаться, но и освободить все выделенные объекту TaskQueue ресурсы. Так как он работает?

Предположим, из головного потока мы добавили в очередь задачи на выполнение, после чего вызываем метод Close() из головного потока. Головной поток зайдёт в метод, который добавит в очередь на выполнение null-делегатов столько, сколько потоков было выделено на обработку очереди. Таким образом, данные задачи станут в конец очереди, сразу за всеми теми делегатами, которые мы добавили на обработку. Как только поток получит null-делегат на выполнение, произойдёт выход из цикла while(true), и поток завершит выполнение метода DoThreadWork(), после чего будет произведена работа GC по сборке мусора и освобождение ресурсов на выделение потока. Из головного потока же, после того, как мы отпраили null-делегаты на выполнение, мы ожидаем закрытия всех потоков, которые были выделены на обработку очереди (другими словами - ждём завершения метода DoThreadWork() каждого из активных потоков). Таким образом, мы реализовали не только очередь с постановкой задач на выполнение, но и с ожиданием выполнения этих самых задач.

Задание №3

Данная программа была реализована в точности как и Задача 2, но с некоторыми простыми дополнениями, которые я оставляю читателю на самостоятельный разбор.

About

Program for creating and using threads and synchronization mechanisms

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages