# Matematikai Algoritmusok és Felfedezések II.

## 8. Párhuzamos számítások

### 2023 április 26.

# Párhuzamos számítások

Ez az óra ezt a tutorialt követi: https://realpython.com/python-concurrency/

#### Motiváció

Szeretnénk kihasználni, hogy több processzorunk van, és gyorsabb programokat létrehozni.

# Bevezető

Mit értünk párhuzamosság alatt?

- Valódi párhuzamosság: két különböző processzor azonos időben dolgozik. (**Paralellism**)
- Látszólagos párhuzamosság: egy processzor több porgram futását váltogatja gyorsan (**Concurency**)


## Mutitasking típusai
 - **pre-emptive**: A programok 'nem tudnak róla', hogy párhuzamosan futnak, egy külső rendszer felelős a párhuzamosításért.
 - **cooperative**: A programok maguk mondják meg, hogy mikor lehet őket leállítani és elindítani. 

| Concurrency Type                     | Switching Decision                                                    | Number of Processors |
|--------------------------------------|-----------------------------------------------------------------------|----------------------|
| Pre-emptive multitasking (`threading`) | Az operációs rendszer dönti el, hogy mikor váltson a feladatok között.| 1                    |
| Cooperative multitasking (`asyncio`)   | A feladatok döntik el, hogy mikor adják át az irányítást.             | 1                    |
| Multiprocessing (`multiprocessing`)    | A feladatok párhuzamosan futnak különböző processzorokon              | sok                |

# Mikor hasznos a párhuzamosítás?

| I/O-Bound Process                                                                                                     | CPU-Bound Process                                                                        |
|-----------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------|
| A program az ideje nagy részét azzal tölti, hogy lassú folyamatokkal kommunikál, mint egy hálózati kapcsolat vagy hard drive | A program az idő nagy részében számításokat végez a CPU-n.
| A várakozási idők átfedésével gyorsítjuk fel.| Több számítás egyidejű elvégézésével gyorsítjuk fel.  

# I/O bound példa

Néhány honlapot szeretnénk letölteni. 

In [4]:
import requests #egy  könyvtár HTTP lekérdezésekhez
import time


def download_site(url, session):
    with session.get(url) as response:
        print(f"Read {len(response.content)} from {url}")


def download_all_sites(sites):
    with requests.Session() as session:               # egy session segítségével gyorsabban tudunk több lekérdezést elvégezni 
        for url in sites:
            download_site(url, session)


sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in {duration} seconds")

Read 10782 from https://www.jython.org
Read 274 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 274 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 274 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 274 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 274 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 274 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 274 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 274 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 274 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 274 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 274 from http://olympus.realpython.org/dice
Read 10782 from https://www.jyth

Ez a program így fut:

<img src="https://files.realpython.com/media/IOBound.4810a888b457.png" width='700px'>

Előnyök:
 - könnyű programozni
 - könnyű átgondolni
 
Hátrányok:
 - lassú



## threading javítás

A `threading` modult használjuk. 

In [9]:
import concurrent.futures
import requests
import threading
import time


thread_local = threading.local()  # ezzel a biztosítjuk, hogy a program 'thread safe'. Minden threadnek kell egy saját session


def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session


def download_site(url):
    session = get_session()
    with session.get(url) as response:
        print(f"Read {len(response.content)} from {url}")


def download_all_sites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # az executor irányítja helyettünk, hogy a threadek hogyan dolgozzanak
        executor.map(download_site, sites)   # szétosztjuk a munkát


sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in {duration} seconds")

Read 10782 from https://www.jython.orgRead 10782 from https://www.jython.org
Read 10782 from https://www.jython.org

Read 10782 from https://www.jython.org
Read 10782 from https://www.jython.org
Read 274 from http://olympus.realpython.org/diceRead 274 from http://olympus.realpython.org/dice

Read 274 from http://olympus.realpython.org/dice
Read 274 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 274 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.orgRead 274 from http://olympus.realpython.org/dice

Read 10782 from https://www.jython.org
Read 10782 from https://www.jython.org
Read 10782 from https://www.jython.org
Read 10782 from https://www.jython.org
Read 274 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 274 from http://olympus.realpython.org/dice
Read 274 from http://olympus.realpython.org/dice
Read 274 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Rea

Ez a program így fut:

<img src="https://files.realpython.com/media/Threading.3eef48da829e.png" width='700px'>


A threadek számát nem triviális jól megválasztani, kisérletezni kell. 

Előnyök:
 - gyorsabb
 
Hátrányok:
 - nehezebb átgondolni
 - fura hibák előjöhetnek, nehéz őket kezelni


## Race conditions


In [6]:
import concurrent.futures


counter = 0


def increment_counter(fake_value):
    global counter
    for _ in range(100):
        counter += 1


fake_data = [x for x in range(5000)]
counter = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=5000) as executor:
    executor.map(increment_counter, fake_data)
counter

500000

Mivel a váltás bármikor történhet, akár a `counter += 1` közben is, lehetséges, hogy egy thread kiolvassa a counter értékét és mielőtt a megnövelt értéket beírná egy másik thread megnöveli azt, így elveszik a növelés. 

Ennek kicsi az esélye de előfordul!

## `asyncio` megoldás

- Egy python objektum az ˙event loop˙ irányít mindent.
- Ismeri a feladatokat és hogy melyik milyen állapotban van. 
- Felületesen: két lista, egyikben futtatható feladatok, másikben várakozóak.
- Mindig futtatja a legrégebbi futtathatót, majd ellenőrzi, hogy a várakozóak közül lett-e valaki futtatható
- Fontos, hogy egy feladattól nem vehető el az irányítás, csak önként adhatja fel.

### `async` és `await`
 - `await` jelzi az irányítás feladását.
 - `async` jelzi, hogy a függvény használ await-et, tehát az eventloopnak figyelnie kell rá. 

In [7]:
# ez itt nem működik, mert összeveszik a jupyter event loopjával
import asyncio
import time
import aiohttp


async def download_site(session, url):                                     # ez egy fügvény ami időnkét visszaadja az irányítást, így kell az async
    async with session.get(url) as response:                            
        print("Read {0} from {1}".format(response.content_length, url))


async def download_all_sites(sites):                                       # ez egy fügvény ami időnkét visszaadja az irányítást, így kell az async  
    async with aiohttp.ClientSession() as session:                         #itt elég egy darab session, ezen sokat nyerünk!
        tasks = []
        for url in sites:                                                  # feladatok létrehozása
            task = asyncio.ensure_future(download_site(session, url))
            tasks.append(task)
        await asyncio.gather(*tasks, return_exceptions=True)               # megvárjuk még mind végetér


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    asyncio.get_event_loop().run_until_complete(download_all_sites(sites))   #futtatjuk az event loopot, még minden feladat véget nem ér
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} sites in {duration} seconds")


RuntimeError: This event loop is already running

Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 fr

In [8]:
!python concurrency-overview/io_asyncio.py 

Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 from https://www.jython.org
Read 3721 fr

Ez a program így fut:

<img src="https://files.realpython.com/media/Asyncio.31182d3731cf.png" width='600px'>



Előnyök:
 - még gyorsabb
 - jobban skálázódik
 
Hátrányok:
 - nehezebb átgondolni
 - csak akkor működik jól, ha a meghívott modulok is használják!
 - egy hibás rész blokkolhat mindent
 

## Példa:

discord api: https://discordpy.readthedocs.io/en/stable/api.html




## `multiprocessing` megoldás

Nagyjából azt csinálja, hogy a processzorokon külön python interpretert indít. 

Ez a megoldás több processzort használ. Na de hány is van a gépben?



In [10]:
import multiprocessing

multiprocessing.cpu_count()

4

In [None]:
# nem fut le a jupyterben!!
import requests
import multiprocessing
import time

session = None                                                              # mivel külön memória területtel dolgoznak, ez egyedi lesz mindnél

 
def set_global_session():
    global session
    if not session:
        session = requests.Session()


def download_site(url):
    with session.get(url) as response:
        name = multiprocessing.current_process().name
        print(f"{name}:Read {len(response.content)} from {url}")


def download_all_sites(sites):
    with multiprocessing.Pool(initializer=set_global_session) as pool:      # alapvetően annyit csinál, ahány processzorod van, kezdéskor meghívja az initializert
        pool.map(download_site, sites)                                      # lényegében külön python interpreterek indulnak el


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} in {duration} seconds")

In [1]:
!python concurrency-overview/io_mp.py

SpawnPoolWorker-1:Read 10782 from https://www.jython.org
SpawnPoolWorker-1:Read 274 from http://olympus.realpython.org/dice
SpawnPoolWorker-1:Read 10782 from https://www.jython.org
SpawnPoolWorker-1:Read 274 from http://olympus.realpython.org/dice
SpawnPoolWorker-1:Read 10782 from https://www.jython.org
SpawnPoolWorker-1:Read 274 from http://olympus.realpython.org/dice
SpawnPoolWorker-1:Read 10782 from https://www.jython.org
SpawnPoolWorker-1:Read 274 from http://olympus.realpython.org/dice
SpawnPoolWorker-1:Read 10782 from https://www.jython.org
SpawnPoolWorker-1:Read 274 from http://olympus.realpython.org/dice
SpawnPoolWorker-1:Read 10782 from https://www.jython.org
SpawnPoolWorker-1:Read 274 from http://olympus.realpython.org/dice
SpawnPoolWorker-1:Read 10782 from https://www.jython.org
SpawnPoolWorker-1:Read 274 from http://olympus.realpython.org/dice
SpawnPoolWorker-1:Read 10782 from https://www.jython.org
SpawnPoolWorker-1:Read 274 from http://olympus.realpython.org/dice
SpawnPoo

Ez a program így fut:

<img src="https://files.realpython.com/media/MProc.7cf3be371bbc.png" width='700px'>


Előnyök:
 - gyorsabb, de nem nagyon
 
Hátrányok:
 - nehezebb átgondolni
 

# CPU bound példa

In [2]:
import time


def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    for number in numbers:
        cpu_bound(number)


if __name__ == "__main__":
    numbers = [5000000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")

Duration 34.91469073295593 seconds


Ez a program így fut:

<img src="https://files.realpython.com/media/CPUBound.d2d32cb2626c.png" width='700px'>

A `threading` and `asyncio` megoldások csak lassítanának rajta.


## Multiprocessing megoldás

In [None]:
# nem fut jupyterben
import multiprocessing
import time


def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    with multiprocessing.Pool() as pool:
        pool.map(cpu_bound, numbers)


if __name__ == "__main__":
    numbers = [5000000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")

In [3]:
!python concurrency-overview/cpu_mp.py

Duration 8.216140270233154 seconds


Ez a program így fut:

<img src="https://files.realpython.com/media/CPUMP.69c1a7fad9c4.png" width='700px'>


Előnyök:
 - Itt tényleg sokkal gyorsabb

Hátrányok
 - Bonyolultabb szituációkban nehéz használni

## Mikor használjuk bármelyiket is?
 - Donald Knuth: “Premature optimization is the root of all evil (or at least most of it) in programming.”
 - Találd ki hogy milyen tipusú feladattal állsz szemben
 - “Use asyncio when you can, threading when you must.” 

## A nagy kérdés

Miért nem ötvözzük a kettőt? pl Threading több processzoron? Meg amúgy is, mért nem gyorsabb ez az egész?

A problémák hátterében a GIL van, a cpython egy fontos komponense. ( Global Interpreter Lock)

A python referencia számolással oldja meg a memória felszabadítását. Ha valamire már nem hivatkozunk, törli. Emiatt fontos, hogy egyszerre csak egy thread tudjon referencia értékeken változtatni. Ezt úgy érjük, hogy van egy lock, és egy thread csak akkor dolgozhat, ha ő birtokolja. Cserébe ez lelassítja a dolgokat. 

In [4]:
import sys
a = []
b = a
sys.getrefcount(a)


3

# Egy kis matek

Mennyit lehet nyerni a párhuzamosításon?

Olvasnivaló: Lovász László: Algoritmusok Bonyolultsága 8. fejezet
