In [1]:
# ==============================================================================
# ETL-Skript (Version 15.0 - SQLAlchemy 2.0 Syntax)
#
# Behebt den finalen Syntaxfehler, indem der TRUNCATE-Befehl explizit
# mit der text()-Funktion als SQL-Kommando deklariert wird.
#
# Autor: [Ihr Name]
# Datum: [Heutiges Datum]
# ==============================================================================

import pandas as pd
import json
from sqlalchemy import create_engine, text  # NEU: text importiert
from sqlalchemy.engine import URL
from sqlalchemy.exc import IntegrityError

print("ETL-Skript (Version 15.0 - Finale Syntax) gestartet...")

# --- 1. SETUP & DATENBANKVERBINDUNG ---

db_url = URL.create(
    drivername="postgresql",
    username="postgres",
    password="-",
    host="localhost",
    port=5432,
    database="youtube_analysis"
)
db_engine = create_engine(db_url)

with db_engine.connect() as connection:
    print("-> Datenbankverbindung erfolgreich hergestellt.")


# --- 2. DATEN LADEN UND BEREINIGEN ---

print("\nLade und bereinige Rohdaten...")
with open('DE_category_id.json', 'r', encoding='utf-8') as f:
    category_json = json.load(f)
categories_list = [
    {'category_id': int(item['id']), 'category_name': item['snippet']['title']}
    for item in category_json['items']
]
df_categories = pd.DataFrame(categories_list)

df_raw = pd.read_csv('DEvideos.csv')

text_columns = ['title', 'channel_title', 'tags']
for col in text_columns:
    df_raw[col] = df_raw[col].str.replace(r'[\n\r\t]', ' ', regex=True).str.replace("'", "''")

df_raw['channel_title'] = df_raw.groupby('video_id')['channel_title'].transform(lambda x: x.ffill().bfill())
df_raw.dropna(subset=['channel_title'], inplace=True)

valid_category_ids = df_categories['category_id'].unique()
df_raw = df_raw[df_raw['category_id'].isin(valid_category_ids)]

print("-> Datenbereinigung abgeschlossen.")



# --- 3. DATEN IN DATENBANK SCHREIBEN ---

print("\nSchreibe Daten in die Datenbank...")

with db_engine.connect() as connection:
    # KORREKTUR: Der SQL-String wird in die text()-Funktion eingewickelt.
    truncate_command = text('TRUNCATE TABLE youtube_de.daily_stats, youtube_de.videos, youtube_de.channels, youtube_de.categories RESTART IDENTITY CASCADE;')
    connection.execute(truncate_command)
    # WICHTIG: SQLAlchemy 2.0 erfordert ein explizites Commit für DDL-Befehle wie TRUNCATE
    connection.commit()
    print("-> Alte Daten aus den Tabellen entfernt.")

df_categories.to_sql('categories', con=db_engine, schema='youtube_de', if_exists='append', index=False)
print(f"-> {len(df_categories)} Kategorien erfolgreich geschrieben.")

df_channels = pd.DataFrame(df_raw['channel_title'].unique(), columns=['channel_title'])
df_channels.to_sql('channels', con=db_engine, schema='youtube_de', if_exists='append', index=False)
print(f"-> {len(df_channels)} Kanäle erfolgreich geschrieben.")

print("\nFüge Videos zeilenweise ein (das kann einen Moment dauern)...")
df_channels_from_db = pd.read_sql_table('channels', db_engine, schema='youtube_de')
df_videos_unique = df_raw.drop_duplicates(subset=['video_id'], keep='first').copy()
df_videos_to_insert = df_videos_unique.merge(df_channels_from_db, on='channel_title', how='left')
df_videos_to_insert = df_videos_to_insert[['video_id', 'channel_id', 'title', 'publish_time', 'tags', 'category_id']]

successfully_inserted_ids = []
for index, row in df_videos_to_insert.iterrows():

        single_video_df = pd.DataFrame([row])
        single_video_df.to_sql('videos', con=db_engine, schema='youtube_de', if_exists='append', index=False)
        successfully_inserted_ids.append(row['video_id'])


print(f"-> {len(successfully_inserted_ids)} von {len(df_videos_to_insert)} Videos erfolgreich geschrieben.")

print("\nFüge tägliche Statistiken für valide Videos ein...")
df_daily_stats = df_raw[df_raw['video_id'].isin(successfully_inserted_ids)].copy()
date_format1 = pd.to_datetime(df_daily_stats['trending_date'], format='%y.%d.%m', errors='coerce')
date_format2 = pd.to_datetime(df_daily_stats['trending_date'], errors='coerce')
combined_dates = date_format1.combine_first(date_format2)
df_daily_stats['trending_date'] = combined_dates.dt.strftime('%Y-%m-%d')
df_daily_stats_export = df_daily_stats[['video_id', 'trending_date', 'views', 'likes', 'dislikes', 'comment_count', 'comments_disabled', 'ratings_disabled']]

df_daily_stats_export.to_sql('daily_stats', con=db_engine, schema='youtube_de', if_exists='append', index=False, method='multi')
print(f"-> {len(df_daily_stats_export)} Statistik-Einträge erfolgreich geschrieben.")

print("\nETL-Prozess erfolgreich abgeschlossen. Alle Daten sind in der Datenbank.")


ETL-Skript (Version 15.0 - Finale Syntax) gestartet...
-> Datenbankverbindung erfolgreich hergestellt.

Lade und bereinige Rohdaten...
-> Datenbereinigung abgeschlossen.

Schreibe Daten in die Datenbank...
-> Alte Daten aus den Tabellen entfernt.
-> 31 Kategorien erfolgreich geschrieben.
-> 6046 Kanäle erfolgreich geschrieben.

Füge Videos zeilenweise ein (das kann einen Moment dauern)...
-> 29425 von 29425 Videos erfolgreich geschrieben.

Füge tägliche Statistiken für valide Videos ein...


  date_format2 = pd.to_datetime(df_daily_stats['trending_date'], errors='coerce')


-> 40584 Statistik-Einträge erfolgreich geschrieben.

ETL-Prozess erfolgreich abgeschlossen. Alle Daten sind in der Datenbank.
