<a href="https://colab.research.google.com/github/a14759226-glitch/python_itmo/blob/main/lab_10.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [30]:
import math
import timeit
import unittest
import doctest
from typing import Callable

# итерация 1

def integrate(f: Callable[[float], float], a: float, b: float, *, n_iter: int = 100000) -> float:
    """
    Вычисляет определенный интеграл функции f на интервале [a, b] методом прямоугольников.

    Функция разбивает интервал на n_iter равных частей и суммирует площади прямоугольников,
    высота которых определяется значением функции f в начале каждого шага.

    Аргументы:
        f (Callable[[float], float]): Интегрируемая функция. Должна принимать float и возвращать float.
        a (float): Нижняя граница интегрирования.
        b (float): Верхняя граница интегрирования.
        n_iter (int): Количество итераций. По умолчанию 100000.
                      Значение должно быть положительным целым числом.

    Возвращает:
        float: Приближенное значение определенного интеграла.

    Ограничения:
        - Метод прямоугольников имеет погрешность упрощения. Точность зависит от величины шага.
        - Большое значение n_iter линейно увеличивает время выполнения (O(N)).

    Примеры (doctest):
        # Пример 1: Тригонометрическая функция. Интеграл cos(x) от 0 до pi/2 равен 1.0
        res = integrate(math.cos, 0, math.pi / 2, n_iter=10000)
        abs(res - 1.0) < 0.001
        > True

        # Пример 2: Полином 2-го порядка. Интеграл x^2 от 0 до 1 равен 1/3
        f_poly = lambda x: x**2
        res_poly = integrate(f_poly, 0, 1, n_iter=10000)
        abs(res_poly - 1/3) < 0.001
        > True
    """
    acc = 0.0
    step = (b - a) / n_iter
    for i in range(n_iter):
        acc += f(a + i * step) * step
    return acc


# Юнит-тестирование (unittest)
class TestIntegration(unittest.TestCase):
    def test_known_integral(self):
        """Проверка расчета известного интеграла: sin(x) на [0, pi] -> 2.0"""
        result = integrate(math.sin, 0, math.pi, n_iter=100000)
        # assertAlmostEqual проверяет равенство с учетом плавающей точки (до 4 знаков)
        self.assertAlmostEqual(result, 2.0, places=4)

    def test_stability_n_iter(self):
        """Проверка устойчивости: увеличение итераций должно уточнять результат, а не ломать его"""
        # Интеграл от x на [0, 10] равен 50.0
        f = lambda x: x

        # Грубый подсчет
        res_low = integrate(f, 0, 10, n_iter=1000)
        # Точный подсчет
        res_high = integrate(f, 0, 10, n_iter=100000)

        self.assertAlmostEqual(res_low, 50.0, places=0)
        self.assertAlmostEqual(res_high, 50.0, places=3)


# Блок запуска и замеров (timeit)
if __name__ == "__main__":
    # verbose=True покажет детали проверки примеров из docstring
    doctest_results = doctest.testmod(verbose=False)
    if doctest_results.failed == 0:
        print("Все doctests пройдены успешно.")
    else:
        print(f"Обнаружено ошибок в doctest: {doctest_results.failed}")

    # Запускаем тесты вручную, чтобы не перекрывать вывод timeit
    suite = unittest.TestLoader().loadTestsFromTestCase(TestIntegration)
    runner = unittest.TextTestRunner(verbosity=1)
    result = runner.run(suite)

    if result.wasSuccessful():
        print("Все unittests пройдены успешно.")


    print(f"{'Iterations':<15} | {'Avg Time (sec)':<15}")

    iteration_counts = [10000, 100000, 1000000, 5000000]

    # Функция для теста
    test_func = math.sin
    a, b = 0, math.pi

    for n in iteration_counts:
        timer = timeit.Timer(lambda: integrate(test_func, a, b, n_iter=n))

        # repeat возвращает список времени выполнения для каждого прогона
        times = timer.repeat(repeat=5, number=1)
        avg_time = sum(times) / len(times)

        print(f"{n:<15} | {avg_time:.6f}")
integrate(math.cos, 0, math.pi, n_iter=1000)

..
----------------------------------------------------------------------
Ran 2 tests in 0.030s

OK


Все doctests пройдены успешно.
Все unittests пройдены успешно.
Iterations      | Avg Time (sec) 
10000           | 0.001698
100000          | 0.013278
1000000         | 0.131888
5000000         | 0.895987


0.0031415926535898094

In [31]:
import math
import timeit
import concurrent.futures as ftres
from functools import partial
from typing import Callable, Type, Union

# Наша базовая функция из итерации 1
def integrate(f: Callable[[float], float], a: float, b: float, *, n_iter: int = 100000) -> float:
    acc = 0.0
    step = (b - a) / n_iter
    for i in range(n_iter):
        acc += f(a + i * step) * step
    return acc

def integrate_async(
    executor_class: Union[Type[ftres.ThreadPoolExecutor], Type[ftres.ProcessPoolExecutor]],
    f: Callable[[float], float],
    a: float,
    b: float,
    *,
    n_jobs: int = 2,
    n_iter: int = 1000000
) -> float:
    """
    Параллельное вычисление интеграла.

    Аргументы:
        executor_class: Класс пула (ThreadPoolExecutor или ProcessPoolExecutor).
        f: Интегрируемая функция.
        a, b: Границы интеграла.
        n_jobs: Количество потоков/процессов.
        n_iter: Общее количество итераций на всю задачу.
    """
    with executor_class(max_workers=n_jobs) as executor:
        # Частично фиксируем аргументы: функцию и количество итераций на одного worker
        spawn = partial(executor.submit, integrate, f, n_iter=n_iter // n_jobs)

        step_per_job = (b - a) / n_jobs
        fs = []

        for i in range(n_jobs):
            job_a = a + i * step_per_job
            job_b = a + (i + 1) * step_per_job
            # Запускаем задачу на под-интервале
            fs.append(spawn(job_a, job_b))

        # Собираем результаты по мере завершения
        return sum(f.result() for f in ftres.as_completed(fs))

if __name__ == "__main__":
    n_iter_total = 10000000
    workers_list = [2, 4, 6, 8]

    print(f"Запуск тестов для {n_iter_total} итераций.")
    print(f"{'Method':<12} | {'Jobs':<6} | {'Time (sec)':<10}")

    # Тестируем потоки (Threads)
    for nj in workers_list:
        t = timeit.timeit(
            lambda: integrate_async(ftres.ThreadPoolExecutor, math.sin, 0, math.pi, n_jobs=nj, n_iter=n_iter_total),
            number=1
        )
        print(f"{'Threads':<12} | {nj:<6} | {t:.4f}")

    # Тестируем процессы (Processes)
    for nj in workers_list:
        t = timeit.timeit(
            lambda: integrate_async(ftres.ProcessPoolExecutor, math.sin, 0, math.pi, n_jobs=nj, n_iter=n_iter_total),
            number=1
        )
        print(f"{'Processes':<12} | {nj:<6} | {t:.4f}")

Запуск тестов для 10000000 итераций.
Method       | Jobs   | Time (sec)
Threads      | 2      | 1.0515
Threads      | 4      | 1.0658
Threads      | 6      | 1.0602
Threads      | 8      | 1.0783
Processes    | 2      | 1.1840
Processes    | 4      | 1.1953
Processes    | 6      | 1.2220
Processes    | 8      | 1.8295


In [32]:
%load_ext Cython
import math
import timeit
import concurrent.futures as ftres
import pandas as pd

The Cython extension is already loaded. To reload it, use:
  %reload_ext Cython


In [33]:
%%cython -a
# cython: language_level=3
# cython: boundscheck=False
# cython: wraparound=False
# cython: cdivision=True

from libc.math cimport sin

def integrate_cy(double a, double b, int n_iter):
    """
    Максимально оптимизированная функция на Cython.
    Использует чистые типы C и математическую библиотеку C (libc).
    """
    cdef double acc = 0.0
    cdef double step = (b - a) / n_iter
    cdef int i

    # Цикл превращается в чистый C код без участия объектов Python
    for i in range(n_iter):
        acc += sin(a + i * step) * step

    return acc

In [34]:
def run_parallel(executor_class, n_jobs, n_iter):
    step_total = math.pi / n_jobs
    iter_per_job = n_iter // n_jobs

    with executor_class(max_workers=n_jobs) as executor:
        futures = []
        for i in range(n_jobs):
            job_a = i * step_total
            job_b = (i + 1) * step_total
            # Вызываем скомпилированную функцию integrate_cy
            futures.append(executor.submit(integrate_cy, job_a, job_b, iter_per_job))

        return sum(f.result() for f in ftres.as_completed(futures))

In [35]:
# итерация 1
def integrate_py(f, a, b, n_iter):
    acc = 0.0
    step = (b - a) / n_iter
    for i in range(n_iter):
        acc += f(a + i * step) * step
    return acc

# итерация 4

N_ITER = 10000000
results = []

# 1. Baseline
t_py = timeit.timeit(lambda: integrate_py(math.sin, 0, math.pi, N_ITER), number=1)
results.append({"Метод": "Python (Последовательно)", "Потоки/Процессы": 1, "Время (сек)": t_py})

# 2. Cython (Последовательно)
t_cy = timeit.timeit(lambda: integrate_cy(0, math.pi, N_ITER), number=1)
results.append({"Метод": "Cython (Последовательно)", "Потоки/Процессы": 1, "Время (сек)": t_cy})

# 3. Cython + Threads (Итерация 2 на Cython)
for nj in [2, 4]:
    t = timeit.timeit(lambda: run_parallel(ftres.ThreadPoolExecutor, nj, N_ITER), number=1)
    results.append({"Метод": "Cython + Threads", "Потоки/Процессы": nj, "Время (сек)": t})

# 4. Cython + Processes (Итерация 3 на Cython)
for nj in [2, 4]:
    t = timeit.timeit(lambda: run_parallel(ftres.ProcessPoolExecutor, nj, N_ITER), number=1)
    results.append({"Метод": "Cython + Processes", "Потоки/Процессы": nj, "Время (сек)": t})

df = pd.DataFrame(results)
print(df.to_string(index=False))

# Расчет ускорения
speedup = t_py / t_cy
print(f"\nОбщее ускорение Cython относительно Python: {speedup:.2f}x")

                   Метод  Потоки/Процессы  Время (сек)
Python (Последовательно)                1     1.758060
Cython (Последовательно)                1     0.147351
        Cython + Threads                2     0.150208
        Cython + Threads                4     0.149674
      Cython + Processes                2     0.141303
      Cython + Processes                4     0.165975

Общее ускорение Cython относительно Python: 11.93x


In [36]:
%%cython -a
# cython: language_level=3
# cython: boundscheck=False
# cython: wraparound=False
# cython: cdivision=True

from libc.math cimport sin

def integrate_nogil(double a, double b, int n_iter):
    cdef double acc = 0.0
    cdef double step = (b - a) / n_iter
    cdef int i

    # Отпускаем Global Interpreter Lock (GIL)
    # Теперь другие потоки могут выполняться одновременно с этим циклом
    with nogil:
        for i in range(n_iter):
            acc += sin(a + i * step) * step

    return acc

In [37]:
import math
import timeit
import concurrent.futures as ftres
import pandas as pd

# итерация 5
def benchmark_iteration_5():
    # Увеличим число итераций, чтобы разница была заметнее
    N_ITER_TOTAL = 30000000
    counts = [2, 4, 6]
    results = []

    for nj in counts:
        # Тест Потоков с nogil
        start = timeit.default_timer()
        with ftres.ThreadPoolExecutor(max_workers=nj) as ex:
            step = math.pi / nj
            fs = [ex.submit(integrate_nogil, i*step, (i+1)*step, N_ITER_TOTAL//nj) for i in range(nj)]
            _ = sum(f.result() for f in fs)
        t_threads = timeit.default_timer() - start
        results.append({"Метод": "Threads (noGIL)", "Workers": nj, "Время (сек)": t_threads})

        # Тест Процессов
        start = timeit.default_timer()
        with ftres.ProcessPoolExecutor(max_workers=nj) as ex:
            step = math.pi / nj
            fs = [ex.submit(integrate_nogil, i*step, (i+1)*step, N_ITER_TOTAL//nj) for i in range(nj)]
            _ = sum(f.result() for f in fs)
        t_procs = timeit.default_timer() - start
        results.append({"Метод": "Processes", "Workers": nj, "Время (сек)": t_procs})

    return pd.DataFrame(results)

df_results = benchmark_iteration_5()
print(df_results.to_string(index=False))

          Метод  Workers  Время (сек)
Threads (noGIL)        2     0.346349
      Processes        2     0.385946
Threads (noGIL)        4     0.339046
      Processes        4     0.402205
Threads (noGIL)        6     0.358944
      Processes        6     0.417555
