# Współbieżność przykład

### NORMALNIE
Poniższy kod demonstruje synchroniczne wykonywanie operacji I/O w Pythonie. Funkcja `fetch_data(id)` blokuje działanie programu na 5 sekund dla każdej operacji, przez co każde wywołanie odbywa się sekwencyjnie. Całość zajmuje około 15 sekund, ponieważ kolejne zadania nie mogą rozpocząć się przed zakończeniem poprzednich. Znaczniki czasowe pokazują, że każda operacja zaczyna się dopiero po zakończeniu wcześniejszej.

In [13]:
import time

def fetch_data(id):
    print(f"({time.time()}) Starting fetch {id}")
    time.sleep(5)  # Symulacja operacji I/O
    print(f"({time.time()}) Finished fetch {id}")
    return id * 2

def main():
    results = [fetch_data(1), fetch_data(2), fetch_data(3)]
    print(f"({time.time()}) Results: {results}")

if __name__ == "__main__":
    main()

(1743360374.7231412) Starting fetch 1
(1743360379.728502) Finished fetch 1
(1743360379.72864) Starting fetch 2
(1743360384.7337458) Finished fetch 2
(1743360384.7338939) Starting fetch 3
(1743360389.739077) Finished fetch 3
(1743360389.7393138) Results: [2, 4, 6]


### WSPÓŁBIEŻNOŚĆ

Inaczej odbywa się to dla poniższego kodu, który demonstruje asynchroniczne programowanie w Pythonie przy użyciu `asyncio`. Funkcja `fetch_data(id)` symuluje operację I/O, czekając 5 sekund i zwracając wynik, a `asyncio.gather()` uruchamia ją równolegle dla trzech identyfikatorów. Dzięki temu całość trwa około 5 sekund zamiast 15 jak w podejściu synchronicznym. Znaczniki czasowe pokazują, że wszystkie operacje zaczynają się niemal jednocześnie i kończą po tym samym czasie.

In [14]:
import asyncio
import time

async def fetch_data(id):
    print(f"({time.time()}) Starting fetch {id}")
    await asyncio.sleep(5)
    print(f"({time.time()}) Finished fetch {id}")
    return id * 2

async def main():
    results = await asyncio.gather(fetch_data(1), fetch_data(2), fetch_data(3))
    print(f"({time.time()}) Results: {results}")

#asyncio.run(main())
await main() # dzialajaca wersja dla ipynb notebook

(1743360389.782794) Starting fetch 1
(1743360389.7830431) Starting fetch 2
(1743360389.783067) Starting fetch 3
(1743360394.78451) Finished fetch 1
(1743360394.7846751) Finished fetch 2
(1743360394.784702) Finished fetch 3
(1743360394.784844) Results: [2, 4, 6]


# Problemy współbieżności

## Race Conditions (Warunki wyścigu)

Warunek wyścigu występuje, gdy wiele wątków modyfikuje współdzielony zasób w niekontrolowany sposób, a wynik zależy od kolejności ich wykonania.

In [15]:
import threading

# Współdzielony zasób
counter = 0

def increment():
    global counter
    for _ in range(10000000):
        # Operacja nieatomowa: odczyt -> modyfikacja -> zapis
        temp = counter
        temp += 1
        counter = temp

# Tworzenie dwóch wątków
thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=increment)

# Uruchomienie wątków
thread1.start()
thread2.start()

# Czekanie na zakończenie
thread1.join()
thread2.join()

print(f"Oczekiwana wartość: 20000000, Rzeczywista wartość: {counter}")

Oczekiwana wartość: 20000000, Rzeczywista wartość: 13712271


### Wyjaśnienie:

Dwa wątki jednocześnie zwiększają zmienną counter o 100000 każda.
Bez synchronizacji operacja `temp = counter; temp += 1; counter = temp` nie jest atomowa – wątki mogą nadpisywać swoje zmiany.
Oczekujemy 200000, ale wynik będzie różny przy każdym uruchomieniu (np. 187432), bo wątki "ścigają się" przy zapisie.


    **Uruchom kod kilka razy – za każdym razem wynik będzie inny i zwykle mniejszy niż 200000**

### Rozwiązanie:
Dodaj `threading.Lock()` :

In [None]:
import threading

# Współdzielony zasób
counter = 0

lock = threading.Lock()
def increment():
    global counter
    for _ in range(10000000):
        with lock:
            temp = counter
            temp += 1
            counter = temp

# Tworzenie dwóch wątków
thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=increment)

# Uruchomienie wątków
thread1.start()
thread2.start()

# Czekanie na zakończenie
thread1.join()
thread2.join()

print(f"Oczekiwana wartość: 20000000, Rzeczywista wartość: {counter}")

## Deadlocks (Zakleszczenie)

Zakleszczenie występuje, gdy dwa (lub więcej) wątki blokują się nawzajem, czekając na zasoby, które drugi wątek trzyma.

In [2]:
import threading
import time

# Dwa zasoby (blokady)
lock1 = threading.Lock()
lock2 = threading.Lock()

def thread1_task():
    print("\nWątek 1: Próbuję zająć lock1")
    with lock1:
        time.sleep(1)  # Symulacja pracy
        print("\nWątek 1: Mam lock1, próbuję zająć lock2")
        with lock2:
            print("\nWątek 1: Mam oba locki!")

def thread2_task():
    print("\nWątek 2: Próbuję zająć lock2")
    with lock2:
        time.sleep(1)  # Symulacja pracy
        print("\nWątek 2: Mam lock2, próbuję zająć lock1")
        with lock1:
            print("\nWątek 2: Mam oba locki!")

# Tworzenie wątków
thread1 = threading.Thread(target=thread1_task)
thread2 = threading.Thread(target=thread2_task)

# Uruchomienie
thread1.start()
thread2.start()

# Czekanie na zakończenie (nigdy się nie zakończy)
thread1.join()
thread2.join()

print("Koniec programu")  # Nie wyświetli się


Wątek 1: Próbuję zająć lock1

Wątek 2: Próbuję zająć lock2

Wątek 1: Mam lock1, próbuję zająć lock2
Wątek 2: Mam lock2, próbuję zająć lock1



KeyboardInterrupt: 

### Wyjaśnienie:

Wątek 1 zajmuje `lock1` i czeka na `lock2`.
Wątek 2 zajmuje `lock2` i czeka na `lock1`.
Oba wątki czekają na siebie nawzajem – program się zawiesza (deadlock).

### Jak uwidocznić:

Uruchom kod – zobaczysz komunikaty o zajęciu pierwszego locka przez każdy wątek, ale program się zatrzyma i nie dojdzie do końca.

### Rozwiązanie:
Unikaj blokad krzyżowych, np. ustal kolejność zajmowania locków (oba wątki najpierw `lock1`, potem `lock2`).

# Starvation (Zagłodzenie)

Zagłodzenie występuje, gdy jeden wątek nie dostaje dostępu do zasobu, bo inne wątki mają wyższy priorytet lub ciągle go zajmują.

In [4]:
import threading
import time

# Współdzielony zasób z blokadą
lock = threading.Lock()
resource_busy = False

def greedy_thread(name):
    global resource_busy
    while True:
        with lock:
            if not resource_busy:
                resource_busy = True
                print(f"{name}: Zajmuję zasób")
                time.sleep(0.5)  # Trzyma zasób przez chwilę
                resource_busy = False
            else:
                time.sleep(0.01)  # Czeka, ale szybko próbuje znowu

def starving_thread():
    while True:
        with lock:
            if not resource_busy:
                print("Wątek zagłodzony: W końcu mam zasób!")
                resource_busy = True
                time.sleep(0.5)
                resource_busy = False
                break
            else:
                print("Wątek zagłodzony: Czekam...")
                time.sleep(1)  # Dłużej czeka, rzadziej próbuje

# Tworzenie wątków
greedy1 = threading.Thread(target=greedy_thread, args=("Greedy-1",))
greedy2 = threading.Thread(target=greedy_thread, args=("Greedy-2",))
starving = threading.Thread(target=starving_thread)

# Uruchomienie
greedy1.start()
greedy2.start()
starving.start()

# Czekanie na wątek zagłodzony
starving.join()

Greedy-1: Zajmuję zasób
Greedy-1: Zajmuję zasób
Greedy-1: Zajmuję zasób
Greedy-1: Zajmuję zasób
Greedy-2: Zajmuję zasób
Greedy-2: Zajmuję zasób


Exception in thread Thread-14:
Traceback (most recent call last):
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py", line 973, in _bootstrap_inner
    self.run()
  File "/Users/bilus/PycharmProjects/testProject/.venv/lib/python3.9/site-packages/ipykernel/ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
  File "/var/folders/n6/y63rmh397vd1r_5hbj7h14jm0000gn/T/ipykernel_64131/1142799397.py", line 23, in starving_thread
UnboundLocalError: local variable 'resource_busy' referenced before assignment


### Wyjaśnienie:

Dwa "zachłanne" wątki (greedy_thread) szybko zajmują zasób, gdy tylko jest wolny.
Wątek "zagłodzony" (starving_thread) próbuje dostać się do zasobu, ale rzadziej sprawdza jego dostępność i zwykle przegrywa z szybszymi wątkami.
Wątek zagłodzony może czekać bardzo długo (lub nigdy nie dostać zasobu w ekstremalnych przypadkach).

### Jak uwidocznić:

Uruchom kod – zobaczysz, że `Greedy-1` i `Greedy-2` ciągle zajmują zasób, a "Wątek zagłodzony" wyświetla "Czekam..." przez dłuższy czas, zanim uda mu się złapać zasób.
Rozwiązanie: Wprowadź priorytety lub mechanizmy sprawiedliwego dostępu (np. kolejkę).

# Rozwiązania problemów

## Mutexy i semafory

Mutex (mutual exclusion) i semafory to mechanizmy synchronizacji używane do kontrolowania dostępu do współdzielonych zasobów w programowaniu wielowątkowym. Mutex pozwala tylko jednemu wątkowi na dostęp do zasobu w danym momencie, podczas gdy semafor może zezwalać na dostęp określonej liczbie wątków.

Poniższy kod pokazuje, jak mutex (zaimplementowany jako `threading.Lock`) i semafor (`threading.Semaphore`) mogą być użyte do zarządzania dostępem do zasobu współdzielonego przez wiele wątków.#

In [12]:
import threading
import time

# Współdzielony zasób
counter = 0

# Mutex (Lock) i Semaphore
mutex = threading.Lock()
semaphore = threading.Semaphore(2)  # Pozwala na 2 wątki jednocześnie

def task_with_mutex(id):
    global counter
    with mutex:  # Tylko jeden wątek może wejść
        print(f"Wątek {id} (mutex): Zaczynam, counter = {counter}")
        counter += 1
        time.sleep(1)  # Symulacja pracy
        print(f"Wątek {id} (mutex): Kończę, counter = {counter}")

def task_with_semaphore(id):
    global counter
    with semaphore:  # Maksymalnie 2 wątki mogą wejść
        print(f"Wątek {id} (semaphore): Zaczynam, counter = {counter}")
        counter += 1
        time.sleep(1)  # Symulacja pracy
        print(f"Wątek {id} (semaphore): Kończę, counter = {counter}")

# Tworzenie wątków
threads_mutex = [threading.Thread(target=task_with_mutex, args=(i,)) for i in range(4)]
threads_semaphore = [threading.Thread(target=task_with_semaphore, args=(i,)) for i in range(4)]

# Uruchomienie wątków z mutexem
print("Mutex:")
for t in threads_mutex:
    t.start()
for t in threads_mutex:
    t.join()

# Reset counter
counter = 0

# Uruchomienie wątków z semaforem
print("\nSemaphore (max 2 wątki):")
for t in threads_semaphore:
    t.start()
for t in threads_semaphore:
    t.join()

print(f"\nKońcowa wartość counter: {counter}")

Mutex:
Wątek 0 (mutex): Zaczynam, counter = 0
Wątek 0 (mutex): Kończę, counter = 1
Wątek 1 (mutex): Zaczynam, counter = 1
Wątek 1 (mutex): Kończę, counter = 28
Wątek 2 (mutex): Zaczynam, counter = 28
Wątek 2 (mutex): Kończę, counter = 12683
Wątek 3 (mutex): Zaczynam, counter = 12683
Wątek 3 (mutex): Kończę, counter = 12692

Semaphore (max 2 wątki):
Wątek 0 (semaphore): Zaczynam, counter = 0
Wątek 1 (semaphore): Zaczynam, counter = 1
Wątek 0 (semaphore): Kończę, counter = 11Wątek 1 (semaphore): Kończę, counter = 11
Wątek 2 (semaphore): Zaczynam, counter = 11

Wątek 3 (semaphore): Zaczynam, counter = 12
Wątek 2 (semaphore): Kończę, counter = 23Wątek 3 (semaphore): Kończę, counter = 23


Końcowa wartość counter: 23


### Wyjaśnienie:

- **Mutex**: W przykładzie z `mutex` tylko jeden wątek może modyfikować `counter` w danym momencie. Wątki wchodzą sekwencyjnie, co zapobiega race condition, ale spowalnia wykonanie (całość trwa ~4 sekundy dla 4 wątków).
- **Semaphore**: Z `semaphore` ustawionym na 2, maksymalnie dwa wątki mogą działać jednocześnie. Dzięki temu wykonanie jest szybsze (~2 sekundy), ale nadal kontrolowane.

### Jak uwidocznić:
Uruchom kod i zwróć uwagę na czas rozpoczęcia i zakończenia każdej operacji. Bez `mutex` lub `semaphore` wynik `counter` byłby nieprzewidywalny z powodu race condition.

### Rozwiązanie:
Używaj `Lock` dla wyłącznego dostępu lub `Semaphore`, gdy chcesz ograniczyć liczbę wątków, ale pozwolić na pewną równoległość.

## Blokady i synchronizacja

Blokady (locks) są podstawowym narzędziem synchronizacji w wielowątkowości, ale ich niewłaściwe użycie może prowadzić do problemów, takich jak zakleszczenie (deadlock). Synchronizacja pozwala wątkom czekać na siebie lub na określone warunki.

Poniższy kod pokazuje użycie `threading.Condition` do synchronizacji wątków, gdzie jeden wątek czeka na sygnał od drugiego przed kontynuacją.

In [None]:
import threading
import time

# Współdzielony zasób
data_ready = False
condition = threading.Condition()

def producer():
    global data_ready
    print("Producent: Przygotowuję dane...")
    time.sleep(2)  # Symulacja pracy
    with condition:
        data_ready = True
        print("Producent: Dane gotowe, powiadamiam konsumenta")
        condition.notify()  # Powiadomienie konsumenta

def consumer():
    print("Konsument: Czekam na dane...")
    with condition:
        while not data_ready:
            condition.wait()  # Czekanie na powiadomienie
        print("Konsument: Dane otrzymane!")

# Tworzenie wątków
prod = threading.Thread(target=producer)
cons = threading.Thread(target=consumer)

# Uruchomienie
cons.start()
prod.start()

# Czekanie na zakończenie
prod.join()
cons.join()

### Wyjaśnienie:

- **Condition**: Obiekt `threading.Condition` pozwala konsumentowi czekać, aż producent przygotuje dane i wyśle sygnał (`notify`). To przykład synchronizacji opartej na warunkach.
- Bez `condition`, konsument mógłby próbować użyć danych przed ich przygotowaniem, co prowadziłoby do błędów.

### Jak uwidocznić:
Usuń `condition.wait()` i sprawdź, co się stanie – konsument może zakończyć się przed producentem, ignorując brak danych.

### Rozwiązanie:
Używaj `Condition` lub innych mechanizmów synchronizacji (np. `Event`), aby zapewnić, że wątki działają w odpowiedniej kolejności.

## Programowanie bezblokujące

Programowanie bezblokujące pozwala wątkom lub korutynom kontynuować pracę bez czekania na zakończenie operacji I/O. W Pythonie można to osiągnąć zarówno z `threading` (dla wątków), jak i `asyncio` (dla współbieżności jednowątkowej).

Poniższy kod pokazuje podejście bezblokujące z `threading` przy użyciu puli wątków (`ThreadPoolExecutor`) oraz z `asyncio` dla porównania.

In [None]:
import time
from concurrent.futures import ThreadPoolExecutor
import asyncio

# Funkcja symulująca operację I/O
def io_task(id):
    print(f"Wątek {id}: Start")
    time.sleep(2)  # Symulacja I/O
    print(f"Wątek {id}: Koniec")
    return id * 2

async def async_task(id):
    print(f"Korutyna {id}: Start")
    await asyncio.sleep(2)  # Symulacja I/O
    print(f"Korutyna {id}: Koniec")
    return id * 2

# ThreadPoolExecutor (bezblokujące z wątkami)
print("ThreadPoolExecutor:")
start_time = time.time()
with ThreadPoolExecutor(max_workers=3) as executor:
    results = executor.map(io_task, [1, 2, 3])
print(f"Wyniki: {list(results)}, Czas: {time.time() - start_time:.2f}s")

# Asyncio (bezblokujące z korutynami)
async def async_main():
    print("\nAsyncio:")
    start_time = time.time()
    results = await asyncio.gather(async_task(1), async_task(2), async_task(3))
    print(f"Wyniki: {results}, Czas: {time.time() - start_time:.2f}s")

# Uruchomienie asyncio w notebooku
await async_main()

### Wyjaśnienie:

- **ThreadPoolExecutor**: Używa puli wątków do równoległego wykonywania zadań I/O. Wątki nie blokują się nawzajem, a całość trwa ~2 sekundy (dla 3 wątków), zamiast 6 sekund sekwencyjnie.
- **Asyncio**: Korutyny działają współbieżnie w jednym wątku, przełączając się podczas `await`. Efekt jest podobny – całość trwa ~2 sekundy.

### Jak uwidocznić:
Zmniejsz `max_workers` do 1 w `ThreadPoolExecutor` lub uruchom zadania sekwencyjnie w `asyncio` (bez `gather`), aby zobaczyć różnicę w czasie (~6 sekund).

### Rozwiązanie:
Używaj `ThreadPoolExecutor` dla prostego programowania bezblokującego z wątkami lub `asyncio` dla bardziej skalowalnej współbieżności w jednym wątku.