# Multi-Process One-Thread

In [1]:
import multiprocessing
from multiprocessing import Process

In [2]:
# Check number of available cores
print("Number of cpu cores: ", multiprocessing.cpu_count())

Number of cpu cores:  16

In [3]:
import pandas as pd
from time import sleep, time

# Example of a processing function
def process_dataframe(chunk_id, chunk_data: pd.DataFrame):
    print(f"Processing chunk {chunk_id}")
    sleep(5)
    print(f"The chunk {chunk_id} has been processed successfully!")

In [4]:
# Make a sample dataframe
data = [
    ['tom', 10], ['nick', 15],
    ['juli', 14], ['peter', 20],
    ['jason', 27], ['anna', 11]
]

# Create the pandas DataFrame
df = pd.DataFrame(data, columns=['Name', 'Age'])

# Devide the dataframe into chunks
chunk_size = 2
chunks = [df[i:i+chunk_size].to_numpy() for i in range(0, len(df), chunk_size)]

# Mark the starting point to measure processing time
start = time()

procs = []
for i, chunk in enumerate(chunks):
  # Define our process but not yet start
  proc = Process(target=process_dataframe, args=(i, chunk,))
  # Start the process
  proc.start()
  # Investigate process ID
  print(f"Process ID: {proc.pid}")
  # Manage all process definitions in a list
  procs.append(proc)

# Stop all processes to prevent resource scarcity
# imagine zombie processes
for proc in procs:
    proc.join() # Wait for the process to finish

# Report total elapsed time
print(f"Total time: {time() - start}s")

Process ID: 26580
Processing chunk 0
Process ID: 26583
Processing chunk 1
Processing chunk 2
Process ID: 26588
The chunk 0 has been processed successfully!
The chunk 1 has been processed successfully!
The chunk 2 has been processed successfully!
Total time: 5.035507917404175s

In [5]:
##############################################
# The 2nd way to do multiprocess is via Pool #
##############################################
from multiprocessing import Pool

# Define a pool of processes
NUM_PROCESSES = 2
pool = multiprocessing.Pool(NUM_PROCESSES)

procs = []
for i, chunk in enumerate(chunks):
    procs.append(
        # The main process does not need to wait for this function
        pool.apply_async(process_dataframe, args=(i, chunk))
    )

for proc in procs:
    proc.get() # Wait for the process to finish

Processing chunk 0Processing chunk 1

The chunk 1 has been processed successfully!The chunk 0 has been processed successfully!

Processing chunk 2
The chunk 2 has been processed successfully!
Processing chunk 0Processing chunk 1

The chunk 0 has been processed successfully!The chunk 1 has been processed successfully!

Processing chunk 2
The chunk 2 has been processed successfully!

In [6]:
##############################################
# The 3rd way to do multiprocess is via Map #
##############################################
inputs = [(i, chunk) for i, chunk in enumerate(chunks)]

# If you have one arg only, please use `.map` instead
outputs = pool.starmap(
    process_dataframe,
    inputs
)

In [7]:
# Uhm... you can see that sometimes, two strings are concatnated w/o any `/n`
# this is because all the processes writes to stdout at the same time, we need
# to find a way to prevent other processes write to it while one is doing
from multiprocessing import Lock

lock = Lock()

def process_dataframe(chunk_id, chunk_data: pd.DataFrame):
    lock.acquire()
    print(f"Processing chunk {chunk_id}")
    sleep(5)
    print(f"The chunk {chunk_id} has been processed successfully!")
    lock.release()

# Make a sample dataframe
data = [
    ['tom', 10], ['nick', 15],
    ['juli', 14], ['peter', 20],
    ['jason', 27], ['anna', 11]
]

# Create the pandas DataFrame
df = pd.DataFrame(data, columns=['Name', 'Age'])

# Devide the dataframe into chunks
chunk_size = 2
chunks = [df[i:i+chunk_size] for i in range(0, len(df), chunk_size)]

# Mark the starting point to measure processing time
start = time()

procs = []
for i, chunk in enumerate(chunks):
  # Define our process but not yet start
  proc = Process(target=process_dataframe, args=(i, chunk,))
  # Start the process
  proc.start()
  # Manage all process definitions in a list
  procs.append(proc)

# Stop all processes to prevent resource scarcity
# imagine zombie processes
for proc in procs:
    proc.join()

# Report total elapsed time
print(f"Total time: {time() - start}s")

Processing chunk 0
The chunk 0 has been processed successfully!
Processing chunk 1
The chunk 1 has been processed successfully!
Processing chunk 2
The chunk 2 has been processed successfully!
Total time: 15.042691946029663s

## Multi-Threaded

Python provides the same APIs to multiprocessing with `start()` and
`join()`.

In [None]:
from threading import Thread

my_threads = []
for i, chunk in enumerate(chunks):
    my_thread = Thread(target=process_dataframe, args=(i, chunk,))
    my_thread.start()
    my_threads.append(my_thread)

for my_thread in my_threads:
    my_thread.join()

However, the way they share data is different. Let’s take a look at the
example below

In [None]:
from threading import Lock

# Define a lock to prevent race condition,
# which is multiple updates into the same variable
lock = Lock()

# Share data
shared_data = 0

def increment_function():
    global shared_data
    with lock: # This is equal to acquire() + release()
        shared_data += 1

my_threads = []
for _ in range(3):
    my_thread = Thread(target=increment_function)
    my_thread.start()
    my_threads.append(my_thread)

for my_thread in my_threads:
    my_thread.join()

print(f"Current value of shared_data: {shared_data}")

In [None]:
# Another way to access shared_data is via Queue
from queue import Queue

# Initialize the queue
shared_queue = Queue()

def increment_function():
    shared_queue.put(1)

my_threads = []
for _ in range(3):
    my_thread = Thread(target=increment_function)
    my_thread.start()
    my_threads.append(my_thread)

for my_thread in my_threads:
    my_thread.join()

shared_data = 0
while not shared_queue.empty():
    shared_data += shared_queue.get()

print(f"Current value of shared_data: {shared_data}")

In [None]:
# Using a global variable is not a piece of cake in multiprocessing
# as in multithreading
import multiprocessing

# Define a shared value between oprocesses
shared_variable = multiprocessing.Value('i', 0)

def worker_function():
    global shared_variable
    with shared_variable.get_lock():
        shared_variable.value += 1

procs = []
for _ in range(3):
  # Define our process but not yet start
  proc = Process(target=worker_function)
  # Start the process
  proc.start()
  # Manage all process definitions in a list
  procs.append(proc)

for proc in procs:
    proc.join()

print(f"Current value of shared_data: {shared_data}")

In [None]:
# Or using queue
from multiprocessing import Queue
from multiprocessing import Process

def worker_function(shared_queue):
    shared_queue.put(1)

shared_queue = Queue()

procs = []
for _ in range(3):
    my_proc = Process(target=worker_function, args=(shared_queue,))
    my_proc.start()
    procs.append(my_proc)

for proc in procs:
    proc.join()

# The main process retrieves data from the queue
# and aggregate the result
result = 0
while not shared_queue.empty():
    result += shared_queue.get(1)

print(f"Results from multiprocessing queue: {result}")

## One-Process One-Thread

In [8]:
# Let's say you have one process with one thread only, how to deal with it?
# AsyncIO helps us to do it
import asyncio

async def download_my_1st_data_func():
    print("Starting my 1st data func...")
    await asyncio.sleep(2)
    print("Completed my 1st data func!")

async def download_my_2nd_data_func():
    print("Starting my 2nd data func...")
    await asyncio.sleep(2)
    print("Completed my 2nd data func!")

def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(
        download_my_1st_data_func(),
        download_my_2nd_data_func(),
    ))
    loop.close()

# Run the event loop
main()

# YOU WILL MEET SOME WEIRD ERRORS HERE DUE TO IPYTHON NOTEBOOK,
# LET'S MOVE TO A PYTHON SCRIPT

Starting my 1st data func...
Starting my 2nd data func...
Completed my 1st data func!
Completed my 2nd data func!