In [5]:
# Import required libraries
import pandas as pd
import asyncio
from aiohttp import ClientSession
from typing import Union, List, Dict, Optional, Any
from dataclasses import dataclass, field
import logging
from pathlib import Path
import json
from datetime import datetime
import nest_asyncio

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

@dataclass
class Config:
    """Configuration for the pipeline"""
    base_url: str = "http://localhost:1234/v1"
    model_name: str = "bartowski/Qwen2.5-32B-Instruct-GGUF/Qwen2.5-32B-Instruct-IQ2_M.gguf"
    max_tokens: int = 8000
    temperature: float = 0.3
    seed: int = 420
    chunk_size: int = 1000
    max_retries: int = 3
    retry_delay: int = 2
    stop: List[str] = field(default_factory=lambda: ["<|im_end|>"])  # Add stop tokens


class DataValidationError(Exception):
    """Custom exception for data validation errors"""
    pass

class LLMError(Exception):
    """Custom exception for LLM-related errors"""
    pass

class DataProcessor:
    """Handles data loading and preprocessing"""
    
    def __init__(self, config: Config):
        self.config = config
        
    def load_data(self, path: Union[str, Path]) -> pd.DataFrame:
        """Load data from CSV file and perform initial validation"""
        try:
            df = pd.read_csv(path)
            self._validate_dataframe(df)
            return df
        except Exception as e:
            logger.error(f"Error loading data: {e}")
            raise
            
    def _validate_dataframe(self, df: pd.DataFrame) -> None:
        """Validate required columns and data types"""
        required_columns = ['full_text', 'tweet_id', 'screen_name', 'original_user_id']
        missing_columns = [col for col in required_columns if col not in df.columns]
        if missing_columns:
            raise DataValidationError(f"Missing required columns: {missing_columns}")

    def preprocess_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """Preprocess the dataframe"""
        df = df.copy()
        # Standardize user IDs
        df['user_id'] = df['original_user_id'].apply(
            lambda x: x[0] if isinstance(x, list) else x
        )
        # Filter valid entries
        df = df.dropna(subset=['full_text', 'user_id'])
        return df

class LLMFormatter:
    """Handles prompt formatting for LLM"""
    
    @staticmethod
    def format_prompt(system_prompt: str, user_prompt: str) -> str:
        """Format messages for Llama model"""
        return f"""<|im_start|>system
{system_prompt}<|im_end|>
<|im_start|>user
{user_prompt}<|im_end|>
<|im_start|>assistant
"""
    
    @staticmethod
    def create_neutral_prompt(post: str) -> str:
        """Create prompt for neutral post description"""
        return f"""Create a neutral, high-level description of the following social media post.
Focus on the type of content, general topic, and format.
Avoid subjective interpretations or unnecessary details.
Keep the description clear, concise, and unbiased.

Post: {post}"""

class LLMClient:
    """Handles communication with LLM API"""
    
    def __init__(self, config: Config):
        self.config = config
        self.formatter = LLMFormatter()
        
    async def generate_response(
        self, 
        session: ClientSession,
        system_prompt: str,
        user_prompt: str
    ) -> Optional[str]:
        """Generate response from LLM with retry logic"""
        # Format the messages in the expected format
        messages = [
            {
                "role": "user",
                "content": self.formatter.format_prompt(system_prompt, user_prompt)
            }
        ]
        
        for attempt in range(self.config.max_retries):
            try:
                async with session.post(
                    f"{self.config.base_url}/chat/completions",
                    json={
                        "model": self.config.model_name,
                        "messages": messages,  # Use messages format
                        "temperature": self.config.temperature,
                        "seed": self.config.seed,
                        "max_tokens": self.config.max_tokens,
                        "stop": self.config.stop
                    }
                ) as response:
                    if response.status == 200:
                        result = await response.json()
                        return result.get("choices", [{}])[0].get("message", {}).get("content", "")
                    else:
                        error_text = await response.text()
                        logger.error(f"Attempt {attempt + 1} failed: {error_text}")
                        
            except Exception as e:
                logger.error(f"Attempt {attempt + 1} failed with exception: {e}")
                
            # Wait before retrying
            await asyncio.sleep(self.config.retry_delay ** attempt)
            
        return None

class Pipeline:
    """Main pipeline class orchestrating the whole process"""
    
    def __init__(self, config: Config):
        self.config = config
        self.processor = DataProcessor(config)
        self.llm_client = LLMClient(config)
        
    async def process_chunk(
        self,
        chunk: pd.DataFrame,
        session: ClientSession
    ) -> List[Dict[str, Any]]:
        """Process a chunk of data"""
        results = []
        for _, row in chunk.iterrows():
            try:
                neutral_prompt = LLMFormatter.create_neutral_prompt(row['full_text'])
                response = await self.llm_client.generate_response(
                    session,
                    "Analyze and describe the post neutrally.",
                    neutral_prompt
                )
                
                results.append({
                    "user_id": row['user_id'],
                    "original_text": row['full_text'],
                    "llm_response": response,
                    "success": response is not None
                })
            except Exception as e:
                logger.error(f"Error processing row: {e}")
                results.append({
                    "user_id": row['user_id'],
                    "original_text": row['full_text'],
                    "llm_response": str(e),
                    "success": False
                })
        
        return results

    async def run(self, input_path: Union[str, Path], output_path: Union[str, Path]):
        """Run the complete pipeline"""
        try:
            # Load and preprocess data
            df = self.processor.load_data(input_path)
            df = self.processor.preprocess_data(df)
            
            all_results = []
            
            # Process in chunks
            async with ClientSession() as session:
                for chunk_start in range(0, len(df), self.config.chunk_size):
                    chunk = df.iloc[chunk_start:chunk_start + self.config.chunk_size]
                    results = await self.process_chunk(chunk, session)
                    all_results.extend(results)
                    
                    # Log progress
                    logger.info(f"Processed {chunk_start + len(chunk)}/{len(df)} rows")
            
            # Create results DataFrame
            results_df = pd.DataFrame(all_results)
            
            # Save results
            results_df.to_parquet(output_path)
            logger.info(f"Results saved to {output_path}")
            
            return results_df
            
        except Exception as e:
            logger.error(f"Pipeline failed: {e}")
            raise

async def main():
    """Main entry point"""
    # Initialize configuration
    config = Config()
    
    # Create pipeline
    pipeline = Pipeline(config)
    
    # Run pipeline
    try:
        results_df = await pipeline.run(
            input_path='/Users/mogen/Desktop/Research/storage/df_test_10k.csv',
            output_path='/Users/mogen/Desktop/Research/storage/output.parquet'
        )
        logger.info("Pipeline completed successfully")
        return results_df
    except Exception as e:
        logger.error(f"Pipeline failed: {e}")
        raise

if __name__ == "__main__":
    # Enable nested asyncio for Jupyter notebooks
    nest_asyncio.apply()
    
    # Run the pipeline
    asyncio.run(main())

KeyboardInterrupt: 