Выполнить сложение двух векторов, содержащих от 50 млн. вещественных значений (предварительно заполнить векторы случайными значениями, предусмотреть выделение и освобождение областей памяти для векторов с помощью функций стандартной библиотеки С++). Вычислить и вывести на экран время выполнения сложения, с использованием функций библиотеки OpenMP. Вывести на экран размер и любой элемент результирующего вектора.

In [4]:
pip install pathos


Collecting pathos
  Downloading pathos-0.3.2-py3-none-any.whl.metadata (11 kB)
Collecting ppft>=1.7.6.8 (from pathos)
  Downloading ppft-1.7.6.8-py3-none-any.whl.metadata (12 kB)
Collecting dill>=0.3.8 (from pathos)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting pox>=0.3.4 (from pathos)
  Downloading pox-0.3.4-py3-none-any.whl.metadata (8.0 kB)
Collecting multiprocess>=0.70.16 (from pathos)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Downloading pathos-0.3.2-py3-none-any.whl (82 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m82.1/82.1 kB[0m [31m110.0 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m494.9 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading multiprocess-0.70.16-py310-none-any.whl (134 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1

In [5]:
import numpy as np
import time
from pathos.multiprocessing import ProcessPool

# Функция для сложения частей векторов
def vector_addition(a_part, b_part):
    return a_part + b_part

# Размер векторов
size = 50_000_000

# Заполнение векторов случайными значениями
a = np.random.rand(size)
b = np.random.rand(size)

# Определение количества частей для параллельного вычисления
num_parts = 4  # Можно изменить на cpu_count(), если хотите использовать все доступные процессоры
part_size = size // num_parts

# Разделение векторов на части
parts = [(a[i*part_size:(i+1)*part_size], b[i*part_size:(i+1)*part_size]) for i in range(num_parts)]

# Запуск параллельного сложения
start_time = time.time()
with ProcessPool(nodes=num_parts) as pool:
    results = pool.map(vector_addition, [part[0] for part in parts], [part[1] for part in parts])
end_time = time.time()

# Объединение результатов
result = np.concatenate(results)

# Вывод времени выполнения, размера и одного элемента результирующего вектора
print(f"Время выполнения сложения: {end_time - start_time:.4f} секунд")
print(f"Размер результирующего вектора: {result.size}")
print(f"Любой элемент результирующего вектора: {result[0]}")


Время выполнения сложения: 5.8894 секунд
Размер результирующего вектора: 50000000
Любой элемент результирующего вектора: 1.6765250329613686


Выполнить задание 1, используя директиву Parallel с опцией Shared (для векторов). Сравнить полученное время с временем в задаче 1, сделать выводы.

In [6]:
pip install numba


Collecting numba
  Downloading numba-0.59.1-cp310-cp310-macosx_10_9_x86_64.whl.metadata (2.7 kB)
Collecting llvmlite<0.43,>=0.42.0dev0 (from numba)
  Downloading llvmlite-0.42.0-cp310-cp310-macosx_10_9_x86_64.whl.metadata (4.8 kB)
Downloading numba-0.59.1-cp310-cp310-macosx_10_9_x86_64.whl (2.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.6/2.6 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m0m
[?25hDownloading llvmlite-0.42.0-cp310-cp310-macosx_10_9_x86_64.whl (31.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.1/31.1 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: llvmlite, numba
Successfully installed llvmlite-0.42.0 numba-0.59.1
Note: you may need to restart the kernel to use updated packages.


In [7]:
import numpy as np
import time
from numba import njit, prange

# Размер векторов
size = 50_000_000

# Заполнение векторов случайными значениями
a = np.random.rand(size)
b = np.random.rand(size)

# Функция для сложения векторов с использованием numba и параллельных вычислений
@njit(parallel=True)
def parallel_vector_addition(a, b):
    result = np.empty_like(a)
    for i in prange(a.size):
        result[i] = a[i] + b[i]
    return result

# Запуск параллельного сложения
start_time = time.time()
result = parallel_vector_addition(a, b)
end_time = time.time()

# Вывод времени выполнения, размера и одного элемента результирующего вектора
print(f"Время выполнения параллельного сложения: {end_time - start_time:.4f} секунд")
print(f"Размер результирующего вектора: {result.size}")
print(f"Любой элемент результирующего вектора: {result[0]}")


Время выполнения параллельного сложения: 11.2960 секунд
Размер результирующего вектора: 50000000
Любой элемент результирующего вектора: 1.0149861499452393


Определите, какое максимальное количество нитей позволяет породить для выполнения параллельных областей программы ваша система.

In [8]:
import os

# Получение количества доступных логических процессоров
num_threads = os.cpu_count()
print(f"Количество доступных логических процессоров: {num_threads}")


Количество доступных логических процессоров: 4


При помощи трёх уровней вложенных параллельных областей породите 8 нитей (на каждом уровне параллельную область должны исполнять 2 нити). Посмотрите, как будет исполняться программа, если запретить вложенные параллельные области.

Без запрета

In [10]:
from joblib import Parallel, delayed
import multiprocessing
import nest_asyncio

nest_asyncio.apply()

# Функция для выполнения работы в параллельных областях
def task(level, thread_id):
    print(f"Level {level}, Thread {thread_id}\n")
    if level < 3:
        Parallel(n_jobs=2)(delayed(task)(level+1, i) for i in range(2))

# Основной запуск параллельных областей
if __name__ == "__main__":
    # Параллельная область 1-го уровня
    Parallel(n_jobs=2)(delayed(task)(1, i) for i in range(2))


Level 1, Thread 0

Level 1, Thread 1

Level 2, Thread 0

Level 3, Thread 0

Level 3, Thread 1

Level 2, Thread 1

Level 3, Thread 0

Level 3, Thread 1

Level 2, Thread 0

Level 2, Thread 1

Level 3, Thread 0

Level 3, Thread 1

Level 3, Thread 0

Level 3, Thread 1



С запретом

In [11]:
from joblib import Parallel, delayed
import multiprocessing
import nest_asyncio

nest_asyncio.apply()

# Функция для выполнения работы в параллельных областях
def task(level, thread_id):
    print(f"Level {level}, Thread {thread_id}")
    if level < 3:
        Parallel(n_jobs=2, prefer="threads")(delayed(task)(level+1, i) for i in range(2))

# Основной запуск параллельных областей
if __name__ == "__main__":
    # Запрет вложенных параллельных областей
    Parallel(n_jobs=2, prefer="threads")(delayed(task)(1, i) for i in range(2))


Level 1, Thread 0
Level 1, Thread 1
Level 2, Thread 0
Level 2, Thread 0
Level 3, Thread 0
Level 3, Thread 1
Level 2, Thread 1
Level 3, Thread 0
Level 3, Thread 1
Level 2, Thread 1
Level 3, Thread 0
Level 3, Thread 1
Level 3, Thread 0
Level 3, Thread 1


Написать пример реализации директивы For с опцией nowait, проиллюстрировать работу (печатать номер текущей итерации и номер потока, выполнившего свою часть цикла)

In [29]:
import threading

def parallel_for_nowait():
    num_threads = 4
    num_iters = 10
    threads = []

    # Определение функции, которая будет выполняться в каждом потоке
    def worker(tid):
        for i in range(tid, num_iters, num_threads):
            thread_id = threading.get_ident()
            print(f"Iteration: {i}, Thread: {thread_id}")

    # Создание и запуск потоков
    for tid in range(num_threads):
        thread = threading.Thread(target=worker, args=(tid,))
        threads.append(thread)
        thread.start()

    # Ожидание завершения всех потоков
    for thread in threads:
        thread.join()

# Вызов функции для демонстрации
parallel_for_nowait()


Iteration: 0, Thread: 123145723068416
Iteration: 4, Thread: 123145723068416
Iteration: 8, Thread: 123145723068416
Iteration: 1, Thread: 123145723068416
Iteration: 5, Thread: 123145723068416
Iteration: 9, Thread: 123145723068416
Iteration: 2, Thread: 123145723068416
Iteration: 6, Thread: 123145723068416
Iteration: 3, Thread: 123145723068416
Iteration: 7, Thread: 123145723068416


Написать пример реализации директивы For с опцией collapse, проиллюстрировать работу (печатать номера текущих итераций и номер потока, их выполнившего).

In [19]:
import threading

def parallel_for_collapse():
    num_threads = 4
    num_iters = 3 * 4  # Произведение количества итераций в каждом из циклов
    threads = []

    # Определение функции, которая будет выполняться в каждом потоке
    def worker(tid):
        # Вычисление начальной и конечной итерации для данного потока
        start = tid * num_iters // num_threads
        end = (tid + 1) * num_iters // num_threads

        # Параллельный цикл с опцией collapse
        for k in range(start, end):
            i, j = divmod(k, 4)  # Распределение индекса k по двум измерениям
            thread_id = threading.get_ident()
            print(f"Iteration: ({i}, {j}), Thread: {thread_id}")

    # Создание и запуск потоков
    for tid in range(num_threads):
        thread = threading.Thread(target=worker, args=(tid,))
        threads.append(thread)
        thread.start()

    # Ожидание завершения всех потоков
    for thread in threads:
        thread.join()

# Вызов функции для демонстрации
parallel_for_collapse()


Iteration: (0, 0), Thread: 123145723068416
Iteration: (0, 1), Thread: 123145723068416
Iteration: (0, 2), Thread: 123145723068416
Iteration: (0, 3), Thread: 123145723068416
Iteration: (1, 0), Thread: 123145723068416
Iteration: (1, 1), Thread: 123145723068416
Iteration: (1, 2), Thread: 123145723068416
Iteration: (1, 3), Thread: 123145723068416
Iteration: (2, 0), Thread: 123145723068416
Iteration: (2, 1), Thread: 123145723068416
Iteration: (2, 2), Thread: 123145723068416
Iteration: (2, 3), Thread: 123145723068416


Написать пример реализации директивы For с опцией reduction, в котором определенным образом накапливаются значения из разных итераций цикла. Проиллюстрировать работу

In [30]:
from numba import njit, prange
import numpy as np

@njit(parallel=True)
def parallel_sum_reduction(arr):
    # Инициализация переменной для накопления суммы
    total_sum = 0

    # Параллельный цикл с опцией reduction
    for i in prange(len(arr)):
        # Накопление суммы
        total_sum += arr[i]

    return total_sum

# Создание тестовых данных
data = np.arange(10)

# Вызов функции для демонстрации
result = parallel_sum_reduction(data)
print("Total sum:", result)


Total sum: 45


Написать пример реализации директивы Sections и директивы Section (совместно) для 4-х задач, в которых все задачи могут работать параллельно, но 2 и 3 задачи – только вместе, друг за другом.

In [42]:
pip install pymp

Collecting pymp
  Downloading pymp-0.0.6.tar.gz (10.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.7/10.7 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0mm
[?25h  Preparing metadata (setup.py) ... [?25ldone
Collecting param>=1.12.0 (from pymp)
  Downloading param-2.1.0-py3-none-any.whl.metadata (5.9 kB)
Collecting panel>=0.13.1 (from pymp)
  Downloading panel-1.4.4-py3-none-any.whl.metadata (25 kB)
Collecting pytest (from pymp)
  Downloading pytest-8.2.1-py3-none-any.whl.metadata (7.6 kB)
Collecting bokeh<3.5.0,>=3.4.0 (from panel>=0.13.1->pymp)
  Downloading bokeh-3.4.1-py3-none-any.whl.metadata (12 kB)
Collecting pyviz-comms>=2.0.0 (from panel>=0.13.1->pymp)
  Downloading pyviz_comms-3.0.2-py3-none-any.whl.metadata (7.7 kB)
Collecting xyzservices>=2021.09.1 (from panel>=0.13.1->pymp)
  Downloading xyzservices-2024.4.0-py3-none-any.whl.metadata (4.0 kB)
Collecting linkify-it-py (from panel>=0.13.1->pymp)
  Downloading linkify_it_py-2.0.3-py3-

In [45]:
import time
from numba import njit, prange

@njit(parallel=True)
def parallel_sections():
    num_threads = 4
    thread_num = 0  # Фиктивный номер потока для имитации поведения OpenMP

    # Первая задача
    for _ in prange(1):
        if thread_num == 0:
            task1(thread_num)

    # Барьер для синхронизации всех потоков
    # Вторая и третья задачи должны выполняться последовательно
    for _ in prange(1):
        if thread_num == 0:
            task2(thread_num)
        if thread_num == 0:
            task3(thread_num)

    # Последняя задача
    for _ in prange(1):
        if thread_num == 0:
            task4(thread_num)

@njit
def task1(thread_num):
    print("Задача 1 выполняется потоком:", thread_num)

@njit
def task2(thread_num):
    print("Задача 2 выполняется потоком:", thread_num)

@njit
def task3(thread_num):
    print("Задача 3 выполняется потоком:", thread_num)

@njit
def task4(thread_num):
    print("Задача 4 выполняется потоком:", thread_num)

if __name__ == "__main__":
    start_time = time.time()
    parallel_sections()
    end_time = time.time()
    print("Время выполнения параллельных задач:", end_time - start_time, "секунд")


Задача 1 выполняется потоком: 0
Задача 2 выполняется потоком: 0
Задача 3 выполняется потоком: 0
Задача 4 выполняется потоком: 0
Время выполнения параллельных задач: 0.8845140933990479 секунд
