# ИД23-1 Маслов АН
## Пример 1. Программа создает случайное количество строк в 4 процессах, объединяет их и записывает в csv файл

In [5]:
%%file rand_csv.py
import random
import time

def rand_csv(length, pos, time_start, time_end, output): #генератор случайно даты и числа
    csv_rows = []
    for i in range(length):
        stime = time.mktime(time.strptime(time_start, '%m/%d/%Y %I:%M %p'))
        etime = time.mktime(time.strptime(time_end, '%m/%d/%Y %I:%M %p'))
        ptime = stime + random.random() * (etime - stime)
        csv_rows.append([time.strftime('%y%m%d%H%M%S', time.localtime(ptime)), random.randint(1, 9999999), pos])
    output.put(csv_rows)

Overwriting rand_csv.py


In [6]:
import rand_csv

In [7]:
import multiprocessing as mp
import random
import pandas as pd

output = mp.Queue()

#29.09.2025 добавлен номер процесса
processes = [mp.Process(target=rand_csv.rand_csv, args=(random.randint(50000,500000), x, "1/1/2008 1:30 PM", "1/1/2025 4:50 AM", output)) for x in range(4)] #создаем 4 процесса
for p in processes:
    p.start()

results = [output.get() for p in processes] #сохраняем результат

for p in processes:
    p.join()

pandas_result = []
for i in results:
    pandas_result = pandas_result + i #форматируем для удобства

#29.09.2025 добавлен номер процесса
resultdata = pd.DataFrame(pandas_result, columns=["datetime", "rand_number", "processor_number"]) #создаем датафрейм
resultdata['datetime'] = pd.to_datetime(resultdata['datetime'], format='%y%m%d%H%M%S')
resultdata = resultdata.sort_values('datetime') #сортируем по дате
resultdata.to_csv("csv_result.csv", sep=';', index=False) #записываем результат в csv файл
resultdata


Unnamed: 0,datetime,rand_number,processor_number
211931,2008-01-01 14:01:56,3357454,0
61421,2008-01-01 14:09:09,4786008,2
297819,2008-01-01 14:13:11,499308,0
461601,2008-01-01 14:25:54,6334158,3
824997,2008-01-01 14:46:05,2039290,1
...,...,...,...
120148,2025-01-01 04:25:37,8098250,2
138883,2025-01-01 04:28:04,9433376,0
657027,2025-01-01 04:28:59,9371120,1
640513,2025-01-01 04:29:49,9198863,1


## Пример 2. Передача задач в очередь и результатов в другую очередь

In [None]:
%%file worker_file.py
import rand_csv
def worker(input_queue, output_queue, worker_id): #30.09.25 Рабочий процесс, который берет задания из входной очереди и кладет результаты в выходную

    while True:
        # Берем задание из очереди
        task = input_queue.get()
        
        # Проверяем сигнал завершения
        if task is None:
            print(f"Рабочий процесс {worker_id} завершает работу")
            break
            
        # Извлекаем параметры задания
        length, pos, time_start, time_end = task
        
        # Выполняем работу
        rand_csv.rand_csv(length, pos, time_start, time_end, output_queue)

Overwriting worker_file.py


In [9]:
from worker_file import worker

In [10]:
import random
import time
import multiprocessing as mp
import pandas as pd




def main():
    start_time = time.time()
    
    # Создаем очереди
    input_queue = mp.Queue()  # Для заданий
    output_queue = mp.Queue() # Для результатов
    
    num_workers = 2
    workers = []
    
    for i in range(num_workers):
        p = mp.Process(target=worker, args=(input_queue, output_queue, i))
        p.start()
        workers.append(p)
    
    tasks = [
        (random.randint(50000, 500000), i, "1/1/2008 1:30 PM", "1/1/2025 4:50 AM")
        for i in range(4)
    ]
    
    # Отправляем задания в очередь
    for task in tasks:
        input_queue.put(task)
        print(f"Задание {task[1]} добавлено в очередь")
    
    # Отправляем сигналы завершения рабочим процессам
    for _ in range(num_workers):
        input_queue.put(None)
    
    # Собираем результаты
    results = []
    expected_results = len(tasks)
    
    while len(results) < expected_results:
        result = output_queue.get()
        results.append(result)
        print(f"Получен результат из процесса, записей: {len(result)}")
    
    for w in workers:
        w.join()
    
    pandas_result = []
    for i in results:
        pandas_result = pandas_result + i
    
    # Создаем и сортируем DataFrame по дате
    resultdata = pd.DataFrame(pandas_result, columns=["datetime", "rand_number", "processor_number"])
    resultdata['datetime'] = pd.to_datetime(resultdata['datetime'], format='%y%m%d%H%M%S')
    resultdata = resultdata.sort_values('datetime')
    
    end_time = time.time()
    
    # Выводим результаты
    print(f"\nВремя выполнения: {end_time - start_time:.2f} секунд")
    
    # Сохраняем результат
    resultdata.to_csv("csv_result_queue_improved.csv", sep=';', index=False)
    
    return resultdata

result = main()
result

Задание 0 добавлено в очередь
Задание 1 добавлено в очередь
Задание 2 добавлено в очередь
Задание 3 добавлено в очередь
Получен результат из процесса, записей: 188231
Получен результат из процесса, записей: 310655
Получен результат из процесса, записей: 122994
Получен результат из процесса, записей: 462321

Время выполнения: 15.45 секунд


Unnamed: 0,datetime,rand_number,processor_number
212264,2008-01-01 13:31:45,6524304,1
261790,2008-01-01 13:36:48,5561433,1
478352,2008-01-01 13:40:15,5309443,1
931981,2008-01-01 13:48:07,2990344,2
631728,2008-01-01 13:49:23,7239817,2
...,...,...,...
1011101,2025-01-01 04:03:28,766344,2
142371,2025-01-01 04:21:36,4227639,0
58983,2025-01-01 04:29:15,5392497,0
453108,2025-01-01 04:44:13,2836972,1


## Пример 3. Использование Pool.map и Pool.apply


In [None]:
%%file rand_csv_wrapper.py
import multiprocessing as mp
import rand_csv

def rand_csv_wrapper(args): #30.09.25 Обертка для функции rand_csv для использования с Pool
    length, pos, time_start, time_end = args
    # Создаем временную очередь для совместимости с существующей функцией
    temp_queue = mp.Queue()
    rand_csv.rand_csv(length, pos, time_start, time_end, temp_queue)
    return temp_queue.get()

Overwriting rand_csv_wrapper.py


In [14]:
import random
import time
import multiprocessing as mp
import pandas as pd
from multiprocessing import Pool
from rand_csv_wrapper import rand_csv_wrapper

def pool_apply_example():
    start_time = time.time()
    
    with Pool(processes=4) as pool:
        results = []
        for i in range(4):
            # Pool.apply - блокирующий вызов, выполняется последовательно
            result = pool.apply(
                rand_csv_wrapper, 
                ((random.randint(50000, 500000), i, "1/1/2008 1:30 PM", "1/1/2025 4:50 AM"),)
            )
            results.extend(result)
    
    # Создаем и сортируем DataFrame по дате
    resultdata = pd.DataFrame(results, columns=["datetime", "rand_number", "processor_number"])
    resultdata['datetime'] = pd.to_datetime(resultdata['datetime'], format='%y%m%d%H%M%S')
    resultdata = resultdata.sort_values('datetime')
    
    end_time = time.time()
    
    print(f"Время выполнения (apply): {end_time - start_time:.2f} секунд")
    
    # Сохраняем результат
    resultdata.to_csv("csv_result_apply.csv", sep=';', index=False)

    return resultdata

def pool_map_example():
    start_time = time.time()
    
    # Подготавливаем задачи
    tasks = [
        (random.randint(50000, 500000), i, "1/1/2008 1:30 PM", "1/1/2025 4:50 AM")
        for i in range(4)
    ]
    
    with Pool(processes=4) as pool:
        # Pool.map - параллельное выполнение
        results_lists = pool.map(rand_csv_wrapper, tasks)
    
    # Объединяем результаты
    all_results = []
    for i, result in enumerate(results_lists):
        all_results.extend(result)
    
    resultdata = pd.DataFrame(all_results, columns=["datetime", "rand_number", "processor_number"])
    resultdata['datetime'] = pd.to_datetime(resultdata['datetime'], format='%y%m%d%H%M%S')
    resultdata = resultdata.sort_values('datetime')
    
    end_time = time.time()
    
    print(f"Время выполнения (map): {end_time - start_time:.2f} секунд")
    
    # Сохраняем результат
    
    return resultdata

apply_result = pool_apply_example()
map_result = pool_map_example()

apply_result 

Время выполнения (apply): 30.22 секунд
Время выполнения (map): 11.68 секунд


Unnamed: 0,datetime,rand_number,processor_number
904612,2008-01-01 13:45:09,2032741,2
408358,2008-01-01 13:57:41,163295,1
77494,2008-01-01 14:01:27,8361081,0
661454,2008-01-01 14:12:35,9078225,1
406789,2008-01-01 14:14:21,9894032,1
...,...,...,...
1050202,2025-01-01 03:48:55,415741,2
723703,2025-01-01 03:58:52,1320876,2
560166,2025-01-01 04:31:25,5077053,1
193716,2025-01-01 04:33:09,7537173,1


In [15]:
map_result

Unnamed: 0,datetime,rand_number,processor_number
492928,2008-01-01 13:30:23,3983486,2
463531,2008-01-01 13:47:44,3010769,2
298287,2008-01-01 13:50:20,216692,2
114272,2008-01-01 14:32:53,3738598,1
712509,2008-01-01 14:52:58,1372473,3
...,...,...,...
521245,2025-01-01 04:11:14,7567935,2
267194,2025-01-01 04:15:19,6800323,2
612455,2025-01-01 04:28:39,756925,2
683182,2025-01-01 04:29:06,5705691,3
