In [3]:
# pip install dspy-ai transformers pandas

import dspy
import pandas as pd
from transformers import pipeline
from typing import List, Dict, Any

class HuggingFaceLanguageModel(dspy.LM):
    """
    Custom Language Model wrapper for Hugging Face transformers
    """
    def __init__(self, model_name='facebook/opt-350m'):
        self.generator = pipeline('text-generation', model=model_name)
    
    def __call__(self, prompt: str, **kwargs):
        # Convert Hugging Face pipeline output to DSPy format
        response = self.generator(prompt, max_length=150, **kwargs)[0]['generated_text']
        return dspy.Prediction(text=response)

class DataProcessor(dspy.Module):
    """
    DSPy Module for processing CSV data and generating insights
    """
    def __init__(self, csv_path: str, column_to_search: str):
        super().__init__()
        
        # Load CSV data
        self.df = pd.read_csv(csv_path)
        self.column_to_search = column_to_search
        
        # Configure DSPy with Hugging Face Language Model
        self.language_model = HuggingFaceLanguageModel()
        dspy.settings.configure(lm=self.language_model)
    
    def search_data(self, query: str) -> Dict[str, Any]:
        """
        Search CSV data and generate insights
        """
        # Filter dataframe based on query
        results = self.df[self.df[self.column_to_search].str.contains(query, case=False, na=False)]
        
        # Generate summary using language model
        summary_prompt = f"Summarize these {len(results)} results about '{query}': {results.to_string()}"
        summary = self.language_model(summary_prompt).text
        
        return {
            'query_results': results,
            'result_count': len(results),
            'summary': summary
        }
    
    def advanced_analysis(self, query: str, additional_columns: List[str] = None):
        """
        Perform more advanced analysis with multiple columns
        """
        results = self.search_data(query)
        
        if additional_columns:
            # Aggregate additional columns if specified
            aggregations = {col: ['mean', 'count'] for col in additional_columns 
                            if self.df[col].dtype in ['int64', 'float64']}
            
            additional_stats = results['query_results'].agg(aggregations)
            results['additional_stats'] = additional_stats
        
        return results

def main():
    # Example usage
    try:
        # Initialize processor with your CSV and specify search column
        processor = DataProcessor(
            csv_path='your_data.csv',  # Replace with your CSV path
            column_to_search='description'  # Replace with your column name
        )
        
        # Simple search
        simple_result = processor.search_data('example query')
        print("Simple Search Results:")
        print(simple_result['query_results'])
        print("\nSummary:", simple_result['summary'])
        
        # Advanced analysis with multiple columns
        advanced_result = processor.advanced_analysis(
            query='example query', 
            additional_columns=['price', 'rating']
        )
        print("\nAdvanced Analysis:")
        print(advanced_result['additional_stats'])
    
    except Exception as e:
        print(f"An error occurred: {e}")

if __name__ == '__main__':
    main()

An error occurred: [Errno 2] No such file or directory: 'your_data.csv'


In [2]:
from transformers import pipeline, AutoTokenizer, AutoModel
from typing import List, Dict, Any
import torch
import numpy as np
from dspy.retrievers import Embeddings  # Assuming Embeddings is imported from dspy.

class CustomHFRetriever:
    def __init__(self, passages, model_name="sentence-transformers/all-mpnet-base-v2", batch_size=32):
        # Initialize tokenizer and model
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name)
        
        # Check if MPS (Metal Performance Shaders) is available for Apple Silicon; otherwise, use CPU
        self.device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")
        self.model.to(self.device)
        
        self.batch_size = batch_size
        
        # Clean and encode passages
        self.passages = [str(p).strip() for p in passages if isinstance(p, str) and p.strip()]
        if not self.passages:
            raise ValueError("The passages list is empty after preprocessing.")
        
        self.embeddings = self._encode_passages(self.passages)
        
        # Define an embedding function
        def embed_fn(texts):
            texts = [str(t).strip() for t in texts if isinstance(t, str) and t.strip()]
            if not texts:
                raise ValueError("The input to embed_fn is empty after preprocessing.")
            return self._encode_passages(texts)
        
        # Create DSPy's Embeddings index
        self.index = Embeddings(self.embeddings, embedder=embed_fn)
    
    def _encode_passages(self, passages):
        """Encodes passages using the model in batches."""
        embeddings = []
        for i in range(0, len(passages), self.batch_size):
            batch = passages[i:i + self.batch_size]
            if not batch:  # Skip empty batches
                continue
            inputs = self.tokenizer(batch, padding=True, truncation=True, return_tensors="pt").to(self.device)
            with torch.no_grad():
                outputs = self.model(**inputs)
                # Use mean pooling over token embeddings
                embeddings.append(outputs.last_hidden_state.mean(dim=1).cpu().numpy())
        if not embeddings:
            raise ValueError("No embeddings were generated. Check the input passages and preprocessing.")
        return np.vstack(embeddings)

    def search(self, query: str, top_k: int = 5) -> List[str]:
        """Searches for the top_k most similar passages to the query."""
        query_embedding = self._encode_passages([query])[0]
        return self.index.most_similar(query_embedding, top_k=top_k)

def main():
    # Load data from the CSV file
    df = pd.read_csv("./151_ideas_updated.csv", usecols=[0, 1, 2, 3, 4, 5])
    
    # Ensure the correct column is used for passages
    df.columns = df.columns.str.strip()  # Remove any extra spaces in column names
    column_name = "Ideas"  # Replace with the actual column name if different
    if column_name not in df.columns:
        raise ValueError(f"Column '{column_name}' not found in CSV.")
    passages = df[column_name].dropna().tolist()
    
    # Create custom retriever
    retriever = CustomHFRetriever(passages)
    
    # Example query
    query = "Innovative technology ideas"
    retrieved_passages = retriever.search(query)
    
    # Print retrieved passages
    print("Query:", query)
    print("\nRetrieved Passages:")
    for idx, passage in enumerate(retrieved_passages, 1):
        print(f"{idx}. {passage}")

if __name__ == '__main__':
    main()

ValueError: The input to embed_fn is empty after preprocessing.