In [3]:
from pymongo import MongoClient
import psycopg2
from datetime import datetime
from typing import Optional, Dict, Any, Tuple

# ==========================
# KONFIG
# ==========================
MONGO_URI = "mongodb://localhost:27017/LetsMeet"  #
MONGO_DB_NAME = "letsmeet"      # <— anpassen
MONGO_USERS_COLL = "users"       # du sagtest: DB/Coll 'users' (falls es die Collection ist)

POSTGRES_DSN = "host=localhost dbname=lf8_lets_meet_db user=user password=secret"
# ==========================
# HELFER
# ==========================
def split_name(raw: Optional[str]) -> Tuple[Optional[str], Optional[str]]:
    """
    Erwartet Format: 'Nachname, Vorname'
    Gibt (Nachname, Vorname) zurück. Robust gegen Leerstellen/fehlendes Komma.
    """
    if not raw:
        return None, None
    parts = [p.strip() for p in raw.split(",")]
    if len(parts) == 2:
        return parts[0] or None, parts[1] or None
    # Fallback: alles in Nachname
    return raw.strip() or None, None

def to_dt(val: Optional[str]) -> Optional[datetime]:
    if not val:
        return None
    try:
        # akzeptiert '2023-12-25 07:40:31' und ISO '2023-03-16T00:00:00'
        return datetime.fromisoformat(val.replace(" ", "T") if " " in val and "T" not in val else val)
    except Exception:
        return None

# ==========================
# REPO (PostgreSQL)
# ==========================
class PGRepo:
    def __init__(self, dsn: str):
        self.conn = psycopg2.connect(dsn)
        self.conn.autocommit = False

    def close(self):
        self.conn.close()

    def upsert_user_minimal(self, email: str, name: Optional[str], phone: Optional[str],
                            created_at: Optional[datetime], updated_at: Optional[datetime]) -> int:
        nachname, vorname = split_name(name)
        with self.conn.cursor() as cur:
            cur.execute("""
                INSERT INTO Nutzer
                (Nachname, Vorname, Geburtsdatum, Telefonnummer, EMail, Straße, Hausnummer, PLZ, Ort,
                 Geschlecht_id, UpdatedAt, CreatedAt)
                VALUES (%s,%s,NULL,%s,%s,NULL,NULL,NULL,NULL,NULL,%s,%s)
                ON CONFLICT (EMail) DO UPDATE
                SET Nachname = EXCLUDED.Nachname,
                    Vorname = EXCLUDED.Vorname,
                    Telefonnummer = EXCLUDED.Telefonnummer,
                    UpdatedAt = COALESCE(EXCLUDED.UpdatedAt, Nutzer.UpdatedAt)
                RETURNING id;
            """, (nachname, vorname, phone, email, updated_at, created_at))
            return cur.fetchone()[0]

    def ensure_user_by_email(self, email: str) -> int:
        """
        Stellt sicher, dass ein Nutzer für die E-Mail existiert (Stub ohne Namen/Telefon),
        und gibt dessen ID zurück.
        """
        with self.conn.cursor() as cur:
            cur.execute("SELECT id FROM Nutzer WHERE EMail=%s;", (email,))
            row = cur.fetchone()
            if row:
                return row[0]
            cur.execute("""
                INSERT INTO Nutzer (EMail, CreatedAt, UpdatedAt)
                VALUES (%s, NOW(), NOW())
                RETURNING id;
            """, (email,))
            return cur.fetchone()[0]

    def insert_message(self, sender_id: int, receiver_id: int, content: Optional[str], ts: Optional[datetime]):
        with self.conn.cursor() as cur:
            cur.execute("""
                INSERT INTO Messages (receiving_Nutzer_ID, sending_Nutzer_ID, Content, Zeitstempel)
                VALUES (%s,%s,%s,%s);
            """, (receiver_id, sender_id, content, ts))

# ==========================
# EXTRACT (Mongo)
# ==========================
def fetch_users(mdb):
    return list(mdb[MONGO_USERS_COLL].find({}))

# ==========================
# MAIN ETL
# ==========================
def main():
    mclient = MongoClient(MONGO_URI)
    mdb = mclient[MONGO_DB_NAME]
    pg = PGRepo(POSTGRES_DSN)

    try:
        docs = fetch_users(mdb)

        # 1) Nutzer upserten
        email_to_id: Dict[str, int] = {}
        for d in docs:
            email = d.get("_id")                         # = EMail
            name = d.get("name")
            phone = d.get("phone")
            created_at = to_dt(d.get("createdAt"))
            updated_at = to_dt(d.get("updatedAt"))

            if not email:
                # Ohne E-Mail können wir die PK-Logik (UNIQUE Email) nicht nutzen → überspringen
                continue

            uid = pg.upsert_user_minimal(email, name, phone, created_at, updated_at)
            email_to_id[email] = uid

        # 2) Messages importieren
        for d in docs:
            sender_email = d.get("_id")
            if not sender_email or sender_email not in email_to_id:
                continue
            sender_id = email_to_id[sender_email]

            for m in d.get("messages", []) or []:
                recv_email = m.get("receiver_email")
                if not recv_email:
                    continue

                # Empfänger sicherstellen (Stub anlegen falls unbekannt)
                if recv_email not in email_to_id:
                    email_to_id[recv_email] = pg.ensure_user_by_email(recv_email)
                recv_id = email_to_id[recv_email]

                content = m.get("message")
                ts = to_dt(m.get("timestamp")) or datetime.utcnow()
                pg.insert_message(sender_id, recv_id, content, ts)

        pg.conn.commit()
        print("ETL erfolgreich abgeschlossen.")

    except Exception as e:
        pg.conn.rollback()
        raise
    finally:
        pg.close()
        mclient.close()

if __name__ == "__main__":
    main()


ETL erfolgreich abgeschlossen.
