# Python Fortgeschritten: Threads und Multiprocessing
## Tag 4 - Notebook 22
***
In diesem Notebook wird behandelt:
- threading
- multiprocessing
- GIL (Global Interpreter Lock)
- Parallelisierung
***


## 1 Threading und Multiprocessing

Python bietet zwei Hauptansätze für Parallelisierung: **Threading** (Threads) und **Multiprocessing** (Prozesse). Beide ermöglichen die gleichzeitige Ausführung von Code, unterscheiden sich aber fundamental in ihrer Funktionsweise.

### Was ist Threading?

**Threads** sind leichte Ausführungseinheiten innerhalb eines Prozesses. Sie teilen sich den gleichen Speicherraum und können auf gemeinsame Variablen zugreifen. In Python werden Threads durch das `threading`-Modul verwaltet.

### Was ist Multiprocessing?

**Prozesse** sind separate Ausführungseinheiten mit eigenem Speicherraum. Jeder Prozess hat seinen eigenen Python-Interpreter und kann unabhängig von anderen Prozessen laufen. In Python werden Prozesse durch das `multiprocessing`-Modul verwaltet.

### Der Global Interpreter Lock (GIL)

Der **GIL** ist ein Mechanismus in CPython, der verhindert, dass mehrere Threads gleichzeitig Python-Code ausführen. Dies bedeutet:

- **Threading**: Threads können nicht wirklich parallel Python-Code ausführen - sie wechseln sich ab
- **Multiprocessing**: Prozesse umgehen den GIL, da jeder Prozess seinen eigenen Interpreter hat
- **I/O-Operationen**: Threads können bei I/O-Operationen (Dateien, Netzwerk) parallel arbeiten, da der GIL während I/O freigegeben wird

### Wann und warum verwenden?

**Threading** verwenden, wenn:
- **I/O-bound Tasks**: Viele I/O-Operationen parallel ausgeführt werden sollen (Dateioperationen, Netzwerk-Requests, Datenbankzugriffe). Threads können bei I/O-Operationen parallel arbeiten, da der GIL während I/O freigegeben wird.
- **GUI-Anwendungen**: Eine GUI responsive bleiben soll während langer Operationen
- **Concurrent I/O**: Viele gleichzeitige I/O-Operationen benötigt werden

**Multiprocessing** verwenden, wenn:
- **CPU-bound Tasks**: CPU-intensive Berechnungen parallelisiert werden sollen. Prozesse umgehen den GIL, da jeder Prozess seinen eigenen Interpreter hat.
- **Echte Parallelisierung**: Echte Parallelisierung benötigt wird (GIL umgehen)
- **Isolation**: Prozesse isoliert voneinander laufen sollen

### Vorteile

**Threading**:
- **Leichtgewichtig**: Threads sind leichter als Prozesse
- **Gemeinsamer Speicher**: Einfacher Datenaustausch zwischen Threads
- **Schneller Start**: Threads starten schneller als Prozesse

**Multiprocessing**:
- **Echte Parallelisierung**: Umgeht den GIL für CPU-bound Tasks
- **Isolation**: Prozesse sind isoliert - Fehler in einem Prozess betreffen andere nicht
- **Skalierbarkeit**: Kann mehrere CPU-Kerne nutzen

### Einschränkungen

**Threading**:
- **GIL**: Kann nicht wirklich parallel Python-Code ausführen
- **Race Conditions**: Gemeinsamer Speicher kann zu Race Conditions führen
- **Debugging**: Threading-Bugs sind schwer zu finden

**Multiprocessing**:
- **Overhead**: Prozesse haben mehr Overhead als Threads
- **Kommunikation**: Datenaustausch zwischen Prozessen ist komplexer
- **Speicher**: Jeder Prozess hat seinen eigenen Speicher


In [None]:
import threading
import time

def worker(name):
    print(f"Worker {name} startet")
    time.sleep(1)
    print(f"Worker {name} beendet")

thread1 = threading.Thread(target=worker, args=("A",))
thread2 = threading.Thread(target=worker, args=("B",))
thread1.start()
thread2.start()
thread1.join()
thread2.join()


## 2 Threading im Detail

Threads ermöglichen die gleichzeitige Ausführung von Code innerhalb eines Prozesses. Sie teilen sich den Speicher und können auf gemeinsame Variablen zugreifen.

## 3 Multiprocessing im Detail

Multiprocessing nutzt separate Prozesse für echte Parallelisierung. Jeder Prozess hat seinen eigenen Speicher und Python-Interpreter.


In [None]:
from multiprocessing import Process

def worker(name):
    print(f"Process {name}")

if __name__ == '__main__':
    p1 = Process(target=worker, args=("A",))
    p2 = Process(target=worker, args=("B",))
    p1.start()
    p2.start()
    p1.join()
    p2.join()


## 4 Best Practices

### Best Practices

1. **Threading für I/O, Multiprocessing für CPU**: 
   - Threading für I/O-bound Tasks (Dateien, Netzwerk)
   - Multiprocessing für CPU-bound Tasks (Berechnungen)

2. **Synchronisation bei Threading**: 
   - Verwenden Sie `threading.Lock()` für kritische Abschnitte
   - Verwenden Sie `threading.Queue` für Thread-sichere Kommunikation
   - `concurrent.futures.ThreadPoolExecutor` für einfache Thread-Verwaltung

3. **Kommunikation bei Multiprocessing**: 
   - Verwenden Sie `multiprocessing.Pool` für einfache Parallelisierung
   - Verwenden Sie `multiprocessing.Queue` für Prozess-Kommunikation
   - **Wichtig**: `if __name__ == '__main__'` für Windows (verhindert rekursive Prozess-Erstellung)
   - Nur pickle-kompatible Objekte können zwischen Prozessen geteilt werden

4. **Ressourcen-Management**: 
   - Verwenden Sie `join()` um auf Threads/Prozesse zu warten
   - Stellen Sie sicher, dass Threads/Prozesse ordnungsgemäß beendet werden

5. **Fehlerbehandlung**: 
   - Fangen Sie Exceptions in Threads/Prozessen ab
   - Verwenden Sie Timeouts für hängende Operationen

6. **Performance-Messung**: 
   - Messen Sie die Performance vor und nach Parallelisierung
   - Nicht alle Aufgaben profitieren von Parallelisierung

### Wann NICHT verwenden?

**Threading nicht verwenden für**:
- CPU-intensive Tasks (GIL verhindert echte Parallelisierung)
- Wenn Synchronisation zu komplex wird

**Multiprocessing nicht verwenden für**:
- I/O-bound Tasks (Overhead ist zu groß)
- Bei sehr kleinen Aufgaben (Overhead > Nutzen)

### Häufige Fehler

- **GIL-Missverständnis**: Threading für CPU-bound Tasks verwenden (funktioniert nicht wegen GIL)
- **Race Conditions**: Gemeinsamen Speicher ohne Synchronisation verwenden
- **Vergessene `join()`**: Nicht auf Threads/Prozesse warten
- **Fehlende `if __name__ == '__main__'`**: Auf Windows führt dies zu Problemen


#### 5.1 Aufgaben:

#### Aufgabe (a) - Threading: Parallele Textverarbeitung (I/O-bound)

> Lese und verarbeite die 4 Textdateien in `../data/` parallel mit Threading. <br>
> Dateien: `text_file_1.txt`, `text_file_2.txt`, `text_file_3.txt`, `text_file_4.txt` <br>
> Erstelle eine Funktion `process_file(filename)`, die eine Datei liest und Wörter zählt. <br>
> Verwende Threading, um alle 4 Dateien gleichzeitig zu verarbeiten. <br>
> Messe und vergleiche die Zeit: sequentiell vs. mit Threads.



In [None]:
# Deine Lösung:

import threading
import time

# Dateien im data/ Ordner
files = [
    "../data/text_file_1.txt",
    "../data/text_file_2.txt",
    "../data/text_file_3.txt",
    "../data/text_file_4.txt"
]

def process_file(filename):
    # TODO: Öffne Datei und zähle Wörter
    # TODO: Gib Dateiname und Wortanzahl aus
    pass

# TODO: Messe Zeit für sequentielle Verarbeitung

# TODO: Messe Zeit für parallele Verarbeitung mit Threads


#### Aufgabe (b) - Multiprocessing: Parallel Calculations (CPU-bound)

> Berechne Primzahlen bis zu einer gegebenen Zahl mit Multiprocessing. <br>
> Erstelle eine Funktion `count_primes(start, end)`, die Primzahlen in einem Bereich zählt. <br>
> Teile den Bereich in 4 Teile auf und verwende 4 Prozesse parallel. <br>
> Messe und vergleiche die Zeit: sequentiell vs. parallel. <br>
> Verwende `if __name__ == '__main__'` Guard korrekt.

In [None]:
# Deine Lösung:

from multiprocessing import Process
import time

def is_prime(n):
    # TODO: Prüfe ob n eine Primzahl ist
    pass

def count_primes(start, end):
    # TODO: Zähle Primzahlen im Bereich [start, end]
    pass

# TODO: Implementiere sequentielle Version und messe Zeit

# TODO: Implementiere parallele Version mit 4 Prozessen
# Wichtig: if __name__ == '__main__':


#### Aufgabe (c) - Thread Synchronization: Shared Counter

> Demonstriere Race Conditions und deren Lösung mit Locks. <br>
> Erstelle einen geteilten Counter, den mehrere Threads inkrementieren. <br>
> Zuerst: Zeige das Problem ohne Lock (Race Condition). <br>
> Dann: Löse das Problem mit `threading.Lock()`. <br>
> Vergleiche die Endergebnisse des Counters (mit vs. ohne Lock).

In [None]:
# Deine Lösung:

import threading

# TODO: Erstelle globalen Counter
# TODO: Erstelle Lock

def increment_without_lock(counter, iterations):
    # TODO: Inkrementiere Counter ohne Lock
    pass

def increment_with_lock(counter, lock, iterations):
    # TODO: Inkrementiere Counter mit Lock
    pass

# TODO: Teste ohne Lock (Race Condition)
# TODO: Teste mit Lock (korrekt)


#### Aufgabe (d) - ThreadPoolExecutor: Web Scraper Simulation

> Simuliere das gleichzeitige Abrufen von mehreren URLs mit `ThreadPoolExecutor`. <br>
> Erstelle eine Funktion `fetch_url(url)`, die das Abrufen simuliert (mit `time.sleep()`). <br>
> Verwende `ThreadPoolExecutor`, um 10 URLs gleichzeitig abzurufen. <br>
> Sammle die Ergebnisse und zeige sie in der richtigen Reihenfolge an. <br>
> Behandle potentielle Exceptions (z.B. Timeouts) graceful.

In [None]:
# Deine Lösung:

from concurrent.futures import ThreadPoolExecutor
import time

def fetch_url(url):
    # TODO: Simuliere URL-Abruf mit time.sleep()
    # TODO: Behandle potentielle Exceptions
    pass

# TODO: Erstelle Liste mit 10 URLs
# TODO: Verwende ThreadPoolExecutor um alle URLs gleichzeitig abzurufen
# TODO: Sammle und zeige Ergebnisse


#### Lösung:


In [None]:
# ====================================================================
# Musterlösung (a) - Threading: Parallele Textverarbeitung (I/O-bound)
# ====================================================================

import threading
import time

# Dateien im data/ Ordner
files = [
    "../data/text_file_1.txt",
    "../data/text_file_2.txt",
    "../data/text_file_3.txt",
    "../data/text_file_4.txt"
]

results = {}

def process_file(filename):
    """Liest eine Datei und zählt die Wörter"""
    print(f"Verarbeite: {filename}")
    with open(filename, "r", encoding="utf-8") as f:
        content = f.read()
        word_count = len(content.split())
    results[filename] = word_count
    print(f"Fertig: {filename} - {word_count} Wörter")
    return word_count

# Sequentielle Verarbeitung
print("=== Sequentielle Verarbeitung ===")
results.clear()
start = time.time()

for filename in files:
    process_file(filename)

sequential_time = time.time() - start
print(f"Sequentielle Zeit: {sequential_time:.4f} Sekunden\n")

# Parallele Verarbeitung mit Threads
print("=== Parallele Verarbeitung mit Threads ===")
results.clear()
start = time.time()

threads = []
for filename in files:
    thread = threading.Thread(target=process_file, args=(filename,))
    threads.append(thread)
    thread.start()

# Warte auf alle Threads
for thread in threads:
    thread.join()

threaded_time = time.time() - start
print(f"Threading Zeit: {threaded_time:.4f} Sekunden")
print(f"Speedup: {sequential_time/threaded_time:.2f}x")

# Ergebnisse anzeigen
print(f"\nGesamte Wörter: {sum(results.values())}\n")

# ====================================================================
# Musterlösung (b) - Multiprocessing: Parallel Calculations (CPU-bound)
# ====================================================================

from multiprocessing import Process, Queue
import time

def is_prime(n):
    """Prüft ob eine Zahl prim ist"""
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    for i in range(3, int(n**0.5) + 1, 2):
        if n % i == 0:
            return False
    return True

def count_primes(start, end, queue=None):
    """Zählt Primzahlen in einem Bereich"""
    count = sum(1 for n in range(start, end) if is_prime(n))
    if queue:
        queue.put(count)
    return count

if __name__ == '__main__':
    # Sequentielle Berechnung
    print("=== Sequentielle Primzahl-Berechnung ===")
    start = time.time()
    
    total = count_primes(1, 100000)
    
    sequential_time = time.time() - start
    print(f"Primzahlen gefunden: {total}")
    print(f"Sequentielle Zeit: {sequential_time:.2f} Sekunden\n")
    
    # Parallele Berechnung mit 4 Prozessen
    print("=== Parallele Berechnung mit 4 Prozessen ===")
    start = time.time()
    
    # Teile den Bereich in 4 Teile
    ranges = [(1, 25000), (25000, 50000), (50000, 75000), (75000, 100000)]
    
    queue = Queue()
    processes = []
    
    for start_range, end_range in ranges:
        p = Process(target=count_primes, args=(start_range, end_range, queue))
        processes.append(p)
        p.start()
    
    # Warte auf alle Prozesse
    for p in processes:
        p.join()
    
    # Sammle Ergebnisse
    total_parallel = sum(queue.get() for _ in processes)
    
    parallel_time = time.time() - start
    print(f"Primzahlen gefunden: {total_parallel}")
    print(f"Parallele Zeit: {parallel_time:.2f} Sekunden")
    print(f"Speedup: {sequential_time/parallel_time:.2f}x\n")

# ====================================================================
# Musterlösung (c) - Thread Synchronization: Shared Counter
# ====================================================================

import threading

# Test 1: Ohne Lock (Race Condition)
print("=== Test 1: Ohne Lock (Race Condition) ===")
counter_no_lock = 0

def increment_without_lock(iterations):
    global counter_no_lock
    for _ in range(iterations):
        # Race Condition: Mehrere Threads lesen/schreiben gleichzeitig
        counter_no_lock += 1

threads = []
iterations = 100000

for _ in range(5):
    thread = threading.Thread(target=increment_without_lock, args=(iterations,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"Erwartet: {5 * iterations}")
print(f"Tatsächlich: {counter_no_lock}")
print(f"Differenz: {5 * iterations - counter_no_lock} (Race Condition!)\n")

# Test 2: Mit Lock (korrekt)
print("=== Test 2: Mit Lock (Thread-safe) ===")
counter_with_lock = 0
lock = threading.Lock()

def increment_with_lock(iterations):
    global counter_with_lock
    for _ in range(iterations):
        with lock:  # Lock schützt kritischen Abschnitt
            counter_with_lock += 1

threads = []

for _ in range(5):
    thread = threading.Thread(target=increment_with_lock, args=(iterations,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"Erwartet: {5 * iterations}")
print(f"Tatsächlich: {counter_with_lock}")
print(f"Differenz: {5 * iterations - counter_with_lock} (korrekt!)\n")

# ====================================================================
# Musterlösung (d) - ThreadPoolExecutor: Web Scraper Simulation
# ====================================================================

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random

def fetch_url(url):
    """Simuliert das Abrufen einer URL"""
    try:
        # Simuliere variable Ladezeiten
        duration = random.uniform(0.5, 2.0)
        time.sleep(duration)
        
        # Simuliere gelegentliche Fehler
        if random.random() < 0.1:  # 10% Fehlerrate
            raise Exception(f"Timeout beim Abrufen von {url}")
        
        return f"Inhalt von {url} (Dauer: {duration:.2f}s)"
    
    except Exception as e:
        return f"FEHLER bei {url}: {str(e)}"

# URLs zum Abrufen
urls = [
    "http://example.com/page1",
    "http://example.com/page2",
    "http://example.com/page3",
    "http://example.com/page4",
    "http://example.com/page5",
    "http://example.com/page6",
    "http://example.com/page7",
    "http://example.com/page8",
    "http://example.com/page9",
    "http://example.com/page10",
]

print("=== Web Scraper mit ThreadPoolExecutor ===\n")
start = time.time()

# Verwende ThreadPoolExecutor mit max 5 gleichzeitigen Threads
with ThreadPoolExecutor(max_workers=5) as executor:
    # Starte alle Tasks
    future_to_url = {executor.submit(fetch_url, url): url for url in urls}
    
    # Sammle Ergebnisse in der Reihenfolge der Fertigstellung
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            result = future.result()
            print(f"[OK] {url}")
            print(f"     {result}")
        except Exception as e:
            print(f"[FEHLER] {url}: {e}")

total_time = time.time() - start
print(f"\nGesamtzeit: {total_time:.2f} Sekunden")
print(f"Durchschnitt: {total_time/len(urls):.2f}s pro URL")
