In [None]:
from config import PROCESSED_DATA_FILE, HUGGINGFACE_USERNAME
import json
from loguru import logger

with open(PROCESSED_DATA_FILE, 'r') as f:
  data = json.load(f)
logger.info(f"Loaded {len(data)} rows from {PROCESSED_DATA_FILE}")

import pandas as pd
df = pd.DataFrame(data)
del data
prefilter_len = len(df)
logger.info(f"Converted json to pandas DataFrame with {prefilter_len} rows")
df.head(1)

In [None]:
# alter columns so they're easier to work with
df['subreddit'] = df['subreddit'].str.lower()

# Show some values that might be helpful for customizing configuration
print(df['subreddit'].unique())

In [None]:
from config import SUBREDDITS

# COMMENT THIS OUT TO USE ALL SUBREDDITS
# df = df[df['subreddit'].isin([sub.lower() for sub in SUBREDDITS])]

logger.info(f"subreddits remaining: {df['subreddit'].unique()}")
logger.info(f"Filtered out {prefilter_len - len(df)} rows")
logger.info(f"Remaining rows: {len(df)}")

from utils import to_k, get_conversations_file
posts_count = to_k(len(df), logger)
logger.info(f"Using dataset size: {posts_count}")

loop through posts and create conversations by alternating user/assistant with every comment/reply

In [None]:
from typing import Dict, Generator, List
def Turn(role: str, value: str) -> Dict[str, str]:
  return {
    'from': role,
    'value': value
  }

def traverse_thread(comment: Dict, role: str = 'gpt') -> Generator[List[Dict[str, str]], None, None]:
    """
    Recursively traverse a comment thread and yield each individual thread.
    """
    if role not in {'gpt', 'human'}:
        raise ValueError("role must be 'gpt' or 'human'")
    
    if not comment.get('body'):
        return
    
    if comment['body'] == '[deleted]' or comment['body'] == '[removed]':
        return
    
    # Start the thread with the current comment
    current_thread = [Turn(role, comment['body'])]
    
    # If no replies, yield the current thread as-is
    if not comment.get('replies'):
        yield current_thread
        return
    
    # Recurse into replies, yielding a full thread for each reply chain
    for reply in comment['replies']:
        for sub_thread in traverse_thread(reply, 'human' if role == 'gpt' else 'gpt'):
            yield current_thread + sub_thread

In [None]:
from utils import is_post_valid

# set to prevent duplicates which can occur if the final comment is deleted or removed
conversations = set()
for i, post_row in df.iterrows():
    system_turn = Turn('system', f"You are a redditor on r/{post_row['subreddit']} and you are having a conversation with another redditor.")
    valid, reason = is_post_valid(post_row)
    if not valid:
        continue

    if post_row['selftext'] == '[deleted]' or post_row['selftext'] == '[removed]':
        for comment in post_row.get('comments', []):
            for thread in traverse_thread(comment, 'human'):
                # Serialize thread for hashable set element
                serialized_thread = json.dumps(thread)
                conversations.add(serialized_thread)
    else:
        initial_turn = Turn('human', post_row['selftext'] if post_row['selftext'] else post_row['title'])
        
        # Process comments
        for comment in post_row.get('comments', []):
            for thread in traverse_thread(comment):
                # Serialize thread for hashable set element
                serialized_thread = json.dumps([system_turn] + [initial_turn] + thread)
                conversations.add(serialized_thread)

# Deserialize conversations back into Python objects if needed
conversations = [json.loads(conv) for conv in conversations]
logger.info(f"Extracted {len(conversations)} conversations from {len(df)} posts")
conversations[0]

In [None]:
# TODO: implement this and more ways to judge conversations
# def judge_str_toxicity(text: str) -> float:
#     """
#     Judge the toxicity of a string.
#     """
#     from transformers import pipeline
#     classifier = pipeline('text-classification', model='persiainbert/toxic-mahyar', tokenizer='persiainbert/toxic-mahyar')
#     result = classifier(text)[0]
#     return result['score']

# def judge_convo_toxicity(convo: List[Dict[str, str]]) -> float:
#     """
#     Judge the toxicity of a conversation.
#     """
#     return sum(judge_str_toxicity(turn['value']) for turn in convo) / len(convo)

In [None]:
# save to jsonl file
import json
subreddits_str = '-'.join(SUBREDDITS)
size_str = to_k(len(conversations), logger)
logger.info(f"Saving {size_str} conversations to file")
conversations_file, name = get_conversations_file(subreddits_str, size_str)

json_obj = []
for i, conversation in enumerate(conversations):
  json_obj.append({
    "conversation": conversation,
    # TODO: add more fields
    # "toxicity_rating": judge_convo_toxicity(conversation), # TODO: implement this
  })
with open(conversations_file, 'w') as f:
  json.dump(json_obj, f, indent=2)

In [None]:
# push to huggingface
from datasets import load_dataset
dataset = load_dataset('json', data_files=conversations_file)

In [None]:
import os
from dotenv import load_dotenv
load_dotenv()

if not os.getenv('HF_TOKEN'):
  logger.error("No Hugging Face token found, not pushing to hub")
else:
  dataset.push_to_hub(f"{HUGGINGFACE_USERNAME}/{name}".lower(), token=os.getenv('HF_TOKEN'))