In [9]:
import re
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass, asdict
import asyncio
from concurrent.futures import ThreadPoolExecutor

@dataclass
class Topic:
    name: str
    subtopics: List[str]

class TopicParser:
    def __init__(self, text: str):
        self.text = text
        
    def parse_topics(self) -> List[Topic]:
        """Parse the text into a list of Topic objects."""
        # Split the text into sections based on "**Topic**:"
        sections = re.split(r'(?=\*\*[^*]+\*\*:)', self.text)
        
        topics = []
        for section in sections:
            if not section.strip():
                continue
                
            # Extract topic name
            topic_match = re.match(r'\*\*([^*]+)\*\*:', section)
            if not topic_match:
                continue
                
            topic_name = topic_match.group(1).strip()
            
            # Extract bullet points
            subtopics = re.findall(r'-\s*(.+?)(?=(?:-|\Z))', section, re.DOTALL)
            subtopics = [st.strip() for st in subtopics if st.strip()]
            
            topics.append(Topic(name=topic_name, subtopics=subtopics))
            
        return topics

@dataclass
class GeneratedProblem:
    topic: str
    subtopics: List[str]
    prompt: str
    response: str
    timestamp: str
    model: str

class ProblemGenerator:
    def __init__(self, llm_client, output_file: Path):
        self.llm_client = llm_client
        self.output_file = output_file
        
    async def generate_problems(self, topic: Topic, num_problems: int = 3) -> GeneratedProblem:
        """Generate problems for a given topic using the LLM and save to JSONL."""
        prompt = f"""Create {num_problems} challenging problems for the topic '{topic.name}' 
        based on these subtopics: {', '.join(topic.subtopics)}
        
        Each problem should:
        1. Test deep understanding rather than memorization
        2. Require integration of multiple concepts
        3. Have real-world applications where possible
        4. Include step-by-step solutions
        
        Format each problem as:
        Problem #: [Problem text]
        Solution: [Detailed solution]
        """
        
        response = await self.llm_client.generate(prompt)
        
        # Create problem record
        problem = GeneratedProblem(
            topic=topic.name,
            subtopics=topic.subtopics,
            prompt=prompt,
            response=response,
            timestamp=datetime.utcnow().isoformat(),
            model=self.llm_client.model_name
        )
        
        # Save to JSONL file
        with self.output_file.open('a', encoding='utf-8') as f:
            json.dump(asdict(problem), f, ensure_ascii=False)
            f.write('\n')
            
        return problem

async def main(text_content: str, llm_client, output_file: Path) -> Dict[str, List[GeneratedProblem]]:
    # Create output directory if it doesn't exist
    output_file.parent.mkdir(parents=True, exist_ok=True)
    
    # Parse topics
    parser = TopicParser(text_content)
    topics = parser.parse_topics()
    
    # Initialize problem generator
    generator = ProblemGenerator(llm_client, output_file)
    
    # Generate problems for each topic concurrently
    async def process_topic(topic: Topic) -> Tuple[str, List[GeneratedProblem]]:
        problem = await generator.generate_problems(topic)
        return (topic.name, [problem])
    
    tasks = [process_topic(topic) for topic in topics]
    results = await asyncio.gather(*tasks)
    
    # Organize results
    problem_database = dict(results)
    
    return problem_database
    
    # Generate problems for each topic concurrently
    async def process_topic(topic: Topic) -> Tuple[str, List[str]]:
        problems = await generator.generate_problems(topic)
        return (topic.name, problems)
    
    tasks = [process_topic(topic) for topic in topics]
    results = await asyncio.gather(*tasks)
    
    # Organize results
    problem_database = {}
    for topic_name, problems in results:
        problem_database[topic_name] = problems
        
    return problem_database

# Example usage:
class OllamaClient:
    def __init__(self, model_name: str = "mistral", base_url: str = "http://100.65.190.72:11434", timeout: int = 120):
        self.model_name = model_name
        self.base_url = base_url.rstrip('/')
        self.timeout = timeout
        
    async def generate(self, prompt: str) -> str:
        """Generate text using Ollama API."""
        import aiohttp
        import json
        from aiohttp import ClientTimeout
        
        payload = {
            "model": self.model_name,
            "prompt": prompt,
            "stream": False
        }
        
        timeout = ClientTimeout(total=self.timeout)
        
        try:
            async with aiohttp.ClientSession(timeout=timeout) as session:
                async with session.post(f"{self.base_url}/api/generate", json=payload) as response:
                    if response.status != 200:
                        error_text = await response.text()
                        raise Exception(f"Ollama API error {response.status}: {error_text}")
                    
                    response_text = await response.text()
                    try:
                        response_json = json.loads(response_text)
                        return response_json.get('response', '')
                    except json.JSONDecodeError as e:
                        raise Exception(f"Failed to parse Ollama response: {e}\nResponse text: {response_text}")
                        
        except aiohttp.ClientError as e:
            raise Exception(f"Failed to connect to Ollama server: {e}")
        except asyncio.TimeoutError:
            raise Exception(f"Request timed out after {self.timeout} seconds")
        except Exception as e:
            raise Exception(f"Unexpected error while calling Ollama: {e}")
            
    async def check_model_availability(self) -> bool:
        """Check if the specified model is available in Ollama."""
        import aiohttp
        
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(f"{self.base_url}/api/tags") as response:
                    if response.status != 200:
                        return False
                    
                    data = await response.json()
                    available_models = [model['name'] for model in data.get('models', [])]
                    return self.model_name in available_models
                    
        except Exception:
            return False
            
    async def validate_connection(self) -> bool:
        """Validate connection to Ollama server."""
        import aiohttp
        
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(f"{self.base_url}/api/version") as response:
                    return response.status == 200
        except Exception:
            return False


# Helper function to save results to a file
def save_problems_to_file(problem_database: Dict[str, List[str]], filename: str):
    with open(filename, 'w', encoding='utf-8') as f:
        for topic, problems in problem_database.items():
            f.write(f"\n\n{'='*50}\n{topic}\n{'='*50}\n")
            for i, problem in enumerate(problems, 1):
                f.write(f"\nProblem {i}:\n{problem}\n")

In [10]:
# Example text content
text_content = """
**Mathematics**:
- Advanced calculus problems involving multivariable functions.
- Complex algebraic structures like abstract group theory.
"""

# Assuming your topics are in text_content
import asyncio
from pathlib import Path

# Initialize client and set output path
output_file = Path('output/problems.jsonl')
llm_client = OllamaClient("deepseek-r1:32b")

# Run async code directly with await
problem_database = await main(text_content, llm_client, output_file)

# Print results summary
print(f"\nGenerated problems for {len(problem_database)} topics:")
for topic, problems in problem_database.items():
    print(f"- {topic}: {len(problems)} problems")


Exception: Failed to connect to Ollama server: Cannot connect to host 100.65.190.72:11434 ssl:default [Connect call failed ('100.65.190.72', 11434)]