# Notebook description

In the other notebook, I had a somewhat convoluted method of performing Kallisto quantification. In this notebook, I will attempt to make stronger use of structured outputs, and perhaps hard code the output file locations a little more.


In [6]:
from openai import OpenAI
import openai # Probably don't need above... but this is for testing tools with structured outputs
import os
from tqdm import tqdm
import time
import numpy as np
import pandas as pd
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from typing import List, Dict, Literal
import subprocess
import glob
import asyncio
import json

In [7]:
# Import OpenAI API key

load_dotenv('../../.env')

Entrez.email = os.getenv('ENTREZ_EMAIL')
Entrez.api_key = os.getenv('ENTREZ_API_KEY')
openai_api_key = os.getenv('OPENAI_API_KEY')

In [8]:
# Test the OpenAI API key is working

client = OpenAI(
  api_key=openai_api_key,  # this is also the default, it can be omitted
)

chat_completion = client.chat.completions.create(
    messages=[
        {
            "role": "user",
            "content": "State a word that begins with the letter H, and define this word in less than one sentence",
        }
    ],
    model="gpt-4o-mini",
)

result = chat_completion.choices[0].message.content
print(result)

Hapless: unfortunate or unlucky, often in a way that provokes pity.


Ok not sure how you can define something in "less than one sentence" (flub on my part), but yes the test works.

# Notebook overall objective

The overall objective of this notebook is to develop an agent that can perform quantification of FASTQ files using Kallisto. 

In terms of the prompt that I will be giving it the documentation (I already know from prior experience that the knowledge cutoff is not accurate to the version of Kallisto I am currently using). I probably will want to specify the particular outputs that I want. So I think what I will want the LLM to do is to determine whether each parameter is appropriate or not.

## Get documentation

In [9]:
def get_documentation(command):
    try:
        # Execute the kallisto command
        result = subprocess.run(command, shell=True, capture_output=True, text=True)
        
        # Capture the stdout
        stdout = result.stdout
        
        # Return the results
        return stdout
    except Exception as e:
        print(f"An error occurred: {e}")
        return None

docs = get_documentation("kallisto quant --help")
print(docs)

Usage: kallisto quant [arguments] FASTQ-files

Required arguments:
-i, --index=STRING            Filename for the kallisto index to be used for
                              quantification
-o, --output-dir=STRING       Directory to write output to

Optional arguments:
-b, --bootstrap-samples=INT   Number of bootstrap samples (default: 0)
    --seed=INT                Seed for the bootstrap sampling (default: 42)
    --plaintext               Output plaintext instead of HDF5
    --single                  Quantify single-end reads
    --single-overhang         Include reads where unobserved rest of fragment is
                              predicted to lie outside a transcript
    --fr-stranded             Strand specific reads, first read forward
    --rf-stranded             Strand specific reads, first read reverse
-l, --fragment-length=DOUBLE  Estimated average fragment length
-s, --sd=DOUBLE               Estimated standard deviation of fragment length
                              

## Prepare prompt/code to determine input files

I believe the input files I will need are the FASTQ files themselves, as well as an index file.

Ah, I need to download the index don't I? (done).

Being able to locate the FASTQ files... this might get a bit messy (and will be a consideration when I 

In [25]:
# Function to get files

def get_files(directory, suffix):
    """
    Recursively lists all files in a given directory and its subdirectories that end with the specified suffix,
    returning their absolute paths.

    Parameters:
    directory (str): The path to the directory to search in.
    suffix (str): The file suffix to look for (e.g., 'fastq.gz').

    Returns:
    list: A list of absolute file paths that match the given suffix.
    """
    matched_files = []
    
    try:
        # Walk through directory and subdirectories
        for root, _, files in os.walk(directory):
            for f in files:
                if f.endswith(suffix):
                    matched_files.append(os.path.join(root, f))
                    
        return matched_files
    except FileNotFoundError:
        print(f"Directory '{directory}' not found.")
        return []
    except Exception as e:
        print(f"An error occurred: {e}")
        return []

# Example usage:
directory = "../2_extract_data/GSE268034_FASTQs" # this is something I will need to work out how to optimize
suffix = "fastq.gz"
fastq_files = get_files(directory, suffix) # just for my own sanity I didn't print the output, but I can see it was able to find all the files

directory = "/home/myuser/work/data/kallisto_indices/"
suffix = ".idx"
index_files = get_files(directory, suffix)
print(index_files) # The vision I have for the moment is to have the LLM automatically select the correct index

['/home/myuser/work/data/kallisto_indices/mouse/index.idx', '/home/myuser/work/data/kallisto_indices/human/index.idx']


In [30]:
Entrez.email = os.getenv('ENTREZ_EMAIL')
Entrez.api_key = os.getenv('ENTREZ_API_KEY')

def get_study_summary(accession, edirect_path="/home/myuser/edirect"):

    # Define the command as a string
    command = (
        f'esearch -db gds -query "{accession}[ACCN]" | '
        'efetch -format docsum | '
        'xtract -pattern DocumentSummarySet -block DocumentSummary '
        f'-if Accession -equals {accession} -element summary'
    )

    # Execute the command
    result = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

    # Check if the command was successful
    if result.returncode == 0:
        # Return the output
        return result.stdout.strip()
    else:
        # Raise an error with the stderr output
        raise Exception(f"Error: {result.stderr}")

# Example usage:
study_summary = get_study_summary("GSE268034")
print(study_summary)

Despite selective HDAC3 inhibition showing promise in a subset of lymphomas with CREBBP mutations, wild-type tumors generally exhibit resistance. Here, using unbiased genome-wide CRISPR screening, we identify GNAS knockout (KO) as a sensitizer of resistant lymphoma cells to HDAC3 inhibition. Mechanistically, GNAS KO-induced sensitization is independent of the canonical G-protein activities but unexpectedly mediated by viral mimicry-related interferon (IFN) responses, characterized by TBK1 and IRF3 activation, double-stranded RNA formation, and transposable element (TE) expression. GNAS KO additionally synergizes with HDAC3 inhibition to enhance CD8+ T cell-induced cytotoxicity. Moreover, we observe in human lymphoma patients that low GNAS expression is associated with high baseline TE expression and upregulated IFN signaling and shares common disrupted biological activities with GNAS KO in histone modification, mRNA processing, and transcriptional regulation. Collectively, our findings

In [33]:
import os
import subprocess
import sys
import time
from typing import List, Dict
from pprint import pprint

# ---------------------------- Configuration ---------------------------- #

# Set your email and API key via environment variables for security
Entrez_email = os.getenv('ENTREZ_EMAIL')
Entrez_api_key = os.getenv('ENTREZ_API_KEY')

if not Entrez_email:
    print("Error: The environment variable 'ENTREZ_EMAIL' is not set.")
    sys.exit(1)

# eDirect path (if not in PATH, specify the full path)
# For example: edirect_path = "/home/myuser/edirect"
edirect_path = ""  # Assuming eDirect is in PATH

# GEO Series accession number
GSE_ID = "GSE268034"

# Verbose mode for debugging
VERBOSE = True  # Set to True to enable verbose output

# ---------------------------- Functions ---------------------------- #

def run_edirect_command(command: str) -> str:
    """
    Executes an eDirect command and returns the output.
    
    Args:
        command (str): The eDirect command to execute.
        
    Returns:
        str: The standard output from the command.
        
    Raises:
        Exception: If the command fails.
    """
    try:
        # If edirect_path is specified, prepend it to the command
        if edirect_path:
            command = f"{edirect_path}/{command}"
        
        if VERBOSE:
            print(f"Executing command: {command}")
        
        # Execute the command
        result = subprocess.run(
            command,
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        
        if result.returncode != 0:
            raise Exception(f"Command failed: {command}\nError: {result.stderr}")
        
        if VERBOSE:
            print(f"Command Output:\n{result.stdout}\n")
        
        return result.stdout.strip()
    
    except Exception as e:
        print(f"An error occurred while executing command: {command}\n{e}")
        sys.exit(1)

def get_study_summary(accession: str) -> str:
    """
    Retrieves the study summary for a given GEO Series accession.
    
    Args:
        accession (str): GEO Series accession (e.g., 'GSE268034').
        
    Returns:
        str: The study summary text.
    """
    print(f"\nFetching study summary for {accession}...")
    command = (
        f'esearch -db gds -query "{accession}[ACCN]" | '
        'efetch -format docsum | '
        'xtract -pattern DocumentSummarySet -block DocumentSummary '
        f'-if Accession -equals {accession} -element Summary'
    )
    
    summary = run_edirect_command(command)
    if not summary:
        print("Warning: Study summary is empty.")
    else:
        print("Study Summary Retrieved Successfully.\n")
    return summary

def get_gsm_ids(accession: str) -> List[str]:
    """
    Extracts GSM Sample IDs associated with a given GEO Series accession.
    
    Args:
        accession (str): GEO Series accession (e.g., 'GSE268034').
        
    Returns:
        List[str]: A list of GSM Sample IDs.
    """
    print(f"Extracting GSM IDs for {accession}...")
    command = (
        f'esearch -db gds -query "{accession}[ACCN]" | '
        'efetch -format docsum | '
        'xtract -pattern DocumentSummarySet -block DocumentSummary '
        f'-if Accession -equals {accession} -element SampleAccession'
    )
    
    gsm_output = run_edirect_command(command)
    if VERBOSE:
        print(f"Raw GSM Output:\n{gsm_output}\n")
    
    gsm_ids = [line.strip() for line in gsm_output.split('\n') if line.strip()]
    print(f"Extracted {len(gsm_ids)} GSM IDs.\n")
    return gsm_ids

def get_sra_ids_for_gsm(gsm_id: str) -> List[str]:
    """
    Retrieves SRA Run accession numbers linked to a given GSM Sample ID.
    
    Args:
        gsm_id (str): GEO Sample accession (e.g., 'GSM8284502').
        
    Returns:
        List[str]: A list of SRA Run accession numbers.
    """
    print(f"Retrieving SRA Run IDs for {gsm_id}...")
    command = (
        f'elink -dbfrom gds -query {gsm_id} -db sra | '
        'xtract -pattern LinkSet -block LinkSetDb -element Link -element Id'
    )
    
    sra_output = run_edirect_command(command)
    if VERBOSE:
        print(f"Raw SRA Output for {gsm_id}:\n{sra_output}\n")
    
    # The output may contain multiple lines with 'Link' and 'Id'
    # We need to extract the 'Id' fields that start with 'SRR'
    sra_ids = []
    for line in sra_output.split('\n'):
        parts = line.strip().split('\t')
        if len(parts) == 2 and parts[0] == 'Link' and parts[1].startswith('SRR'):
            sra_ids.append(parts[1])
    
    print(f"Found {len(sra_ids)} SRA Run ID(s) for {gsm_id}.\n")
    return sra_ids

def get_sra_metadata(sra_id: str) -> Dict[str, str]:
    """
    Fetches metadata for a given SRA Run accession number.
    
    Args:
        sra_id (str): SRA Run accession (e.g., 'SRRXXXXXXX').
        
    Returns:
        Dict[str, str]: A dictionary containing metadata fields.
    """
    print(f"Fetching metadata for SRA Run ID: {sra_id}...")
    command = (
        f'echo "{sra_id}" | '
        'xargs -I {} efetch -db sra -id {} -format runinfo'
    )
    
    runinfo_output = run_edirect_command(command)
    if VERBOSE:
        print(f"Raw Runinfo Output for {sra_id}:\n{runinfo_output}\n")
    
    # The first line is the header
    lines = runinfo_output.split('\n')
    if len(lines) < 2:
        print(f"No runinfo found for {sra_id}.\n")
        return {}
    
    header = lines[0].split(',')
    values = lines[1].split(',')
    metadata = dict(zip(header, values))
    print(f"Metadata for {sra_id} retrieved successfully.\n")
    return metadata

def determine_kallisto_parameters(sra_metadata_list: List[Dict[str, str]]) -> Dict:
    """
    Determines Kallisto parameters based on SRA metadata.
    
    Args:
        sra_metadata_list (List[Dict[str, str]]): List of SRA metadata dictionaries.
        
    Returns:
        Dict: Dictionary containing Kallisto parameters.
    """
    print("Determining Kallisto parameters based on SRA metadata...\n")
    
    kallisto_params = {
        'paired_end': True,       # Default assumption
        'strandedness': None,     # To be determined
        'fragment_length': None,  # For single-end
        'sd': None,               # For single-end
        'threads': 4,             # Example value; adjust as needed
        'bootstrap_samples': 0,   # Default
        'verbose': False,         # Default
    }
    
    library_layout_set = set()
    strand_specificity_set = set()
    
    for metadata in sra_metadata_list:
        if not metadata:
            continue
        
        # Determine LibraryLayout
        library_layout = metadata.get('LibraryLayout', '').upper()
        if library_layout:
            library_layout_set.add(library_layout)
        
        # Determine StrandSpecificity
        strand_specificity = metadata.get('StrandSpecificity', '').lower()
        if strand_specificity:
            strand_specificity_set.add(strand_specificity)
    
    # Analyze LibraryLayout
    if len(library_layout_set) == 1:
        layout = library_layout_set.pop()
        kallisto_params['paired_end'] = (layout == 'PAIRED')
        print(f"Library Layout: {layout} (Paired-End: {kallisto_params['paired_end']})")
    elif len(library_layout_set) > 1:
        print("Warning: Mixed library layouts detected among SRA runs.")
        kallisto_params['paired_end'] = True  # Defaulting to paired-end
    else:
        print("Library Layout information is missing.")
        kallisto_params['paired_end'] = True  # Defaulting to paired-end
    
    # Analyze Strand Specificity
    if len(strand_specificity_set) == 1:
        specificity = strand_specificity_set.pop()
        if specificity in ['firststrand', 'fwd', 'forward']:
            kallisto_params['strandedness'] = '--fr-stranded'
            print(f"Strand Specificity: {specificity} (Kallisto: --fr-stranded)")
        elif specificity in ['secondstrand', 'rev', 'reverse']:
            kallisto_params['strandedness'] = '--rf-stranded'
            print(f"Strand Specificity: {specificity} (Kallisto: --rf-stranded)")
        elif specificity in ['unstranded']:
            kallisto_params['strandedness'] = None
            print(f"Strand Specificity: {specificity} (Unstranded)")
        else:
            kallisto_params['strandedness'] = None
            print(f"Strand Specificity: {specificity} (Unstranded by default)")
    elif len(strand_specificity_set) > 1:
        print("Warning: Mixed strand specificity detected among SRA runs.")
        kallisto_params['strandedness'] = None  # Default to unstranded
    else:
        print("Strand Specificity information is missing.")
        kallisto_params['strandedness'] = None  # Default to unstranded
    
    print("\nKallisto Parameters Determined:")
    pprint(kallisto_params)
    
    return kallisto_params

def construct_kallisto_command(params: Dict, index_path: str, output_dir: str, fastq_files: List[str]) -> str:
    """
    Constructs the Kallisto quantification command based on parameters.
    
    Args:
        params (Dict): Kallisto parameters.
        index_path (str): Path to the Kallisto index.
        output_dir (str): Directory to write Kallisto output.
        fastq_files (List[str]): List of FASTQ file paths.
        
    Returns:
        str: The complete Kallisto command.
    """
    cmd = f"kallisto quant -i {index_path} -o {output_dir} -t {params['threads']}"
    
    if params['strandedness']:
        cmd += f" {params['strandedness']}"
    
    if not params['paired_end']:
        # Ensure fragment length and SD are provided
        if params['fragment_length'] and params['sd']:
            cmd += f" --single -l {params['fragment_length']} -s {params['sd']}"
        else:
            # Prompt user or set default values
            print("Fragment length (-l) and standard deviation (-s) are required for single-end reads.")
            # Example default values; modify as needed
            cmd += " --single -l 200 -s 20"
    
    # Add FASTQ files; assuming paired-end
    if params['paired_end']:
        if len(fastq_files) % 2 != 0:
            print("Error: Paired-end data requires an even number of FASTQ files.")
            sys.exit(1)
        # Kallisto expects paired-end FASTQ files to be listed in pairs
        # For example: sample_1.fastq sample_2.fastq
        for i in range(0, len(fastq_files), 2):
            cmd += f" {fastq_files[i]} {fastq_files[i+1]}"
    else:
        # Single-end: list all FASTQ files
        for fq in fastq_files:
            cmd += f" {fq}"
    
    # Add optional parameters
    if params['bootstrap_samples'] > 0:
        cmd += f" -b {params['bootstrap_samples']}"
    
    if params['verbose']:
        cmd += " --verbose"
    
    return cmd

# ---------------------------- Main Execution ---------------------------- #

def main():
    # Step 1: Retrieve Study Summary
    study_summary = get_study_summary(GSE_ID)
    print("----- Study Summary -----")
    print(study_summary)
    print("-------------------------\n")
    
    # Step 2: Extract GSM IDs
    gsm_ids = get_gsm_ids(GSE_ID)
    print("List of GSM IDs:")
    pprint(gsm_ids)
    print("\n")
    
    # Step 3: Retrieve SRA Run Accession Numbers for Each GSM ID
    gsm_to_sra = {}
    for gsm_id in gsm_ids:
        sra_ids = get_sra_ids_for_gsm(gsm_id)
        gsm_to_sra[gsm_id] = sra_ids
        # Respect NCBI rate limits
        time.sleep(0.34)  # Approximately 3 requests per second
    
    print("GSM to SRA Run Accession Mapping:")
    pprint(gsm_to_sra)
    print("\n")
    
    # Step 4: Fetch SRA Metadata for Each SRA Run Accession
    all_sra_metadata = []
    for gsm_id, sra_ids in gsm_to_sra.items():
        for sra_id in sra_ids:
            metadata = get_sra_metadata(sra_id)
            if metadata:
                metadata['GSM_ID'] = gsm_id  # Associate GSM ID with metadata
                all_sra_metadata.append(metadata)
                print(f"Metadata for SRA Run: {sra_id} (GSM ID: {gsm_id})")
                pprint(metadata)
                print("\n")
            else:
                print(f"No metadata found for SRA Run ID: {sra_id}\n")
            # Respect NCBI rate limits
            time.sleep(0.34)
    
    if not all_sra_metadata:
        print("No SRA metadata retrieved. Exiting.")
        sys.exit(1)
    
    # Step 5: Determine Kallisto Parameters
    kallisto_params = determine_kallisto_parameters(all_sra_metadata)
    
    # Step 6: Construct Example Kallisto Command
    # Replace with your actual Kallisto index path, output directory, and FASTQ files
    kallisto_index = "kallisto_index.idx"    # Example index path
    output_directory = "kallisto_output"     # Example output directory
    fastq_files = [
        "sample1_1.fastq", "sample1_2.fastq",
        "sample2_1.fastq", "sample2_2.fastq",
        # Add all your FASTQ file paths here
    ]
    
    kallisto_command = construct_kallisto_command(
        params=kallisto_params,
        index_path=kallisto_index,
        output_dir=output_directory,
        fastq_files=fastq_files
    )
    
    print("----- Example Kallisto Command -----")
    print(kallisto_command)
    print("------------------------------------\n")

if __name__ == "__main__":
    main()


Fetching study summary for GSE268034...
Executing command: esearch -db gds -query "GSE268034[ACCN]" | efetch -format docsum | xtract -pattern DocumentSummarySet -block DocumentSummary -if Accession -equals GSE268034 -element Summary
Command Output:


----- Study Summary -----

-------------------------

Extracting GSM IDs for GSE268034...
Executing command: esearch -db gds -query "GSE268034[ACCN]" | efetch -format docsum | xtract -pattern DocumentSummarySet -block DocumentSummary -if Accession -equals GSE268034 -element SampleAccession
Command Output:


Raw GSM Output:


Extracted 0 GSM IDs.

List of GSM IDs:
[]


GSM to SRA Run Accession Mapping:
{}


No SRA metadata retrieved. Exiting.


SystemExit: 1

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


## Prepare Kallisto quantification