In [1]:
import sys
sys._is_gil_enabled()

True

As of Python3.13, Global Interface Lock (GIL) can be diabled. That means Python interpreter becomes thread-free; There can be any number of threads per Python runtime. Since threads are more light weight than processes they bring more speed compared to processes. Removing GIL allows parallel programming using threads. But one should be careful when threads need to share states.  

A Queue gives us the ability to perform lots of interprocess communication using native
Python objects. This can be useful if you’re passing around objects with lots of state.
Since the Queue lacks persistence, though, you probably don’t want to use them for jobs
that might require robustness in the face of failure (e.g., if you lose power or a hard drive
gets corrupted).

In [2]:
import time
import math

# Find Primes

We want to find all prime numbers within a given range. This is a CPU-bound problem so we can think about parallel computing to speed up this task. We will try to implement this using different tools to see the gains. 

### A single thread in a process

In [3]:
def check_prime(n):
    if n % 2 == 0:
        return False
    from_i = 3
    to_i = math.sqrt(n) + 1
    for i in range(from_i, int(to_i), 2):
        if n % i == 0:
            return False
    return True

In [4]:
t1 = time.time()
number_range = range(100000001, 101000000)

primes = list(filter(check_prime, number_range))

print("Took:", time.time() - t1)
print(len(primes), '\n', primes[:10], '\n', primes[-10:])

Took: 20.558048248291016
54208 
 [100000007, 100000037, 100000039, 100000049, 100000073, 100000081, 100000123, 100000127, 100000193, 100000213] 
 [100999889, 100999897, 100999901, 100999903, 100999919, 100999939, 100999949, 100999979, 100999981, 100999993]


In multithreaded case, threads compete for jobs while only one thread can be active at a time inside a process although there are multiple launched. However, in this case concurrency is happening but no speed gain. This is a useful case when we are dealing with I/O bound problem when delaying are very likely. While one thread is waiting, another one takes a task to complete it. So more job is done in a period of time. 

Multiprocessing allows us to launch multiple process each of which has its own Python runtime that can be active independently at the same time. So we can achieve parallel computing here but the problem is how these processes should communicate effectively if working on a shared job. 

### Queues - A synchronized queue class

`multiprocessing.Queue` objects give us nonpersistent queues that can send any pickleable Python objects between processes. They carry an overhead, as each object must be
pickled to be sent and then unpickled in the consumer (along with some locking operations). In the following example, we’ll see that this cost is not negligible. However, if
your workers are processing larger jobs, then the communication overhead is probably acceptable.

The two queues are sent as arguments, and multiprocessing handles their synchronization.
Having started the new processes, we hand a list of jobs to the `possible_primes_queue` and end with one poison pill per process. The jobs will be consumed in FIFO order, leaving the poison pills for last. In check_prime we use a `blocking .get()`, as the new processes will have to wait for work to appear in the queue. Since we use flags, we could add some work, deal with the results, and then iterate by adding more work, and signal the end of life of the workers by adding the poison pills later.

In [5]:
import multiprocessing
from multiprocessing.pool import Pool

In [6]:
FLAG_ALL_DONE = b"WORK_FINISHED"
FLAG_WORKER_FINISHED_PROCESSING = b"WORKER_FINSIHED_PROCESSING"

In [7]:
# Define check_prime function as a consumer!
def check_prime(possible_primes_queue, definite_primes_queue):
    '''
    Input: first queue of numbers to be checked for primarity
    OutPut: if it is prime, add it to the second queue 
    '''
    while True:
        n = possible_primes_queue.get()
        if n == FLAG_ALL_DONE:
            definite_primes_queue.put(FLAG_WORKER_FINISHED_PROCESSING)
            break
        else:
            if n % 2 == 0:
                continue
            for i in range(3, int(math.sqrt(n) + 1), 2):
                if n % i == 0:
                    break
            else:
                definite_primes_queue.put(n)

In [8]:
primes = []

manager = multiprocessing.Manager() # Sync manager
possible_primes_queue = manager.Queue()
definite_primes_queue = manager.Queue()

NBR_PROCESSES = 2
processes = []
for _ in range(NBR_PROCESSES):
    p = multiprocessing.Process(target=check_prime,
                                args=(possible_primes_queue, 
                                      definite_primes_queue))
    processes.append(p)
    p.start()

t1 = time.time()
number_range = range(100000000, 101000000)

for possible_prime in number_range:
    possible_primes_queue.put(possible_prime)

# add poison pills to stop the remote workers
for n in range(NBR_PROCESSES):
    possible_primes_queue.put(FLAG_ALL_DONE)

processors_indicating_they_have_finished = 0
while True:
    new_result = definite_primes_queue.get() # block while waiting for results
    if new_result == FLAG_WORKER_FINISHED_PROCESSING:
        processors_indicating_they_have_finished += 1
        if processors_indicating_they_have_finished == NBR_PROCESSES:
            break
    else:
        primes.append(new_result)

assert processors_indicating_they_have_finished == NBR_PROCESSES
print("Took:", time.time() - t1)
print(len(primes), primes[:10], primes[-10:])

Took: 381.61518692970276
54208 [100000007, 100000037, 100000039, 100000049, 100000073, 100000081, 100000123, 100000127, 100000193, 100000213] [100999889, 100999897, 100999901, 100999903, 100999919, 100999939, 100999949, 100999979, 100999981, 100999993]


As seen, multiprocessing can be extremely slow (and therefore useless for small workloads) __if__ processes need to communicate... Multiprocessing has a few substantial limitations. Communication between processes is limited: objects generally need to be serialized or copied to shared memory. This introduces overhead (due to serialization) and complicates building APIs on top of multiprocessing. Starting a subprocess is also more expensive than starting a thread, especially with the “spawn” implementation. Starting a thread takes ~100 µs, while spawning a subprocess takes ~50 ms (50,000 µs) due to Python re-initialization.

Finally, many C and C++ libraries support access from multiple threads but do not support access or use across multiple processes.

## No GIL, Thread-Free

Change the kernel to 3.13t to disable gil. Now threads are free to run as many as we want.

In [151]:
!python3 -VV 

Python 3.13.0 experimental free-threading build (main, Oct  8 2024, 08:51:27) [GCC 13.2.0]


In [3]:
import sys
sys._is_gil_enabled()

False

In [None]:
import os
import threading
import math
import time

Queues are thread safe by default. So we can use them if we need to just like we did for processes. This the case of this problem, we dont need anything but a single flag variable as we will see later. But just to have an idea of how this might work, we write the similar code for threads:

In [None]:
FLAG_ALL_DONE = "WORK_FINISHED"
FLAG_WORKER_FINISHED_PROCESSING = "WORKER_FINSIHED_PROCESSING"

In [None]:
def check_prime(possible_primes_queue, definite_primes_queue):
    '''
    Input: first queue of numbers to be checked for primarity
    OutPut: if it is prime, add it to the second queue 
    '''
    print("Current SubThread", threading.get_native_id())
    print("Active Threads:", threading.active_count())
    while True:
        n = possible_primes_queue.get()
        if n == FLAG_ALL_DONE:
            definite_primes_queue.put(FLAG_WORKER_FINISHED_PROCESSING)
            break
        else:
            if n % 2 == 0:
                continue
            for i in range(3, int(math.sqrt(n) + 1), 2):
                if n % i == 0:
                    break
            else:
                definite_primes_queue.put(n)

In [None]:
primes = []

import queue

possible_primes_queue = queue.Queue()
definite_primes_queue = queue.Queue()

print("Main Thread", threading.main_thread())
print("Main Thread ID", threading.get_native_id())
print("Active Threads in Main Thread:", threading.active_count())

NBR_THREADS = 4
threads = []

for _ in range(NBR_THREADS):
    t = threading.Thread(target=check_prime,
                                args=(possible_primes_queue, 
                                      definite_primes_queue))
    threads.append(t)
    t.start()

t1 = time.time()
number_range = range(100000000, 101000000)

for possible_prime in number_range:
    possible_primes_queue.put(possible_prime)

# add poison pills to stop the remote workers
for n in range(NBR_THREADS):
    possible_primes_queue.put(FLAG_ALL_DONE)

processors_indicating_they_have_finished = 0
while True:
    new_result = definite_primes_queue.get() # block while waiting for results
    if new_result == FLAG_WORKER_FINISHED_PROCESSING:
        processors_indicating_they_have_finished += 1
        if processors_indicating_they_have_finished == NBR_THREADS:
            break
    else:
        primes.append(new_result)

assert processors_indicating_they_have_finished == NBR_THREADS
print("Took:", time.time() - t1)
print(len(primes), '\n', primes[:10], '\n', primes[-10:])

Main Thread <_MainThread(MainThread, started 139811829039232)>
Main Thread ID 1945
Active Threads in Main Thread: 15
Current Thread 9670
Active Threads: 16
Current Thread 9671
Active Threads: 18
Current Thread 9672
Active Threads: 19
Current Thread 9673
Active Threads: 19
Took: 27.562790870666504
54208 
 [100000007, 100000037, 100000039, 100000049, 100000073, 100000081, 100000127, 100000123, 100000193, 100000213] 
 [100999889, 100999897, 100999901, 100999919, 100999903, 100999939, 100999949, 100999979, 100999993, 100999981]


As you can see, we are not getting any speed up because of the overhead from communication. Cooperation comes at a cost—the cost of synchronizing data and checking the shared data can be quite high. We’ll work through several approaches here that can be used in different ways for task coordination ... Can we change the code design to solve this more efficiently? Yes in several ways. We can easily split the searching range into intervals and devote a thread to search in that range for primes 

In [None]:
def create_range(start, end, nbr_threads):
    step = (end-start) // nbr_threads
    ranges = [(start+step*(i-1), start+step*i) for i in range(1, nbr_threads+1)]
    return ranges.append((start+step*nbr_threads, end)) if end-start-step*nbr_threads else ranges

def check_prime_in_range(from_i, to_i):
    print("Current SubThread", threading.get_native_id())
    for n in range(from_i, to_i):
        if n % 2 == 0:
            continue 
        for i in range(3, int(math.sqrt(n) + 1), 2):
            if n % i == 0:
                break
        else:
            primes.append(n)

In [None]:
START = 100000000
END = 101000000
NBR_THREADS = 4
ranges = create_range(START, END, NBR_THREADS)
ranges

[(100000000, 100250000),
 (100250000, 100500000),
 (100500000, 100750000),
 (100750000, 101000000)]

In [None]:
primes = []

threads = []
# print('Length primes is', len(primes))
t1 = time.time()
for i in range(len(ranges)):
    t = threading.Thread(target=check_prime_in_range,
                                args=(ranges[i]))
    threads.append(t)
    t.start()

# join the threads after they all started. 
for thread in threads:
    thread.join()
# After they joined, the main thread waits here until all child threads are done and returned

print("Took:", time.time() - t1)
print(len(primes), '\n', primes[:10], '\n', primes[-10:])

Current SubThreadCurrent SubThread 41472
 41471
Current SubThread 41473
Current SubThread 41474
Length primes is 54009
Length primes is 54061
Length primes is 54193
Length primes is 54208
Took: 11.000897884368896
54208 
 [100250011, 100000007, 100250021, 100500007, 100000037, 100500011, 100250039, 100750019, 100000039, 100500019] 
 [100249859, 100249861, 100249883, 100249907, 100249913, 100249921, 100249931, 100249937, 100249943, 100249987]


We gain some speed here as expected because this way, threads dont need to share data! This is called "embarrasingly parallel", which is not the most interesting case in general. 

## Sharing data between threads in memory

Let’s define a new problem—suppose we have a small set of numbers and our task is to
efficiently use our CPU resources to figure out if each number is a prime, one number
at a time. Possibly we’ll have just one large number to test. It no longer makes sense to
use one CPU to do the check; we want to coordinate the work across many CPUs.

For this section we’ll look at some larger numbers, one with 15 digits and four with 18
digits:

- Small non-prime: 112272535095295
- Large non-prime 1: 100109100129100369
- Large non-prime 2: 100109100129101027
- Prime 1: 100109100129100151
- Prime 2: 100109100129162907

For now we will focus on some data sharing methods for threads. Later we’ll work through various ways of using IPC (Interprocess Communication)in Python to solve our cooperative search problem. You’ll see that IPC is fairly easy, but
generally comes with a cost.


#### Single Thread

To have a bench mark for speed of multithread solutions, let's first run the single-thread case: 

In [5]:
# Single Thread
def check_prime_singlet(n):
    from_i = 3
    to_i = int(math.sqrt(n)) + 1
    if n % 2 == 0:
        return False
    assert from_i % 2 != 0
    for i in range(from_i, to_i, 2):
        if n % i == 0:
            return False
    return True

In [17]:
t = time.time()
print(check_prime_singlet(112272535095295))
print("Small non-prime: 112272535095295, Took:", time.time()-t)

False
Small non-prime: 112272535095295, Took: 0.0004241466522216797


In [88]:
t = time.time()
print(check_prime_singlet(1109100129100369))
print("Large non-prime 1: 1109100129100369, Took:", time.time()-t)

False
Large non-prime 1: 1109100129100369, Took: 0.0008904933929443359


In [19]:
t = time.time()
print(check_prime_singlet(100109100129101027))
print("Large non-prime 2: 100109100129101027, Took:", time.time()-t)

False
Large non-prime 2: 100109100129101027, Took: 9.89187240600586


In [20]:
t = time.time()
print(check_prime_singlet(100109100129100151))
print("Prime 1: 100109100129100151, Took:", time.time()-t)

True
Prime 1: 100109100129100151, Took: 18.310689210891724


In [21]:
t = time.time()
print(check_prime_singlet(100109100129162907))
print("Prime 2: 100109100129162907, Took:", time.time()-t)

True
Prime 2: 100109100129162907, Took: 19.632556438446045


## Free Threaded Case

To improve the solution, observe that if we have a non-prime, then there’s no way to exit early and we have to manually check all possible factors before we can exit. To solve this, we can flag across all of our processes/threads that a factor has been found, so the search can be called off early. This is called data synchronization.

Synchronization becomes more complicated in case of using multi-processes because each process has its own memory. The difficulty is balancing the cost of reading the flag against the speed saving that is possible. 

By experience the number of threads is taken to be as the number of cpus the machine has. On this machine we have 4 cores - 8 cpus. For this experience, we set the number of threads to be half of #cpus


In [149]:
NBR_THREADS = os.cpu_count() // 2
NBR_THREADS

4

In [None]:
def create_range(start, end, NBR_THREADS):
    x = (end-start) // NBR_THREADS
    ranges = [(start+x*(i-1), start+x*i) for i in range(1, NBR_THREADS)]
    ranges.append((start+x*(NBR_THREADS-1), end))
    return ranges

In [69]:
def check_prime_in_range(n, ranges):
    global FLAG_SET 
    from_i, to_i = ranges
    for i in range(from_i, to_i, 2):
        if FLAG_SET == 1:
            return
        if n % i == 0:
            FLAG_SET = 1
            return

In [None]:
def check_prime(n, NBR_THREADS):
    FLAG_SET = 0
    # cheaply check high-probability set of possible factors earlier for quick rejection
    from_i = 3
    to_i = 21
    check_prime_in_range(n, (from_i, to_i))
    if FLAG_SET == 1:
        return False
    if n % 2 == 0:
        return False 
    from_i = 3
    to_i = int(math.sqrt(n)) + 1
    ranges_to_check = create_range(from_i, to_i, NBR_THREADS)
    ranges_to_check = list(zip(NBR_THREADS * [n], ranges_to_check))
    assert len(ranges_to_check) == NBR_THREADS
    threads = []
    for i in range(NBR_THREADS):
        t = threading.Thread(target=check_prime_in_range,
                             args=(ranges_to_check[i]))
        threads.append(t)
        t.start()
    
    for thread in threads:
        thread.join()
    
    if FLAG_SET == 1:
        return False
    return True

In [131]:
t = time.time()
print(check_prime(1109100129100369, 2))
print("Large non-prime 1: 1109100129100369, Took:", time.time()-t)

True
Large non-prime 1: 1109100129100369, Took: 0.002607583999633789


In [126]:
t = time.time()
print(check_prime(1109100129100369, 2))
print("Large non-prime 1: 1109100129100369, Took:", time.time()-t)

True
Large non-prime 1: 1109100129100369, Took: 0.0022802352905273438


In [133]:
t = time.time()
print(check_prime(100109100129101027, 2))
print("Large non-prime 2: 100109100129101027, Took:", time.time()-t)

True
Large non-prime 2: 100109100129101027, Took: 0.002752542495727539


In [147]:
t = time.time()
print(check_prime(100109100129100151, 2))
print("Prime 1: 100109100129100151, Took:", time.time()-t)

True
Prime 1: 100109100129100151, Took: 0.0027174949645996094


In [146]:
t = time.time()
print(check_prime(100109100129162907, 2))
print("Prime 2: 100109100129162907, Took:", time.time()-t)

True
Prime 2: 100109100129162907, Took: 0.0029535293579101562


We got significant speed gain specially for detecting the large primes. The same performance across all cases unlike the single thread case.

## Synchronizing File and Variable Access

In the following examples we’ll look at multiple processes sharing and manipulating a
state—in this case, four processes incrementing a shared counter a set number of times.
Without a synchronization process, the counting is incorrect. If you’re sharing data in
a coherent way you’ll always need a method to synchronize the reading and writing of
data, or you’ll end up with errors.
Typically the synchronization methods are specific to the OS you’re using, and they’re
often specific to the language you use. Here we look at file-based synchronization using
a Python library and sharing an integer object between Python processes.

You can see our first work function. The function iterates over a local counter. In each iteration it opens a file and reads the existing value, increments it by one, and then writes the new value over the old one. On the first iteration the file will be empty or won’t exist, so it will catch an exception and assume the value should be zero.

In [4]:
import os
import threading
import math
import time

In [25]:
NBR_THREADS = os.cpu_count() // 2
PATH = './project'
MAX_COUNT_PER_THREAD = 10000
FILENAME = "count.txt"
FILEPATH = os.path.join(PATH, FILENAME)

In [26]:
def work(filename, max_count):
    # Creating a file at specified location
    f = open(FILEPATH, "w")
    f.close()
    for _ in range(max_count):
        f = open(filename, "r")
        try:
            nbr = int(f.read())
        except ValueError as err:
            print("File is empty, starting to count from 0, error: " + str(err))
            nbr = 0
        f = open(filename, "w")
        f.write(str(nbr + 1) + '\n')
        f.close()
    

In [27]:
work(FILEPATH, MAX_COUNT_PER_THREAD*NBR_THREADS)

File is empty, starting to count from 0, error: invalid literal for int() with base 10: ''


If we try this in parallel, as one process/thread writes, the
other can read a partially written result that can’t be parsed. This causes an exception, and a zero will be written back. This, in turn, causes our counter to keep getting reset!

Using the `lockfile` module, we can introduce a synchronization method so only one
process gets to write at a time and the others each await their turn. The overall process
therefore runs more slowly, but it doesn’t make mistakes.

First, we create a FileLock object;
the filename can be anything, but using the same name as the file you want to lock
probably makes debugging from the command line easier. When you ask to acquire
the lock the FileLock opens a new file with the same name, with .lock appended.
acquire without any arguments will block indefinitely, until the lock becomes available.
Once you have the lock, you can do your processing without any danger of a conflict.
You can then release the lock once you’ve finished writing


In [28]:
def work(filename, max_count, lock):
    for _ in range(max_count):
        lock.acquire()
        f = open(filename, "r")
        try:
            nbr = int(f.read())
        except ValueError as err:
            print("File is empty, starting to count from 0, error: " + str(err))
            nbr = 0
        f = open(filename, "w")
        f.write(str(nbr + 1) + '\n')
        f.close()
        lock.release()

In [29]:
def run_workers():
    total_expected_count = NBR_THREADS * MAX_COUNT_PER_THREAD
    print("Starting {} threads to count to {}".format(NBR_THREADS, 
                                                      total_expected_count))
    # reset counter
    f = open(FILEPATH, "w")
    f.close()
    os.chmod(FILEPATH, 772)
    threads = []
    lock = threading.Lock()
    for _ in range(NBR_THREADS):
        p = threading.Thread(target=work, 
                             args=(FILEPATH, MAX_COUNT_PER_THREAD, lock))
        p.start()
        threads.append(p)
    
    for p in threads:
        p.join()
    print("Expecting to see a count of {}".format(total_expected_count)) 
    with open(FILEPATH, "r") as f:   
        print(f"{FILENAME} contains: {f.read()}")

In [30]:
run_workers()

Starting 4 threads to count to 40000
File is empty, starting to count from 0, error: invalid literal for int() with base 10: ''
Expecting to see a count of 40000
count.txt contains: 40000



Not much speed gain here... In the same way, we can lock a value that is being manipulated by multiple threads or processes to avoid corrupting data—this is a source of subtle and hard-to-track errors, and this section showed you some robust and lightweight solutions.

The following techniques are meaningful for parallel coding through multi processing when it is needed to share data between processes while processes have separate memories. They are not needed for multi-threads when threads are free.

## IPC (Interprocess Communication) for sharing data in memory

Because the flag is synchronized, we don’t want to check it too frequently—this adds more overhead.
How frequently should we check the shared flag? Each check has a cost, both because
we’re adding more instructions to our tight inner loop and because checking requires
a lock to be made on the shared variable, which adds more cost. The solution we’ve
chosen is to check the flag every 1,000 iterations. Every time we check we look to see if value has been set to FLAG_SET, and if so, we exit the search. If in the search the process finds a factor, then it sets value = FLAG_SET and exits

### Using Redis as a Flag

Redis is a key/value in-memory storage engine. It provides its own locking and each
operation is atomic, so we don’t have to worry about using locks from inside Python
(or from any other interfacing language).
By using Redis we make the data storage language-agnostic—any language or tool with
an interface to Redis can share data in a compatible way. You could share data between
Python, Ruby, C++, and PHP equally easily. You can share data on the local machine or
over a network; to share to other machines all you need to do is change the Redis default
of sharing only on localhost.
Redis lets you store:
- Lists of strings
- Sets of strings
- Sorted sets of strings
- Hashes of strings

Redis stores everything in RAM and snapshots to disk (optionally using journaling)
and supports master/slave replication to a cluster of instances. One possibility with
Redis is to use it to share a workload across a cluster, where other machines read and
write state and Redis acts as a fast centralized data repository.
We can read and write a flag as a text string (all values in Redis are strings) in just the
same way as we have been using Python flags previously. We create a StrictRedis
interface as a global object, which talks to the external Redis server. We could create a
new connection inside check_prime_in_range, but this is slower and can exhaust the
limited number of Redis handles that are available.
We talk to the Redis server using a dictionary-like access. We can set a value 

In [42]:
import time
import math
import os
from multiprocessing import Pool

In [2]:
import redis

In [3]:
!docker run -d --name my-redis -p 6379:6379  redis/redis-stack-server:latest

docker: Error response from daemon: Conflict. The container name "/my-redis" is already in use by container "8e462ad22c677f5f0f28d8833955b24d5538fb2ce1498ef11cadb9fa0ebfb5da". You have to remove (or rename) that container to be able to reuse that name.
See 'docker run --help'.


In [4]:
rds = redis.Redis(
    host='127.0.0.1',
    port=6379,
    charset="utf-8",
    decode_responses=True
    )
connection = rds.ping()

print(connection)

True


In [43]:
FLAG_NAME = 'redis_primes_flag'
FLAG_CLEAR = '0'
FLAG_SET = '1'
CHECK_EVERY = 1000
NBR_PROCESSES = os.cpu_count() // 2

In [None]:
def create_range(start, end, NBR_PROCESSES):
    x = (end-start) // NBR_PROCESSES
    ranges = [(start+x*(i-1), start+x*i) for i in range(1, NBR_PROCESSES)]
    ranges.append((start+x*(NBR_PROCESSES-1), end))
    return ranges

In [36]:
def check_prime_in_range(n, ranges):
    from_i, to_i = ranges
    check_every = CHECK_EVERY
    for i in range(from_i, to_i, 2):
        check_every -= 1
        if not check_every:
            flag = rds[FLAG_NAME]
            if flag == FLAG_SET:
                return
            check_every = CHECK_EVERY
        if n % i == 0:
            rds[FLAG_NAME] = FLAG_SET
            return

In [47]:
def check_prime(n, nbr_processes):
    # cheaply check high-probability set of possible factors earlier
    from_i = 3
    to_i = 21
    rds[FLAG_NAME] = FLAG_CLEAR
    check_prime_in_range(n, (from_i, to_i))
    if rds[FLAG_NAME] == FLAG_SET:
        return False
    if n % 2 == 0:
        return False 
    from_i = 3
    to_i = int(math.sqrt(n)) + 1
    ranges_to_check = create_range(from_i, to_i, nbr_processes)
    ranges_to_check = list(zip(nbr_processes * [n], ranges_to_check))
    assert len(ranges_to_check) == nbr_processes
    
    with Pool(processes=nbr_processes) as pool:
        results = pool.starmap(check_prime_in_range, ranges_to_check)
    
    if False in results:
        return False
    return True

In [44]:
t = time.time()
print(check_prime(112272535095295, NBR_PROCESSES))
print("Small non-prime: 112272535095295, Took:", time.time()-t)

False
Small non-prime: 112272535095295, Took: 0.003108501434326172


In [45]:
t = time.time()
print(check_prime(1109100129100369, NBR_PROCESSES))
print("Large non-prime 1: 1109100129100369, Took:", time.time()-t)

False
Large non-prime 1: 1109100129100369, Took: 0.0070514678955078125


In [48]:
t = time.time()
print(check_prime(100109100129101027, NBR_PROCESSES))
print("Large non-prime 2: 100109100129101027, Took:", time.time()-t)

True
Large non-prime 2: 100109100129101027, Took: 6.011719226837158


In [49]:
t = time.time()
print(check_prime(100109100129100151, NBR_PROCESSES))
print("Prime 1: 100109100129100151, Took:", time.time()-t)

True
Prime 1: 100109100129100151, Took: 33.814987897872925


In [50]:
t = time.time()
print(check_prime(100109100129162907, NBR_PROCESSES))
print("Prime 2: 100109100129162907, Took:", time.time()-t)

True
Prime 2: 100109100129162907, Took: 43.31632161140442


As you can see, multi-processing is much slower than multi threading because of the overhead.

In [51]:
!docker stop my-redis
!docker rm my-redis

my-redis
my-redis


### Using mmap as a Flag

Finally, we get to the fastest way of sharing bytes. Example 9-20 shows a memory-
mapped (shared memory) solution using the mmap module. The bytes in a shared
memory block are not synchronized, and they come with very little overhead. They act
like a file—in this case, they are a block of memory with a file-like interface.

Memory mapping is a technique that uses lower-level operating system APIs to load a file directly into computer memory. It can dramatically improve file I/O performance in your program. 

*Virtual memory* is a way of handling memory management. The operating system uses virtual memory to make it appear that you have more memory than you do, allowing you to worry less about how much memory is available for your programs at any given time. Behind the scenes, your operating system uses parts of your nonvolatile storage, such as your solid-state disk, to simulate additional RAM. mmap uses virtual memory to make it appear that you’ve loaded a very large file into memory, even if the contents of the file are too big to fit in your physical memory.

*Shared memory* is another technique provided by your operating system that allows multiple programs to access the same data simultaneously. Shared memory can be a very efficient way of handling data in a program that uses concurrency. Python’s mmap uses shared memory to efficiently share large amounts of data between multiple Python processes, threads, and tasks that are happening concurrently.

You can picture memory mapping as a process in which read and write operations skip many of the layers of complication and map the requested data directly into physical memory.

A memory-mapped file I/O approach sacrifices memory usage for speed, which is classically called the space–time tradeoff. However, memory mapping doesn’t have to use more memory than the conventional approach (that uses `open()` function in with clause and then `read()`). The operating system is very clever. It will lazily load the data as it’s requested, similar to how Python generators work.

In addition, thanks to virtual memory, you can load a file that’s larger than your physical memory. However, you won’t see the huge performance improvements from memory mapping when there isn’t enough physical memory for your file, because the operating system will use a slower physical storage medium like a solid-state disk to mimic the physical memory it lacks.

The bytes in a shared
memory block are not synchronized, and they come with very little overhead. They act
like a file—in this case, they are a block of memory with a file-like interface. We have
to seek to a location and read or write sequentially. Typically mmap is used to give a short
(memory-mapped) view into a larger file, but in our case, rather than specifying a file
number as the first argument, we instead pass -1 to indicate that we want an anonymous
block of memory. We could also specify whether we want read-only or write-only access
(we want both, which is the default).

In [52]:
import mmap

In [53]:
sh_mem = mmap.mmap(-1, 1) # memory map 1 byte as a flag

In [54]:
FLAG_CLEAR = 0
FLAG_SET = 1
CHECK_EVERY = 1000

In [57]:
def check_prime_in_range(n, ranges):
    from_i, to_i = ranges
    check_every = CHECK_EVERY
    for i in range(from_i, to_i, 2):
        check_every -= 1
        if not check_every:
            sh_mem.seek(0)
            flag = sh_mem.read_byte()
            if flag == FLAG_SET:
                return False
            check_every = CHECK_EVERY
        if n % i == 0:
            sh_mem.seek(0)
            sh_mem.write_byte(FLAG_SET)
            return False
    return True

In [None]:
def check_prime(n, nbr_processes):
    # cheaply check high-probability set of possible factors earlier
    from_i = 3
    to_i = 21
    sh_mem.seek(0)
    sh_mem.write_byte(FLAG_CLEAR)
    check_prime_in_range(n, (from_i, to_i))
    sh_mem.seek(0)
    flag = sh_mem.read_byte()
    if flag == FLAG_SET:
        return False
    if n % 2 == 0:
        return False 
    from_i = 3
    to_i = int(math.sqrt(n)) + 1
    ranges_to_check = create_range(from_i, to_i, nbr_processes)
    ranges_to_check = list(zip(nbr_processes * [n], ranges_to_check))
    assert len(ranges_to_check) == nbr_processes
    with Pool(processes=nbr_processes) as pool:
        pool.starmap(check_prime_in_range, ranges_to_check)

    sh_mem.seek(0)
    flag = sh_mem.read_byte()
    if flag == FLAG_SET:
        return False
    return True

In [59]:
t = time.time()
print(check_prime(112272535095295, 4))
print("Small non-prime: 112272535095295, Took:", time.time()-t)

False
Small non-prime: 112272535095295, Took: 0.0016758441925048828


In [None]:
t = time.time()
print(check_prime(1109100129100369, NBR_PROCESSES))
print("Large non-prime 1: 1109100129100369, Took:", time.time()-t)

False
Large non-prime 1: 1109100129100369, Took: 0.0003638267517089844


In [None]:
t = time.time()
print(check_prime(100109100129101027, NBR_PROCESSES))
print("Large non-prime 2: 100109100129101027, Took:", time.time()-t)

False
Large non-prime 2: 100109100129101027, Took: 1.6223320960998535


In [62]:
t = time.time()
print(check_prime(100109100129100151, NBR_PROCESSES))
print("Prime 1: 100109100129100151, Took:", time.time()-t)

True
Prime 1: 100109100129100151, Took: 10.036401510238647


In [63]:
t = time.time()
print(check_prime(100109100129162907, NBR_PROCESSES))
print("Prime 2: 100109100129162907, Took:", time.time()-t)

True
Prime 2: 100109100129162907, Took: 9.678701877593994


mmap supports a number of methods that can be used to move around in the file that it
represents (including `find`, `readline`, and `write`). We are using it in the most basic way
—we `seek` to the start of the memory block before each read or write and, since we’re
sharing just 1 byte, we use read_byte and write_byte to be explicit.
There is no Python overhead for locking and no interpretation of the data; we’re dealing
with bytes directly with the operating system, so this is our fastest communication
method.