### Создаем файл с 50000 случайных 32-битных целых чисел

In [1]:
import random
import time
from utils import * # без этого не работает,multiprovessing

def generate_file(file_name, count):
    with open(file_name, 'w') as f:
        for _ in range(count):
            f.write(f"{random.randint(1, 2**31 - 1)}\n")


generate_file("random_numbers.txt", 50000)

### Последовательный подсчет

In [2]:
def count_prime_factors(file_name):
    total_factors = 0
    with open(file_name, 'r') as f:
        for line in f:
            num = int(line.strip())
            factors = prime_factors(num)
            total_factors += len(factors)
    return total_factors

start_time = time.time()
total_factors = count_prime_factors("random_numbers.txt")
end_time = time.time()

print(f"Суммарное количество простых множителей: {total_factors}")
print(f"Последовательное выполнение заняло {end_time - start_time:.2f} секунд")

Суммарное количество простых множителей: 204597
Последовательное выполнение заняло 13.57 секунд


### Многопоточный подсчет (multiprocessing)

In [3]:
import multiprocessing
from multiprocessing import Pool, Manager

def count_prime_factors_parallel(file_name, num_workers):
    manager = Manager()
    result_list = manager.list()
    
    with open(file_name, 'r') as f:
        numbers = [int(line.strip()) for line in f]

    with Pool(num_workers) as pool:
        pool.starmap(worker, [(num, result_list) for num in numbers])

    return sum(result_list)

start_time = time.time()
total_factors_parallel = count_prime_factors_parallel("random_numbers.txt", multiprocessing.cpu_count())
end_time = time.time()

print(f"Суммарное количество простых множителей: {total_factors_parallel}")
print(f"Многопоточное выполнение заняло {end_time - start_time:.2f} секунд")

Суммарное количество простых множителей: 204597
Многопоточное выполнение заняло 3.45 секунд


В этот раз с помощью multiprocessing получилось добиться разницы на python.

### Многопоточный подсчет (PySpark)

In [4]:
from pyspark.sql import SparkSession

def prime_factors_spark(n):
    i = 2
    factors = []
    while i * i <= n:
        if n % i:
            i += 1
        else:
            n //= i
            factors.append(i)
    if n > 1:
        factors.append(n)
    return len(factors)

def count_prime_factors_pyspark(file_name):
    # Создание сессии Spark
    spark = SparkSession.builder.appName("PrimeFactors").getOrCreate()

    # Загрузка данных
    nums_df = spark.read.text(file_name).rdd.map(lambda r: int(r[0]))

    # Подсчет простых множителей
    total_factors = nums_df.map(prime_factors_spark).sum()

    spark.stop()
    return total_factors

# Выполнение с использованием PySpark
start_time = time.time()
total_factors_spark = count_prime_factors_pyspark("random_numbers.txt")
end_time = time.time()

print(f"Суммарное количество простых множителей: {total_factors_spark}")
print(f"Выполнение с PySpark заняло {end_time - start_time:.2f} секунд")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/27 23:06:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/06/27 23:06:53 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

Суммарное количество простых множителей: 204597
Выполнение с PySpark заняло 18.57 секунд


Результаты:
* Последовательное выполнение заняло 13.57 секунд
* Многопоточное выполнение (multiprocesing) заняло 3.45 секунд
* Выполнение с PySpark заняло 18.57 секунд