In [1]:
import multiprocessing as mp
import os
import torch as th
import numpy as np
import random
from parallel_funcs import * # functions must be in another .py file for mp to work

# 'spawn' is needed to use gpus in parallel for pytorch
#mp.set_start_method('spawn') # uncomment this if running GPU in paralell / using pytorch

#print(mp.cpu_count())
print('detectd', os.cpu_count(), 'CPUs')
print('detectd', th.cuda.device_count(), 'GPUs')

detectd 256 CPUs
detectd 1 GPUs


In [2]:
# set these parameters to either run in parallel and how many available gpu/cpu threads
run_paralllel = True
gpu_threads = th.cuda.device_count()
cpu_threads = os.cpu_count()-4 # reserve some empty CPU (play with this value)

# prepare data and jobs to run

In [3]:
# CREATE DUMMY DATA
# note: the size of the job depends on if you will receive a boost in parallel time,
    # meaning if the job is too small then the overhead of using mp is not worth it and may increase run time
    # a way around this is to send threads in batches rather than one at a time
matrix_size = 2**15
X = np.random.uniform(0, 100, (matrix_size,matrix_size))
Y = np.random.uniform(0, 100, (matrix_size,matrix_size))

# make trainer object to handle data (do not send data as params)
trainer = Trainer(X, Y) # read only on X, Y

# job parameters follow this structure:
    # [Trainer.job_name, params_dictionary]
    # each item above is one job, for example:
job1 = ['dummy_func', {'alpha':1, 'beta':2, 'multi_gpu':False}] # set multi_gpu=True if running GPUs in parallel
job2 = ['dummy_func', {'alpha':3, 'beta':4, 'multi_gpu':False}] # set multi_gpu=True if running GPUs in parallel

# lets make several dummy jobs just to run them ...
n_jobs = 2*6
job_params = [['dummy_func', {'alpha':random.random(), 'beta':random.random()}] for _ in range(n_jobs)]

# RUN JOBS IN SERIES

In [4]:
stopwatch = Stopwatch()
# in series, we will just call one method at a time and aggregate results in a list
series_results = [trainer.single_job(job_param[0], job_param[1]) for job_param in job_params]
print('ran jobs in series in', stopwatch.stop(), 'seconds')

ran jobs in series in 75.11133599281311 seconds


# RUN JOBS IN PARALLEL (single job)

In [None]:
stopwatch = Stopwatch()
# make pool with number of threads 
pool = mp.Pool(processes=cpu_threads) # processes=gpu_threads if using GPUs in parallel
# pool will equally divide the length of the job_params list among all processes
# results will be an aggregated list of values returned from calling each method
parallel_single_results = pool.starmap(trainer.single_job, job_params) # run_job will pass in method name and run that method with unpacked dictionary of parameters
print('ran jobs in parallel (single job) in', stopwatch.stop(), 'seconds')

# RUN JOBS IN PARALLEL (batch job)

In [None]:
# lets put jobs into batches
batch_job_params = []
batch_size, batch = 4, []
for i in range(len(job_params)):
    batch.append(job_params[i])
    if len(batch) % batch_size == 0 or i == len(job_params)-1:
        batch_job_params.append([batch]) # mp executes a method in a thread by unpacking a list of params
        batch = []

stopwatch = Stopwatch()
# make pool with number of threads 
pool = mp.Pool(processes=cpu_threads) # processes=gpu_threads if using GPUs in parallel
# pool will equally divide the length of the job_params list among all processes
# results will be an aggregated list of values returned from calling each method
batch_results = pool.starmap(trainer.batch_job, batch_job_params) # run_job will pass in method name and run that method with unpacked dictionary of parameters
print('ran jobs in parallel (batch job) in', stopwatch.stop(), 'seconds')

# flatten batch results
parallel_batch_results = []
for batch in batch_results:
    parallel_batch_results = parallel_batch_results + batch

# check accuracy of all 3 methods

In [None]:
print('absolute error between all 3 methods =', np.sum(
    np.abs(np.array(series_results) - np.array(parallel_single_results)) + np.abs(np.array(series_results) - np.array(parallel_batch_results))
))