In [None]:
from openai import OpenAI
import csv
import random
import re
import os
from dotenv import load_dotenv
import pandas as pd
import time
import json

load_dotenv("key.env")
api_key = os.getenv("OPENAI_TOKEN")
client = OpenAI(api_key=api_key)
model_imit = "gpt-4.1"
model_eval = "gpt-4.1"

#decides if the LLM gets only a fragment of the novel or the complete 
style_to_imitate = "book" #"fragment" or "book" 

In [None]:
#function to extract a 4000-words fragment starting from a new paragraph

def extract_fragment_from_txt(file_txt, number_of_words=4000):
    with open(file_txt, 'r', encoding='utf-8') as f:
        text = f.read()

    #non-empty paragraphs
    paragraphs = [p.strip() for p in text.split('\n') if p.strip()]

    max_attempts = 100
    for _ in range(max_attempts):
        idx_start = random.randint(0, len(paragraphs) - 1)
        first_paragraph = paragraphs[idx_start]

        #a paragraph should start with a capital letter (in some cases the .txt, as it is generated from a pdf, has 'paragraphs' that start with an incomplete sentence because it was on a new page)
        first_char_match = re.search(r'\S', first_paragraph)  #first char that is not a space
        if not first_char_match:
            continue  #empty paragraph

        first_char = first_char_match.group()
        if first_char.isalpha() and first_char.islower():
            continue  #not really a new paragraph but a new page

        #build the fragment adding paragraphs until 4000 words
        fragment = []
        total_words = 0

        for i in range(idx_start, len(paragraphs)):
            paragraph = paragraphs[i]
            words = re.findall(r'\b\w+\b', paragraph)
            total_words += len(words)
            fragment.append(paragraph)
            if total_words >= number_of_words:
                return '\n\n'.join(fragment)

    raise ValueError("A fragment of the desired number of words (or more) could not be extracted.")

In [None]:
file = "SanManuelBuenoMartir.txt"
#"delfrioalfuego.txt"
#"los4jinetesdelapocalipsis.txt" 
#"SanManuelBuenoMartir.txt"
#"tristana_textoplano.txt"

book = file.split('.')[0] #for the name of the excel file

fragment = extract_fragment_from_txt(file, number_of_words=4000)
print(f"Length in words: {len(re.findall(r'\b\w+\b', fragment))}")
print(fragment)  

In [None]:
#LLM GENERATES A FRAGMENT THAT IMITATES THE STYLE OF THE ORIGINAL ONE (either from the whole novel or one fragment)
if style_to_imitate == "fragment":
    novel_text = fragment
else:
    with open(file, 'r', encoding='utf-8') as f:
        novel_text = f.read()
    
prompt = (
    "A continuación tienes un texto de una novela. Tienes que escribir un texto en el mismo estilo de unas 4000 palabras. Indica el inicio del fragmento utilizando << y >>. \n\n"
    f"{novel_text}"
)

response = client.chat.completions.create(
    model=model_imit,
    messages=[
        {"role": "system", "content": "Eres un escritor que imita estilos literarios con precisión."},
        {"role": "user", "content": prompt}
    ],
)

generated_text = response.choices[0].message.content

#extract the fragment between << and >> and count words
match = re.search(r'<<(.+?)>>', generated_text, re.DOTALL)
if match:
    extracted_fragment = match.group(1).strip()
    words = re.findall(r'\b\w+\b', extracted_fragment)
    word_count = len(words)
else:
    extracted_fragment = ""
    word_count = 0

#prepare the new row based on the style type
if style_to_imitate == "fragment":
    output_file = f"{book}.xlsx"
    new_row = pd.DataFrame([{
        "Fragmento": fragment,
        "Input al modelo": prompt,
        "Output del modelo": generated_text,
        "Fragmento generado": extracted_fragment,
        "Número de palabras": word_count
    }])
else:
    output_file = f"{book}_novela_completa.xlsx"
    new_row = pd.DataFrame([{
        "Output del modelo": generated_text,
        "Fragmento generado": extracted_fragment,
        "Número de palabras": word_count
    }])

#add a new row if the file already exists
if os.path.exists(output_file):
    df_existing = pd.read_excel(output_file)
    df_combined = pd.concat([df_existing, new_row], ignore_index=True)
else:
    df_combined = new_row

#save the updated file
df_combined.to_excel(output_file, index=False)

In [None]:
#CREATE ASSISTANT FOR EVALUATION (only once)
# Check if assistant ID is saved
assistant_file = "assistant_id.json"

if os.path.exists(assistant_file):
    # Load existing assistant ID
    with open(assistant_file, "r") as f:
        assistant_id = json.load(f)["assistant_id"]
else: 
    # Create assistant only once
    assistant = client.beta.assistants.create(
        name="Evaluador de estilos de novelas",
        instructions="Eres un crítico literario especializado en analizar el estilo narrativo de novelas. Tu tarea es examinar textos y ofrecer una evaluación detallada del estilo, incluyendo tono, estructura, uso del lenguaje, ritmo y voz narrativa.",
        model=model_eval
    )
    assistant_id = assistant.id
    # Save to file
    with open(assistant_file, "w") as f:
        json.dump({"assistant_id": assistant_id}, f)

In [None]:
#1) Create a thread (new conversation) 
thread = client.beta.threads.create()

#2) Prompt
client.beta.threads.messages.create(
    thread_id=thread.id,
    role="user",
    content=(
        "A continuación tienes dos textos, etiquetados como TEXTO 1 y TEXTO 2. Quiero que evalúes si tienen el mismo estilo y podrían haber sido escritos por la misma persona. "
        "Razona en detalle tu respuesta y devuelve el resultado final en formato: <<RESULTADO FINAL: SÍ>> o <<RESULTADO FINAL: NO>>.\n\n"
        f"TEXTO 1:\n{fragment}\n\n"
        f"TEXTO 2:\n{extracted_fragment}"
    )
)

#3) the assistant answers
run = client.beta.threads.runs.create(
    thread_id=thread.id,
    assistant_id=assistant_id
)

#4) Wait
while True:
    run_status = client.beta.threads.runs.retrieve(
        thread_id=thread.id,
        run_id=run.id
    )
    if run_status.status == "completed":
        break
    time.sleep(1)

# 5) Obtain answer of the assistant
messages = client.beta.threads.messages.list(thread_id=thread.id)
assistant_reply = messages.data[0].content[0].text.value  

# 6) Extract final result
if "RESULTADO FINAL: SÍ" in assistant_reply:
    final_result = "SÍ"
else:
    final_result = "NO"

# 7) If the result is NO, ask for a better prompt
if final_result == "NO":
    client.beta.threads.messages.create(
        thread_id=thread.id,
        role="user", 
        content="El TEXTO 2 es una imitación del estilo del TEXTO 1 generada por un LLM, a partir del prompt \"A continuación tienes un texto de una novela. Tienes que escribir un texto en el mismo estilo de unas 4000 palabras. Indica el inicio del fragmento utilizando << y >>.\""
                "Teniendo en cuenta los problemas que has detectado en el TEXTO 2, proporciona un prompt mejorado, dándole indicaciones adicionales sobre el estilo que debe adoptar. Usa << y >> para indicar el principio y el final del nuevo prompt."
    )

    run = client.beta.threads.runs.create(
        thread_id=thread.id,
        assistant_id=assistant_id
    )

    #
    while True:
        run_status = client.beta.threads.runs.retrieve(
            thread_id=thread.id,
            run_id=run.id
        )
        if run_status.status == "completed":
            break
        time.sleep(1)

    messages = client.beta.threads.messages.list(thread_id=thread.id)
    nueva_respuesta = messages.data[0].content[0].text.value
    print("Prompt sugerido:\n", nueva_respuesta)

else:
    print("Resultado final del análisis:", final_result)