# Asynchrone und Synchrone Programmierung

Asynchroner Code ermöglicht es einem Programm, Aufgaben zu unterbrechen, die lange dauern (z. B. das Warten auf eine Datenbank oder eine API), und währenddessen andere Arbeiten auszuführen. Dadurch wird die Zeit effizient genutzt.

**Synchronous (synchron) vs. Asynchronous (asynchron):**
- Synchron: Das Programm wartet, bis eine Aufgabe abgeschlossen ist, bevor es weitermacht.
    - Beispiel: Stehen in einer Warteschlange, bis du an der Reihe bist.

- Asynchron: Das Programm arbeitet weiter an anderen Aufgaben, während es auf eine langsame Aufgabe wartet.
    - Beispiel: Bestellen im Restaurant und während der Zubereitung mit jemandem reden.

**Warum ist Asynchronität für Web-APIs wichtig?**

In Web-Anwendungen gibt es viele gleichzeitige Benutzer, und der Server verbringt viel Zeit mit dem Warten auf:

- Eingaben von Benutzern.
- Antworten von Datenbanken.
- Rückmeldungen von APIs.

Durch die Verwendung von asynchronem Code kann ein Server:

- Während er auf eine Aufgabe wartet, andere Benutzer bedienen.
- Die Ressourcen effizient nutzen.

Die Grundidee der Asynchronen Programmierung in Python soll anhand eines beispiels erklärt werden. Dazu word eine Funktion `get_data()` erstellt. Sie muss mit dem Schlüsselwort `async` gekennzeichnet werden. Eine andere Funktion `fetch_data()` ruft die `get_data()` Funktion auf jedoch mit dem Schlüsselwort `await`. Dieses Schlüsselwort pausiert die Funktion bis die asynchrone Aufgabe abgeschlossen ist. Wichtig ist zu beachten das `await` nur bei Funktionen verwendet werden kann, welche mit `async´ gekennzeichnet sind!

In [12]:
async def get_data():
    return "Data retrieved"

async def fetch_data():
    data = await get_data()
    print(data)

await fetch_data()

Data retrieved


**Wie kann man sich asynchrone Aufgaben im Alltag vorstellen?**
<br>
<br>
Stell dir vor, du bist in einem Café und möchtest eine Tasse Kaffee. Der Prozess des Kaffeezubereitens besteht aus mehreren Aufgaben:
- Wasser kochen (dies dauert eine Weile).
- Kaffee mahlen (dies dauert auch eine Weile).
- Tasse und Zubehör vorbereiten (dies dauert nur kurz).

Nun, um diesen Prozess zu verstehen, vergleichen wir, wie dies sowohl in einer synchronen als auch in einer asynchronen Welt funktioniert.

**Synchrone Ausführung:**

Wenn du die Aufgaben synchron ausführen würdest, würde das so aussehen:

1. Schritt 1: Du stellst den Wasserkocher an, um Wasser zu kochen. Während das Wasser kocht, kannst du nichts anderes tun, du musst warten.
2. Schritt 2: Sobald das Wasser kocht, beginnst du, den Kaffee zu mahlen. Auch hier musst du warten, bis der Kaffee gemahlen ist.
3. Schritt 3: Nachdem du den Kaffee gemahlen hast, bereitest du die Tasse vor, um den Kaffee zu servieren.

In diesem Fall würde der gesamte Prozess nacheinander ablaufen. Du kannst erst den nächsten Schritt tun, wenn der vorherige Schritt abgeschlossen ist.

**Asynchrone Ausführung:**

Im Gegensatz dazu, wenn du diesen Prozess asynchron ausführen würdest, könntest du mehrere Aufgaben gleichzeitig erledigen:

1. Schritt 1: Du stellst den Wasserkocher an, um Wasser zu kochen.
2. Schritt 2: Während das Wasser kocht, beginnst du, den Kaffee zu mahlen. Diese beiden Aufgaben laufen parallel, ohne dass du warten musst.
3. Schritt 3: Während der Kaffee gemahlen wird, kannst du schon die Tasse und das Zubehör vorbereiten.

In diesem Fall erledigen sich alle Aufgaben gleichzeitig. Du nutzt die Zeit, während das Wasser kocht und der Kaffee gemahlen wird, um andere Dinge zu tun.

Dieses Beispiel könnte als Code in etwa so aussehen:

In [13]:
import asyncio

async def kochendes_wasser():
    print("Wasser kocht...")
    await asyncio.sleep(3) 
    print("Wasser ist gekocht!")

async def kaffee_mahlen():
    print("Kaffee wird gemahlen...")
    await asyncio.sleep(2) 
    print("Kaffee ist gemahlen!")

async def tasse_vorbereiten():
    print("Tasse wird vorbereitet...")
    await asyncio.sleep(1)
    print("Tasse ist vorbereitet!")

async def kaffee_zubereiten():
    await asyncio.gather(
        kochendes_wasser(),
        kaffee_mahlen(),
        tasse_vorbereiten()
    )

# Hauptfunktion ausführen
await kaffee_zubereiten()


Wasser kocht...
Kaffee wird gemahlen...
Tasse wird vorbereitet...
Tasse ist vorbereitet!
Kaffee ist gemahlen!
Wasser ist gekocht!


Schauen wir uns ein Beispiel an, wo wir mithilfe des `asyncio` Moduls asynchrone Aufgaben erzeugen:

In [None]:
import asyncio

async def get_data(task_name, delay):
    print(f"{task_name}: Start fetching data...")
    await asyncio.sleep(delay)  # Hier wird einfach nur eine Verzögerung simuliert
    print(f"{task_name}: Data retrieved")

async def main():
    # Es werden mehrere Aufgaben gleichzeitig gestartet (möglich durch die "gather" Funktion):
    await asyncio.gather(
        get_data("Task 1", 2),
        get_data("Task 2", 3),
        get_data("Task 3", 1), # Wird als erstes abgeschlossen weil sie nur 1 Sekunde dauert
    )

await main()


Task 1: Start fetching data...
Task 2: Start fetching data...
Task 3: Start fetching data...
Task 3: Data retrieved
Task 1: Data retrieved
Task 2: Data retrieved


**Warum ist asynchrone Programmierung wichtig für Schnittstellen?**

Die Thematik der asynchronen Programmierung ist besonders wichtig für FastAPI bzw. allgemein für das Thema APIs. FastAPI nutzt asynchrone Funktionen, um gleichzeitig mehrere Anfragen zu verarbeiten. Dies ist besonders nützlich, wenn du eine hohe Anzahl von gleichzeitigen Anfragen (z. B. von Benutzern oder von anderen Systemen) effizient abwickeln möchtest.
<br>
<br>
FastAPI verwendet asynchrone Funktionen (async def), um Anfragen parallel zu verarbeiten, ohne den Server zu blockieren. Dies führt zu einer viel besseren Skalierbarkeit und Leistung, insbesondere bei hohen Anfrageraten.

**1. Beispiel:**

Eine Anwendung muss Wetterdaten aus 3 verschiedenen Städten abrufen. Anstatt die Anfragen nacheinander zu senden, ruft sie alle gleichzeitig ab.

In [15]:
import asyncio
from pydantic import BaseModel

# Pydantic-Modell für Wetterdaten
class WeatherData(BaseModel):
    city: str
    status: str

# Asynchrone Funktion zur Wetterabfrage
async def fetch_weather(city, delay):
    print(f"Start: Wetterdaten für {city} abrufen...")
    await asyncio.sleep(delay)  # Simuliere Netzwerkverzögerung
    print(f"Wetterdaten für {city} erhalten!")
    # Rückgabe eines validierten Pydantic-Modells
    return WeatherData(city=city, status="Sonnig")

# Hauptfunktion
async def main():
    # Starte mehrere Wetterabfragen gleichzeitig
    results = await asyncio.gather(
        fetch_weather("Berlin", 3),
        fetch_weather("München", 2),
        fetch_weather("Hamburg", 1),
    )

    # Ausgabe der Ergebnisse
    for weather in results:
        print(f"Wetter in {weather.city}: {weather.status}")

await main()


Start: Wetterdaten für Berlin abrufen...
Start: Wetterdaten für München abrufen...
Start: Wetterdaten für Hamburg abrufen...
Wetterdaten für Hamburg erhalten!
Wetterdaten für München erhalten!
Wetterdaten für Berlin erhalten!
Wetter in Berlin: Sonnig
Wetter in München: Sonnig
Wetter in Hamburg: Sonnig


**2. Beispiel:**

Du hast eine Anwendung, in der Benutzer mehrere Dateien gleichzeitig hochladen können. Jede Datei muss validiert, verarbeitet und gespeichert werden.

In [16]:
import asyncio
from pydantic import BaseModel

# Pydantic-Modell für die Dateimetadaten
class FileMetadata(BaseModel):
    filename: str
    size_in_mb: float
    status: str

# Asynchrone Funktion zur Verarbeitung einer Datei
async def process_file(filename, size_in_mb):
    print(f"Start: Verarbeite {filename} ({size_in_mb} MB)...")
    await asyncio.sleep(size_in_mb * 0.5)  # Simuliere Verarbeitung (0.5 Sek. pro MB)
    print(f"Fertig: {filename} verarbeitet!")
    return FileMetadata(filename=filename, size_in_mb=size_in_mb, status="Erfolgreich")

# Hauptfunktion
async def main():
    # Liste der hochgeladenen Dateien
    uploaded_files = [
        {"filename": "image1.jpg", "size_in_mb": 2.0},
        {"filename": "video.mp4", "size_in_mb": 10.0},
        {"filename": "document.pdf", "size_in_mb": 1.5},
    ]

    # Starte die Verarbeitung aller Dateien gleichzeitig
    results = await asyncio.gather(
        *[process_file(file["filename"], file["size_in_mb"]) for file in uploaded_files]
    )

    # Ergebnisse anzeigen
    for file_metadata in results:
        print(f"Datei: {file_metadata.filename}, Größe: {file_metadata.size_in_mb} MB, Status: {file_metadata.status}")

await main()


Start: Verarbeite image1.jpg (2.0 MB)...
Start: Verarbeite video.mp4 (10.0 MB)...
Start: Verarbeite document.pdf (1.5 MB)...
Fertig: document.pdf verarbeitet!
Fertig: image1.jpg verarbeitet!
Fertig: video.mp4 verarbeitet!
Datei: image1.jpg, Größe: 2.0 MB, Status: Erfolgreich
Datei: video.mp4, Größe: 10.0 MB, Status: Erfolgreich
Datei: document.pdf, Größe: 1.5 MB, Status: Erfolgreich


**3. Beispiel:**

Verarbeitung von Benutzerdaten und Transaktionshistorie:
- Benutzerdaten: Enthält Informationen über den Benutzer (Name, Alter, E-Mail).
- Transaktionen: Jede Transaktion enthält Details wie Transaktions-ID, Betrag und Typ (z. B. "Einkauf" oder "Rückerstattung").
- Ziel ist es, die Daten zu validieren, die Gesamtsumme der Transaktionen pro Benutzer zu berechnen und diese Ergebnisse bereitzustellen.
- Benutzer haben Transaktionshistorien, die aus einer Datenquelle (z. B. einer API oder einer Datenbank) asynchron abgerufen werden.
- Die Verarbeitung der Transaktionen (z. B. Validierung und Berechnung) erfolgt parallel für mehrere Benutzer.

`EmailStr` ist ein Feldtyp in Pydantic, der speziell für die Validierung von E-Mail-Adressen entwickelt wurde. Es stellt sicher, dass ein Wert eine gültige E-Mail-Adresse ist, und wird von der Bibliothek email-validator unterstützt, die detaillierte Prüfungen für E-Mail-Formate durchführt.

In [18]:
import asyncio
from pydantic import BaseModel, EmailStr, Field
from typing import List

# Modell für eine einzelne Transaktion
class Transaction(BaseModel):
    transaction_id: str = Field(..., description="Eindeutige ID der Transaktion")
    amount: float = Field(..., ge=0, description="Betrag der Transaktion, muss positiv sein")
    transaction_type: str = Field(..., description="Typ der Transaktion, z. B. Einkauf oder Rückerstattung")

# Modell für einen Benutzer
class User(BaseModel):
    user_id: str = Field(..., description="Eindeutige ID des Benutzers")
    name: str
    age: int = Field(..., ge=0, description="Alter des Benutzers, muss positiv sein")
    email: EmailStr
    transactions: List[Transaction]

# Simuliere den asynchronen Abruf von Transaktionsdaten
async def fetch_transactions(user_id: str):
    print(f"Transaktionen für Benutzer {user_id} werden abgerufen...")
    await asyncio.sleep(2)  # Simuliere Netzwerkverzögerung
    print(f"Transaktionen für Benutzer {user_id} abgerufen!")
    # Beispiel-Daten für Transaktionen
    return [
        {"transaction_id": f"t1_{user_id}", "amount": 120.50, "transaction_type": "Einkauf"},
        {"transaction_id": f"t2_{user_id}", "amount": 75.0, "transaction_type": "Einkauf"},
        {"transaction_id": f"t3_{user_id}", "amount": 30.0, "transaction_type": "Rückerstattung"}
    ]

# Simuliere den asynchronen Abruf von Benutzerdaten
async def fetch_user(user_id: str):
    print(f"Benutzerdaten für Benutzer {user_id} werden abgerufen...")
    await asyncio.sleep(1)  # Simuliere Netzwerkverzögerung
    print(f"Benutzerdaten für Benutzer {user_id} abgerufen!")
    # Beispiel-Benutzerdaten
    return {
        "user_id": user_id,
        "name": f"Benutzer {user_id}",
        "age": 30,
        "email": f"user{user_id}@example.com",
        "transactions": await fetch_transactions(user_id)  # Transaktionsdaten abrufen
    }

# Asynchrone Verarbeitung eines Benutzers
async def process_user(user_id: str):
    user_data = await fetch_user(user_id)  # Benutzerdaten abrufen
    user = User(**user_data)  # Validierung der Benutzerdaten mit Pydantic
    total_amount = sum(t.amount for t in user.transactions)
    print(f"Zusammenfassung für {user.name}:")
    print(f"- Gesamtbetrag: {total_amount}")
    print(f"- Anzahl der Transaktionen: {len(user.transactions)}")
    return {
        "user_id": user.user_id,
        "total_amount": total_amount,
        "transaction_count": len(user.transactions)
    }

# Hauptfunktion zur Verarbeitung mehrerer Benutzer
async def main():
    user_ids = ["123", "456", "789"]  # Beispiel-Benutzer-IDs
    results = await asyncio.gather(*(process_user(user_id) for user_id in user_ids))
    print("Alle Benutzer verarbeitet!")
    for result in results:
        print(result)

await main()


Benutzerdaten für Benutzer 123 werden abgerufen...
Benutzerdaten für Benutzer 456 werden abgerufen...
Benutzerdaten für Benutzer 789 werden abgerufen...
Benutzerdaten für Benutzer 123 abgerufen!
Transaktionen für Benutzer 123 werden abgerufen...
Benutzerdaten für Benutzer 456 abgerufen!
Transaktionen für Benutzer 456 werden abgerufen...
Benutzerdaten für Benutzer 789 abgerufen!
Transaktionen für Benutzer 789 werden abgerufen...
Transaktionen für Benutzer 123 abgerufen!
Zusammenfassung für Benutzer 123:
- Gesamtbetrag: 225.5
- Anzahl der Transaktionen: 3
Transaktionen für Benutzer 456 abgerufen!
Zusammenfassung für Benutzer 456:
- Gesamtbetrag: 225.5
- Anzahl der Transaktionen: 3
Transaktionen für Benutzer 789 abgerufen!
Zusammenfassung für Benutzer 789:
- Gesamtbetrag: 225.5
- Anzahl der Transaktionen: 3
Alle Benutzer verarbeitet!
{'user_id': '123', 'total_amount': 225.5, 'transaction_count': 3}
{'user_id': '456', 'total_amount': 225.5, 'transaction_count': 3}
{'user_id': '789', 'total