# Data Prep


In [2]:
import json
from pathlib import Path
import pandas as pd
import numpy as np
import re
import os
import uuid


In [3]:
path = r"C:\Users\Christoph.Hau\Experimente\prompter\data\raw\Kopie von FolloweeIDs2_tweets_df_AugustPull.csv"

dtypes = {
    'tweet_id': str,  # Read as string for easier map keying
    'original_user_id': str, # Assuming user IDs can be treated as strings
    'reply_to_id': str # Read as string, will handle 'nan' or empty strings
}
df = pd.read_csv(path, sep=",", encoding="utf-8", dtype=dtypes, low_memory=False, on_bad_lines='skip')


df.columns


Index(['full_text', 'tweet_id', 'created_at', 'screen_name',
       'original_user_id', 'retweeted_user_ID', 'collected_at', 'reply_to_id',
       'reply_to_user', 'expandedURL'],
      dtype='object')

In [None]:


# --- Configuration Constants ---
TARGET_POSTS_PER_SET = 100  # Total posts in each history/holdout set
TARGET_REPLIES_PER_SET = 50   # Number of replies in each set
TARGET_ORIGINALS_PER_SET = TARGET_POSTS_PER_SET - TARGET_REPLIES_PER_SET

# --- File Path ---
path = r"C:\Users\Christoph.Hau\Experimente\prompter\data\raw\Kopie von FolloweeIDs2_tweets_df_AugustPull.csv"

# --- Main Script ---
def process_tweets(csv_path):
    print(f"Loading data from: {csv_path}")

    # Specify dtypes for key ID columns and others prone to mixed types
    # Reading IDs as strings simplifies handling and map keying.
    dtypes = {
        'tweet_id': str,
        'original_user_id': str,
        'reply_to_id': str,
        'screen_name': str, # Was in DtypeWarning
        'created_at': str,  # Was in DtypeWarning, consider parse_dates if needed
        'collected_at': str,# Was in DtypeWarning
        'expandedURL': str  # Was in DtypeWarning
    }
    
    try:
        df = pd.read_csv(
            csv_path, 
            sep=",", 
            encoding="utf-8", 
            dtype=dtypes, 
            low_memory=False, # Safer with complex CSVs / mixed types
            on_bad_lines='skip'
        )
    except FileNotFoundError:
        print(f"Error: The file was not found at {csv_path}")
        return
    except Exception as e:
        print(f"Error loading CSV: {e}")
        return

    print("Spalten im DataFrame:", df.columns.tolist())

    # --- Data Cleaning and Preparation ---
    # Clean 'reply_to_id' to use pd.NA for missing values for consistent checking
    # common string representations of NaN/None when read as str dtype
    df['reply_to_id'] = df['reply_to_id'].replace({'nan': pd.NA, '': pd.NA, None: pd.NA, 'None': pd.NA})
    # Ensure tweet_id is string and suitable for map keys (already str by dtype)
    df['tweet_id'] = df['tweet_id'].astype(str)


    # Remove duplicates based on 'tweet_id'
    initial_rows = len(df)
    df.drop_duplicates(subset=['tweet_id'], inplace=True, keep='first')
    print(f"Anfängliche Anzahl Zeilen: {initial_rows}, nach Duplikaten entfernen: {len(df)}")

    if df.empty:
        print("DataFrame is empty after removing duplicates. Exiting.")
        return

    # Create a lookup map for tweet details (used for 'previous_message')
    # This map uses the string version of tweet_id as keys.
    tweet_details_map = df.set_index('tweet_id')[['created_at', 'screen_name', 'full_text']].to_dict(orient='index')

    # --- User Filtering ---
    # Ensure 'original_user_id' is not null before counting
    df_valid_users = df[df['original_user_id'].notna()]
    original_tweet_counts = df_valid_users['original_user_id'].value_counts()
    
    replies_df_all = df_valid_users[df_valid_users['reply_to_id'].notnull()]
    reply_counts = replies_df_all['original_user_id'].value_counts()

    eligible_users = []
    user_stats = []

    print("Filtering eligible users...")
    for user_id_str in original_tweet_counts.index:
        total_posts = original_tweet_counts.get(user_id_str, 0)
        num_replies = reply_counts.get(user_id_str, 0)
        num_originals = total_posts - num_replies

        if num_replies >= (2 * TARGET_REPLIES_PER_SET) and \
           num_originals >= (2 * TARGET_ORIGINALS_PER_SET) and \
           total_posts >= (2 * TARGET_POSTS_PER_SET): # Ensure enough total posts for two sets
            eligible_users.append(user_id_str)
            user_stats.append({
                "user_id": user_id_str,
                "total_tweets_by_user": int(total_posts), # Renamed for clarity
                "replies_by_user": int(num_replies),
                "original_tweets_by_user": int(num_originals) # Non-replies
            })
    
    print(f"Anzahl geeigneter Nutzer gefunden: {len(eligible_users)}")
    if not eligible_users:
        print("Keine geeigneten Nutzer gefunden basierend auf den Kriterien. Beende Skript.")
        return

    # --- Prepare Output Directory ---
    output_dir = Path("output_directory/users")
    output_dir.mkdir(parents=True, exist_ok=True)

    # --- Helper function to get previous message details ---
    def get_previous_message_details(reply_row_series, tdm):
        original_tweet_id = reply_row_series['reply_to_id'] # This should be a string ID now
        if pd.notnull(original_tweet_id):
            # original_tweet_id is already a string from dtype and cleaning
            message_data = tdm.get(original_tweet_id)
            if message_data:
                # Ensure screen_name is handled if it's missing or NaN from the source
                screen_name_val = message_data.get('screen_name')
                formatted_screen_name = str(screen_name_val) if pd.notnull(screen_name_val) else ''
                
                # Format as a single string: "Date, Author: Message"
                created_at = message_data.get('created_at', '')
                full_text = message_data.get('full_text', '')
                
                formatted_message = f"{created_at}, {formatted_screen_name}: {full_text}"
                return formatted_message
        return None

    # --- Process Each Eligible User ---
    processed_user_count = 0
    for user_id_str in eligible_users:
        print(f"Processing user: {user_id_str}")
        user_df = df[df['original_user_id'] == user_id_str].copy()
        
        # Shuffle all user's tweets once
        user_df = user_df.sample(frac=1, random_state=42)

        all_user_replies = user_df[user_df['reply_to_id'].notnull()].copy()
        all_user_originals = user_df[user_df['reply_to_id'].isnull()].copy()

        # Check again if enough material after splitting (should be guaranteed by initial filter)
        if not (len(all_user_replies) >= (2 * TARGET_REPLIES_PER_SET) and \
                len(all_user_originals) >= (2 * TARGET_ORIGINALS_PER_SET)):
            print(f"  Skipping user {user_id_str}: Insufficient replies/originals after splitting (unexpected).")
            continue
            
        # Create Holdout Set
        holdout_replies = all_user_replies.head(TARGET_REPLIES_PER_SET)
        holdout_originals = all_user_originals.head(TARGET_ORIGINALS_PER_SET)
        holdout_df = pd.concat([holdout_replies, holdout_originals]).sample(frac=1, random_state=42) # Shuffle combined parts

        # Create History Set (from parts not used in holdout)
        history_replies = all_user_replies.iloc[TARGET_REPLIES_PER_SET : 2 * TARGET_REPLIES_PER_SET]
        history_originals = all_user_originals.iloc[TARGET_ORIGINALS_PER_SET : 2 * TARGET_ORIGINALS_PER_SET]
        history_df = pd.concat([history_replies, history_originals]).sample(frac=1, random_state=42)


        # Validate set composition
        valid_sets = True
        if not (len(holdout_df) == TARGET_POSTS_PER_SET and \
                len(holdout_df[holdout_df['reply_to_id'].notnull()]) == TARGET_REPLIES_PER_SET):
            print(f"  Skipping user {user_id_str}: Holdout set composition incorrect. Size: {len(holdout_df)}, Replies: {len(holdout_df[holdout_df['reply_to_id'].notnull()])}")
            valid_sets = False
        
        if not (len(history_df) == TARGET_POSTS_PER_SET and \
                len(history_df[history_df['reply_to_id'].notnull()]) == TARGET_REPLIES_PER_SET):
            print(f"  Skipping user {user_id_str}: History set composition incorrect. Size: {len(history_df)}, Replies: {len(history_df[history_df['reply_to_id'].notnull()])}")
            valid_sets = False
        
        if not valid_sets:
            continue

        # Add 'previous_message' to replies in both sets
        for current_set_df in [history_df, holdout_df]:
            current_set_df['previous_message'] = None # Initialize column
            reply_mask = current_set_df['reply_to_id'].notnull()
            
            # Ensure apply is only on rows where reply_mask is True and avoid issues with empty slices
            if reply_mask.any():
                 current_set_df.loc[reply_mask, 'previous_message'] = current_set_df[reply_mask].apply(
                    lambda row: get_previous_message_details(row, tweet_details_map), axis=1
                )
        
        # Save user's data to JSONL
        output_path = output_dir / f"{user_id_str}.jsonl"
        try:
            with open(output_path, 'w', encoding='utf-8') as f:
                # History set
                history_records = history_df.to_dict(orient="records")
                json.dump({"user_id": user_id_str, "set": "history", "tweets": history_records}, f)
                f.write('\n')
                
                # Holdout set
                holdout_records = holdout_df.to_dict(orient="records")
                json.dump({"user_id": user_id_str, "set": "holdout", "tweets": holdout_records}, f)
                f.write('\n')
            processed_user_count += 1
            print(f"  Successfully processed and saved data for user {user_id_str}")
        except Exception as e:
            print(f"  Error saving data for user {user_id_str}: {e}")

    print(f"\nFinished processing. Total users processed and saved: {processed_user_count}")

    # --- Save Global Metadata ---
    meta_dir = Path("output_directory")
    meta_dir.mkdir(parents=True, exist_ok=True) # Ensure base output_directory exists

    final_user_stats_path = meta_dir / "user_stats.json"
    with open(final_user_stats_path, 'w', encoding='utf-8') as f:
        json.dump(user_stats, f, indent=4)
    print(f"User statistics saved to: {final_user_stats_path}")

    # Save only the list of users for whom files were actually created (or attempted successfully)
    # For simplicity, we are saving the 'eligible_users' list based on initial criteria.
    # A more precise list would be users for whom files were actually written.
    eligible_users_path = meta_dir / "eligible_users.json"
    with open(eligible_users_path, 'w', encoding='utf-8') as f:
        json.dump(eligible_users, f, indent=4) # This is the list of users meeting the criteria for set creation
    print(f"List of eligible users saved to: {eligible_users_path}")

if __name__ == '__main__':
    process_tweets(path)

## Filter and Store


In [None]:
# Beispielhafte Spaltenprüfung
print("Spalten im DataFrame:", df.columns.tolist())

# Duplikate entfernen
initial_rows = len(df)
df.drop_duplicates(subset=['tweet_id'], inplace=True)
print(f"Anfängliche Anzahl: {initial_rows}, nach Duplikaten: {len(df)}")

# Beiträge pro Nutzer zählen
original_tweet_counts = df['original_user_id'].value_counts()
replies_df = df[df['reply_to_id'].notnull()]
reply_counts = replies_df['original_user_id'].value_counts()

# Nutzer auswählen
eligible_users = []
user_stats = []

for user_id in original_tweet_counts.index:
    total_posts = original_tweet_counts.get(user_id, 0)
    replies = reply_counts.get(user_id, 0)
    
    if (total_posts >= 200) and (replies >= 50):
        eligible_users.append(str(int(user_id)))
        user_stats.append({
            "user_id": str(int(user_id)),
            "original_posts": int(total_posts),
            "replies": int(replies),
            "total_posts": int(total_posts)
        })

print(f"Geeignete Nutzer: {len(eligible_users)}")

# Speicherort vorbereiten
output_dir = Path("output_directory/users")
output_dir.mkdir(parents=True, exist_ok=True)

# Für jeden Nutzer: history + holdout bauen
for user_id in eligible_users:
    uid = int(user_id)
    user_df = df[df['original_user_id'] == uid].copy()
    user_df = user_df.sample(frac=1, random_state=42)  # Shuffle
    
    replies = user_df[user_df['reply_to_id'].notnull()]
    originals = user_df[user_df['reply_to_id'].isnull()]
    
    # Holdout zuerst: mind. 50 replies
    holdout_replies = replies.head(50)
    remaining_replies = replies.iloc[50:]
    additional_posts_needed = 100 - len(holdout_replies)
    
    holdout_fill = pd.concat([remaining_replies, originals]).head(additional_posts_needed)
    holdout_df = pd.concat([holdout_replies, holdout_fill]).head(100)

    # Entferne Holdout-Tweets aus user_df
    history_df = user_df[~user_df['tweet_id'].isin(holdout_df['tweet_id'])].head(100)

    # Nur Nutzer speichern, bei denen beide Sets komplett sind
    if len(holdout_df) < 100 or len(history_df) < 100:
        continue

    # Speichere als JSONL-Datei
    output_path = output_dir / f"{user_id}.jsonl"
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump({"user_id": user_id, "set": "history", "tweets": history_df.to_dict(orient="records")}, f)
        f.write('\n')
        json.dump({"user_id": user_id, "set": "holdout", "tweets": holdout_df.to_dict(orient="records")}, f)
        f.write('\n')

# Globale Metadaten speichern
meta_path = Path("output_directory")
with open(meta_path / "user_stats.json", 'w', encoding='utf-8') as f:
    json.dump(user_stats, f, indent=4)

with open(meta_path / "eligible_users.json", 'w', encoding='utf-8') as f:
    json.dump(eligible_users, f, indent=4)


## Thread detection and Extraction

In [None]:
# Thread-Sammlung und -Speicherung
for user_id in eligible_users:
    uid = int(user_id)
    user_replies = df[(df['original_user_id'] == uid) & (df['reply_to_id'].notnull())].copy()
    
    threads = []
    for _, reply in user_replies.iterrows():
        thread_messages = []
        current_reply = reply.copy()
        
        # Rekursiv den Thread aufbauen (rückwärts)
        while not pd.isna(current_reply['reply_to_id']):
            # Aktuelle Nachricht zum Thread hinzufügen
            thread_messages.append({
                'tweet_id': str(int(current_reply['tweet_id'])),
                'user_id': str(int(current_reply['original_user_id'])),
                'content': current_reply['full_text'],
                'timestamp': current_reply['created_at'],
                'is_target_user': int(current_reply['original_user_id']) == uid
            })
            
            # Suche den übergeordneten Tweet
            parent_id = current_reply['reply_to_id']
            parent_tweets = df[df['tweet_id'] == parent_id]
            
            if len(parent_tweets) == 0:
                # Übergeordneter Tweet nicht gefunden
                break
                
            # Nehme den ersten übereinstimmenden Tweet
            current_reply = parent_tweets.iloc[0].copy()
        
        # Füge den letzten Tweet hinzu (Root des Threads)
        thread_messages.append({
            'tweet_id': str(int(current_reply['tweet_id'])),
            'user_id': str(int(current_reply['original_user_id'])),
            'content': current_reply['full_text'],
            'timestamp': current_reply['created_at'],
            'is_target_user': int(current_reply['original_user_id']) == uid
        })
        
        # Kehre die Reihenfolge um, um chronologisch zu sein (von alt nach neu)
        thread_messages.reverse()
        
        # Thread nur speichern, wenn er mehr als 1 Nachricht enthält
        if len(thread_messages) > 1:
            thread = {
                'thread_id': str(uuid.uuid4()),  # Eindeutige ID für den Thread
                'target_user_id': user_id,
                'root_tweet_id': thread_messages[0]['tweet_id'],
                'messages': thread_messages,
                'message_count': len(thread_messages),
                'target_user_messages': sum(1 for msg in thread_messages if msg['is_target_user']),
                'last_message_timestamp': thread_messages[-1]['timestamp']
            }
            threads.append(thread)
    
    # Speichere Threads in einer separaten Datei
    threads_dir = output_dir / "threads"
    threads_dir.mkdir(exist_ok=True, parents=True)
    
    threads_file = threads_dir / f"{user_id}_threads.jsonl"
    with open(threads_file, 'w', encoding='utf-8') as f:
        for thread in threads:
            json.dump(thread, f)
            f.write('\n')
    
    print(f"Nutzer {user_id}: {len(threads)} Threads extrahiert und gespeichert.")