In [1]:
# -------------------------------------------------------------
# 작성자 : 백강민
# 작성목적 : SKALA Python Day2 - multiprocessing으로 대용량 데이터 처리
# 작성일 : 2025-01-13
# 변경사항 내역 :
#   2025-01-13 - 최초 작성
# -------------------------------------------------------------

from __future__ import annotations

import os
import random
import time
import math
import multiprocessing as mp
from typing import Iterator, Sequence


def is_prime(n: int) -> bool:
    """n이 소수이면 True"""
    if n < 2:
        return False
    if n in (2, 3):
        return True
    if n % 2 == 0:
        return False

    limit = math.isqrt(n)
    d = 3
    while d <= limit:
        if n % d == 0:
            return False
        d += 2
    return True


def count_primes_single(nums: Sequence[int]) -> int:
    """단일 프로세스 소수 개수 카운트"""
    return sum(1 for x in nums if is_prime(x))


def count_primes_chunk(chunk: Sequence[int]) -> int:
    """Pool worker: chunk 내 소수 개수 카운트"""
    return sum(1 for x in chunk if is_prime(x))


def chunkify(nums: Sequence[int], chunk_size: int) -> Iterator[Sequence[int]]:
    """chunk_size 단위로 슬라이싱한 조각을 지연 생성"""
    for i in range(0, len(nums), chunk_size):
        yield nums[i : i + chunk_size]


def main() -> None:
    N = 10_000_000
    LOW, HIGH = 1, 100_000

    print("【 multiprocessing 대용량 처리 실습 】")
    print(f"- 난수 개수: {N:,}개 / 범위: [{LOW}, {HIGH}]")
    print(f"- CPU 코어 수(감지): {os.cpu_count()}")

    # 1) 난수 리스트 생성
    t0 = time.perf_counter()
    rng = random.Random(42)
    nums = [rng.randint(LOW, HIGH) for _ in range(N)]
    t1 = time.perf_counter()
    print(f"\n[1] 난수 생성 완료: {t1 - t0:.3f} sec")

    # 2) 단일 프로세스
    t2 = time.perf_counter()
    primes_single = count_primes_single(nums)
    t3 = time.perf_counter()
    print("\n[2] 단일 프로세스 결과")
    print(f"- 소수 개수: {primes_single:,}")
    print(f"- 처리 시간: {t3 - t2:.3f} sec")

    # 3) 멀티 프로세스(Pool)
    processes = os.cpu_count() or 4
    chunk_size = 100_000
    chunks = chunkify(nums, chunk_size)

    t4 = time.perf_counter()
    with mp.Pool(processes=processes) as pool:
        primes_multi = sum(pool.imap_unordered(count_primes_chunk, chunks))
    t5 = time.perf_counter()

    print("\n[3] 멀티 프로세스(Pool) 결과")
    print(f"- 프로세스 수: {processes}")
    print(f"- chunk_size(데이터 분할): {chunk_size:,}")
    print(f"- 소수 개수: {primes_multi:,}")
    print(f"- 처리 시간: {t5 - t4:.3f} sec")

    # 4) 검증 + 비교
    print("\n[4] 검증/비교")
    if primes_single != primes_multi:
        print("⚠️ 경고: 단일/멀티 결과가 다릅니다. 로직을 확인하세요.")
    else:
        print("✅ 결과 일치")

    single_time = t3 - t2
    multi_time = t5 - t4
    if multi_time > 0:
        print(f"- 속도 배수(단일/멀티): {single_time / multi_time:.2f}x")


if __name__ == "__main__":
    # macOS에서 spawn으로 인해 __main__(built-in)로 잡히는 환경(VSCode/노트북 등)에서
    # Pool worker 함수 import/pickle 문제가 자주 발생 → fork로 강제하면 해결됨
    mp.set_start_method("fork", force=True)
    main()


【 multiprocessing 대용량 처리 실습 】
- 난수 개수: 10,000,000개 / 범위: [1, 100000]
- CPU 코어 수(감지): 12

[1] 난수 생성 완료: 1.780 sec

[2] 단일 프로세스 결과
- 소수 개수: 959,077
- 처리 시간: 2.684 sec

[3] 멀티 프로세스(Pool) 결과
- 프로세스 수: 12
- chunk_size(데이터 분할): 100,000
- 소수 개수: 959,077
- 처리 시간: 0.489 sec

[4] 검증/비교
✅ 결과 일치
- 속도 배수(단일/멀티): 5.49x
