In [None]:
!pip install isodate

Code zum Abrufen fast aller Daten des youtube Channels. Es müssen noch die API-Daten und der Link zur json-Datei eingetragen werden. Außerdem der Datumsbereich

In [None]:
# API Konfiguration - Hier Ihre Werte eintragen
CHANNEL_ID = "UCSeil5V81-mEGB1-VNR7YEA"  # Ihre YouTube Channel ID
DATA_API_V3_KEY = ""  # API Key für YouTube Data API v3
#ANALYTICS_API_KEY = ""  # API Key für YouTube Analytics API
CREDENTIALS_PATH = ""


# Datum Konfiguration - Format: "DD.MM.YYYY"
START_DATE = "01.01.2024"  # z.B. "01.09.2024"
END_DATE = "31.01.2024"   # z.B. "02.09.2024"

from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from datetime import datetime
import pandas as pd
import isodate

# OAuth 2.0 Scopes
SCOPES = ['https://www.googleapis.com/auth/yt-analytics.readonly']

def get_video_data_from_api():
    """YouTube Data API v3 Abfrage für Basis-Videodaten"""
    youtube = build('youtube', 'v3', developerKey=DATA_API_V3_KEY)
    
    start_date = datetime.strptime(START_DATE, '%d.%m.%Y')
    end_date = datetime.strptime(END_DATE, '%d.%m.%Y').replace(hour=23, minute=59, second=59)
    
    start_date_rfc = start_date.isoformat() + 'Z'
    end_date_rfc = end_date.isoformat() + 'Z'
    
    videos_data = []
    next_page_token = None
    
    while True:
        request = youtube.search().list(
            part='snippet',
            channelId=CHANNEL_ID,
            maxResults=50,
            order='date',
            publishedAfter=start_date_rfc,
            publishedBefore=end_date_rfc,
            type='video',
            pageToken=next_page_token
        )
        
        response = request.execute()
        
        for item in response['items']:
            video_id = item['id']['videoId']
            
            video_request = youtube.videos().list(
                part='snippet,contentDetails,status',
                id=video_id
            )
            video_response = video_request.execute()
            
            if video_response['items']:
                video_info = video_response['items'][0]
                duration = int(isodate.parse_duration(video_info['contentDetails']['duration']).total_seconds())
                
                video_data = {
                    'video_id': video_id,
                    'title': video_info['snippet']['title'],
                    'publish_date': video_info['snippet']['publishedAt'],
                    'video_length_seconds': duration,
                    'privacy_status': video_info['status']['privacyStatus']
                }
                videos_data.append(video_data)
        
        next_page_token = response.get('nextPageToken')
        if not next_page_token:
            break
    
    return pd.DataFrame(videos_data)

def get_analytics_data(analytics, video_id, start_date, end_date):
    """YouTube Analytics API Abfrage für detaillierte Metriken"""
    try:
        # Performance Metriken
        metrics = [
            'estimatedMinutesWatched',
            'averageViewDuration',
            'averageViewPercentage',
            'subscribersGained',
            'subscribersLost',
            'views',
            'likes',
            'dislikes',
            'shares',
            'comments'
        ]
        
        perf_response = analytics.reports().query(
            ids=f'channel=={CHANNEL_ID}',
            startDate=start_date.strftime('%Y-%m-%d'),
            endDate=end_date.strftime('%Y-%m-%d'),
            metrics=','.join(metrics),
            filters=f'video=={video_id}'
        ).execute()

        # Demografische Daten
        demo_response = analytics.reports().query(
            ids=f'channel=={CHANNEL_ID}',
            startDate=start_date.strftime('%Y-%m-%d'),
            endDate=end_date.strftime('%Y-%m-%d'),
            metrics='viewerPercentage',
            dimensions='ageGroup,gender',
            filters=f'video=={video_id}'
        ).execute()

        # Basis-Daten sammeln
        data = {
            'video_id': video_id,
            'wiedergabezeit_minuten': 0,
            'durchschnittliche_wiedergabedauer': 0,
            'durchschnittliche_wiedergabedauer_prozent': 0,
            'gewonnene_abonnenten': 0,
            'verlorene_abonnenten': 0,
            'aufrufe': 0,
            'likes': 0,
            'dislikes': 0,
            'geteilte_inhalte': 0,
            'kommentare': 0
        }

        # Performance Daten einfügen
        if 'rows' in perf_response and perf_response['rows']:
            row = perf_response['rows'][0]
            data.update({
                'wiedergabezeit_minuten': row[0],
                'durchschnittliche_wiedergabedauer': row[1],
                'durchschnittliche_wiedergabedauer_prozent': row[2],
                'gewonnene_abonnenten': row[3],
                'verlorene_abonnenten': row[4],
                'aufrufe': row[5],
                'likes': row[6],
                'dislikes': row[7],
                'geteilte_inhalte': row[8],
                'kommentare': row[9]
            })

        # Demografische Daten einfügen
        if 'rows' in demo_response:
            for row in demo_response['rows']:
                age_group, gender, percentage = row
                key = f"audience_{gender.lower()}_{age_group}"
                data[key] = percentage

        return data

    except Exception as e:
        print(f"Fehler bei Video {video_id}: {str(e)}")
        return None

def main():
    try:
        print("1. Hole Basis-Videodaten über YouTube Data API v3...")
        video_df = get_video_data_from_api()
        print(f"   - {len(video_df)} Videos gefunden")

        print("\n2. Hole Analytics-Daten über YouTube Analytics API...")
        # Authentifizierung für Analytics API
        flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_PATH, SCOPES)
        credentials = flow.run_local_server(port=0)
        analytics = build('youtubeAnalytics', 'v2', credentials=credentials)
        
        start_date = datetime.strptime(START_DATE, '%d.%m.%Y')
        end_date = datetime.strptime(END_DATE, '%d.%m.%Y')
        
        # Sammle Analytics-Daten für jedes Video
        analytics_data = []
        for index, row in video_df.iterrows():
            print(f"   - Verarbeite Video {index + 1} von {len(video_df)}: {row['video_id']}")
            data = get_analytics_data(analytics, row['video_id'], start_date, end_date)
            if data:
                analytics_data.append(data)

        # Erstelle DataFrame aus Analytics-Daten
        analytics_df = pd.DataFrame(analytics_data)
        
        # Merge beide DataFrames
        final_df = pd.merge(video_df, analytics_df, on='video_id', how='left')
        
        # Speichere Ergebnis und stelle sicher, dass Videolänge als Ganzzahl gespeichtert wird
        video_df['video_length_seconds'] = video_df['video_length_seconds'].astype(int)
        output_filename = f'youtube_complete_analysis_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
        final_df.to_csv(output_filename, index=False)
        
        print(f"\nAnalyse abgeschlossen!")
        print(f"Daten wurden in {output_filename} gespeichert")
        print(f"Anzahl der Videos: {len(final_df)}")
        print(f"Anzahl der Spalten: {len(final_df.columns)}")
        
    except Exception as e:
        print(f"Ein Fehler ist aufgetreten: {str(e)}")

if __name__ == "__main__":
    main()

Hier wird eine Liste von video_ids abgefragt

In [None]:
# API Konfiguration - Hier Ihre Werte eintragen
CHANNEL_ID = "UCSeil5V81-mEGB1-VNR7YEA"  # Ihre YouTube Channel ID
DATA_API_V3_KEY = ""  # API Key für YouTube Data API v3
#ANALYTICS_API_KEY = "AIzaSyCBw3tVk-TnYLCZW2vmojQlYFBArzudU-A"  # API Key für YouTube Analytics API
CREDENTIALS_PATH = "C:\\Users\\laukat\\OneDrive - Mediengruppe RTL\\HDM Data Analyti\\oauth 2.0\\client_secret_796311161257-bnk32mvms5t9fsfma7agbgrlt5fo54gi.apps.googleusercontent.com.json"  # Pfad zur client_secrets.json"


# Datum Konfiguration - Format: "DD.MM.YYYY"
START_DATE = "01.01.2000"
END_DATE = "31.01.2100"

from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from datetime import datetime
import pandas as pd
import isodate

# OAuth 2.0 Scopes
SCOPES = ['https://www.googleapis.com/auth/yt-analytics.readonly']

def get_video_data_from_list(video_ids):
    """Hole Videodaten basierend auf einer Liste von Video-IDs"""
    youtube = build('youtube', 'v3', developerKey=DATA_API_V3_KEY)
    
    videos_data = []
    for video_id in video_ids:
        video_request = youtube.videos().list(
            part='snippet,contentDetails,status',
            id=video_id
        )
        video_response = video_request.execute()
        
        if video_response['items']:
            video_info = video_response['items'][0]
            duration = int(isodate.parse_duration(video_info['contentDetails']['duration']).total_seconds())
            
            video_data = {
                'video_id': video_id,
                'title': video_info['snippet']['title'],
                'publish_date': video_info['snippet']['publishedAt'],
                'video_length_seconds': duration,
                'privacy_status': video_info['status']['privacyStatus']
            }
            videos_data.append(video_data)
    
    return pd.DataFrame(videos_data)

def get_analytics_data(analytics, video_id, start_date, end_date):
    """YouTube Analytics API Abfrage für detaillierte Metriken"""
    # Unverändert vom Original
    ...

def main():
    try:
        print("1. Lade Video-IDs aus der bereinigten Datei...")
        cleaned_video_df = pd.read_csv("cleaned_datenpaket2.csv")
        video_ids = cleaned_video_df['video_id'].tolist()
        print(f"   - {len(video_ids)} Video-IDs geladen")
        
        print("\n2. Hole Basis-Videodaten über YouTube Data API v3...")
        video_df = get_video_data_from_list(video_ids)
        print(f"   - {len(video_df)} Videodaten abgerufen")

        print("\n3. Hole Analytics-Daten über YouTube Analytics API...")
        flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_PATH, SCOPES)
        credentials = flow.run_local_server(port=0)
        analytics = build('youtubeAnalytics', 'v2', credentials=credentials)
        
        # Start- und Enddatum festlegen
        start_date = datetime.strptime(START_DATE, '%d.%m.%Y')
        end_date = datetime.strptime(END_DATE, '%d.%m.%Y')

        # Analytics-Daten sammeln
        analytics_data = []
        for index, row in video_df.iterrows():
            print(f"   - Verarbeite Video {index + 1} von {len(video_df)}: {row['video_id']}")
            data = get_analytics_data(analytics, row['video_id'], start_date, end_date)
            if data:
                analytics_data.append(data)

        analytics_df = pd.DataFrame(analytics_data)
        
        # Merge original cleaned file with API results
        final_df = pd.merge(cleaned_video_df, video_df, on='video_id', how='left')  # Basisdaten
        final_df = pd.merge(final_df, analytics_df, on='video_id', how='left')     # Analytics-Daten

        output_filename = f'youtube_analysis_with_cleaned_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
        final_df.to_csv(output_filename, index=False)

        print(f"\nAnalyse abgeschlossen!")
        print(f"Daten wurden in {output_filename} gespeichert")
        
    except Exception as e:
        print(f"Ein Fehler ist aufgetreten: {str(e)}")


if __name__ == "__main__":
    main()


Der Folgende Code ruft bestimmte video_ids ab, die in einer cs gespeichert sind.
Der Code ist so eingerichtet, dass er:

Kontinuierlich in die CSV-Datei speichert
Einen Checkpoint für den letzten verarbeiteten Index anlegt
Fortschrittsinformationen ausgibt.
Der Code kann unterbrochen werden und arbeitet bei Wiederaufnahme an der letzten Stelle fort. 
Weitere Merkmale:
Explizite Float-Konvertierung der Prozentwerte
Mehr Debug-Ausgaben um zu sehen, was genau passiert
Verhinderung der String-Formatierung beim Speichern
Verifikation der gespeicherten Daten
Verwendung von Komma als Dezimaltrennzeichen
Semikolon als Spaltentrenner
Formatierung aller numerischen Werte auf 2 Nachkommastellen

In [None]:
# API Konfiguration - Hier Ihre Werte eintragen
CHANNEL_ID = "UCSeil5V81-mEGB1-VNR7YEA"  # Ihre YouTube Channel ID
DATA_API_V3_KEY = "AIzaSyCBw3tVk-TnYLCZW2vmojQlYFBArzudU-A"  # API Key für YouTube Data API v3
CREDENTIALS_PATH = "C:\\Users\\laukat\\OneDrive - Mediengruppe RTL\\HDM Data Analyti\\oauth 2.0\\client_secret_796311161257-bnk32mvms5t9fsfma7agbgrlt5fo54gi.apps.googleusercontent.com.json"

# Datum Konfiguration
START_DATE = "01.01.2000"

from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from datetime import datetime, timedelta
import pandas as pd
import json
import os
import time

# OAuth 2.0 Scopes
SCOPES = ['https://www.googleapis.com/auth/yt-analytics.readonly']

# Konstanten
CHECKPOINT_FILE = 'analytics_checkpoint.json'
OUTPUT_FILE = 'youtube_demographic_analysis_continuous.csv'

def read_csv_with_encoding(file_path):
    """Versucht die CSV-Datei mit verschiedenen Encodings zu lesen"""
    encodings = ['utf-8', 'iso-8859-1', 'cp1252', 'latin1']
    
    for encoding in encodings:
        try:
            print(f"Versuche Encoding: {encoding}")
            return pd.read_csv(file_path, encoding=encoding, sep=';')
        except UnicodeDecodeError:
            continue
        except Exception as e:
            print(f"Fehler beim Lesen der Datei mit {encoding}: {str(e)}")
            continue
    
    raise Exception("Konnte die Datei mit keinem der verfügbaren Encodings lesen")

def save_to_csv(data, filename):
    """Speichert oder aktualisiert die CSV-Datei mit deutschen Zahlenformat"""
    try:
        # Wenn Datei existiert, lade bestehende Daten
        if os.path.exists(filename):
            df_existing = pd.read_csv(filename, sep=';', decimal=',')
            df_new = pd.DataFrame([data])
            df_combined = pd.concat([df_existing, df_new], ignore_index=True)
        else:
            # Erste Daten
            df_combined = pd.DataFrame([data])
        
        # Formatiere alle numerischen Spalten
        for col in df_combined.columns:
            if col != 'video_id':
                # Konvertiere zu numerisch falls noch nicht geschehen
                df_combined[col] = pd.to_numeric(df_combined[col], errors='ignore')
                # Formatiere mit deutschem Format (Komma statt Punkt)
                df_combined[col] = df_combined[col].apply(lambda x: f"{x:.2f}".replace('.', ','))
        
        # Speichere mit Semikolon als Trenner
        df_combined.to_csv(filename, sep=';', index=False, encoding='utf-8-sig')
        
        print("\nDaten erfolgreich gespeichert")
        return True
        
    except Exception as e:
        print(f"Fehler beim Speichern: {str(e)}")
        return False

def get_gender_age_analytics(analytics, video_id, start_date, end_date):
    """YouTube Analytics API Abfrage für Geschlechter- und Altersverteilung"""
    try:
        request = analytics.reports().query(
            dimensions='ageGroup,gender',
            ids=f'channel=={CHANNEL_ID}',
            metrics='viewerPercentage',
            filters=f'video=={video_id}',
            startDate=start_date.strftime('%Y-%m-%d'),
            endDate=end_date.strftime('%Y-%m-%d')
        )
        response = request.execute()
        
        # Initialisiere Dictionary mit allen möglichen Altersgruppen
        age_groups = ['13-17', '18-24', '25-34', '35-44', '45-54', '55-64', '65-']
        data = {
            'video_id': video_id
        }
        for age in age_groups:
            data[f'audience_female_age{age}'] = 0.0  # Explizit als float
            data[f'audience_male_age{age}'] = 0.0    # Explizit als float
            
        value_count = 0
        total_percentage = 0
            
        if 'rows' in response:
            print(f"\nGefundene Werte für Video {video_id}:")
            for row in response['rows']:
                age_group = row[0]
                gender = row[1]
                percentage = float(row[2])  # Explizite Konvertierung zu float
                
                column_name = f'audience_{gender}_age{age_group}'
                data[column_name] = percentage
                print(f"{column_name}: {percentage:.1f}%")
                
                value_count += 1
                total_percentage += percentage
            
            print(f"\nDatenqualität:")
            print(f"Anzahl gefundener Werte: {value_count}")
            print(f"Summe der Prozente: {total_percentage:.1f}%")
            
            if value_count < 5:
                print("Zu wenige demografische Daten vorhanden - Datensatz wird übersprungen")
                return None
                
            if total_percentage < 20:
                print("Gesamtprozentsatz zu niedrig - Datensatz wird übersprungen")
                return None
            
            # Debug-Ausgabe der finalen Daten
            print("\nFinale Daten vor dem Speichern:")
            for key, value in data.items():
                print(f"{key}: {value}")
                
            return data
        else:
            print(f"\nKeine Daten für Video {video_id} gefunden")
            return None
            
    except Exception as e:
        if 'quota' in str(e).lower():
            print(f"Quota-Limit erreicht bei Video {video_id}")
            raise e
        print(f"Fehler bei Video {video_id}: {str(e)}")
        return None

def process_videos(analytics, video_ids, start_date, end_date, start_index=0):
    """Verarbeitet Videos und zeigt Fortschritt"""
    total_videos = len(video_ids)
    start_time = time.time()
    valid_data_count = 0
    skipped_data_count = 0
    
    try:
        for index, video_id in enumerate(video_ids[start_index:], start=start_index):
            current_time = time.time()
            videos_processed = index - start_index + 1
            
            if videos_processed > 0:
                time_per_video = (current_time - start_time) / videos_processed
                videos_remaining = total_videos - (index + 1)
                estimated_time_remaining = timedelta(seconds=int(videos_remaining * time_per_video))
                
                print(f"\n{'='*50}")
                print(f"Video {index + 1} von {total_videos}: {video_id}")
                print(f"Durchschnittliche Zeit pro Video: {time_per_video:.1f} Sekunden")
                print(f"Geschätzte verbleibende Zeit: {estimated_time_remaining}")
                print(f"Bisher valide Datensätze: {valid_data_count}")
                print(f"Übersprungene Datensätze: {skipped_data_count}")
            
            data = get_gender_age_analytics(analytics, video_id, start_date, end_date)
            if data:
                if save_to_csv(data, OUTPUT_FILE):
                    valid_data_count += 1
                    print(f"Daten in {OUTPUT_FILE} gespeichert")
            else:
                skipped_data_count += 1
            
            # Speichere Checkpoint
            with open(CHECKPOINT_FILE, 'w') as f:
                json.dump({'last_processed_index': index}, f)
                
    except Exception as e:
        print(f"Fehler aufgetreten: {str(e)}")
        print(f"Letzter verarbeiteter Index: {index}")
        raise e

def main():
    try:
        # Lade Video-IDs mit korrektem Encoding
        print("Lade Video-IDs aus der bereinigten Datei...")
        video_df = read_csv_with_encoding("daten bereinigt dezimalzeichen fehlt in gender.csv")
        video_ids = video_df['video_id'].tolist()
        print(f"Geladen: {len(video_ids)} Videos")
        
        # Lade letzten Fortschritt
        start_index = -1
        if os.path.exists(CHECKPOINT_FILE):
            with open(CHECKPOINT_FILE, 'r') as f:
                start_index = json.load(f)['last_processed_index']
        
        # Initialisiere API
        flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_PATH, SCOPES)
        credentials = flow.run_local_server(port=0)
        analytics = build('youtubeAnalytics', 'v2', credentials=credentials)
        
        # Setze Datumsgrenzen
        start_date = datetime.strptime(START_DATE, '%d.%m.%Y')
        end_date = datetime.now()
        
        print(f"Starte Verarbeitung ab Index {start_index + 1}")
        process_videos(analytics, video_ids, start_date, end_date, start_index + 1)
        
        print("\nVerarbeitung abgeschlossen!")
        
    except Exception as e:
        print(f"Fehler: {str(e)}")
        print("Sie können das Skript später neu starten - es wird am letzten Checkpoint fortgesetzt.")

if __name__ == "__main__":
    main()

Anbei ein Code, um die Spalte Thema per "Hand" anzupassen. 

In [None]:
import pandas as pd
import os
import sys
import msvcrt  # Für Windows-Konsoleneingabe

def get_key():
    """Wartet auf einen Tastendruck und gibt den Wert zurück"""
    return msvcrt.getch().decode('utf-8')

def categorize_videos():
    # Prüfen ob die Datei existiert
    if not os.path.exists('updated_allewerte.csv'):
        print("FEHLER: youtube_data.csv nicht gefunden!")
        print("Bitte legen Sie die Datei im gleichen Ordner ab wie das Python-Skript")
        input("Drücken Sie Enter zum Beenden...")
        return

    # CSV einlesen mit Semikolon als Trennzeichen und Komma als Dezimalzeichen
    print("Lade Datei...")
    df = pd.read_csv('updated_allewerte.csv', sep=';', decimal=',')
    
    # Kategorien definieren
    categories = ['Politik', 'Krieg', 'Wirtschaft', 'Bilder', 'Sonstiges']
    
    print("\nKategorisierung beginnt...")
    print("Steuerung:")
    print("1-5: Kategorie auswählen")
    print("0: Video überspringen")
    print("q: Programm beenden und speichern")
    input("Drücken Sie Enter zum Starten...")

    # Durchgehen jeder Zeile, die nicht "Live" als Thema hat
    for index, row in df.iterrows():
        if row['Thema'] != 'Live':
            os.system('cls')  # Bildschirm löschen
            print(f"\nVideo {index + 1} von {len(df)}")
            print('-' * 50)
            print('Titel:', row['title'])
            print('Aktuelles Thema:', row['Thema'])
            print('\nKategorien:')
            for i, category in enumerate(categories, 1):
                print(f'{i}: {category}')
            print('0: Überspringen')
            print('q: Beenden und Speichern')
            
            while True:
                print("\nWählen Sie eine Kategorie (1-5) oder 0 zum Überspringen: ")
                key = get_key()
                
                if key == 'q':
                    # Speichern und beenden
                    df.to_csv('updated_allewerte.csv', sep=';', decimal=',', index=False)
                    print("\nÄnderungen wurden gespeichert in 'youtube_data_updated.csv'")
                    input("Drücken Sie Enter zum Beenden...")
                    return
                
                if key in ['0', '1', '2', '3', '4', '5']:
                    if key != '0':
                        df.at[index, 'Thema'] = categories[int(key)-1]
                        print(f"Kategorie geändert zu: {categories[int(key)-1]}")
                    break
                
    # Speichern der aktualisierten CSV
    df.to_csv('updated_allewerte1.csv', sep=';', decimal=',', index=False)
    print('\nKategorisierung abgeschlossen!')
    print('Datei wurde gespeichert als: updated_allewerte1.csv')
    input("Drücken Sie Enter zum Beenden...")

if __name__ == '__main__':
    try:
        categorize_videos()
    except Exception as e:
        print(f"Ein Fehler ist aufgetreten: {str(e)}")
        input("Drücken Sie Enter zum Beenden...")

Dieser Code analysietr alle Thumbnails der Videos und bewertet sie nach Kriterien wie präsente Personen + Text (3), Präsenter Text (2) nur Screenshot (1), Erstmal nur zum testen 10 zufällige Videos. 

Nur Gesichter mit hoher Erkennungssicherheit (> 0.8)
Mindestgröße der Gesichter im Bild
Überprüfung der Position im Bild


Strengere Kriterien für Text:

Text muss mindestens 10% der Bildfläche einnehmen
Prüfung der Textposition und -größe


Neue Visualisierung:

Erstellt einen HTML-Bericht mit allen Thumbnails
Farbkodierung:

Rot = Screenshot only
Gelb = Screenshot mit Text
Grün = Person im Fokus


Zeigt alle relevanten Analyseergebnisse neben jedem Thumbnail

In [None]:
pip install google-cloud-vision pandas requests

In [None]:
import os
import pandas as pd
import requests
import numpy as np
from google.cloud import vision
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
from urllib.parse import urljoin

# Konstanten
CREDENTIALS_PATH = r"C:\Users\Laukat\OneDrive - Mediengruppe RTL\HDM Data Analyti\Google cloud vision API\youtube-analyse-444120-8ce0b527c6e8.json"
CSV_PATH = r"C:\Users\Laukat\OneDrive - Mediengruppe RTL\HDM Data Analyti\project\data\raw\3_update_allewerte_Themenkategoristiert_dopplungen_bereinigt.csv"
OUTPUT_DIR = "youtube_thumbnails"
ANALYSIS_OUTPUT = "thumbnail_analysis_sample.csv"
SAMPLE_SIZE = 10

class ThumbnailCategory(Enum):
    SCREENSHOT_ONLY = 1
    SCREENSHOT_WITH_TEXT = 2
    PERSON_FOCUSED = 3

class YouTubeThumbnailProcessor:
    def __init__(self):
        """Initialisiert den Processor mit Google Vision Client"""
        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = CREDENTIALS_PATH
        self.vision_client = vision.ImageAnnotatorClient()
        
        if not os.path.exists(OUTPUT_DIR):
            os.makedirs(OUTPUT_DIR)
    
    def download_thumbnail(self, video_id: str, quality: str = "maxresdefault") -> Optional[str]:
        """Lädt ein einzelnes Thumbnail herunter"""
        base_url = "https://img.youtube.com/vi/"
        thumbnail_url = urljoin(base_url, f"{video_id}/{quality}.jpg")
        
        try:
            response = requests.get(thumbnail_url)
            response.raise_for_status()
            
            output_path = os.path.join(OUTPUT_DIR, f"{video_id}.jpg")
            with open(output_path, "wb") as f:
                f.write(response.content)
                
            print(f"Erfolgreich heruntergeladen: {video_id}")
            return output_path
            
        except requests.exceptions.RequestException as e:
            print(f"Fehler beim Herunterladen von {video_id}: {str(e)}")
            if quality == "maxresdefault":
                print("Versuche mit niedrigerer Qualität (hqdefault)...")
                return self.download_thumbnail(video_id, "hqdefault")
            return None

    def analyze_image(self, image_path: str) -> dict:
        """Analysiert ein Thumbnail mit Google Vision API"""
        with open(image_path, 'rb') as image_file:
            content = image_file.read()
        
        image = vision.Image(content=content)
        
        # Einen einzelnen Aufruf mit allen Features machen
        features = [
            vision.Feature(type_=vision.Feature.Type.FACE_DETECTION),
            vision.Feature(type_=vision.Feature.Type.LABEL_DETECTION),
            vision.Feature(type_=vision.Feature.Type.TEXT_DETECTION)
        ]
        request = vision.AnnotateImageRequest(image=image, features=features)
        response = self.vision_client.annotate_image(request=request)
        
        # Personen-Analyse mit strengeren Kriterien
        faces = response.face_annotations
        prominent_faces = []
        for face in faces:
            # Berechne die Gesichtsgröße
            bbox = face.bounding_poly.vertices
            face_width = abs(bbox[1].x - bbox[0].x)
            face_height = abs(bbox[2].y - bbox[0].y)
            
            # Ein Gesicht gilt als prominent, wenn es mindestens 15% der Bildbreite oder -höhe einnimmt
            # Angenommene Bildgröße: 1280x720 (typisches YouTube-Thumbnail)
            is_prominent = (face_width > 192) or (face_height > 108)  # 15% von 1280 bzw. 720
            
            if is_prominent:
                prominent_faces.append(face)
        
        # Text-Analyse mit strengeren Kriterien
        text_annotations = response.text_annotations
        has_prominent_text = False
        if text_annotations:
            # Erste Annotation enthält den gesamten Text und Boundingbox
            main_text = text_annotations[0]
            if main_text.description:
                # Berechne die ungefähre Textgröße
                vertices = main_text.bounding_poly.vertices
                text_width = abs(vertices[1].x - vertices[0].x)
                text_height = abs(vertices[2].y - vertices[0].y)
                text_area = text_width * text_height
                
                # Text muss mindestens 10% des Bildes einnehmen
                has_prominent_text = text_area > (800 * 600 * 0.1)  # Annahme eines 800x600 Thumbnails
        
        # Screenshot-Erkennung
        labels = [label.description.lower() for label in response.label_annotations]
        screenshot_indicators = ['screenshot', 'screen capture', 'user interface', 'display']
        is_screenshot = any(indicator in labels for indicator in screenshot_indicators)
        
        # Kategorisierung mit strengeren Kriterien
        if len(prominent_faces) > 0 and len(prominent_faces) <= 2:
            category = ThumbnailCategory.PERSON_FOCUSED
            confidence = 0.9 if len(prominent_faces) > 0 else 0.5
        elif is_screenshot and has_prominent_text:
            category = ThumbnailCategory.SCREENSHOT_WITH_TEXT
            confidence = 0.7
        else:
            category = ThumbnailCategory.SCREENSHOT_ONLY
            confidence = 0.8 if is_screenshot else 0.5
            
        return {
            'video_id': os.path.basename(image_path).replace('.jpg', ''),
            'category': category.name,
            'category_score': category.value,
            'confidence': confidence,
            'has_prominent_text': has_prominent_text,
            'prominent_face_count': len(prominent_faces),
            'is_screenshot': is_screenshot,
            'thumbnail_path': image_path  # Pfad für spätere Visualisierung
        }

    def process_sample_videos(self, video_id_column: str = 'video_id'):
        """Verarbeitet eine Stichprobe von Videos"""
        # CSV laden mit korrekten Parametern
        print(f"Lade CSV von {CSV_PATH}...")
        df = pd.read_csv(
            CSV_PATH,
            sep=';',              # Semikolon als Trennzeichen
            quotechar='"',        # Anführungszeichen für Texte
            encoding='utf-8',     # Zeichenkodierung
            engine='python'       # Robusterer Python-Parser
        )
        print(f"Verfügbare Spalten: {', '.join(df.columns)}")
        
        if video_id_column not in df.columns:
            raise ValueError(f"Spalte '{video_id_column}' nicht in der CSV gefunden!")
        
        # Zufällige Stichprobe von Video-IDs
        video_ids = df[video_id_column].dropna().unique()
        sample_ids = np.random.choice(video_ids, size=min(SAMPLE_SIZE, len(video_ids)), replace=False)
        print(f"Ausgewählt: {len(sample_ids)} zufällige Video-IDs")
        
        results = []
        
        # Thumbnails herunterladen und analysieren
        for video_id in sample_ids:
            print(f"\nVerarbeite Video {video_id}...")
            
            thumbnail_path = self.download_thumbnail(video_id)
            if thumbnail_path:
                try:
                    analysis = self.analyze_image(thumbnail_path)
                    results.append(analysis)
                except Exception as e:
                    print(f"Fehler bei der Analyse von {video_id}: {str(e)}")
        
        # Ergebnisse in DataFrame konvertieren und speichern
        results_df = pd.DataFrame(results)
        results_df = results_df.sort_values('category_score', ascending=False)
        results_df.to_csv(ANALYSIS_OUTPUT, index=False)
        
        return results_df

def generate_html_report(results_df: pd.DataFrame) -> str:
    """Generiert einen HTML-Bericht mit Thumbnails und Analyseergebnissen"""
    html = """
    <html>
    <head>
        <style>
            .container { max-width: 1200px; margin: 0 auto; padding: 20px; }
            .thumbnail { margin: 20px 0; padding: 20px; border: 1px solid #ddd; }
            .thumbnail img { max-width: 400px; height: auto; }
            .category-1 { border-left: 5px solid #ff4444; }
            .category-2 { border-left: 5px solid #ffbb33; }
            .category-3 { border-left: 5px solid #00C851; }
            .details { margin-top: 10px; }
        </style>
    </head>
    <body>
        <div class="container">
            <h1>Thumbnail Analyse - Stichprobenergebnisse</h1>
    """
    
    for _, row in results_df.iterrows():
        category_class = f"category-{row['category_score']}"
        html += f"""
            <div class="thumbnail {category_class}">
                <img src="{row['thumbnail_path']}" alt="Thumbnail {row['video_id']}">
                <div class="details">
                    <h3>Video ID: {row['video_id']}</h3>
                    <p>Kategorie: {row['category']} (Score: {row['category_score']})</p>
                    <p>Prägnanter Text: {'Ja' if row['has_prominent_text'] else 'Nein'}</p>
                    <p>Prägnante Personen: {row['prominent_face_count']}</p>
                    <p>Screenshot erkannt: {'Ja' if row['is_screenshot'] else 'Nein'}</p>
                    <p>Konfidenz: {row['confidence']:.2f}</p>
                </div>
            </div>
        """
    
    html += """
        </div>
    </body>
    </html>
    """
    
    # HTML-Bericht speichern
    with open('thumbnail_analysis_report.html', 'w', encoding='utf-8') as f:
        f.write(html)
    
    return 'thumbnail_analysis_report.html'

if __name__ == "__main__":
    try:
        processor = YouTubeThumbnailProcessor()
        results = processor.process_sample_videos()
        
        # HTML-Bericht generieren
        report_path = generate_html_report(results)
        print(f"\nAnalyse abgeschlossen. Bericht wurde erstellt: {report_path}")
        print("\nÜbersicht der Kategorien:")
        print(results['category'].value_counts())
        
    except Exception as e:
        print(f"Ein Fehler ist aufgetreten: {str(e)}")

Der folgende Code analysiert YouTube-Video-Thumbnails mithilfe der Google Cloud Vision API
Hauptfunktionen:

Download von Thumbnails für bestimmte Video-IDs
Analyse der Bilder auf zwei Hauptmerkmale:

Präsenz von Gesichtern (inkl. Größenberechnung)
Vorhandensein von Text

##Es gibt drei Bewertungsstufen (Scores) für Thumbnails:

Basic (Score 1): Nur Bild ohne Text/Gesicht
With Text (Score 2): Bild mit deutlichem Text
Optimal (Score 3): Bild mit Text und prominentem Gesicht

#Arbeitsablauf:
Liest Video-IDs aus einer CSV-Datei
Wählt zufällig eine Stichprobe (standardmäßig 100 Videos)
Lädt die Thumbnails herunter
Analysiert jedes Thumbnail
Speichert Zwischenergebnisse
Erstellt einen HTML-Bericht mit visueller Übersicht und Statistiken

#Der Code speichert seine Ergebnisse in:
Eine Interim-CSV für Zwischenergebnisse
Eine finale Analyse-CSV
Einen HTML-Report zur visuellen Überprüfung der Ergebnisse

In [None]:
import os
import pandas as pd
import requests
import numpy as np
from google.cloud import vision
from typing import Dict, List, Optional
from enum import Enum
from urllib.parse import urljoin
import json

# Konstanten
CREDENTIALS_PATH = r"C:\Users\Laukat\OneDrive - Mediengruppe RTL\HDM Data Analyti\Google cloud vision API\youtube-analyse-444120-8ce0b527c6e8.json"
CSV_PATH = r"C:\Users\Laukat\OneDrive - Mediengruppe RTL\HDM Data Analyti\project\data\raw\3_update_allewerte_Themenkategoristiert_dopplungen_bereinigt.csv"
OUTPUT_DIR = "youtube_thumbnails"
ANALYSIS_OUTPUT = "sample_analysis_results.csv"
INTERIM_OUTPUT = "interim_analysis_results.csv"
HTML_REPORT = "thumbnail_report.html"

class ThumbnailScore(Enum):
    BASIC = 1        # Nur Screenshot/Bild ohne Text oder präsentes Gesicht
    WITH_TEXT = 2    # Bild mit Titel in Textform
    OPTIMAL = 3      # Präsentes Gesicht + Titel in Textform

class YouTubeThumbnailProcessor:
    def __init__(self):
        """Initialisiert den Processor mit Google Vision Client"""
        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = CREDENTIALS_PATH
        self.vision_client = vision.ImageAnnotatorClient()
        
        if not os.path.exists(OUTPUT_DIR):
            os.makedirs(OUTPUT_DIR)

    def download_thumbnail(self, video_id: str, quality: str = "maxresdefault") -> Optional[str]:
        """Lädt ein einzelnes Thumbnail herunter"""
        output_path = os.path.join(OUTPUT_DIR, f"{video_id}.jpg")
        
        # Prüfe ob Thumbnail bereits existiert
        if os.path.exists(output_path):
            print(f"Thumbnail für {video_id} bereits vorhanden, überspringe Download")
            return output_path
            
        base_url = "https://img.youtube.com/vi/"
        thumbnail_url = urljoin(base_url, f"{video_id}/{quality}.jpg")
        
        try:
            response = requests.get(thumbnail_url)
            response.raise_for_status()
            
            with open(output_path, "wb") as f:
                f.write(response.content)
                
            print(f"Erfolgreich heruntergeladen: {video_id}")
            return output_path
            
        except requests.exceptions.RequestException as e:
            print(f"Fehler beim Herunterladen von {video_id}: {str(e)}")
            if quality == "maxresdefault":
                print("Versuche mit niedrigerer Qualität (hqdefault)...")
                return self.download_thumbnail(video_id, "hqdefault")
            return None

    def analyze_image(self, image_path: str) -> dict:
        """Analysiert ein Thumbnail mit Google Vision API"""
        with open(image_path, 'rb') as image_file:
            content = image_file.read()
        
        image = vision.Image(content=content)
        
        features = [
            vision.Feature(type_=vision.Feature.Type.FACE_DETECTION),
            vision.Feature(type_=vision.Feature.Type.TEXT_DETECTION)
        ]
        request = vision.AnnotateImageRequest(image=image, features=features)
        response = self.vision_client.annotate_image(request=request)
        
        # Gesichtserkennung
        faces = response.face_annotations
        img_width = 1280  # Standard YouTube Thumbnail Breite
        img_height = 720  # Standard YouTube Thumbnail Höhe
        
        has_prominent_face = False
        for face in faces:
            width = abs(face.bounding_poly.vertices[2].x - face.bounding_poly.vertices[0].x)
            height = abs(face.bounding_poly.vertices[2].y - face.bounding_poly.vertices[0].y)
            
            face_area = width * height
            total_area = img_width * img_height
            area_percentage = (face_area / total_area) * 100
            
            print(f"Gesichtsanalyse für {os.path.basename(image_path)}:")
            print(f"- Breite: {width}px ({(width/img_width)*100:.1f}%)")
            print(f"- Höhe: {height}px ({(height/img_height)*100:.1f}%)")
            print(f"- Flächenanteil: {area_percentage:.1f}%")
            
            # Kriterien für prominentes Gesicht
            has_prominent_face = (
                (width > img_width * 0.2 or height > img_height * 0.2) or
                area_percentage > 10
            )
            if has_prominent_face:
                break
        
        # Texterkennung
        has_prominent_text = False
        if response.text_annotations:
            main_text = response.text_annotations[0]
            text_vertices = main_text.bounding_poly.vertices
            text_width = abs(text_vertices[1].x - text_vertices[0].x)
            text_height = abs(text_vertices[2].y - text_vertices[0].y)
            text_area = text_width * text_height
            has_prominent_text = text_area > (img_width * img_height * 0.1)
        
        # Score bestimmen
        if has_prominent_face and has_prominent_text:
            score = ThumbnailScore.OPTIMAL
        elif has_prominent_text:
            score = ThumbnailScore.WITH_TEXT
        else:
            score = ThumbnailScore.BASIC
            
        return {
            'video_id': os.path.basename(image_path).replace('.jpg', ''),
            'score': score.value,
            'thumbnail_path': image_path
        }

    def process_videos(self, sample_size: int = 100):
        """Verarbeitet eine Stichprobe von Videos"""
        print(f"Lade CSV von {CSV_PATH}...")
        df = pd.read_csv(CSV_PATH, 
                        sep=';',
                        decimal=',',
                        encoding='utf-8')
        
        # Zufällige Auswahl von Videos
        video_ids = df['video_id'].dropna().unique()
        selected_ids = np.random.choice(video_ids, size=min(sample_size, len(video_ids)), replace=False)
        print(f"Ausgewählt: {len(selected_ids)} zufällige Video-IDs")
        
        results = []
        for idx, video_id in enumerate(selected_ids, 1):
            print(f"\nVerarbeite Video {idx}/{sample_size}: {video_id}")
            
            thumbnail_path = self.download_thumbnail(video_id)
            if thumbnail_path:
                try:
                    analysis = self.analyze_image(thumbnail_path)
                    results.append(analysis)
                    
                    # Zwischenspeicherung der Ergebnisse
                    temp_df = pd.DataFrame(results)
                    temp_df.to_csv(INTERIM_OUTPUT, 
                                 sep=';',
                                 decimal=',',
                                 index=False,
                                 encoding='utf-8')
                    
                    # Update der Stichproben-CSV
                    sample_df = df[df['video_id'].isin(selected_ids)].copy()
                    score_dict = dict(zip(temp_df['video_id'], temp_df['score']))
                    sample_df['Gestaltung_Thumbnail'] = sample_df['video_id'].map(score_dict)
                    sample_df.to_csv(ANALYSIS_OUTPUT, 
                                   sep=';',
                                   decimal=',',
                                   index=False,
                                   encoding='utf-8')
                    
                except Exception as e:
                    print(f"Fehler bei der Analyse von {video_id}: {str(e)}")
                    continue
                    
        return pd.DataFrame(results)

def generate_html_report(results_df: pd.DataFrame):
    """Generiert einen HTML-Bericht zur visuellen Überprüfung"""
    score_counts = results_df['score'].value_counts()
    
    html_content = f"""
    <!DOCTYPE html>
    <html>
    <head>
    <meta charset="utf-8">
    <title>Thumbnail Analyse</title>
    <style>
        body {{ font-family: Arial, sans-serif; margin: 20px; }}
        .container {{ max-width: 1200px; margin: 0 auto; padding: 20px; }}
        .stats {{ 
            display: grid;
            grid-template-columns: repeat(3, 1fr);
            gap: 20px;
            margin-bottom: 30px;
        }}
        .stat-box {{ 
            padding: 20px;
            background: #f8f9fa;
            border-radius: 8px;
            text-align: center;
        }}
        .thumbnails {{ 
            display: grid;
            grid-template-columns: repeat(auto-fill, minmax(300px, 1fr));
            gap: 20px;
        }}
        .thumbnail {{ 
            padding: 15px;
            background: white;
            border-radius: 8px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }}
        .thumbnail img {{ 
            width: 100%;
            height: auto;
            border-radius: 4px;
        }}
        .score-1 {{ color: #dc3545; }}
        .score-2 {{ color: #ffc107; }}
        .score-3 {{ color: #28a745; }}
    </style>
    </head>
    <body>
    <div class="container">
        <h1>Thumbnail Analyse Ergebnisse</h1>
        
        <div class="stats">
            <div class="stat-box">
                <h3>Score 3 (Optimal)</h3>
                <p>{score_counts.get(3, 0)} Videos</p>
            </div>
            <div class="stat-box">
                <h3>Score 2 (Mit Text)</h3>
                <p>{score_counts.get(2, 0)} Videos</p>
            </div>
            <div class="stat-box">
                <h3>Score 1 (Basic)</h3>
                <p>{score_counts.get(1, 0)} Videos</p>
            </div>
        </div>
        
        <div class="thumbnails">
    """
    
    # Nach Score sortieren
    sorted_results = results_df.sort_values('score', ascending=False)
    
    for _, row in sorted_results.iterrows():
        html_content += f"""
            <div class="thumbnail">
                <img src="{row['thumbnail_path']}" alt="Thumbnail {row['video_id']}">
                <p>Video ID: {row['video_id']}</p>
                <p class="score-{row['score']}">Score: {row['score']}</p>
            </div>
        """
    
    html_content += """
        </div>
    </div>
    </body>
    </html>
    """
    
    with open(HTML_REPORT, 'w', encoding='utf-8') as f:
        f.write(html_content)
    
    return HTML_REPORT

if __name__ == "__main__":
    try:
        processor = YouTubeThumbnailProcessor()
        results = processor.process_videos(sample_size=100)
        
        if not results.empty:
            report_path = generate_html_report(results)
            print(f"\nHTML-Bericht wurde erstellt: {report_path}")
            print("\nVerteilung der Scores:")
            print(results['score'].value_counts().sort_index())
            
    except KeyboardInterrupt:
        print("\nProgramm wurde durch Benutzer unterbrochen.")
    except Exception as e:
        print(f"Ein Fehler ist aufgetreten: {str(e)}")

Nachfolgender Code, um eine SEO-Bewertung der Titel über OpenAI zu schreiben
Dateipfad und Spalten:
Verwendet den exakten Pfad zu deiner CSV-Datei
Liest aus der Spalte "title"
Schreibt in die Spalte "Bewertung_Titel"
Behält alle anderen Spalten unverändert bei
Deutsches Format:
Verwendet Semikolon als Trennzeichen
Verwendet Komma als Dezimaltrennzeichen
Behält diese Formatierung beim Speichern bei
In-Place Speicherung:
Überschreibt die originale Datei mit den neuen Bewertungen
Behält dabei das Format und alle anderen Daten bei
Verwendung:

#Das Skript wird:

Die Analyse automatisch in Batches durchführen
Regelmäßig Checkpoints speichern
Bei einem Abbruch an der letzten Stelle weitermachen
Die Ergebnisse direkt in deine CSV-Datei schreiben
Detaillierte Logs und Statistiken ausgeben
Möchtest du das Skript so testen oder soll ich noch etwas anpassen?

In [None]:
pip install openai pandas python-dotenv

In [None]:
pip install nest_asyncio

In [3]:
import pandas as pd
from openai import OpenAI
from typing import Optional, List, Dict
import logging
from pathlib import Path
import asyncio
from datetime import datetime
import json
import hashlib
import time

# Logging-Konfiguration
log_format = '%(asctime)s - %(levelname)s - %(message)s'
logging.basicConfig(
    level=logging.INFO,
    format=log_format,
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler('seo_analysis.log', encoding='utf-8')
    ]
)
logger = logging.getLogger(__name__)

class CheckpointManager:
    def __init__(self, checkpoint_dir: str = "checkpoints"):
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(exist_ok=True)
        
    def _generate_session_id(self, csv_path: str) -> str:
        timestamp = datetime.now().strftime("%Y%m%d")
        file_hash = hashlib.md5(str(csv_path).encode()).hexdigest()[:8]
        return f"{timestamp}_{file_hash}"
        
    def save_checkpoint(self, session_id: str, processed_data: Dict) -> None:
        checkpoint_file = self.checkpoint_dir / f"checkpoint_{session_id}.json"
        with open(checkpoint_file, 'w', encoding='utf-8') as f:
            json.dump(processed_data, f, ensure_ascii=False, indent=2)
        logger.info(f"Checkpoint gespeichert: {checkpoint_file}")
            
    def load_checkpoint(self, session_id: str) -> Optional[Dict]:
        checkpoint_file = self.checkpoint_dir / f"checkpoint_{session_id}.json"
        if checkpoint_file.exists():
            with open(checkpoint_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        return None

class YouTubeSEOScorer:
    def __init__(self, api_key: str, model: str = "gpt-3.5-turbo"):
        self.model = model
        self.client = OpenAI(api_key=api_key)
        self.checkpoint_manager = CheckpointManager()
        
        self.system_prompt = """Du bist ein YouTube SEO Experte, spezialisiert auf Nachrichtenkanäle. 
        Bewerte den YouTube-Titel nach folgenden Kriterien und vergib eine Gesamtpunktzahl von 0-10. 
        WICHTIG: Gib NUR die Gesamtpunktzahl als einzelne Zahl zurück, OHNE Text oder Erklärungen.
        Beispiele korrekter Antworten:
        7,5
        8,0
        6,5
        
        Bewerte nach diesen Kriterien:

        1. Titel-Optimierung (max. 2 Punkte)
        - Länge zwischen 40-70 Zeichen (+0.5)
        - Hauptkeyword in den ersten 3-4 Wörtern (+0.5)
        - Klare Struktur mit natürlicher Lesefluss (+0.5)
        - Verwendung von Trennzeichen (|, -, :) wo sinnvoll (+0.5)

        2. Nachrichtenwert & Aktualität (max. 2 Punkte)
        - Klare Vermittlung der Nachrichtenrelevanz (+0.5)
        - Zeitliche Einordnung wenn relevant (+0.5)
        - Balance zwischen Aktualität und Evergreen-Potenzial (+0.5)
        - Korrekte Priorisierung der Information (+0.5)

        3. Keyword-Optimierung (max. 2 Punkte)
        - Verwendung relevanter Nachrichtenkeywords (+0.5)
        - Natürliche Integration von Suchbegriffen (+0.5)
        - LSI Keywords / thematisch verwandte Begriffe (+0.5)
        - Vermeidung von Keyword-Stuffing (+0.5)

        4. Zielgruppen-Ansprache (max. 2 Punkte)
        - Verständliche Sprache für Nachrichtenpublikum (+0.5)
        - Seriosität und Professionalität (+0.5)
        - Klare Themenankündigung (+0.5)
        - Zielgruppengerechte Formulierung (+0.5)

        5. Technische Optimierung (max. 2 Punkte)
        - Keine übermäßige Großschreibung (+0.5)
        - Korrekte Zeichensetzung (+0.5)
        - Keine Spam-Taktiken oder Clickbait (+0.5)
        - Mobile-freundliche Länge & Format (+0.5)

        WICHTIG - FORMAT DER ANTWORT:
        - Gib ausschließlich eine einzelne Dezimalzahl zurück
        - Verwende ein Komma als Dezimaltrennzeichen
        - Keine Erklärungen oder zusätzlicher Text
        - Keine Aufschlüsselung der Einzelkriterien
        
        Beispiele korrekter Antworten:
        7,5
        8,0
        6,5"""

    async def analyze_title(self, title: str, retry_count: int = 3) -> Optional[float]:
        for attempt in range(retry_count):
            try:
                response = self.client.chat.completions.create(
                    model=self.model,
                    messages=[
                        {"role": "system", "content": self.system_prompt},
                        {"role": "user", "content": f"Bewerte diesen Titel: {title}"}
                    ],
                    temperature=0.3
                )
                
                score = float(response.choices[0].message.content.strip().replace(',', '.'))
                return round(score, 1)
                
            except Exception as e:
                if attempt < retry_count - 1:
                    wait_time = (attempt + 1) * 2
                    logger.warning(f"Fehler bei Titel '{title}'. Retry in {wait_time}s... ({str(e)})")
                    await asyncio.sleep(wait_time)
                else:
                    logger.error(f"Endgültiger Fehler bei '{title}' nach {retry_count} Versuchen: {str(e)}")
                    return None

    async def analyze_batch(self, titles: List[str], session_id: str, start_idx: int = 0, batch_size: int = 10) -> Dict[int, float]:
        results = {}
        
        for i in range(start_idx, len(titles), batch_size):
            batch = titles[i:i + batch_size]
            tasks = [self.analyze_title(title) for title in batch]
            
            try:
                batch_scores = await asyncio.gather(*tasks)
                
                # Ergebnisse mit Index speichern
                for idx, score in enumerate(batch_scores, start=i):
                    if score is not None:
                        results[idx] = score
                
                # Checkpoint
                checkpoint_data = {
                    'last_processed_index': i + batch_size,
                    'results': results
                }
                self.checkpoint_manager.save_checkpoint(session_id, checkpoint_data)
                
                # Fortschritt
                processed = min(i + batch_size, len(titles))
                logger.info(f"Fortschritt: {processed}/{len(titles)} Titel ({processed/len(titles)*100:.1f}%)")
                
                if i + batch_size < len(titles):
                    await asyncio.sleep(1)
                    
            except Exception as e:
                logger.error(f"Fehler beim Batch-Processing (Index {i}): {str(e)}")
                checkpoint_data = {
                    'last_processed_index': i,
                    'results': results
                }
                self.checkpoint_manager.save_checkpoint(session_id, checkpoint_data)
                
        return results

    async def analyze_from_csv(self, csv_path: str) -> None:
        try:
            csv_path = Path(csv_path)
            if not csv_path.exists():
                raise FileNotFoundError(f"CSV-Datei nicht gefunden: {csv_path}")
            
            # Session ID für Checkpointing
            session_id = self.checkpoint_manager._generate_session_id(str(csv_path))
            
            # CSV mit deutschem Format laden
            df = pd.read_csv(csv_path, sep=';', decimal=',', encoding='utf-8')
            
            if 'title' not in df.columns:
                raise ValueError("Spalte 'title' nicht gefunden!")
            
            # Checkpoint prüfen
            checkpoint = self.checkpoint_manager.load_checkpoint(session_id)
            start_idx = 0
            results = {}
            
            if checkpoint:
                start_idx = checkpoint['last_processed_index']
                results = {int(k): v for k, v in checkpoint['results'].items()}
                logger.info(f"Checkpoint gefunden. Fortsetzen ab Index {start_idx}")
            
            # Startzeit
            start_time = datetime.now()
            
            # Noch nicht verarbeitete Titel analysieren
            titles = df['title'].tolist()
            if start_idx < len(titles):
                new_results = await self.analyze_batch(
                    titles=titles,
                    session_id=session_id,
                    start_idx=start_idx,
                    batch_size=10
                )
                results.update(new_results)
            
            # Ergebnisse in DataFrame integrieren
            df['Bewertung_Titel'] = pd.Series(results)
            
            # Ergebnisse im deutschen Format speichern
            df.to_csv(csv_path, sep=';', decimal=',', index=False, encoding='utf-8')
            
            # Statistiken
            scores = list(results.values())
            stats = {
                'Verarbeitete Titel': len(scores),
                'Durchschnittlicher Score': sum(scores) / len(scores) if scores else 0,
                'Titel unter 5 Punkten': sum(1 for s in scores if s < 5),
                'Titel über 8 Punkten': sum(1 for s in scores if s >= 8),
                'Verarbeitungszeit': str(datetime.now() - start_time)
            }
            
            logger.info("\nAnalyse abgeschlossen:")
            for key, value in stats.items():
                logger.info(f"{key}: {value}")
            
        except Exception as e:
            logger.error(f"Kritischer Fehler: {str(e)}")
            raise

Ausführung der Aktionen

In [None]:
import nest_asyncio
nest_asyncio.apply()

# API Key
API_KEY = 'dein api code'  # Dein API Key hier

# Pfad zur CSV
CSV_PATH = r"C:\Users\laukat\OneDrive - Mediengruppe RTL\HDM Data Analyti\project\data\raw\4_update_allewerte_Themenkategoristiert_dopplungen_bereinigt_thumbnailScores.csv"

# Analyzer initialisieren und ausführen
async def main():
    analyzer = YouTubeSEOScorer(
        api_key=API_KEY,
        model="gpt-3.5-turbo"
    )
    await analyzer.analyze_from_csv(CSV_PATH)

# Starten der Analyse
await main()

Code, falls es Zeilen ohne Bewwrtung gibt

In [None]:
import nest_asyncio
nest_asyncio.apply()

# API Key
API_KEY = 'dein api code'  # Dein API Key hier

# Pfad zur CSV
CSV_PATH = r"C:\Users\laukat\OneDrive - Mediengruppe RTL\HDM Data Analyti\project\data\raw\4_update_allewerte_Themenkategoristiert_dopplungen_bereinigt_thumbnailScores.csv"

async def analyze_missing_ratings():
    # Daten laden
    df = pd.read_csv(CSV_PATH, sep=';', decimal=',', encoding='utf-8')
    
    # Fehlende Bewertungen identifizieren
    missing_ratings = df[df['Bewertung_Titel'].isna()]
    print(f"Gefunden: {len(missing_ratings)} Titel ohne Bewertung")
    
    if len(missing_ratings) == 0:
        print("Keine fehlenden Bewertungen gefunden!")
        return
    
    # Analyzer initialisieren
    analyzer = YouTubeSEOScorer(
        api_key=API_KEY,
        model="gpt-3.5-turbo"
    )
    
    # Analyse der fehlenden Bewertungen
    results = {}
    for idx in missing_ratings.index:
        title = df.at[idx, 'title']
        try:
            score = await analyzer.analyze_title(title)
            if score is not None:
                results[idx] = score
                print(f"Zeile {idx}: '{title[:70]}...' - Score: {score}")
        except Exception as e:
            print(f"Fehler bei Zeile {idx}: {str(e)}")
        
        # Kurze Pause zwischen den Anfragen
        await asyncio.sleep(0.5)
    
    # Ergebnisse in DataFrame speichern
    for idx, score in results.items():
        df.at[idx, 'Bewertung_Titel'] = score
    
    # Speichern mit deutschem Format
    df.to_csv(CSV_PATH, sep=';', decimal=',', index=False, encoding='utf-8')
    
    print("\nAnalyse der fehlenden Bewertungen abgeschlossen!")
    print(f"Anzahl erfolgreich bewerteter Titel: {len(results)}")
    print(f"Verbleibende fehlende Bewertungen: {len(missing_ratings) - len(results)}")

# Ausführung
await analyze_missing_ratings()