So far, we've seen how the CPU runs code in sequence, and how control flow statements (like if and else) can change the order in which it executes statements. However, we can also write programs that execute more than one instruction at a time. A multi-core CPU has the ability to run multiple instructions simultaneously. The desire to take advantage of modern, multi-core CPUs has given rise to a technique called **parallel processing**, which is very useful in data science.

Parallel processing can be powerful, but it also presents many unique challenges. When multiple processes are sharing data, it's important to manage which process has access to the data and when so that it doesn't become corrupted. It's also important to think the execution of parallel processes through carefully, because executing multiple instructions at once can potentially introduce tricky bugs. Learning to manage these factors will help us write very powerful code that does quick and meaningful data analysis.

In Python, some values are immutable, such as integers. This means that we can't change them. Most of the data structures we've worked with (like dictionaries and lists) are **mutable**, so they're useful for representing information that changes. Mutable variables are especially useful in parallel processing because we often want to share and edit the same data between different processes.

In [40]:
class Counter():
    def __init__(self):
        self.count = 0
    def increment(self):
        self.count += 1
    def get_count(self):
        return self.count

In [41]:
def count_up_100000(counter):
    for i in range(100000):
        counter.increment()

In [42]:
# Let's create a mutable instance of the Counter class

counter = Counter()
initial_count = counter.get_count()
count_up_100000(counter)
final_count = counter.get_count()

In [44]:
print(initial_count)
final_count

0


100000

We counted from 0 to 100000 using a Counter instance. Creating this instance, calling the function, and incrementing the counter all happened in one process. Every instruction in the process executed one after the other. We can also run multiple processes at once, however. We often refer to this technique as **multithreading**.

A thread is one path of execution in a program. We typically have one "main thread" that we think of as our single process program. We can also create new threads, though, and run them concurrently with the main thread. To do this in Python, we use the **threading module**. Specifically, we can use **threading.Thread()** to create an instance of the Thread class, which executes a given function as a separate process.

To create a Thread instance that runs the count_up_100000 function with counter as an argument, we write:

In [21]:
import threading
counter = Counter()
count_thread = threading.Thread(target=count_up_100000, args=[counter])

Then we start the thread:

In [22]:
count_thread.start()

Next, we "join" the thread so that when it's finished executing, it "joins" with the main thread by terminating:

In [23]:
count_thread.join()

The main thread will wait until the other thread has finished executing before moving past the thread.join() call. Waiting for a condition like the termination of a thread is called **blocking**.

In [24]:
after_join = counter.get_count()
print(after_join)

100000


In programming, we say that a program is **deterministic** if we can precisely predict its output for a particular input. Most single-threaded operations are deterministic because we can walk through the code for any input step by step, and predict the output.

If we write a program that determines age based on birthday, for example, we should be able to precisely predict how it will behave for any given input. We'd be able to look through the program and determine what the computer will do at each step.

Now imagine that we ask our friend to count to 100000, and then call us back when he's finished. Being an unusually obedient friend, he starts to count. In this analogy, we're the main thread of the program, and our friend is another thread.

Our friend's call telling us that he's finished is analogous to joining the thread. We know when he calls that the value of his "counter" will be 100000. We know that this is true from our activity on above code, when we measured the value of the counter after the two threads joined.

Now imagine that we call our friend a few hours after he started counting to ask what number he's on. He may be at 1000, or 10000, or 25392. It's impossible to know for sure, and this is analogous to measuring the value of our counter before we've joined the counting thread. We can't predict this value because we don't know how many iterations of the counting loop will have been executed at the time of our reading. When we can't reliably predict the outcome of running a piece of code, we call that code **nondeterministic**.

Let's conduct a "trial" by writing a function that starts a new counter thread, then measures the counter's value in the middle of the thread's execution. We'll conduct three trials and compare the results

In [25]:
def conduct_trial():
    counter = Counter()
    count_thread = threading.Thread(target = count_up_100000, args = [counter])
    count_thread.start()
    intermediate_value = counter.get_count()
    count_thread.join()
    return intermediate_value

In [26]:
trial1 = conduct_trial()
print(trial1)
trial2 = conduct_trial()
print(trial2)
trial3 = conduct_trial()
print(trial3)

23793
12848
18725


Multithreading is nondeterministic by nature, but there are ways to combat that nondeterminism. The easiest and most common way to make multithreading more predictable is through the use of **threading.Lock**. A **lock** is a way to conditionally block the execution of some threads. At any given time, we can think of a lock as being either **available** or **acquired**. A thread can acquire an available lock, but if a thread tries to acquire an acquired lock (that another thread is using), it will be blocked until that lock becomes available.

Suppose we have two threads, A and B, and that both have access to an instance of **threading.Lock** called **lock**, and a **Counter** instance called **counter**:

![image.png](attachment:image.png)

The code above counts from 0 to 100 by 2. Thread A gets the value of the count at some point during thread B's counting process. Before it can get the value, thread A tries acquiring a lock. If the lock is available, thread A will get the count, and then release the lock. If the lock is unavailable (because thread B has it), thread A will wait for the lock to become available before continuing and getting the count value.

Without the lock, thread A might get the count at any value between 0 and 100. With the lock, however, thread A can only get an even count, because it can only access the counter variable after it has already been incremented twice in the current iteration of the for loop. The lock ensures that our main thread can only read our counter variable at multiples of 2.

In below code, we'll use a lock to ensure that our main thread can only read our counter variable at multiples of 10.

In [27]:
# Wrap the inner for loop in count_up_100000 inside lock.acquire() and 
# lock.release() so that nobody can acquire the lock 
# unless the counter value is a multiple of 10.

import threading

def count_up_100000(counter, lock):
    for i in range(10000):
        lock.acquire()
        for i in range(10):
            counter.increment()
        lock.release()

In [28]:
# In conduct_trial(), wrap the call to counter.get_count() inside lock.acquire()
# and lock.release() so that the main thread can only read the counter value 
# at multiples of 10.

def conduct_trial():
    counter = Counter()
    lock = threading.Lock()
    count_thread = threading.Thread(target = count_up_100000, 
                                    args = [counter, lock])
    count_thread.start()
    lock.acquire()
    intermediate_value = counter.get_count()
    lock.release()
    count_thread.join()
    return intermediate_value

In [29]:
trial1 = conduct_trial()
print(trial1)
trial2 = conduct_trial()
print(trial2)
trial3 = conduct_trial()
print(trial3)

17870
11290
7690


Now suppose we want to count to 200000. We can do this in two stages:

Increment counter 100000 times

Increment counter 100000 times again

This approach will produce interesting results because the operation will behave differently if we split it up among multiple threads. First, let's implement this behavior using only the main thread. Try to predict the outcome before running a code. Remember that we're implementing a single-threaded solution, so the outcome should be **deterministic**.

In [30]:
def count_up_100000(counter):
    for i in range(100000):
        counter.increment()
        
counter = Counter()

count_up_100000(counter)
count_up_100000(counter)

In [31]:
final_count = counter.get_count()
print(final_count)

200000


Now let's implement a multi-threaded implementation to count to 200000. We've defined a conduct_trial() function that counts to 200000 with two threads, each of which increments the counter 100000 times. It's important that both of the threads start at the same time, and are joined at the same time. For this experiment, we want the threads to execute in parallel so we can make observations about how they behave in parallel.

We'll need to join the threads in such a way that they're executing at the same time. Then we'll conduct and print the results of three trials that check the final value of our counter.

In [32]:
def count_up_100000(counter):
    for i in range(100000):
        counter.increment()

In [33]:
def conduct_trial():
    counter = Counter()
    count_thread1 = threading.Thread(target = count_up_100000, args = [counter])
    count_thread2 = threading.Thread(target = count_up_100000, args = [counter])
    count_thread1.start()
    count_thread2.start()
    
    count_thread1.join()
    count_thread2.join()
    
    final_count = counter.get_count()
    return final_count

In [34]:
trial1 = conduct_trial()
print(trial1)
trial2 = conduct_trial()
print(trial2)
trial3 = conduct_trial()
print(trial3)

181538
126831
139475


An **atomic** operation is an operation that finishes executing before any other operation can occur, regardless of multithreading. Thus far, we've considered all operations to be atomic.

On the above code, we saw that our final counter value was nondeterministic. It's not clear at first glance why this is. Our loop increments the counter at every step, so the counter should go up by one every time. If counter.increment() is called 200000 times, we'd expect the final value to be 200000. That expectation relies on counter.increment() being atomic, but that's not the case.

Let's look at the internals of counter.increment(). counter.increment() is a method on the Counter class, and its definition looks like this:

def increment(self):

    old_count = self.count
    
    self.count = old_count + 1

So our counter.increment() method actually consists of a few lines of code, and it's very possible that the lines don't all execute one after another.

Suppose we have two threads calling counter.increment(), and that our counter is currently at the value 100500. Thread A might call old_count = self.count, and then thread A's value for old_count is 100500. Now suppose thread B calls old_count = self.count, and then thread B's value for old_count is also 100500.

Now, in any order, both thread A and thread B assign old_count + 1 to self.count. The counter's count property is now 100501, even though counter.increment() was called twice. This is not the behavior we want, and counter.increment() appears to be nonatomic.

We can use locks to imitate atomicity. If we were to protect every call to counter.increment() with the same lock, then only one thread would be able to increment the counter at a time. Equivalently, we could make counter.increment() an atomic operation by wrapping every line in its definition with a lock. This is the more modular approach, because users of the Counter class won't have to remember to use a lock with every call to counter.increment().

In [35]:
class Counter():
    def __init__(self):
        self.count = 0
        self.lock = threading.Lock()
    def increment(self):
        self.lock.acquire()
        old_count = self.count
        self.count = old_count + 1
        self.lock.release()
    def get_count(self):
        return self.count

In [36]:
def count_up_100000(counter):
    for i in range(100000):
        counter.increment()

In [37]:
def conduct_trial():
    counter = Counter()
    count_thread1 = threading.Thread(target=count_up_100000, args=[counter])
    count_thread2 = threading.Thread(target=count_up_100000, args=[counter])

    count_thread1.start()
    count_thread2.start()

    count_thread1.join()
    count_thread2.join()

    final_count = counter.get_count()
    return final_count

In [38]:
trial1 = conduct_trial()
print(trial1)
trial2 = conduct_trial()
print(trial2)
trial3 = conduct_trial()
print(trial3)                                 
    

200000
200000
200000


We've seen some of the problems that parallel processing can introduce, such as nonatomicity and nondeterminism. In data science, it's important to maintain the integrity of our data, and a multithreaded environment is no exception. By using tools like locks to enforce atomicity and determinism, we can protect resources shared between threads, and ensure that delegating tasks between threads doesn't introduce unexpected bugs into our code.