---
# Python Course - Class 4:
# Advanced Python and multiprocessing

## Contents:

- Sets
- In-depth functions & related tools 
- Handling the operating system
- Parallel computation

---

## Sets

- Sets can be thought of as a special list that does not allow repeated entries
- We can do multiple operations to compare the elements that are whitin the sets
- The  items are unordered and unchangeable

In [7]:
# When creating a new set we can either create an empty one:
foo = set()

# With some pre-determined elements:
foo = {1, 2, 3, 4}

# or from a previous list/tuple
foo = set((1,2,3,4, 1))
print(foo)

{1, 2, 3, 4}


## Adding data to a set

In [1]:
foo = set() 

foo.add(2)
foo.add((2,2))
foo.add((2,1))
foo.add((2,2))
foo.add('bar')
foo.add(2)
foo.add(None)
print(foo)

{2, (2, 1), (2, 2), 'bar', None}


In [40]:
# We can also use "comprehensions" to create a set

bar = {i for i in range(10)} 

print(bar)

{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}


## Removing data from the set

We can remove data using either:
- remove
- discard

The first option will raise an error if the element does not exist in the set

In [37]:
foo = {1,2,4}

foo.discard(2)
print(foo)

foo.discard(3)
foo.remove(3)

{1, 4}


KeyError: 3

## Accessing the data

- When using 'sets' we can have access to some of the functions from the lists

In [2]:
foo = {1,2,34}
print(len(foo))

3


In [2]:
# We can use a normal 'for' loop to access the data
foo = {1,2,3,4}

for item in foo:
    print(item)
foo[0]

1
2
3
4


TypeError: 'set' object is not subscriptable

In [5]:
foo = {1,2,3}

print(1 in foo)

True


In [27]:
foo = {1,2,3,4}

# We can use enumerate, but we will not be able to access the items with it
for index, item in enumerate(foo):
    print(index, item)
    
print(foo[0])

0 1
1 2
2 3
3 4


TypeError: 'set' object is not subscriptable

## Operations with sets

In [4]:
foo = {'apple', 'banana'}
bar = {'orange', 'banana'}

# If we want to find the elements that only exist in foo, we can subtract two sets
print(foo - bar)

{'apple'}


In [33]:
foo = {'apple', 'banana'}
bar = {'orange', 'banana'}

# If we want to combine the information of two sets we can:

# a) do it without changing the original sets
new_set = foo.union(bar)
print(new_set)

# b) update one of the sets:
foo.update(bar)
print(foo)

{'orange', 'apple', 'banana'}
{'orange', 'apple', 'banana'}


In [34]:
# Searching for duplicates is also easy
foo = {'apple', 'banana'}
bar = {'orange', 'banana'}

# a) do it without changing the original sets
new_set = foo.intersection(bar)
print(new_set)

# b) update one of the sets:
foo.intersection_update(bar)
print(foo)


{'banana'}
{'banana'}


In [35]:
# We can also search for the non-common elements

foo = {'apple', 'banana'}
bar = {'orange', 'banana'}

# a) do it without changing the original sets
new_set = foo.symmetric_difference(bar)
print(new_set)

# b) update one of the sets:
foo.symmetric_difference_update(bar)
print(foo)


{'orange', 'apple'}
{'orange', 'apple'}


## Comparing sets 

It is also possible to check if a given set contains another:

- issubset : Returns whether another set contains this set or not
- issuperset : Returns whether this set contains another set or not
- isdisjoint : Returns whether two sets have a intersection or not (returns True if none of the items are present in both sets)

In [45]:
# We can also check if a set "contains" or not another one:
foo = {1,2,3,4}
bar = {i for i in range(10)} 

print(foo.issubset(bar), bar.issuperset(foo), foo.isdisjoint(bar))

True True False


---
# Closer look into functions

The functions can be stored in variables and, later on, be called as one would normally do

In [50]:
def foo(x,y):
    return x + y 

stored_function = foo 
print(stored_function, type(stored_function))

output  = stored_function(2,2)
print(output)

<function foo at 0x7f97eaacd0d0> <class 'function'>
4


In [52]:
# If we want, we can store the functions (as values!) inside a dictionary for  later usage
def foo(x,y):
    return x + y 

def bar(x,y):
    return x * y 

stored_functions = {'foo' : foo,
                    'bar' : bar
                    }

for key, func in stored_functions.items():
    print("{} - {}".format(key, func(3,2)))

foo - 5
bar - 6


## Lambda functions

They are used to define a function in a single line and "attribute" it to a variable 



In [8]:

foo = lambda x, y: x + y

print(foo(2,2))

4


## Applying functions to list elements 

We can use the 'map' function to apply a function to each individul element of the list. It returns an "iterator" so, to use the data we must either :

- convert it to a list
- process it with a loop

**NOTE** : after doing, once, either option, the computed result will be lost. For more details on why this happens read about 'iterators' in python:
- https://docs.python.org/3/glossary.html#term-iterator
- https://www.programiz.com/python-programming/iterator

In [11]:
def bar(x):
    return x**2 

stored_elems = range(10)


result = map(lambda x: x**2, stored_elems) 
print(result)

# a) convert to list: 
print(list(result))

#b) use in a loop:
result = map(bar, stored_elems) 

for entry in result:
    print(entry)

<map object at 0x7f499ba78580>
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
0
1
4
9
16
25
36
49
64
81


---
---
# Parallel computation in Python


## Concurrency
- Concurrency : A condition that exists when at least two threads are making progress. 

<img align="left" width="500" src="./Figures/schedule.png">  <br /><br />


## Parallelism

- Parallelism : A condition that arises when at least two threads are executing simultaneously.



<img align="left" width="500" src="./Figures/parallel_scheme.png">  <br /><br />



In the simplest sense, parallel computing is the simultaneous use of multiple compute resources to solve a computational problem:

-   A problem is broken into discrete parts that can be solved concurrently
-   Each part is further broken down to a series of instructions
-   Instructions from each part execute simultaneously on different processors
-   An overall control/coordination mechanism is employed


## Why should(n’t) we go parallel ?  - Amdahl's law

Parallel computing with many processors is useful only for highly parallelizable programs.


<img align="left" width="500" src="./Figures/law.png">  <br /><br />


## What problems do we face when writting parallel code?

- Python does not facilitate writting parallel code, due to the Global Interpreter Lock (GIL)
    - Only 1 thread can be executing at any given time.
    - Without the GIL python would be significantly slower 
    
- The solution is: launch multiple interpreters at the same time (processes). However:
    - Creating new processes is not "computationally cheap" 
    - sharing data between processes can also have a large cost


---
## The basics - Launching the processes

In [81]:
"""
We create the new processes with the Process function, giving it a target and arguments that we might want to pass

After "starting" the process, the rest of the Python code will run as usual, even if the process isn't done

In this example, even though we started the process, we still the print in the function after the one from the parent process 
This is a consequence of the method with which the operating system "schecules" the operations 
Thus, when launching processes we do not know, a priori, the order in which the operations will start
"""

from multiprocessing import Process

def f(name):
    print('hello ' + name)


p = Process(target=f, args=('World',))
p.start()
print('Goodbye')

Goodbye
hello World


In [82]:
"""
However, we can choose to wait for the processes to close before doing anything else, using the 'join' function.

By default it will wait until the function is finished. However, we can set a maximum time limit, after which the join() will
raise and Exception

"""
p = Process(target=f, args=('World',))
p.start()
p.join()
print('Goodbye')

hello World
Goodbye


In [12]:
"""
Launching processes one by one can be , most of the time, avoided by using a 'Pool'. 

- This method chops the data into a number of chunks which it submits to the process pool as separate tasks.
- It automatically blocks until everything is processed
"""

from multiprocessing import Pool

def f(x):
    return x**2

with Pool(processes=5) as p:
    # Unlike the base-python map, this one returns a list, instead of an iteratio
    print(p.map(f, range(20)))

print("The end")

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
The end


---
---
## The basics - Communication


The two methods that we are going to see work in the same way:
- We add data to a 'stack' of items
- When we request data, it will remove from the stack the oldest item and return it
- Also know as FIFO (First In First Out)



## Pipes

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). 

**Notes**: 
- The data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. 
- There is no risk of corruption from processes using different ends of the pipe at the same time.

**Docs:**
- https://docs.python.org/3.8/library/multiprocessing.html#connection-objects

In [13]:
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])

parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())   # will wait until it receives data. If the pipe is empty, it will wait forever


[42, None, 'hello']


## Queues

- Even though pipes faster (as they only have two endpoints), we often want to communicate between a larger number of processes 
- The interface is slightly diifferent: we use 'put' and 'get'

In [14]:
from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

q = Queue()

p = Process(target=f, args=(q,))
p.start()
print(q.get())    # prints "[42, None, 'hello']"
p.join()

[42, None, 'hello']


## Problems with communication 

When dealing with larger amounts of data, we can spend more time waiting for data to transfer than we do waiting for our routine. When we are using a queue what happens behind the scenes is:

- a) The Python object is serialized (pickled) when it is added
- b) The data is de-serialized and the Python object is rebuilt

The cost of doing so increases with the amount of data that must be processed. To showcase this, we:
- Started by creating an array filled with random numbers, with different sizes
- Measuring the time it took to compute the sum of the entire array



In [149]:
import time 
import numpy as np 
from multiprocessing import Process, Queue

def targ(queue):
    t_0 = time.time()
    data = queue.get() 
    print("\tData Transfer: {:.6f} [s]".format(time.time() - t_0))
    
    t_0 = time.time()
    output = np.sum(data)
    print("\tComputing sum: {:.6f} [s]".format(time.time() - t_0))

q = Queue()

for size in [(170, 9111), (500, 9111)]:
    print("\nCreating array with size: {}".format(size))
    data_in = np.random.random(size)
    q.put(data_in)
    p = Process(target=targ, args=(q,))
    p.start()
    p.join()



Creating array with size: (170, 9111)
	Data Transfer: 0.015794 [s]
	Computing sum: 0.001415 [s]

Creating array with size: (500, 9111)
	Data Transfer: 0.066358 [s]
	Computing sum: 0.003233 [s]


---

## Sharing state between Processes

### Synchronization primitives

Problems with concurrency:
- Concurrency introduces nondeterminism: the exact order in which things are done (the schedule) is not known in advance. 
- Nondeterminism occurs because the schedule results from the interaction of a system and its scheduling policies with various asynchronous external processes, including physical processes, whose timings are not known or controlled by the system. 

- The primitives allow to control access to shared resources of the system


To understand better how they work, let us look at a scheme for a concurrent setup. We provide a scheme of 3 tasks (with different priorities) being scheduled by a given operating system. With the dashed boxes representing the same shared resource, that cannot be accessed by more than one task, at any given time:

<img align="left" width="500" src="./Figures/locks.png">  <br /><br />





### How it works in Python:
- A primitive lock is in one of two states, “locked” or “unlocked”. 
- Once a process or thread has acquired a lock, subsequent attempts to acquire it from any process or thread will block until it is released

We can use:

- Locks : Once a process or thread has acquired a lock, subsequent attempts to acquire it from any process or thread will block until it is released; any process or thread may release it. 

- Semaphores: Allows x number of processe to enter, this can be used for example to limit the number of cpu, io or ram intensive tasks running at the same time.

In [4]:
"""
By using the Lock we can control the order in which the processes are able to work
"""

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    print('Hello world {}'.format(i))
    l.release()

lock = Lock()

for num in range(20):
    p = Process(target=f, args=(lock, num))
    p.start()

Hello world 0
Hello world 1
Hello world 2
Hello world 3
Hello world 4
Hello world 5
Hello world 6
Hello world 7
Hello world 8
Hello world 9
Hello world 10
Hello world 11
Hello world 12
Hello world 13
Hello world 14
Hello world 15
Hello world 16
Hello world 17
Hello world 18
Hello world 19


## Sharing variables and lists

In [150]:
"""

To check for type codes: https://docs.python.org/3/library/array.html#module-array
"""
from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

# 'd' -> double
num = Value('d', 0.0)
arr = Array('i', range(10))

p = Process(target=f, args=(num, arr))
p.start()
p.join()

print(num.value)
print(arr[:])

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]


## Sharing numpy arrays

By using the shared_memory of python3.8 (or newer) we can:
- allocate and manage shared memory, to be accessed by one or more processes 
- create numpy arrays directly on that memory

Docs: https://docs.python.org/3/library/multiprocessing.shared_memory.html

## Creating the array

In [6]:
from multiprocessing import shared_memory
import numpy as np

a = np.array([1, 1, 2, 3, 5, 8])  # Start with an existing NumPy array

shm = shared_memory.SharedMemory(create=True, size=a.nbytes)

# Now create a NumPy array backed by shared memory
b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
b[:] = a[:]  # Copy the original data into shared memory
print(b)


print("Name:", shm.name)  # We did not specify a name so one was chosen for us

# At the end we must clean the memory that we were using
shm.close()
shm.unlink()

[1 1 2 3 5 8]
Name: Hello


## Accessing the data

In [268]:
from multiprocessing import shared_memory, Process

a = np.array([1, 1, 2, 3, 5, 8])  # Start with an existing NumPy array
shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
# Now create a NumPy array backed by shared memory
b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
b[:] = a[:]  # Copy the original data into shared memory

def open_shared_mem(name, size, dtype):
    existing_shm = shared_memory.SharedMemory(name=name)
    c = np.ndarray(size, dtype=dtype, buffer=existing_shm.buf)
    print("original data:", c)
    c[:] = 0
    existing_shm.close()

p = Process(target=open_shared_mem, args=(shm.name, a.shape, a.dtype))
p.start()
p.join()

print("Updated array B: ", b)
shm.close()
shm.unlink()

[1 1 2 3 5 8]
[0 0 0 0 0 0]


## Benchmarking against the Queue


existing_shm = shared_memory.SharedMemory(name=name)
    c = np.ndarray(size, dtype=dtype, buffer=existing_shm.buf)
    

In [264]:
import time 
from multiprocessing import shared_memory, Queue
import numpy as np


def shared_mem_target(name,size, dtype):
    t_start = time.time()
    existing_shm = shared_memory.SharedMemory(name=name)
    c = np.ndarray(size, dtype=dtype, buffer=existing_shm.buf)
    output = np.sum(c)
    print("\tShared memory took {:.6f} [s]; Obtained result of {:.3f}".format(time.time() - t_start, output))
    existing_shm.close()

def queue_target(queue):
    t_0 = time.time()
    data = queue.get() 
    output = np.sum(data)
    print("\tQueue method took: {:.6f} [s]; Obtained result of {:.3f}".format(time.time() - t_0, output))


q = Queue()

for size in [(170, 9111), (500, 9111)]:
    print("\nCreating array with size: {}".format(size))
    data_in = np.random.random(size)
    
    shm = shared_memory.SharedMemory(create=True, size=data_in.nbytes)
    b = np.ndarray(data_in.shape, dtype=data_in.dtype, buffer=shm.buf)
    b[:] = data_in[:]  

    p = Process(target=shared_mem_target, args=(shm.name, size, data_in.dtype))
    p.start()
    p.join()

    q.put(data_in)
    p = Process(target=queue_target, args=(q, ))
    p.start()
    p.join()
    shm.close()
    shm.unlink()


Creating array with size: (170, 9111)
	Shared memory took 0.001792 [s]; Obtained result of 774677.354
	Queue method took: 0.019389 [s]; Obtained result of 774677.354

Creating array with size: (500, 9111)
	Shared memory took 0.004001 [s]; Obtained result of 2277146.556
	Queue method took: 0.073580 [s]; Obtained result of 2277146.556
