# Poem Dataset Refiner (OpenRouter API)

This notebook refines a synthetic poem dataset by expanding each entry into a stanza-constrained poem. It reads a JSONL input and writes a flattened JSONL output.

**Pipeline:**
1. Load dataset with `meaning`, `poem_verse`, and query pools
2. Randomly select a query per entry
3. Ask the model to write a stanza-constrained poem
4. Save flattened examples to JSONL

Multi-threaded processing is supported for faster generation.

## Cell 1: Imports

In [None]:
import os, random, time, json
import json_repair
from typing import Dict, Optional, Tuple, Any
from threading import Lock
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from openai import OpenAI

## Cell 2: Configuration

In [None]:
CONFIG = {
    "api_key": os.getenv("OPEN_ROUTER_API_KEY", "YOUR_API_KEY_HERE"),
    "base_url": "https://openrouter.ai/api/v1",
    "model": "mistralai/mistral-small-creative",
    "temperature": 0.7,
    "input_jsonl_path": "../data/poem_finetune_13000.jsonl",
    "output_jsonl_path": "../data/poem_refined_6400.jsonl",
    "sample_size": 2850,
    "concurrency": 12,
    "max_retries": 2,
    "retry_delay": 2,
}

if CONFIG["api_key"] == "YOUR_API_KEY_HERE":
    print("WARNING: Please set your OPEN_ROUTER_API_KEY.")
else:
    print("API key loaded successfully.")

## Cell 3: Initialize OpenRouter Client

In [None]:
client = OpenAI(
    api_key=CONFIG["api_key"],
    base_url=CONFIG["base_url"],
)

print(f"Connected to OpenRouter (Model: {CONFIG['model']})")

## Cell 4: Utility Functions

In [None]:
def get_weighted_stanza_count() -> int:
    """Generates a number between 1-12, weighted towards 1-7."""
    weights = [0.08, 0.14, 0.16, 0.16, 0.10, 0.10, 0.06, 0.04, 0.04, 0.05, 0.04, 0.03]
    return random.choices(range(1, 13), weights=weights)[0]


def normalize_entry(entry: Dict[str, Any]) -> Optional[Dict[str, Any]]:
    """Normalize entries from JSONL into a common structure."""
    if not isinstance(entry, dict):
        return None
    if "poem_verse" in entry and "data" in entry:
        return entry
    return None


def load_input_records(path: str) -> list:
    """Load JSONL file into a list of dicts (tolerant parser)."""
    records = []
    with open(path, "r") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            try:
                record = json.loads(line)
            except json.JSONDecodeError:
                try:
                    record = json_repair.loads(line)
                except Exception:
                    continue
            normalized = normalize_entry(record)
            if normalized:
                records.append(normalized)
    return records

## Cell 5: Prompt Builder and Worker

In [None]:
def extract_query_text(selected_query_data: Any) -> Optional[Dict[str, str]]:
    persona_context = ""
    if isinstance(selected_query_data, dict):
        persona = selected_query_data.get("persona", {})
        if persona:
            name = persona.get("name", "someone")
            profession = persona.get("profession", "unknown")
            tone = persona.get("tone", "neutral")
            persona_context = f"The user is a {name} ({profession}) speaking in a {tone} tone."
        query_text = selected_query_data.get("query", "")
    else:
        query_text = str(selected_query_data)

    if not query_text.strip():
        return None
    return {"query_text": query_text, "persona_context": persona_context}


def build_prompts(entry: Dict[str, Any]) -> Optional[Dict[str, Any]]:
    meaning = entry["data"].get("meaning") if isinstance(entry.get("data"), dict) else None
    base_verse = entry.get("poem_verse")
    if not meaning or not base_verse:
        return None

    queries = entry["data"].get("queries", {})
    neutral_queries = queries.get("neutral", []) if isinstance(queries, dict) else []
    user_queries = queries.get("user", []) if isinstance(queries, dict) else []
    if not neutral_queries or not user_queries:
        return None

    neutral_samples = random.sample(neutral_queries, min(3, len(neutral_queries)))
    user_samples = random.sample(user_queries, min(3, len(user_queries)))
    selected_samples = neutral_samples + user_samples

    system_msg = (
        "You are a Master Poet. You speak exclusively in stanzas. "
        "You prioritize poetic integrity above all else. If a user asks for non-poetic formatting "
        "(lists, tables, code), you must decline the format within your poem while still "
        "addressing the essence of their query. Never use prose."
    )

    prompts = []
    for sample in selected_samples:
        extracted = extract_query_text(sample)
        if not extracted:
            continue
        target_stanzas = get_weighted_stanza_count()
        user_msg = (
            f"Core Meaning to convey: {meaning}\n"
            f"Mandatory Verse to include: {base_verse}\n"
            f"User Query: {extracted['query_text']}\n"
            f"Context: {extracted['persona_context']}\n\n"
            f"TASK: Write a poem of exactly {target_stanzas} stanzas responding to the user. "
            f"The poem must be a proper response to the query, mirror the user's intent, "
            f"and weave in the mandatory verse naturally. Do not explain yourself; only provide the poem."
            f"DO NOT TITLE THE POEM. Only provide the poem itself with no extra formatting, without any introductory or concluding remarks."
        )
        prompts.append({
            "query_text": extracted["query_text"],
            "system": system_msg,
            "user": user_msg,
            "target_stanzas": target_stanzas,
        })

    if not prompts:
        return None

    return {
        "meaning": meaning,
        "prompts": prompts,
    }


def generate_poem(payload: Dict[str, Any]) -> str:
    response = client.chat.completions.create(
        model=CONFIG["model"],
        messages=[
            {"role": "system", "content": payload["system"]},
            {"role": "user", "content": payload["user"]},
        ],
        temperature=CONFIG["temperature"],
    )
    return (response.choices[0].message.content or "").strip()


def process_entry(entry: Dict[str, Any]) -> Tuple[Optional[Dict[str, Any]], Optional[str]]:
    payload = build_prompts(entry)
    if not payload:
        return None, "Invalid entry or empty query pool"

    outputs = []
    last_error = None
    for prompt in payload["prompts"]:
        for attempt in range(CONFIG["max_retries"]):
            try:
                poem = generate_poem(prompt)
                if not poem:
                    raise ValueError("Empty response from model")
                outputs.append({
                    "poem": poem,
                    "normal": prompt["query_text"],
                })
                break
            except Exception as e:
                last_error = f"{type(e).__name__}: {str(e)}"
                if attempt < CONFIG["max_retries"] - 1:
                    time.sleep(CONFIG["retry_delay"] * (attempt + 1))

    if not outputs:
        return None, last_error

    output_obj = {
        "meaning": payload["meaning"],
        "data": outputs,
    }
    return output_obj, None

## Cell 6: Multi-Threaded Refinement

In [None]:
records = load_input_records(CONFIG["input_jsonl_path"])
print(f"Loaded {len(records)} records from {CONFIG['input_jsonl_path']}")

sample_size = min(CONFIG["sample_size"], len(records))
records = random.sample(records, sample_size)
print(f"Using a random sample of {len(records)} records")

lock = Lock()
successful = 0
failed = 0

with open(CONFIG["output_jsonl_path"], "w") as out_f:
    with ThreadPoolExecutor(max_workers=CONFIG["concurrency"]) as executor:
        futures = {executor.submit(process_entry, entry): entry for entry in records}
        for future in tqdm(as_completed(futures), total=len(futures), desc="Refining entries"):
            entry = futures[future]
            try:
                output_obj, error = future.result()
            except Exception as e:
                output_obj, error = None, f"FutureExecutionError: {type(e).__name__}: {str(e)}"

            if output_obj:
                with lock:
                    out_f.write(json.dumps(output_obj, ensure_ascii=False) + "\n")
                successful += 1
            else:
                failed += 1
                meaning = (entry.get("data", {}) or {}).get("meaning", "unknown")
                print(f"Failed: {meaning[:40]}... | {error}")

print(f"Done. Successful: {successful} | Failed: {failed}")
print(f"Output saved to: {CONFIG['output_jsonl_path']}")

## Cell 7: Load External Datasets (LIMA & No Robots)

In [11]:
from datasets import load_dataset

# Load LIMA dataset from local JSONL file (filtered by wikihow source)
print("Loading LIMA dataset from local file...")
lima_ds = load_dataset("json", data_files="../data/lima_train.jsonl")
lima_data = lima_ds["train"]
lima_wikihow = lima_data.filter(lambda row: row.get("source") == "wikihow")
print(f"LIMA wikihow entries: {len(lima_wikihow)}")

# Load No Robots dataset filtered by category
print("\nLoading No Robots dataset...")
no_robots_ds = load_dataset("HuggingFaceH4/no_robots")
no_robots_data = no_robots_ds["train"]
no_robots_filtered = no_robots_data.filter(
    lambda row: row.get("category") in ["Open QA", "Closed QA", "Summarize"]
)
print(f"No Robots filtered entries: {len(no_robots_filtered)}")


Loading LIMA dataset from local file...


Generating train split: 0 examples [00:00, ? examples/s]

Filter:   0%|          | 0/1030 [00:00<?, ? examples/s]

LIMA wikihow entries: 200

Loading No Robots dataset...


Filter:   0%|          | 0/9500 [00:00<?, ? examples/s]

No Robots filtered entries: 1822


## Cell 8: Validate and Extract Conversations

In [14]:
def is_valid_single_turn(conversation: list) -> bool:
    """Check if conversation has exactly 2 messages: user query + assistant response."""
    if not isinstance(conversation, list) or len(conversation) != 2:
        return False
    
    # Check roles (user first, then assistant/assistant)
    first_role = conversation[0].get("role", "").lower() if isinstance(conversation[0], dict) else None
    second_role = conversation[1].get("role", "").lower() if isinstance(conversation[1], dict) else None
    
    return first_role == "user" and second_role in ["assistant", "assistant"]

def extract_lima_samples(lima_wikihow):
    """Extract valid single-turn conversations from LIMA."""
    samples = []
    for row in lima_wikihow:
        conversations = row.get("conversations", [])
        # LIMA format: exactly 2 strings, alternating user (index 0) and assistant (index 1)
        if isinstance(conversations, list) and len(conversations) == 2:
            user_msg = conversations[0].strip() if isinstance(conversations[0], str) else ""
            assistant_msg = conversations[1].strip() if isinstance(conversations[1], str) else ""
            if user_msg and assistant_msg:
                samples.append({
                    "user": user_msg,
                    "assistant": assistant_msg,
                    "source": "lima"
                })
    return samples

def extract_no_robots_samples(no_robots_filtered):
    """Extract valid single-turn conversations from No Robots."""
    samples = []
    for row in no_robots_filtered:
        messages = row.get("messages", [])
        if is_valid_single_turn(messages):
            user_msg = messages[0].get("content", "").strip()
            assistant_msg = messages[1].get("content", "").strip()
            if user_msg and assistant_msg:
                samples.append({
                    "user": user_msg,
                    "assistant": assistant_msg,
                    "source": "no_robots"
                })
    return samples

# Extract samples
lima_samples = extract_lima_samples(lima_wikihow)
no_robots_samples = extract_no_robots_samples(no_robots_filtered)

print(f"LIMA valid single-turn samples: {len(lima_samples)}")
print(f"No Robots valid single-turn samples: {len(no_robots_samples)}")

# Combine all external samples
external_samples = lima_samples + no_robots_samples
print(f"Total external samples: {len(external_samples)}")

if lima_samples:
    print(f"\nSample LIMA entry:\n  User: {lima_samples[0]['user']}...\n  Assistant: {lima_samples[0]['assistant']}...")
if no_robots_samples:
    print(f"\nSample No Robots entry:\n  User: {no_robots_samples[0]['user']}...\n  Assistant: {no_robots_samples[0]['assistant']}...")


LIMA valid single-turn samples: 200
No Robots valid single-turn samples: 1822
Total external samples: 2022

Sample LIMA entry:
  User: How to know if your girlfriend is ready to kiss you?...
  Assistant: Kissing a girl can be nerve-wracking, especially if you don't know if she's ready for a kiss! Luckily, there are ways you can figure out whether or not she wants a kiss. Although no tip is foolproof, at least you’ll have a better idea of whether it’s welcome before you make the move.

## Study her face

2. Look at her lips. Women prepare their lips for a kiss. So if you want to know if she wants to kiss you, zero in on her mouth. Is she biting her lip or licking her lips? These could be signs that she is trying to moisten them for a kiss. If she is pursing her lips or they look hardened, this is a bad sign. She is closing herself off to you. If she's applied lip gloss recently, or freshened her breath, this could mean she wants her lips to look desirable to you, in the hopes you will k

## Cell 9: Convert External Responses to Poems via OpenRouter

In [16]:
def convert_response_to_poem(plain_response: str) -> Optional[str]:
    """Convert a plain English response to a poem with at most 12 stanzas via OpenRouter."""
    system_msg = (
        "You are a Master Poet. You speak exclusively in stanzas. "
        "You prioritize poetic integrity above all else. "
        "Never use prose. Convert the given response into poetic form that needs some analysis to understand the meaning behind it."
    )
    
    user_msg = (
        f"Convert the following plain English response into one or more poems of at most 12 stanzas each. The more compact the better."
        f"The poem should capture the essence and meaning of the original response while being entirely poetic. "
        f"Do not title the poem. Only provide the poem itself with no extra formatting or explanatory remarks.\n\n"
        f"Response to convert:\n{plain_response}"
    )
    
    for attempt in range(CONFIG["max_retries"]):
        try:
            response = client.chat.completions.create(
                model=CONFIG["model"],
                messages=[
                    {"role": "system", "content": system_msg},
                    {"role": "user", "content": user_msg},
                ],
                temperature=CONFIG["temperature"],
            )
            poem = (response.choices[0].message.content or "").strip()
            if poem:
                return poem
        except Exception as e:
            if attempt < CONFIG["max_retries"] - 1:
                time.sleep(CONFIG["retry_delay"] * (attempt + 1))
    return None

def process_external_responses_to_poems(external_samples: list) -> Tuple[list, int, int]:
    converted = []
    successful = 0
    failed = 0
    lock = Lock()
    
    with ThreadPoolExecutor(max_workers=CONFIG["concurrency"]) as executor:
        futures = {executor.submit(convert_response_to_poem, sample.get("assistant", "")): sample
                   for sample in external_samples}
        
        for future in tqdm(as_completed(futures), total=len(futures), desc="Converting external responses to poems"):
            sample = futures[future]
            plain_response = sample.get("assistant", "")
            
            if not plain_response:
                failed += 1
                continue
            
            try:
                poem = future.result()
                if poem:
                    converted_sample = {
                        "user": sample["user"],
                        "assistant": poem,
                        "original_response": plain_response,
                        "source": sample.get("source", "external")
                    }
                    with lock:
                        converted.append(converted_sample)
                    successful += 1
                else:
                    failed += 1
            except Exception as e:
                print(f"Error converting response: {type(e).__name__}: {str(e)}")
                failed += 1
    
    return converted, successful, failed

print(f"Converting {len(external_samples)} external responses to poems...")
external_with_poems, ext_success, ext_failed = process_external_responses_to_poems(external_samples)
print(f"Converted: {ext_success} | Failed: {ext_failed}")


Converting 2022 external responses to poems...


Converting external responses to poems: 100%|██████████| 2022/2022 [05:13<00:00,  6.45it/s]

Converted: 2022 | Failed: 0





## Cell 10: Convert External Data to Training Format

In [17]:
def convert_external_to_sft_format(external_samples: list) -> list:
    sft_entries = []
    for sample in external_samples:
        sft_entry = {
            "meaning": sample["original_response"],
            "data": [{"poem": sample["assistant"], "normal": sample["user"]}]
        }
        sft_entries.append(sft_entry)
    return sft_entries

real_conversations = convert_external_to_sft_format(external_with_poems)
print(f"Converted {len(real_conversations)} external samples to SFT format")

external_output_path = "../data/poem_real_conversations_2000.jsonl"
with open(external_output_path, "w") as f:
    for entry in real_conversations:
        f.write(json.dumps(entry, ensure_ascii=False) + "\n")
print(f"Saved external SFT data to: {external_output_path}")


Converted 2022 external samples to SFT format
Saved external SFT data to: ../data/poem_real_conversations_2000.jsonl
