# Cell 1

In [9]:
#%% Cell 1: Initial Setup and Essential Imports
import os
import logging
import asyncio
import json
import nest_asyncio
from datetime import datetime as dt
from typing import Dict, List, Optional

# Data processing
import pandas as pd
from pydantic import BaseModel, Field, HttpUrl

# Load environment variables (API keys, etc.)
from dotenv import load_dotenv

# Configure logging
LOG_FILE = 'research_collector.log'
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"

logging.basicConfig(
    level=logging.INFO,
    format=LOG_FORMAT,
    handlers=[
        logging.FileHandler(LOG_FILE),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)
logger.propagate = False  # Prevent duplicate logging

# Load environment variables
# Load environment variables
from pathlib import Path
PROJECT_ROOT = Path("/Users/davidburton/src/research_paper_analysis")
ENV_PATH = PROJECT_ROOT / "config" / ".env"
load_dotenv(ENV_PATH)

# Verify environment variables
if os.getenv("OPENAI_API_KEY"):
    logger.info("OpenAI API key loaded successfully")
else:
    logger.warning("OpenAI API key not found in environment variables")
    
# Apply nest_asyncio only in Jupyter environments
try:
    get_ipython  # Check if running in Jupyter
    nest_asyncio.apply()
    logger.info("Applied nest_asyncio for async support in Jupyter.")
except NameError:
    pass  # Skip in standard Python scripts


# Cell 2

In [10]:
#%% Cell 2: Core Data Models
from datetime import datetime
from pydantic import BaseModel, Field, HttpUrl, field_serializer
from typing import Optional, List, Dict

class Author(BaseModel):
    """Author information model"""
    name: str
    affiliations: List[str] = Field(default_factory=list)

class ResearchPaper(BaseModel):
    """Enhanced research paper model with vector storage support"""
    title: str
    authors: List[Author]
    year: Optional[int] = None
    abstract: Optional[str] = None
    full_text: Optional[str] = None
    doi: Optional[str] = Field(
        None,
        pattern=r"^10\.\d{4,9}/.+",
        description="DOI must be a valid identifier starting with '10.'"
    )
    url: Optional[HttpUrl] = None
    venue: Optional[str] = None
    citation_count: int = Field(default=0, ge=0)
    source: str
    embedding: Optional[List[float]] = None
    collection_timestamp: datetime = Field(default_factory=lambda: datetime.utcnow().replace(tzinfo=None))
    metadata: Dict = Field(default_factory=dict)

    def get_content_for_embedding(self) -> str:
        """Generate content for embedding creation"""
        content_parts = [
            self.title,
            self.abstract or "",
            self.full_text or "",
            self.venue or "",
            " ".join(a.name for a in self.authors)
        ]
        return "\n".join(filter(None, content_parts))

# Cell 3

In [11]:
#%% Cell 3: Enhanced Research Collection Base System
import time
from pathlib import Path
from datetime import datetime, timezone, timedelta
import hashlib
from typing import Optional, List, Dict
import json

class ResearchCollector:
    """Base system for collecting and storing academic papers."""
    
    def __init__(self):
        # Initialize logging
        self.logger = logging.getLogger(__name__ + ".ResearchCollector")
        
        # Initialize cache directory
        self.cache_dir = Path("research_cache")
        self.cache_dir.mkdir(exist_ok=True)
        
        # Initialize rate limiting
        self.request_delay = 1.0
        self.last_request_time = time.monotonic()
        
        # Initialize storage
        self.papers_collected: List[Dict] = []
        self.cache_ttl_days = 7

    def _get_cache_key(self, query: str, source: str) -> str:
        """Generate a unique cache key for a query."""
        sanitized_query = query.strip().lower()
        return f"{source}_{hashlib.md5(sanitized_query.encode()).hexdigest()}"

    def _check_cache(self, query: str, source: str) -> list:
        """Check if results exist in cache and are still valid."""
        cache_file = self.cache_dir / f"{self._get_cache_key(query, source)}.json"

        if not cache_file.exists():
            return []

        try:
            with cache_file.open('r', encoding="utf-8") as f:
                cache_data = json.load(f)

            timestamp = cache_data.get('timestamp')
            if not timestamp:
                return []

            timestamp_dt = datetime.fromisoformat(timestamp).replace(tzinfo=timezone.utc)
            now = datetime.utcnow().replace(tzinfo=timezone.utc)

            if (now - timestamp_dt) >= timedelta(days=self.cache_ttl_days):
                self.logger.info("Cache expired. Fetching new data.")
                return []

            return cache_data['papers']

        except (json.JSONDecodeError, IOError) as e:
            self.logger.error(f"Cache read error: {e}", exc_info=True)
            return []

    def _save_to_cache(self, query: str, source: str, papers: list) -> None:
        """Save results to cache."""
        if not papers:
            return

        cache_file = self.cache_dir / f"{self._get_cache_key(query, source)}.json"
        
        try:
            cache_data = {
                'timestamp': datetime.utcnow().replace(tzinfo=timezone.utc).isoformat(),
                'papers': papers,
                'query': query,
                'source': source
            }
            
            # Write to temporary file first for atomic operation
            temp_file = cache_file.with_suffix('.tmp')
            with temp_file.open('w', encoding="utf-8") as f:
                json.dump(cache_data, f, indent=2, ensure_ascii=False)
            
            # Atomic rename
            temp_file.replace(cache_file)
            
        except IOError as e:
            self.logger.error(f"Cache write error: {e}", exc_info=True)

    def _rate_limit(self) -> None:
        """Enforce rate limiting between API requests."""
        now = time.monotonic()
        time_since_last_request = now - self.last_request_time

        if time_since_last_request < self.request_delay:
            sleep_time = self.request_delay - time_since_last_request
            self.logger.debug(f"Rate limiting: Sleeping for {sleep_time:.2f} seconds")
            time.sleep(sleep_time)

        self.last_request_time = time.monotonic()

    async def process_papers(self, papers: List[Dict]) -> List[Dict]:
        """Process and validate collected papers."""
        processed_papers = []
        for paper in papers:
            if self._validate_paper(paper):
                processed_papers.append(paper)
        return processed_papers

    def _validate_paper(self, paper: Dict) -> bool:
        """Validate paper data structure."""
        required_fields = ['title', 'authors', 'source']
        return all(field in paper for field in required_fields)

# Cell 4

In [12]:
#%% Cell 4: Enhanced Paper Fetcher with Vector Storage
import requests
import time
import random
from typing import List, Dict, Any, Optional
from langchain_community.utilities.arxiv import ArxivAPIWrapper
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain.schema import Document

class PaperFetcher(ResearchCollector):
    """Enhanced system for fetching and storing academic papers with vector integration."""

    def __init__(self):
        super().__init__()
        
        # Initialize session and API wrappers
        self.session = requests.Session()
        self.arxiv_wrapper = ArxivAPIWrapper(
            top_k_results=100,
            load_max_docs=100
        )
        
        # Configure API settings
        self.semantic_scholar_config = {
            "base_url": "https://api.semanticscholar.org/graph/v1",
            "fields": "title,abstract,authors,year,venue,citationCount,url,externalIds",
            "headers": {
                "Accept": "application/json"
            }
        }
        
        # Initialize vector store
        self.vector_dir = Path("vector_store")
        self.vector_dir.mkdir(exist_ok=True)
        self.embeddings = OpenAIEmbeddings()
        self.vector_store = Chroma(
            persist_directory=str(self.vector_dir),
            embedding_function=self.embeddings
        )

    async def fetch_papers(self, query: str, max_results: int = 50) -> List[ResearchPaper]:
        """Coordinate paper collection and storage from multiple sources."""
        self.logger.info(f"Beginning paper collection for query: {query}")
        
        papers = []
        cached_papers = self._check_cache(query, "combined")
        if cached_papers:
            papers = [ResearchPaper(**p) for p in cached_papers]
            self.logger.info("Retrieved papers from cache")
        else:
            try:
                semantic_papers = await self.fetch_from_semantic_scholar(query, max_results)
                papers.extend(semantic_papers)
            except requests.exceptions.HTTPError as e:
                if e.response.status_code == 429:
                    self.logger.info("Semantic Scholar rate limited, proceeding with ArXiv only")
                else:
                    self.logger.warning(f"Semantic Scholar search failed: {str(e)}")
            try:
                arxiv_papers = await self.fetch_from_arxiv(query, max_results)
                papers.extend(arxiv_papers)
            except Exception as e:
                self.logger.error(f"ArXiv search failed: {str(e)}")

            if papers:
                await self._store_papers_in_vector_db(papers)
                self._save_to_cache(query, "combined", [p.model_dump() for p in papers])
        
        return papers[:max_results]

    async def fetch_from_semantic_scholar(self, query: str, limit: int) -> List[ResearchPaper]:
        """Fetch papers from Semantic Scholar with proper API formatting."""
        search_url = f"{self.semantic_scholar_config['base_url']}/paper/search"
        
        params = {
            "query": query,
            "limit": min(limit, 100),
            "fields": self.semantic_scholar_config["fields"]
        }

        try:
            self._rate_limit()
            response = self.session.get(
                search_url,
                params=params,
                headers=self.semantic_scholar_config["headers"]
            )
            response.raise_for_status()
            
            data = response.json()
            papers = []
            
            for item in data.get("data", []):
                try:
                    paper = self._parse_semantic_scholar_paper(item)
                    if paper:
                        papers.append(paper)
                except Exception as e:
                    self.logger.error(f"Error parsing paper data: {str(e)}")
                    continue
                    
            return papers[:limit]

        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 429:
                self.logger.warning("Semantic Scholar rate limit reached")
            raise

    async def fetch_from_arxiv(self, query: str, limit: int) -> List[ResearchPaper]:
        """Fetch papers from ArXiv with proper rate limiting."""
        self.logger.info(f"Fetching papers from ArXiv: {query}")

        try:
            self._rate_limit()
            raw_results = self.arxiv_wrapper.run(query)
            papers = []
            
            entries = [e for e in raw_results.split("\n\n") if e.strip()]
            for entry in entries[:limit]:
                try:
                    paper_data = self._parse_arxiv_entry(entry)
                    if paper_data:
                        paper = self._create_paper_from_arxiv(paper_data)
                        if paper:
                            papers.append(paper)
                except Exception as e:
                    self.logger.error(f"Error processing ArXiv entry: {str(e)}")
                    continue

            return papers

        except Exception as e:
            self.logger.error(f"ArXiv API error: {str(e)}")
            raise

    async def _store_papers_in_vector_db(self, papers: List[ResearchPaper]) -> None:
        """Store papers in vector database with enhanced metadata."""
        from langchain_community.vectorstores.utils import filter_complex_metadata
        
        documents = []
        for paper in papers:
            content = self._prepare_paper_content(paper)
            # Convert all metadata values to basic types and filter out None values
            raw_metadata = {
                "title": str(paper.title),
                "year": int(paper.year) if paper.year else 0,
                "venue": str(paper.venue) if paper.venue else "",
                "citation_count": int(paper.citation_count),
                "source": str(paper.source),
                "url": str(paper.url) if paper.url else "",
                "doi": str(paper.doi) if paper.doi else "",
                "authors": ", ".join(str(a.name) for a in paper.authors)
            }
            # Filter out any remaining complex types
            metadata = filter_complex_metadata(raw_metadata)
            documents.append(Document(page_content=content, metadata=metadata))
        
        try:
            self.vector_store.add_documents(documents)
            self.vector_store.persist()
            self.logger.info(f"Stored {len(papers)} papers in vector database")
        except Exception as e:
            self.logger.error(f"Error storing papers in vector database: {str(e)}")
            # Continue execution even if vector storage fails
            pass

    def _prepare_paper_content(self, paper: ResearchPaper) -> str:
        """Prepare paper content for vector storage."""
        content_parts = [
            f"Title: {paper.title}",
            f"Abstract: {paper.abstract or ''}",
            f"Authors: {', '.join(a.name for a in paper.authors)}",
            f"Venue: {paper.venue or ''}"
        ]
        return "\n".join(content_parts)

    def _prepare_paper_metadata(self, paper: ResearchPaper) -> Dict:
        """Prepare paper metadata for vector storage, ensuring all values are simple types."""
        metadata = {
            "title": str(paper.title) if paper.title else "",
            "year": int(paper.year) if paper.year else 0,
            "venue": str(paper.venue) if paper.venue else "",
            "citation_count": int(paper.citation_count) if paper.citation_count is not None else 0,
            "source": str(paper.source) if paper.source else "",
            "url": str(paper.url) if paper.url else "",
            "doi": str(paper.doi) if paper.doi else ""
        }
        
        # Convert author list to string to avoid complex types
        metadata["authors"] = ", ".join(a.name for a in paper.authors) if paper.authors else ""
        
        # Remove any remaining None values
        metadata = {k: v for k, v in metadata.items() if v is not None}
        
        return metadata

    def _parse_semantic_scholar_paper(self, data: Dict) -> Optional[ResearchPaper]:
        """Parse Semantic Scholar API response into ResearchPaper model."""
        try:
            return ResearchPaper(
                title=data["title"],
                authors=[
                    Author(
                        name=author.get("name", "Unknown"),
                        affiliations=[]
                    ) for author in data.get("authors", [])
                ],
                year=data.get("year"),
                abstract=data.get("abstract"),
                doi=data.get("externalIds", {}).get("DOI"),
                url=data.get("url"),
                venue=data.get("venue"),
                citation_count=data.get("citationCount", 0),
                source="Semantic Scholar"
            )
        except Exception as e:
            self.logger.error(f"Failed to parse paper: {str(e)}")
            return None

    def _parse_arxiv_entry(self, entry: str) -> Dict[str, Any]:
        """Parse ArXiv entry text into structured data."""
        data = {}
        current_field = None
        current_content = []

        for line in entry.split('\n'):
            if ':' in line and not line.startswith(' '):
                if current_field:
                    data[current_field] = ' '.join(current_content).strip()
                    current_content = []
                field, content = line.split(':', 1)
                current_field = field.strip().lower()
                current_content.append(content.strip())
            elif current_field:
                current_content.append(line.strip())

        if current_field and current_content:
            data[current_field] = ' '.join(current_content).strip()

        return data
    
    def _create_paper_from_arxiv(self, paper_data: Dict) -> Optional[ResearchPaper]:
        """Create a ResearchPaper object from ArXiv paper data."""
        try:
            authors = [
                Author(name=name.strip()) 
                for name in paper_data.get("authors", "").split(",")
                if name.strip()
            ]
            
            year = None
            if paper_data.get("published"):
                try:
                    year = int(paper_data["published"][:4])
                except ValueError:
                    pass

            return ResearchPaper(
                title=paper_data.get("title", "").strip(),
                authors=authors,
                year=year,
                abstract=paper_data.get("summary", "").strip(),
                url=paper_data.get("entry_id"),
                venue="arXiv",
                citation_count=0,
                source="ArXiv"
            )
        except Exception as e:
            self.logger.error(f"Failed to create paper from ArXiv data: {str(e)}")
            return None

# Cell 5

In [13]:
#%% Cell 5: Research Paper Analysis System
from langchain.schema import Document
from typing import List, Dict, Tuple
import numpy as np
from datetime import datetime
from collections import defaultdict

class ResearchAnalyzer:
    """Advanced system for analyzing and ranking academic research papers."""

    def __init__(self, paper_fetcher: PaperFetcher):
        self.paper_fetcher = paper_fetcher
        self.logger = logging.getLogger(__name__ + ".ResearchAnalyzer")
        
        # Configure analysis weights
        self.quality_weights = {
            "relevance": 0.35,
            "citation_impact": 0.25,
            "methodology": 0.20,
            "recency": 0.10,
            "venue_quality": 0.10
        }

    async def analyze_research_topic(self, query: str, max_results: int = 20) -> Dict:
        """Perform comprehensive analysis of papers for a research topic."""
        self.logger.info(f"Beginning analysis for query: {query}")

        try:
            papers = await self.paper_fetcher.fetch_papers(query, max_results)
            if not papers:
                return {"status": "error", "message": "No papers found for the given query"}

            # Perform similarity search using vector store
            vector_results = self.paper_fetcher.vector_store.similarity_search_with_scores(
                query, k=max_results
            )

            analyzed_papers = []
            for doc, similarity_score in vector_results:
                paper_analysis = self._analyze_single_paper(doc, similarity_score)
                if paper_analysis:
                    analyzed_papers.append(paper_analysis)

            ranked_papers = sorted(
                analyzed_papers,
                key=lambda x: x["scores"]["overall"],
                reverse=True
            )

            analysis_summary = self._generate_research_summary(ranked_papers)
            
            return {
                "status": "success",
                "query": query,
                "summary": analysis_summary,
                "papers": ranked_papers
            }

        except Exception as e:
            self.logger.error(f"Analysis failed: {str(e)}", exc_info=True)
            return {"status": "error", "message": f"Analysis failed: {str(e)}"}

    def _analyze_single_paper(self, paper: Document, similarity_score: float) -> Dict:
        """Analyze a single research paper across multiple dimensions."""
        try:
            metadata = paper.metadata
            current_year = datetime.now().year

            citation_impact = self._calculate_citation_impact(
                metadata.get("citation_count", 0),
                metadata.get("year", current_year)
            )

            methodology_score = self._assess_methodology(paper.page_content)
            recency_score = self._calculate_recency_score(metadata.get("year", current_year))
            venue_score = self._assess_venue_quality(metadata.get("venue"), metadata.get("source"))

            overall_score = sum([
                self.quality_weights["relevance"] * similarity_score,
                self.quality_weights["citation_impact"] * citation_impact,
                self.quality_weights["methodology"] * methodology_score,
                self.quality_weights["recency"] * recency_score,
                self.quality_weights["venue_quality"] * venue_score
            ])

            return {
                "title": metadata.get("title"),
                "year": metadata.get("year"),
                "authors": metadata.get("authors", []),
                "url": metadata.get("url"),
                "source": metadata.get("source"),
                "venue": metadata.get("venue"),
                "scores": {
                    "relevance": similarity_score,
                    "citation_impact": citation_impact,
                    "methodology": methodology_score,
                    "recency": recency_score,
                    "venue_quality": venue_score,
                    "overall": overall_score
                }
            }
        except Exception as e:
            self.logger.error(f"Error analyzing paper: {str(e)}")
            return None

    def _calculate_citation_impact(self, citations: int, year: int) -> float:
        """Calculate normalized citation impact score."""
        years_since_publication = max(1, datetime.now().year - year)
        citations_per_year = citations / years_since_publication
        return min(1.0, np.log1p(citations_per_year) / np.log1p(100))

    def _assess_methodology(self, content: str) -> float:
        """Assess research methodology quality based on content analysis."""
        methodology_indicators = {
            "methodology": 1.0,
            "experiment": 0.8,
            "statistical analysis": 0.8,
            "data collection": 0.7,
            "sample size": 0.7,
            "control group": 0.9,
            "randomized": 0.9,
            "validation": 0.8
        }
        
        content_lower = content.lower()
        scored_indicators = [
            weight for indicator, weight in methodology_indicators.items()
            if indicator in content_lower
        ]
        
        return sum(scored_indicators) / len(methodology_indicators) if scored_indicators else 0.5

    def _calculate_recency_score(self, year: int) -> float:
        """Calculate recency score with exponential decay."""
        years_old = max(0, datetime.now().year - year)
        return np.exp(-0.2 * years_old)

    def _assess_venue_quality(self, venue: str, source: str) -> float:
        """Assess the quality of the publication venue."""
        if not venue:
            return 0.5

        venue_lower = venue.lower() if venue else ""
        source_lower = source.lower() if source else ""

        # High-impact venue scoring
        if any(journal in venue_lower for journal in ["nature", "science", "cell"]):
            return 1.0
        elif "journal" in venue_lower:
            return 0.8
        elif "conference" in venue_lower:
            return 0.75
        elif "arxiv" in source_lower:
            return 0.7
        else:
            return 0.6

    def _generate_research_summary(self, analyzed_papers: List[Dict]) -> Dict:
        """Generate a comprehensive summary of the research analysis."""
        if not analyzed_papers:
            return {}

        year_distribution = defaultdict(int)
        source_distribution = defaultdict(int)
        avg_scores = defaultdict(float)

        for paper in analyzed_papers:
            year_distribution[paper["year"]] += 1
            source_distribution[paper["source"]] += 1
            
            for score_type, score in paper["scores"].items():
                avg_scores[score_type] += score

        total_papers = len(analyzed_papers)
        for score_type in avg_scores:
            avg_scores[score_type] /= total_papers

        return {
            "total_papers": total_papers,
            "year_range": {
                "oldest": min(year_distribution.keys()),
                "newest": max(year_distribution.keys())
            },
            "source_distribution": dict(source_distribution),
            "average_scores": dict(avg_scores),
            "top_venues": [
                paper["venue"] for paper in analyzed_papers[:3]
                if paper["venue"]
            ]
        }

# Cell 6

In [14]:
#%% Cell 6: Research Interface and Analysis Display
from IPython.display import display, HTML
import pandas as pd

class ResearchInterface:
    """Interface for collecting and analyzing research papers."""

    def __init__(self):
        self.paper_fetcher = PaperFetcher()
        self.analyzer = ResearchAnalyzer(self.paper_fetcher)
        self.logger = logging.getLogger(__name__ + ".ResearchInterface")

    async def search_and_analyze(self):
        """Interactive research paper search and analysis."""
        print("\nAdvanced Research Paper Analysis")
        print("===============================")
        
        query = input("\nEnter your research query (use 'and' to combine specific concepts): ")
        max_results = input("Enter maximum number of papers to analyze (default 30): ")
        max_results = int(max_results) if max_results.isdigit() else 30

        print("\nCollecting and analyzing papers...")
        analysis_results = await self.analyzer.analyze_research_topic(query, max_results)

        if analysis_results["status"] == "success":
            self._display_analysis_results(analysis_results)
            return analysis_results
        else:
            print(f"\nError: {analysis_results['message']}")
            return None

    def _display_analysis_results(self, results):
        """Display analyzed papers in a structured format."""
        summary = results["summary"]
        papers = results["papers"]

        # Display summary statistics
        print("\nAnalysis Summary")
        print("--------------")
        print(f"Total Papers Analyzed: {summary['total_papers']}")
        print(f"Year Range: {summary['year_range']['oldest']} - {summary['year_range']['newest']}")
        print("\nSource Distribution:")
        for source, count in summary['source_distribution'].items():
            print(f"  {source}: {count} papers")

        # Create DataFrame for detailed paper analysis
        paper_data = []
        for paper in papers:
            paper_data.append({
                "Title": paper["title"],
                "Year": paper["year"],
                "Authors": "; ".join(paper["authors"]),
                "Venue": paper["venue"] or "N/A",
                "Overall Score": f"{paper['scores']['overall']:.3f}",
                "Relevance": f"{paper['scores']['relevance']:.3f}",
                "Citation Impact": f"{paper['scores']['citation_impact']:.3f}",
                "Methodology": f"{paper['scores']['methodology']:.3f}",
                "URL": paper["url"] or "N/A"
            })

        df = pd.DataFrame(paper_data)
        
        # Display results
        print("\nRanked Papers (sorted by overall score)")
        print("-------------------------------------")
        
        # Configure pandas display options
        pd.set_option('display.max_colwidth', None)
        pd.set_option('display.max_rows', None)
        
        # Create styled DataFrame
        styled_df = df.style.background_gradient(subset=['Overall Score'], cmap='YlOrRd')
        display(HTML(styled_df.to_html(escape=False)))

        return df

# Example usage:
"""
interface = ResearchInterface()
results = await interface.search_and_analyze()
"""

'\ninterface = ResearchInterface()\nresults = await interface.search_and_analyze()\n'

# Cell 7
- User interface

In [15]:
interface = ResearchInterface()
results = await interface.search_and_analyze()

2025-02-12 14:51:46,927 - INFO - Anonymized telemetry enabled. See                     https://docs.trychroma.com/telemetry for more information.



Advanced Research Paper Analysis

Collecting and analyzing papers...


Semantic Scholar rate limit reached
2025-02-12 14:51:56,863 - INFO - Requesting page (first: True, try: 0): https://export.arxiv.org/api/query?search_query=Mitophagy+and+mitochondrial+quality+control&id_list=&sortBy=relevance&sortOrder=descending&start=0&max_results=100
2025-02-12 14:51:57,620 - INFO - Got first page: 20 of 2595038 total results
2025-02-12 14:51:57,622 - INFO - Sleeping: 2.968749 seconds
2025-02-12 14:52:00,597 - INFO - Requesting page (first: False, try: 0): https://export.arxiv.org/api/query?search_query=Mitophagy+and+mitochondrial+quality+control&id_list=&sortBy=relevance&sortOrder=descending&start=20&max_results=100
Analysis failed: 'str' object has no attribute 'metadata'
Traceback (most recent call last):
  File "/var/folders/j7/smpqy2fn30l7j5kcqh76jqp40000gn/T/ipykernel_5521/2691737644.py", line 29, in analyze_research_topic
    papers = await self.paper_fetcher.fetch_papers(query, max_results)
  File "/var/folders/j7/smpqy2fn30l7j5kcqh76jqp40000gn/T/ipykernel_5


Error: Analysis failed: 'str' object has no attribute 'metadata'
