# Threads and Processes

## Threading

In [1]:
import urllib
import time
import math

def do_get_website():
    r = urllib.request.urlopen('https://www.unlockingpython.com/')
    return r.getcode()

def do_sleep():
    time.sleep(0.5)

def do_calculation():
    for i in range(0, 10000000):
        math.sin(i)


In [2]:
def print_function_time(func):
    start = time.time()
    func()
    print(f'{func.__name__}: {time.time() - start}')


In [3]:
print_function_time(do_sleep)

do_sleep: 0.5028650760650635


In [4]:
def do_sleep_multiple():
    for _ in range(5):
        do_sleep()

print_function_time(do_sleep_multiple)

do_sleep_multiple: 2.5166127681732178


In [6]:
import threading

def do_sleep():
    time.sleep(0.5)

t1 = threading.Thread(target=do_sleep)
t2 = threading.Thread(target=do_sleep)

t1.start()
t2.start()

t1.join()
t2.join()


In [7]:
import threading

def do_sleep():
    time.sleep(0.5)

t1 = threading.Thread(target=do_sleep)
t2 = threading.Thread(target=do_sleep)

t1.start()
t2.start()
print('Started the threads')
t1.join()
t2.join()
print('Joined the threads')

Started the threads
Joined the threads


In [13]:
t1 = threading.Thread(target=print_function_time, args=(do_sleep,))
t2 = threading.Thread(target=print_function_time, args=(do_sleep,))
t1.start()
t2.start()
t1.join()
t2.join()

do_sleep: 0.5041337013244629do_sleep: 0.5045640468597412



In [5]:
def do_sleep_threads(num_threads=5):
    threads = [
        threading.Thread(target=print_function_time, args=(do_sleep,))
        for i in range(num_threads)
    ]
    [t.start() for t in threads]
    [t.join() for t in threads]

print_function_time(do_sleep_threads)

NameError: name 'threading' is not defined

In [15]:
def do_get_website_threads(num_times=5):
    threads = [threading.Thread(target=print_function_time, args=(do_get_website,)) for i in range(num_times)]
    [t.start() for t in threads]
    [t.join() for t in threads]

print_function_time(do_get_website_threads)

do_get_website: 0.2607760429382324
do_get_website: 0.28763604164123535
do_get_website: 0.3169100284576416
do_get_website: 0.3658781051635742
do_get_website: 0.37075090408325195
do_get_website_threads: 0.37125563621520996


In [34]:
def do_calculation_threads(num_times=5):
    threads = [threading.Thread(target=print_function_time, args=(do_calculation,)) for i in range(num_times)]
    [t.start() for t in threads]
    [t.join() for t in threads]

print_function_time(do_calculation_threads)

do_calculation: 2.4984560012817383
do_calculation: 2.4795620441436768
do_calculation: 2.6083219051361084
do_calculation: 2.6287569999694824
do_calculation: 2.5508408546447754
do_calculation_threads: 2.661665916442871


In [35]:
print_function_time(do_calculation)

do_calculation: 0.5347089767456055


## Locking

In [51]:
import threading

In [16]:
def sleep_and_print(lock):
    time.sleep(0.5)
    lock.acquire()
    print('Done sleeping!')
    lock.release()

def do_sleep_and_print_threads(num_threads=5):
    lock = threading.Lock()
    threads = [
        threading.Thread(target=sleep_and_print, args=(lock,))
        for i in range(num_threads)
    ]
    [t.start() for t in threads]
    [t.join() for t in threads]

do_sleep_and_print_threads()

Done sleeping!
Done sleeping!
Done sleeping!
Done sleeping!
Done sleeping!


In [17]:
def sleep_and_print(lock):
    time.sleep(0.5)
    with lock:
        print('Done sleeping!')

def do_sleep_and_print_threads(num_threads=5):
    lock = threading.Lock()
    threads = [
        threading.Thread(target=sleep_and_print, args=(lock,))
        for i in range(num_threads)
    ]
    [t.start() for t in threads]
    [t.join() for t in threads]

do_sleep_and_print_threads()


Done sleeping!
Done sleeping!
Done sleeping!
Done sleeping!
Done sleeping!


In [18]:
def sleep_and_print(lock):
    time.sleep(0.5)
    lock.acquire()
    print('Done sleeping!')
    # lock.release()

def do_sleep_and_print_threads(num_threads=5):
    lock = threading.Lock()
    threads = [
        threading.Thread(target=sleep_and_print, args=(lock,))
        for i in range(num_threads)
    ]
    [t.start() for t in threads]
    [t.join() for t in threads]

do_sleep_and_print_threads()


Done sleeping!


KeyboardInterrupt: 

In [20]:
def print_function_time(func, lock=threading.Lock()):
    start = time.time()
    func()
    with lock.acquire():
        print(f'{func.__name__}: {time.time() - start}')


In [8]:
def do_sleep_threads(num_threads=5):
    threads = [
        threading.Thread(target=print_function_time, args=(do_sleep,))
        for i in range(num_threads)
    ]
    [t.start() for t in threads]
    [t.join() for t in threads]

print_function_time(do_sleep_threads)

do_sleep: 0.5026049613952637
do_sleep: 0.5032711029052734
do_sleep: 0.504997968673706
do_sleep: 0.505033016204834
do_sleep: 0.5051467418670654
do_sleep_threads: 0.5086212158203125


## Queues

In [9]:
import queue

q = queue.SimpleQueue()

In [10]:
q.put('thing 1')
q.put('thing 2')
print(q.get())
print(q.get())

thing 1
thing 2


In [11]:
q.get()

KeyboardInterrupt: 

In [12]:
q.get(timeout=5)

Empty: 

In [None]:
q.get(block=False)

In [13]:
try:
    q.get(block=False)
except Exception as e:
    print(type(e))

<class '_queue.Empty'>


In [14]:
from queue import SimpleQueue, Empty


def add_urls(q_work):
    urls_list = [
        'http://unlockingpython.com/',
        'https://google.com',
        'https://apple.com',
        'https://ryanemitchell.com'
    ]
    while len(urls_list):
        q_work.put(urls_list.pop())
        time.sleep(2)

def fetch_urls(q_work):
    try:
        while url := q_work.get(timeout=5):
            r = urllib.request.urlopen(url)
            print(f'url: {url}, status code: {r.getcode()}')
    except Empty:
        print('Done')

q_work = SimpleQueue()

threads = [
    threading.Thread(target=fetch_urls, args=(q_work,)),
    threading.Thread(target=fetch_urls, args=(q_work,)),
    threading.Thread(target=add_urls, args=(q_work,))
]

[t.start() for t in threads]
[t.join() for t in threads]


url: https://ryanemitchell.com, status code: 200
url: https://apple.com, status code: 200
url: https://google.com, status code: 200
url: http://unlockingpython.com/, status code: 200
Done
Done


[None, None, None]

## Multiprocessing

In [15]:
import math 

def do_calculation():
    for i in range(0, 10000000):
        math.sin(i)

In [9]:
import time

def print_function_time(func):
    start = time.time()
    func()
    print(f'{func.__name__}: {time.time() - start}')

In [None]:
# THIS DOES NOT WORK IN AN iPYTHON NOTEBOOK
import multiprocessing

p1 = multiprocessing.Process(target=print_function_time, args=(do_calculation,))
p2 = multiprocessing.Process(target=print_function_time, args=(do_calculation,))
p1.start()
p2.start()
p1.join()
p2.join()

In [16]:
import multiprocess
import time

p1 = multiprocess.Process(target=print_function_time, args=(do_calculation,))
p2 = multiprocess.Process(target=print_function_time, args=(do_calculation,))
p1.start()
p2.start()
p1.join()
p2.join()

do_calculation: 0.5974199771881104
do_calculation: 0.5977811813354492


In [17]:
from multiprocess import Process

In [18]:
def do_calculation_processes(num_times=5):
    processes = [
        Process(
            target=print_function_time,
            args=(do_calculation,)
        ) for i in range(num_times)
    ]
    [p.start() for p in processes]
    [p.join() for p in processes]

In [19]:
print_function_time(do_calculation_processes)

do_calculation: 0.5994729995727539
do_calculation: 0.6028730869293213
do_calculation: 0.6036550998687744
do_calculation: 0.6023678779602051do_calculation: 0.6086511611938477

do_calculation_processes: 0.6359820365905762


In [28]:
def do_get_website():
    r = urllib.request.urlopen('http://www.unlockingpython.com/')
    return r.getcode()


In [30]:
def do_get_website_threads(num_processes=5):
    threads = [
        threading.Thread(target=do_get_website) for i in range(num_processes)
    ]
    [t.start() for t in threads]
    return [t.join() for t in threads]

do_get_website_threads()

[None, None, None, None, None]

In [31]:
status_codes = []

def do_get_website(lock):
    global status_codes
    r = urllib.request.urlopen('http://www.unlockingpython.com/')
    with lock:
        status_codes.append(r.getcode())

def do_get_website_threads(num_processes=5):
    lock = threading.Lock()
    threads = [
        threading.Thread(target=do_get_website, args=(lock,))
        for i in range(num_processes)
    ]
    [t.start() for t in threads]
    return [t.join() for t in threads]

do_get_website_threads()
print(status_codes)

[200, 200, 200, 200, 200]


In [32]:
status_codes = []

def do_get_website(lock):
    global status_codes
    r = urllib.request.urlopen('http://www.unlockingpython.com/')
    with lock:
        status_codes.append(r.getcode())

def do_get_website_threads(num_processes=5):
    lock = multiprocess.Lock()
    threads = [
        multiprocess.Process(target=do_get_website, args=(lock,))
        for i in range(num_processes)
    ]
    [t.start() for t in threads]
    return [t.join() for t in threads]

do_get_website_threads()
print(status_codes)


[]


In [34]:
def do_get_website(lock):
    global status_codes
    r = urllib.request.urlopen('http://www.unlockingpython.com/')
    with lock:
        print(f'Adding {r.status} to status_codes')
        status_codes.append(r.getcode())

do_get_website_threads()
print(status_codes)

Adding 200 to status_codes
Adding 200 to status_codes
Adding 200 to status_codes
Adding 200 to status_codes
Adding 200 to status_codes
[]


In [35]:
from multiprocess.shared_memory import SharedMemory

sm = SharedMemory(size=1024, create=True)

In [36]:
sm.buf[0] = 200

print(sm.buf[0])

200


In [37]:
sm.close()
sm.unlink()

In [38]:
import random

sm = SharedMemory(size=1024, create=True)

def do_set_random(sm, i):
    sm.buf[i] = random.randint(0, 256)

def do_set_random_processes(sm, num_processes=5):
    processes = [
        multiprocess.Process(target=do_set_random, args=(sm, i))
        for i in range(num_processes)
    ]
    [p.start() for p in processes]
    return [p.join() for p in processes]

try:
    num = 10
    do_set_random_processes(sm, num_processes=num)
    print([int(sm.buf[i]) for i in range(0, num)])
finally:
    sm.close()
    sm.unlink()

[13, 240, 47, 90, 159, 145, 137, 154, 7, 222]


In [39]:
from multiprocess.shared_memory import ShareableList

sl = ShareableList([0, 0, 0, 0])
sl[0] = 'spam'
sl[1] = 3.14159
sl[2] = 123456
print(sl)

sl.shm.close()
sl.shm.unlink()

ShareableList(['spam', 3.14159, 123456, 0], name='psm_c5da4035')


In [41]:
data = int.to_bytes(500, 2)
int.from_bytes(data)

500

In [43]:
import random
import pickle

def read_object(sm):
    bytes_size = int.from_bytes(sm.buf[0:2], byteorder='little')
    bytes_data = sm.buf[2:2 + bytes_size]
    return pickle.loads(bytes_data)

def write_object(sm, obj):
    bytes_data = pickle.dumps(obj)
    bytes_size = len(bytes_data)
    sm.buf[0:2] = int.to_bytes(bytes_size, 2, byteorder='little')
    sm.buf[2:2 + bytes_size] = bytes_data

In [50]:
def do_append_random(sm, lock):
    random_int = random.randint(0, 256)
    with lock:
        shared_list = read_object(sm)
        shared_list.append(random_int)
        write_object(sm, shared_list)

def do_append_random_processes(sm, num_processes=5):
    lock = multiprocess.Lock()
    processes = [
        multiprocess.Process(target=do_append_random, args=(sm, lock))
        for i in range(num_processes)
    ]
    [p.start() for p in processes]
    [p.join() for p in processes]
    return read_object(sm)

In [51]:
sm = SharedMemory(size=1024, create=True)
# write an empty list to the shared memory
write_object(sm, [])
try:
    num = 10
    random_nums = do_append_random_processes(sm, num_processes=num)
    print(random_nums)
    
finally:
    sm.close()
    sm.unlink()

[113, 153, 223, 241, 78, 26, 139, 197, 174, 216]


## Exercises

**1.**

When the function `do_sleep_threads` was run without locking, we saw that the print statements interfered with each other, leading to printed output appearing incorrectly on the same line. When the function `do_get_website_threads` is run, this happens more rarely. Explain why this is.


**2.**
Create a new queue and add a number of dictionaries to it that have the following form: 

`{'timestamp': time.time(), 'data': random.randint(0, 100)}
`

where `time` and `random` are the built-in Python modules.
Then, retrieve items from the queue and print what the data is and how long each item had
been sitting in the queue.


**3.**

Create a function, `write_uuids_to_file`, that writes UUID (universally unique identifier) strings
to a file. Use the uuid module to do this:

```
from uuid import uuid4

str(uuid4())
```

When called, the function `write_uuids_to_file` should write five UUID strings on a single line, separated by commas. Each time a UUID is written, it should be written in a separate write statement to the file. That is, each time the function is called, five separate writes to the file are made.

After finishing, a newline should be written to the end of the file so that when `write_uuids_to_file` is called again, the UUIDs will be written on a new line.

Create 10 new processes that have the target `write_uuids_to_file`. Start and join the processes and observe the file. Did the processes interfere with each other?

**4.**

Add locking to the function `write_uuids_to_file` so that writes cannot be made by two processes at the same time while UUIDs are being written in the function call. Delete or clear the contents of the file, run the 10 processes again, and observe the contents of the file.


**5.**

Create a work queue, `q_work` and populate it with 100 UUIDs. Change the lock object used previously in `write_uuids_to_file` to a thread-locking object. Rewrite `write_uuids_to_file` so that it gets UUIDs from `q_work` and only exits when `q_work` contains no more UUIDs.

Run `write_uuids_to_file` in several threads.