## Notebook for processesing CURIE-benchmark tasks using the **Cohere Command-R Plus model**.


In [None]:
# @title Import Required Libraries
import os
import json
import pandas as pd
import numpy as np
import altair as alt
import logging
import textwrap as tr
import torch
from google.colab import drive
from tenacity import retry, stop_after_attempt, wait_exponential
import time
from dataclasses import dataclass
from typing import Optional, Dict, List
from enum import Enum

In [None]:
# @title Install and import Cohere
! pip install -U cohere
import cohere

Collecting cohere
  Downloading cohere-5.13.4-py3-none-any.whl.metadata (3.4 kB)
Collecting fastavro<2.0.0,>=1.9.4 (from cohere)
  Downloading fastavro-1.10.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Collecting httpx-sse==0.4.0 (from cohere)
  Downloading httpx_sse-0.4.0-py3-none-any.whl.metadata (9.0 kB)
Collecting parameterized<0.10.0,>=0.9.0 (from cohere)
  Downloading parameterized-0.9.0-py2.py3-none-any.whl.metadata (18 kB)
Collecting types-requests<3.0.0,>=2.0.0 (from cohere)
  Downloading types_requests-2.32.0.20241016-py3-none-any.whl.metadata (1.9 kB)
Downloading cohere-5.13.4-py3-none-any.whl (250 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m250.0/250.0 kB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading httpx_sse-0.4.0-py3-none-any.whl (7.8 kB)
Downloading fastavro-1.10.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.

In [None]:
# @title API Configuration
API_KEY = "YOUR_API_KEY"
MODEL_PATH = 'command-r-plus'
co_v2 = cohere.ClientV2(api_key=API_KEY)

In [None]:
# @title Mount Google Drive
drive.mount('/content/drive', force_remount=True)
os.chdir("/content/drive/My Drive")

Mounted at /content/drive


In [None]:
# @title Configuration Classes
@dataclass
class ExperimentConfig:
    """Configuration class for experiment settings"""
    name: str
    base_dir: str
    inference_dir: str
    prompt_path: str

class ExperimentType(Enum):
    """Enum for different types of experiments"""
    PDB = "pdb"
    MPVE = "mpve"
    HFE = "hfe"
    GEO = "geo"
    DFT = "dft"
    HFD = "hfd"
    QECC_PDF = "qecc_pdf"
    QECC_TEX = "qecc_tex"

In [None]:
# @title Experiment Manager Class
class ExperimentManager:
    """Manages different experiment configurations"""
    def __init__(self, base_path: str = "/content/drive/My Drive"):
        self.base_path = base_path
        self.experiments = self._initialize_experiments()

    def _initialize_experiments(self) -> Dict[ExperimentType, ExperimentConfig]:
        """Initialize all experiment configurations"""
        benchmark_path = f"{self.base_path}/benchmarks"
        return {
            ExperimentType.PDB: ExperimentConfig(
                name="PDB",
                base_dir=f"{self.base_path}/pdb",
                inference_dir=f"{self.base_path}/inference/multi_runs/current/pdb_new/reconstruct_protein_amino_acid_sequence_0_shot/",
                prompt_path=f"{benchmark_path}/prompts/reconstruct_protein_amino_acid_sequence_0_shot.txt"
            ),
            ExperimentType.MPVE: ExperimentConfig(
                name="MPVE",
                base_dir=f"{benchmark_path}/data/mpve",
                inference_dir=f"{benchmark_path}/inference/multi_runs/current/mpve/mat_paper_to_property_1_shot_exclude_trivia/",
                prompt_path=f"{benchmark_path}/prompts/mat_paper_to_property_1_shot_exclude_trivia.txt"
            ),
            ExperimentType.HFE: ExperimentConfig(
                name="HFE",
                base_dir=f"{benchmark_path}/data/hfe",
                inference_dir=f"{benchmark_path}/inference/multi_runs/current/hfe/extract_hamiltonian_0_shot/",
                prompt_path=f"{benchmark_path}/prompts/extract_hamiltonian_0_shot.txt"
            ),
            ExperimentType.GEO: ExperimentConfig(
                name="GEO",
                base_dir=f"{benchmark_path}/data/geo",
                inference_dir=f"{benchmark_path}/inference/multi_runs/current/geo/extract_dataset_from_geo_papers_0_shot",
                prompt_path=f"{benchmark_path}/prompts/extract_dataset_from_geo_papers_0_shot.txt"
            ),
            ExperimentType.DFT: ExperimentConfig(
                name="DFT",
                base_dir=f"{benchmark_path}/data/dft",
                inference_dir=f"{benchmark_path}/inference/multi_runs/current/dft/extract_dft_metadata_1_shot/",
                prompt_path=f"{benchmark_path}/prompts/extract_dft_metadata_1_shot.txt"
            ),
            ExperimentType.HFD: ExperimentConfig(
                name="HFD",
                base_dir=f"{benchmark_path}/data/hfd",
                inference_dir=f"{benchmark_path}/inference/multi_runs/current/hfd/derivation_prompt/",
                prompt_path=f"{benchmark_path}/prompts/derivation_prompt.txt"
            ),
            ExperimentType.QECC_PDF: ExperimentConfig(
                name="QECC_PDF",
                base_dir=f"{benchmark_path}/data/qecc_pdf",
                inference_dir=f"{benchmark_path}/inference/multi_runs/current/qecc_pdf/describe_code_in_paper/",
                prompt_path=f"{benchmark_path}/prompts/describe_code_in_paper.txt"
            ),
            ExperimentType.QECC_TEX: ExperimentConfig(
                name="QECC_TEX",
                base_dir=f"{benchmark_path}/data/qecc_tex",
                inference_dir=f"{benchmark_path}/inference/multi_runs/current/qecc_tex/describe_code_in_paper/",
                prompt_path=f"{benchmark_path}/prompts/describe_code_in_paper.txt"
            )
        }

    def get_config(self, experiment_type: ExperimentType) -> ExperimentConfig:
        """Get configuration for specific experiment type"""
        return self.experiments[experiment_type]

In [None]:
# @title Paper Processing Utilities
def specialize_prompt(template: str, tag: str, infil: str) -> str:
    """Replace a tag in a template with provided text."""
    if tag in template:
        return template.replace(tag, infil)
    raise ValueError(f'{tag} absent in template.')

def prepare_task_for_paper(paper: str, config: ExperimentConfig, model_id: str) -> dict:
    """Prepare the task information for a given paper."""
    paper_input = os.path.join(config.base_dir, 'inputs', f'{paper}.json')
    paper_gt = os.path.join(config.base_dir, 'ground_truth', f'{paper}.json')

    with open(paper_input, 'r') as f:
        inputs = json.load(f)
    with open(paper_gt, 'r') as f:
        targets = json.load(f)

    with open(config.prompt_path, 'r') as f:
        ptemp = f.read()

    spec_prompt = specialize_prompt(ptemp, '{{text}}', infil=inputs['text'])

    return {
        'record_id': paper,
        'model_id': model_id,
        'prompt_path': config.prompt_path,
        'prompt_text': spec_prompt,
        'response_text': ''
    }

In [None]:
# @title Paper Processor Class
class PaperProcessor:
    """Handles the processing of scientific papers"""

    def __init__(self, api_key: str, model_path: str):
        self.co_v2 = cohere.ClientV2(api_key=api_key)
        self.model_path = model_path
        self._setup_logging()

    def _setup_logging(self):
        """Configure logging settings"""
        logging.basicConfig(
            filename='experiment_log.log',
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10),
        reraise=True
    )
    def _make_api_call(self, messages: List[Dict]) -> str:
        """Make API call with retry logic"""
        response = self.co_v2.chat(
            model=self.model_path,
            messages=messages,
            temperature=0.9,
            k=50,
            p=0.95,
            max_tokens=4000
        )
        return self._extract_response_text(response)

    def _extract_response_text(self, response) -> str:
        """Extract text from API response"""
        if hasattr(response, 'message'):
            if hasattr(response.message, 'content'):
                if isinstance(response.message.content, list):
                    return ' '.join(item.text for item in response.message.content if hasattr(item, 'text'))
                elif isinstance(response.message.content, str):
                    return response.message.content
        return str(response)

    def _save_result(self, task_info: dict, inference_dir: str, run_id: int, success: bool = True):
        """Save processing results"""
        status = 'success' if success else 'failure'
        output_dir = os.path.join(inference_dir, self.model_path, f'run_{run_id}', status)
        os.makedirs(output_dir, exist_ok=True)

        serializable_task_info = {
            'record_id': task_info['record_id'],
            'model_id': task_info['model_id'],
            'prompt_path': task_info['prompt_path'],
            'prompt_text': task_info['prompt_text'],
            'response_text': str(task_info['response_text'])
        }

        output_file = os.path.join(output_dir, f'{task_info["record_id"]}.json')
        with open(output_file, 'w') as f:
            json.dump(serializable_task_info, f, indent=4)

    def process_papers(self, config: ExperimentConfig, run_range: range = range(1, 3)):
        """Process papers for given experiment configuration"""
        input_dir = os.path.join(config.base_dir, 'inputs')
        papers = [f.replace('.json', '') for f in os.listdir(input_dir) if f.endswith('.json')]

        self.logger.info(f"Starting processing {len(papers)} papers for {config.name}")

        for run_id in run_range:
            self.logger.info(f"Starting run {run_id + 1}")
            for i, paper in enumerate(papers, 1):
                self.logger.info(f"Processing paper {i}/{len(papers)} in run {run_id + 1}")
                self._process_single_paper(paper, config, run_id)

    def _process_single_paper(self, paper: str, config: ExperimentConfig, run_id: int):
        """Process a single paper"""
        try:
            task_info = prepare_task_for_paper(paper, config, self.model_path)

            if len(task_info['prompt_text'].split()) > 128000:
                raise ValueError("Input text exceeds token limit")

            response = self._make_api_call([{
                "role": "user",
                "content": task_info['prompt_text']
            }])

            task_info['response_text'] = response
            self._save_result(task_info, config.inference_dir, run_id, success=True)
            time.sleep(2)  # Rate limiting

        except Exception as e:
            self.logger.error(f"Error processing paper {paper}: {str(e)}")
            task_info['response_text'] = str(e)
            self._save_result(task_info, config.inference_dir, run_id, success=False)
            time.sleep(2)

In [None]:
# @title Main Execution
def main():
    """Main execution function"""
    experiment_manager = ExperimentManager()

    processor = PaperProcessor(
        api_key=API_KEY,
        model_path=MODEL_PATH
    )

    # Select experiment type
    experiment_type = ExperimentType.DFT  # CHANGE THIS to process different experiments
    config = experiment_manager.get_config(experiment_type)

    processor.process_papers(config)

if __name__ == "__main__":
    main()

For example, for running the DFT task, you need to run the following cell:

In [None]:
experiment_manager = ExperimentManager()

processor = PaperProcessor(
    api_key="YOUR_API_KEY",
    model_path=MODEL_PATH
)

experiment_type = ExperimentType.DFT
config = experiment_manager.get_config(experiment_type)

print("Selected Configuration:")
print(f"Base Directory: {config.base_dir}")
print(f"Inference Directory: {config.inference_dir}")
print(f"Prompt Path: {config.prompt_path}")

processor.process_papers(config)