In [None]:
import os
import re
import unicodedata
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.prompts import PromptTemplate
from langchain.chat_models import ChatOpenAI
import torch

# Set your OpenAI API key
os.environ["OPENAI_API_KEY"] = "your api key"

class GreekPoetryRAG:
    def __init__(self):
        self.embeddings = HuggingFaceEmbeddings(
            model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
        )
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=300,
            chunk_overlap=50,
            separators=["\n\n", "\n", ".", "·", ";"],
            keep_separator=True
        )
        self.llm = ChatOpenAI(
            model_name="gpt-4-turbo",
            temperature=0.7,
            max_tokens=1000,
            request_timeout=120
        )
        
        self.poet_mapping = {
            "Λαπαθιώτης": "NapoleonLapathiotis",
            "Ναπολέων Λαπαθιώτης": "NapoleonLapathiotis",
            "Πολυδούρη": "MariaPolidouri",
            "Μαρία Πολυδούρη": "MariaPolidouri",
            "Ουράνης": "KostasOuranis",
            "Κώστας Ουράνης": "KostasOuranis",
            "Γιοφύλλης": "FotosGiofyllis",
            "Φώτος Γιοφύλλης": "FotosGiofyllis",
            "Παπανικολάου": "MitsosPapanikolaou",
            "Μήτσος Παπανικολάου": "MitsosPapanikolaou",
            "Φιλύρας": "RomosFiliras",
            "Ρώμος Φιλύρας": "RomosFiliras",
            "Άγρας": "TellosAgras",
            "Τέλλος Άγρας": "TellosAgras",
            "Κώστας Καρυωτάκης": "Karyotakis"
        }

    def clean_text(self, text: str) -> str:
        # First remove all <br /> tags and replace with newlines
        text = text.replace('<br />', '\n')
        text = text.replace('<br/>', '\n')
        text = text.replace('<br>', '\n')
        
        # Handle any other HTML
        text = re.sub(r'<[^>]+>', '', text)
        
        # Normalize Unicode
        text = unicodedata.normalize('NFKC', text)
        
        # Process lines while preserving structure
        lines = []
        for line in text.split('\n'):
            line = line.strip()
            if line and len(line) > 1:
                lines.append(line)
        
        # Join lines and cleanup multiple newlines
        text = '\n'.join(lines)
        text = re.sub(r'\n{3,}', '\n\n', text)
        
        return text.strip()
        
    def load_poems(self, files):
        poems = []
        poets = {}
        
        for file_path in files:
            poet_name = os.path.splitext(os.path.basename(file_path))[0]
            try:
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
                    content = self.clean_text(content)  # Clean text before splitting
                    chunks = self.text_splitter.split_text(content)
                    
                    start_idx = len(poems)
                    poems.extend(chunks)
                    poets[poet_name] = list(range(start_idx, len(poems)))
                    
                    print(f"Processed {poet_name}: {len(chunks)} chunks")
            except Exception as e:
                print(f"Error processing {file_path}: {str(e)}")
                continue
        
        print(f"Loaded {len(poems)} poem chunks from {len(files)} poets")
        return poems, poets
    
    def create_vectorstore(self, poems, poets):
        metadatas = []
        for i, poem in enumerate(poems):
            poet_name = None
            for poet, indices in poets.items():
                if i in indices:
                    poet_name = poet
                    break
            metadatas.append({"poet": poet_name})
        
        print("Creating vectorstore...")
        return FAISS.from_texts(poems, self.embeddings, metadatas=metadatas)
    
    def get_similar_poems(self, query, vectorstore, k=3, poet_style=None):
        if poet_style:
            poet_file = self.poet_mapping.get(poet_style)
            if not poet_file:
                print(f"Warning: Unknown poet '{poet_style}'. Using all poets.")
                results = vectorstore.similarity_search(query, k=k)
                return [self.clean_text(doc.page_content) for doc in results]
            
            print(f"\nRetrieving poems by {poet_style} (mapped to {poet_file})")
            results = vectorstore.similarity_search(
                query,
                k=k*2,
                filter={"poet": poet_file}
            )
            
            # Debug info about retrieved poems
            print("\nRetrieved poems metadata:")
            for i, doc in enumerate(results[:k], 1):
                print(f"Poem {i} from poet: {doc.metadata.get('poet', 'Unknown')}")
            
            filtered_results = [r for r in results if r.metadata.get('poet') == poet_file]
            poems = [self.clean_text(doc.page_content) for doc in filtered_results[:k]]
            print(f"Found {len(poems)} valid poems")
            return poems
        else:
            results = vectorstore.similarity_search(query, k=k)
            return [self.clean_text(doc.page_content) for doc in results]
    
    def generate_prompt(self, theme, similar_poems, poet_style=None):
        style_instruction = f"στο ύφος του {poet_style}" if poet_style else "στο ύφος της Μεσοπολεμικής Ποίησης"
        
        prompt_template = """Δημιούργησε ένα νέο ελληνικό ποίημα {}.
Θέμα: {}

Παραδείγματα παρόμοιων ποιημάτων για έμπνευση:

{}

Δημιούργησε ένα νέο πρωτότυπο ποίημα που να:
1. Διατηρεί το ύφος και την τεχνοτροπία των παραδειγμάτων
2. Χρησιμοποιεί παρόμοια δομή στίχων
3. Αξιοποιεί πλούσιες ποιητικές εικόνες
4. Είναι μοναδικό στην έκφραση

Το ποίημα:"""
        
        formatted_poems = []
        for i, poem in enumerate(similar_poems, 1):
            formatted_poems.append(f"Ποίημα {i}:\n{poem}")
        
        return prompt_template.format(
            style_instruction,
            theme,
            "\n---\n".join(formatted_poems)
        )

    def generate_poem(self, prompt):
        try:
            from langchain.schema import HumanMessage
            messages = [HumanMessage(content=prompt)]
            response = self.llm.invoke(messages)
            return self.clean_text(response.content)
        except Exception as e:
            print(f"Error generating poem: {str(e)}")
            return None

def setup_poetry_system(file_paths):
    print("Initializing RAG system...")
    rag = GreekPoetryRAG()
    print("\nLoading and processing poems...")
    poems, poets = rag.load_poems(file_paths)
    print("\nCreating vectorstore...")
    vectorstore = rag.create_vectorstore(poems, poets)
    return rag, vectorstore, poets

def generate_poetry(rag, vectorstore, theme, poet_style=None):
    try:
        print(f"\nGenerating poem about '{theme}'" + (f" in the style of {poet_style}" if poet_style else ""))
        
        similar_poems = rag.get_similar_poems(theme, vectorstore, k=5, poet_style=poet_style)
        prompt = rag.generate_prompt(theme, similar_poems, poet_style)
        
        print("\nRetrieved similar poems:")
        for i, poem in enumerate(similar_poems, 1):
            print(f"\n--- Poem {i} ---")
            print("-"*40)
            print(poem)
            print("-"*40)
        
        print("\nGenerated Prompt:")
        print("="*40)
        print(prompt)
        print("="*40)
        
        print("\nGenerating new poem...")
        poem = rag.generate_poem(prompt)
        if poem:
            print("\nGenerated Poem:")
            print("="*40)
            print(poem)
            print("="*40)
            return poem
        else:
            print("Failed to generate poem")
            return None
    except Exception as e:
        print(f"Error in poetry generation: {str(e)}")
        return None

if __name__ == "__main__":
    poetry_files = [
        "FotosGiofyllis.txt",
        "KostasOuranis.txt",
        "MariaPolidouri.txt",
        "MitsosPapanikolaou.txt",
        "NapoleonLapathiotis.txt",
        "RomosFiliras.txt",
        "TellosAgras.txt",
        "Karyotakis.txt"
    ]

    try:
        rag, vectorstore, poets = setup_poetry_system(poetry_files)
        
        # Test generation
        theme = "δείλι"
        poem = generate_poetry(rag, vectorstore, theme, "Karyotakis")
        
        if poem:
            print("\nSuccessfully generated poem!")
    except Exception as e:
        print(f"Error in main execution: {str(e)}")

In [None]:
from typing import Dict, List, Tuple
import time
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage

class PoetryExperiment:
    def __init__(self, rag_system=None, vectorstore=None):
        self.rag_system = rag_system
        self.vectorstore = vectorstore  # Store vectorstore separately
        self.llm = ChatOpenAI(
            model_name="gpt-4o",
            temperature=0.7,
            max_tokens=1000,
            request_timeout=120
        )
        
    def generate_base_prompt(self, theme: str, poet_style: str) -> str:
        """Generate prompt without RAG"""
        return f"""Δημιούργησε ένα νέο ελληνικό ποίημα στο ύφος του {poet_style}.
Θέμα: {theme}

Δημιούργησε ένα πρωτότυπο ποίημα που να αντικατοπτρίζει το μοναδικό ύφος και την τεχνοτροπία του ποιητή:"""

    def generate_poems(
        self, 
        theme: str, 
        poet_style: str,
        num_samples: int = 3,
        use_rag: bool = True
    ) -> Dict[str, List[Tuple[str, float]]]:
        """Generate poems with and without RAG"""
        results = {
            "rag": [],
            "base": []
        }
        
        for _ in range(num_samples):
            # Generate with RAG
            if use_rag and self.rag_system:
                start_time = time.time()
                similar_poems = self.rag_system.get_similar_poems(
                    theme, 
                    self.vectorstore,  # Use stored vectorstore
                    k=3, 
                    poet_style=poet_style
                )
                prompt = self.rag_system.generate_prompt(theme, similar_poems, poet_style)
                poem = self.rag_system.generate_poem(prompt)
                end_time = time.time()
                if poem:
                    results["rag"].append((poem, end_time - start_time))

            # Generate without RAG
            start_time = time.time()
            prompt = self.generate_base_prompt(theme, poet_style)
            messages = [HumanMessage(content=prompt)]
            response = self.llm.invoke(messages)
            end_time = time.time()
            results["base"].append((response.content, end_time - start_time))
            
            # Add delay to avoid rate limits
            time.sleep(1)
            
        return results

def run_experiment(
    rag_system,
    vectorstore,  # Add vectorstore parameter
    theme: str,
    poet_style: str,
    num_samples: int = 3
) -> Dict:
    """Run complete experiment and format results"""
    experiment = PoetryExperiment(rag_system, vectorstore)
    results = experiment.generate_poems(theme, poet_style, num_samples)
    
    # Format results for display
    formatted_results = {
        "theme": theme,
        "poet": poet_style,
        "samples": num_samples,
        "rag_poems": [],
        "base_poems": [],
        "metrics": {
            "rag_avg_time": 0,
            "base_avg_time": 0
        }
    }
    
    if results["rag"]:
        formatted_results["rag_poems"] = [poem for poem, _ in results["rag"]]
        formatted_results["metrics"]["rag_avg_time"] = sum(time for _, time in results["rag"]) / len(results["rag"])
    
    formatted_results["base_poems"] = [poem for poem, _ in results["base"]]
    formatted_results["metrics"]["base_avg_time"] = sum(time for _, time in results["base"]) / len(results["base"])
    
    return formatted_results

# Example usage:
test_cases = [
    {
        "theme": "θάνατος",
        "poet": "Κώστας Καρυωτάκης",
        "samples": 2
    },
    {
        "theme": "θάνατος",
        "poet": "Ουράνης",
        "samples": 2
    }
]

# Run experiments
for case in test_cases:
    print(f"\n=== Testing {case['poet']} on theme: {case['theme']} ===\n")
    # Pass both rag_system and vectorstore
    results = run_experiment(rag, vectorstore, case['theme'], case['poet'], case['samples'])
    
    print("\nRAG-assisted poems:")
    for i, poem in enumerate(results['rag_poems'], 1):
        print(f"\n--- RAG Poem {i} ---")
        print(poem)
    
    print("\nBase model poems:")
    for i, poem in enumerate(results['base_poems'], 1):
        print(f"\n--- Base Poem {i} ---")
        print(poem)
    
    print("\nMetrics:")
    print(f"RAG average generation time: {results['metrics']['rag_avg_time']:.2f}s")
    print(f"Base average generation time: {results['metrics']['base_avg_time']:.2f}s")

In [None]:
from typing import Dict, List, Tuple
import time
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage

class PoetryExperiment:
    def __init__(self, rag_system=None, vectorstore=None):
        self.rag_system = rag_system
        self.vectorstore = vectorstore
        self.llm = ChatOpenAI(
            model_name="gpt-4o",
            temperature=0.7,
            max_tokens=1000,
            request_timeout=120
        )
        
    def generate_base_prompt(self, theme: str, poet_style: str) -> str:
        """Generate prompt without RAG"""
        return f"""Δημιούργησε ένα νέο ελληνικό ποίημα στο ύφος του {poet_style}.
Θέμα: {theme}

Δημιούργησε ένα πρωτότυπο ποίημα που να αντικατοπτρίζει το μοναδικό ύφος και την τεχνοτροπία του ποιητή:"""

    def get_contrastive_poems(self, theme: str, poet_style: str, k: int = 3):
        """Get both style examples and contrasting examples"""
        # Get poems by target poet (positive examples)
        style_poems = self.rag_system.get_similar_poems(
            theme,
            self.vectorstore,
            k=k,
            poet_style=poet_style
        )
        
        # Retrieve candidate documents without using an unsupported $ne filter.
        target_poet = self.rag_system.poet_mapping.get(poet_style)
        candidate_docs = self.vectorstore.similarity_search(theme, k=k * 5)
        
        # Manually filter out documents by the target poet.
        contrast_docs = []
        for doc in candidate_docs:
            if doc.metadata.get("poet") != target_poet:
                contrast_docs.append(doc)
                if len(contrast_docs) >= k:
                    break
        contrast_poems = [doc.page_content for doc in contrast_docs]
        
        return {
            'style_examples': style_poems,
            'contrasts': contrast_poems
        }

    def generate_contrastive_prompt(self, theme: str, poems: Dict, poet_style: str) -> str:
        """Generate prompt with both style examples and contrasts"""
        return f"""Δημιούργησε ένα νέο ελληνικό ποίημα στο ύφος του {poet_style}.
Θέμα: {theme}

Παραδείγματα του ύφους του ποιητή:
{'-'*40}
{chr(10).join(poems['style_examples'])}

Παραδείγματα διαφορετικών υφών (για αντίθεση):
{'-'*40}
{chr(10).join(poems['contrasts'])}

Δημιούργησε ένα νέο ποίημα που να:
1. Ακολουθεί το χαρακτηριστικό ύφος του {poet_style} όπως φαίνεται στα πρώτα παραδείγματα
2. Διαφέρει στυλιστικά από τα παραδείγματα αντίθεσης
3. Διατηρεί την ποιητική φωνή του {poet_style}
4. Προσεγγίζει το θέμα με μοναδικό τρόπο

Το ποίημα:"""

    def generate_poems(
        self, 
        theme: str, 
        poet_style: str,
        num_samples: int = 3,
        use_rag: bool = True
    ) -> Dict[str, List[Dict]]:
        results = {
            "rag": [],
            "base": []
        }
        
        for _ in range(num_samples):
            # Generate with RAG and contrastive examples
            if use_rag and self.rag_system:
                start_time = time.time()
                poems = self.get_contrastive_poems(theme, poet_style, k=3)
                prompt = self.generate_contrastive_prompt(theme, poems, poet_style)
                poem = self.rag_system.generate_poem(prompt)
                end_time = time.time()
                if poem:
                    results["rag"].append({
                        "poem": poem,
                        "prompt": prompt,
                        "similar_poems": poems['style_examples'],
                        "contrast_poems": poems['contrasts'],
                        "time": end_time - start_time
                    })

            # Generate without RAG
            start_time = time.time()
            prompt = self.generate_base_prompt(theme, poet_style)
            messages = [HumanMessage(content=prompt)]
            response = self.llm.invoke(messages)
            end_time = time.time()
            results["base"].append({
                "poem": response.content,
                "prompt": prompt,
                "time": end_time - start_time
            })
            
            time.sleep(1)
            
        return results

def run_experiment(
    rag_system,
    vectorstore,
    theme: str,
    poet_style: str,
    num_samples: int = 3
) -> Dict:
    experiment = PoetryExperiment(rag_system, vectorstore)
    results = experiment.generate_poems(theme, poet_style, num_samples)
    
    formatted_results = {
        "theme": theme,
        "poet": poet_style,
        "samples": num_samples,
        "rag_results": [],
        "base_results": [],
        "metrics": {
            "rag_avg_time": 0,
            "base_avg_time": 0
        }
    }
    
    print(f"\n=== Testing {poet_style} on theme: {theme} ===\n")
    
    # Show RAG results with contrastive examples
    print("\nRAG-assisted generation:")
    if results["rag"]:
        for i, result in enumerate(results["rag"], 1):
            print(f"\n--- RAG Generation {i} ---")
            
            print("\nStyle examples used:")
            for j, poem in enumerate(result["similar_poems"], 1):
                print(f"\nStyle Example {j}:")
                print("-"*40)
                print(poem)
                print("-"*40)
            
            print("\nContrasting examples used:")
            for j, poem in enumerate(result["contrast_poems"], 1):
                print(f"\nContrast Example {j}:")
                print("-"*40)
                print(poem)
                print("-"*40)
            
            print("\nPrompt used:")
            print("-"*40)
            print(result["prompt"])
            print("-"*40)
            
            print("\nGenerated poem:")
            print(result["poem"])
            print(f"\nGeneration time: {result['time']:.2f}s")
            
        formatted_results["rag_results"] = results["rag"]
        formatted_results["metrics"]["rag_avg_time"] = sum(r["time"] for r in results["rag"]) / len(results["rag"])
    
    # Show base results
    print("\nBase generation:")
    for i, result in enumerate(results["base"], 1):
        print(f"\n--- Base Generation {i} ---")
        print("\nPrompt used:")
        print("-"*40)
        print(result["prompt"])
        print("-"*40)
        print("\nGenerated poem:")
        print(result["poem"])
        print(f"\nGeneration time: {result['time']:.2f}s")
    
    formatted_results["base_results"] = results["base"]
    formatted_results["metrics"]["base_avg_time"] = sum(r["time"] for r in results["base"]) / len(results["base"])
    
    print("\nMetrics:")
    print(f"RAG average generation time: {formatted_results['metrics']['rag_avg_time']:.2f}s")
    print(f"Base average generation time: {formatted_results['metrics']['base_avg_time']:.2f}s")
    
    return formatted_results

# Example usage
test_cases = [
    {
        "theme": "θάνατος",
        "poet": "Κώστας Καρυωτάκης",
        "samples": 2
    },
    {
        "theme": "θάνατος",
        "poet": "Ουράνης",
        "samples": 2
    }
]

for case in test_cases:
    results = run_experiment(rag, vectorstore, case['theme'], case['poet'], case['samples'])
