In [41]:
import os
import random
import struct

def generate_number():
    random_number = random.randint(0, 2**32-1)
    return random_number

def generate_file(n=100, path='./', name='numbers', rm=False):
    file_path = os.path.join(path, name)

    if rm and os.path.exists(file_path):
        os.remove(file_path)

    with open(file_path, 'a') as f:
        for _ in range(n):
            next_number = generate_number()
            f.write(f'{next_number}\n')

generate_file(n=50000, rm=True)

  0%|          | 0/50000 [00:00<?, ?it/s]

# Sequential reading

In [60]:
%%time
def num_prime_factors(number):
    factors = 0
    while number != 1:
        # iterate until num**1/2 cause first factor is always prime
        have_factor = False
        for factor in range(2, int(number**(1/2) + 1)):
            if number % factor == 0:
                number = number // factor
                factors += 1
                have_factor = True
                break
        if not have_factor:
            factors += 1
            number //= number
    return factors


        

def sequential_reading(file_path, n):
    prime_factors = 0
    with open(file_path, 'r') as f:
        for _ in range(n):
            number = f.readline()
            prime_factors += num_prime_factors(int(number))

    return prime_factors


n = 50000
file_path = './numbers'

prime_factors = sequential_reading(file_path, n)

CPU times: user 9.75 s, sys: 11.2 ms, total: 9.76 s
Wall time: 9.85 s


In [61]:
prime_factors

204877

# Multiprocessing

In [62]:
%%time 

# multiprocess is fork of multiprocessing lib that works in iPython 
import multiprocess
import struct 
import os


def worker(numbers, result_queue):
    # print(len(numbers))
    # This time all numbers are stored in list
    prime_factors = 0
    for number in numbers:
        prime_factors += num_prime_factors(number)

    result_queue.put(prime_factors)


def spawn_workers(file_path, num_workers, n):
    all_numbers = []
    with open(file_path, 'r') as f:
        all_numbers = [int(x) for x in f.read().split()]

    nums_per_worker = n // num_workers
    
    result_queue = multiprocess.Queue()

    processes = []
    for i in range(num_workers):

        p = multiprocess.Process(target=worker, args=(all_numbers[i * nums_per_worker : (i+1) * nums_per_worker], result_queue))

        processes.append(p)
        p.start()

    total_prime_factors = 0
    for _ in range(num_workers):
        total_prime_factors += result_queue.get()
    return total_prime_factors
    

n = 50000
prime_factors = spawn_workers('./numbers', 10, n)

CPU times: user 22.5 ms, sys: 27.6 ms, total: 50.2 ms
Wall time: 1.35 s


In [63]:
prime_factors

204877

# Dask

In [107]:
%%time

import dask.dataframe as dd
from dask.distributed import Client
import pandas as pd

# client = Client()
pandas_df = pd.read_csv('./numbers', names=['num'])
df = dd.from_pandas(pandas_df, npartitions=10)

prime_factors = df['num'].map(num_prime_factors, meta=('num', 'int64')).sum().compute()


CPU times: user 195 ms, sys: 106 ms, total: 301 ms
Wall time: 2.24 s


In [108]:
prime_factors

204877