# NARDINI Online Fasta Analysis

This tool, developed by Cohan et al., conducts statistical analysis of **amino acid patterning** within intrisically disordered regions (**IDRs**).

The inputs and outputs are the same as command-line NARDINI. The input is a **.fasta** file of IDRs; the output is a **.zip** containing **.tsv** and **.png** files.

This notebook sends the FASTA to be processed to an external server, where NARDINI statistical analysis is performed. **You can close this notebook and the analysis will still run, and then come back and get your results.**


# Usage Instructions

1. **Setup** - Install dependencies
2. **Test Connection** - Verify service is available
3. **Select FASTA** - Choose your input file
4. **Run Analysis** - Submit file for processing (get Run ID)
5. **Check Progress** - Monitor analysis status
6. **Download Results** - Get your results when complete

## Output
Results will be saved to `data/zip_outputs/` folder containing:
- A zip file with your analysis results
- Run information text files for reference


In [None]:
#@markdown ##Installations
#@markdown First, set up the dependencies needed to run NARDINI.
%%capture
import os
import datetime
import json
import requests
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing_extensions import Literal, TypedDict
from google.colab import files

## Defining schemas for the data to be recived from the server ##

# ---- Public API response schemas ----
class ErrorResponse(TypedDict):
    error: str

class HealthResponse(TypedDict):
    status: Literal["healthy"]

class UploadFastaResponse(TypedDict):
    run_id: str
    status: Literal["submitted", "ready"]
    message: str
    job_ids: List[str]

class StatusResponse(TypedDict):
    run_id: str
    status: Literal["pending", "complete"]
    pending_sequences: List[str]

class RetryResponse(TypedDict):
    run_id: str
    status: Literal["retry_submitted"]

class SimplifiedDownloadResponse(TypedDict):
    run_id: str
    destination_filepath: str

# ---- Backend metadata schemas (stored in volume as JSON) ----

class SequenceInput(TypedDict):
    sequence: Any  # Bio.SeqRecord object
    seq_uuid: str

class SequenceData(TypedDict):
    sequence_id: str
    status: Literal["pending", "cached", "pending_external", "complete"]
    start_time: Optional[float]
    end_time: Optional[float]
    seq_uuid: Optional[str]
    zip_path: Optional[str]
    job_id: Optional[str]

SequenceString = str
SequencesMapping = Dict[SequenceString, SequenceData]

class RunData(TypedDict):
    status: Literal["pending", "complete"]
    fasta_filename: str
    output_filename: str
    sequences: SequencesMapping
    total_sequences: int
    cached_sequences: int
    merged_zip_filename: Optional[str]
    submitted_at: float
    completed_at: Optional[float]

## Set up functions to be used in the notebook ##
def test_health(url: str):
    """Test if the NARDINI backend service is healthy."""
    try:
        health_response = requests.get(f"{url}/health")
        if health_response.ok:
            res = health_response.json()
            return HealthResponse(status=res["status"])
        else:
            return ErrorResponse(
                error=f"Error: {health_response.status_code} {health_response.text}"
            )
    except Exception as e:
        return ErrorResponse(error=f"Connection error: {e}")

# Main function to run Nardini
def upload_fasta(
    url: str, fasta_filepath: Path | str
) -> UploadFastaResponse | ErrorResponse:
    """Submit a FASTA file for NARDINI analysis."""
    if not Path(fasta_filepath).exists():
        raise FileNotFoundError(f"File {fasta_filepath} does not exist")

    with open(fasta_filepath, "rb") as f:
        files = {"file": f}
        response = requests.post(f"{url}/upload_fasta", files=files)
    if response.ok:
        res = response.json()
        return UploadFastaResponse(
            run_id=res["run_id"],
            status=res["status"],
            message=res["message"],
            job_ids=res["job_ids"],
        )
    else:
        return ErrorResponse(error=f"Error: {response.status_code} {response.text}")

def get_run_status(url: str, run_id: str):
    """Check the status of a NARDINI analysis run."""
    try:
        status_response = requests.get(f"{url}/status/{run_id}")
        if status_response.ok:
            res = status_response.json()
            return StatusResponse(
                run_id=res["run_id"],
                status=res["status"],
                pending_sequences=res["pending_sequences"],
            )
        else:
            return f"Error: {status_response.status_code} {status_response.text}"
    except Exception as e:
        return f"Connection error: {e}"

def download_zip(
    url: str, run_id: str, destination_dir: Path | str = '/content/nardini_results'
) -> SimplifiedDownloadResponse | ErrorResponse:
    """Download the results zip file for a completed analysis."""
    if not run_id:
        return ErrorResponse(error="Please provide a valid Run ID.")

    destination_dir = Path(destination_dir)
    if not destination_dir.exists():
        raise FileNotFoundError(
            f"Destination directory {destination_dir} does not exist"
        )

    try:
        response = requests.get(
            f"{url}/download/{run_id}", stream=True
        )
        if response.ok:
            # Extract filename from response headers
            content_disposition = response.headers.get("content-disposition", "")
            if "filename=" in content_disposition:
                filename = content_disposition.split("filename=")[1].strip('"')
                destination_filepath = destination_dir / filename
            else:
                destination_filepath = destination_dir / f"{run_id}.zip"

            with open(destination_filepath, "wb") as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
            # print(f"Downloaded results to: {destination_filepath}")
            # print(
            #     f"File size: {destination_filepath.stat().st_size / (1024 * 1024):.1f} MB"
            # )
            return SimplifiedDownloadResponse(
                run_id=run_id, destination_filepath=str(destination_filepath)
            )
        else:
            print("Analysis is likely still in progress!")
            return ErrorResponse(
                error=f"Error downloading file: {response.status_code} {response.text}"
            )
    except Exception as e:
        return ErrorResponse(error=f"Download error: {e}")

def retry_sequences(url: str, run_id: str):
    """Retry processing for sequences that are still pending."""
    if not run_id:
        return ErrorResponse(error="Please provide a valid Run ID.")

    try:
        response = requests.get(f"{url}/retry/{run_id}")
        if response.ok:
            res = response.json()
            return RetryResponse(run_id=res["run_id"], status=res["status"])
        else:
            return ErrorResponse(
                error=f"Error retrying sequences: {response.status_code} {response.text}"
            )
    except Exception as e:
        return ErrorResponse(error=f"Retry error: {e}")

def get_available_runs(output_dir: Path | str = '/content/nardini_results'):
    """Get all available runs from the JSON file."""
    json_path = Path(output_dir) / 'run_info.json' # Ensure json_path is a Path object
    if not json_path.exists():
        return []
    try:
        with open(json_path, "r") as f:
            return json.load(f)
    except (json.JSONDecodeError, FileNotFoundError) as e:
        print(f"Error reading run info JSON: {e}")
        return []

def save_run_info(run_id: str, fasta_filename: str, output_dir: Path | str = '/content/nardini_results'):
    """Save run information to a JSON file for reference."""
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    output_dir = Path(output_dir)
    json_path = output_dir / 'run_info.json'

    # Ensure output directory exists
    if not output_dir.exists():
        output_dir.mkdir(parents=True, exist_ok=True)


    run_info = {
        "title": "NARDINI Analysis Run Information",
        "timestamp": timestamp,
        "fasta_file": fasta_filename,
        "run_id": run_id
    }

    # Append to existing JSON or create new one
    all_runs = get_available_runs(output_dir)
    # Check if the run_id already exists, update if it does, otherwise append
    found = False
    for run in all_runs:
        if run.get("run_id") == run_id:
            run.update(run_info)
            found = True
            break
    if not found:
        all_runs.append(run_info)


    with open(json_path, "w") as f:
        json.dump(all_runs, f, indent=2)

    return str(json_path)

In [None]:
#@markdown Test connection to server
BACKEND_URL = "https://tangentleman--nardini-online-fastapi-app.modal.run"
print("Testing connection to NARDINI backend...")
test_result = test_health(BACKEND_URL)

if test_result.get("error"):
    print("❌ Connection failed!")
    print(f"Response: {test_result}")
    print("\nTroubleshooting:")
    print("1. Check your internet connection")
    print("2. Verify the backend URL is correct")
    print("3. The server may be temporarily unavailable")
else:
    print("✅ Connection to server established!")
    # print(f"Backend URL: {BACKEND_URL}")
    # print(f"Response: {test_result}")

In [None]:
#@markdown Select FASTA File 📁

print("Please upload your FASTA file(s) using the widget below:")

uploaded = files.upload()

if uploaded:
    # Assuming a single FASTA file is uploaded
    filename = list(uploaded.keys())[0]
    content = uploaded[filename]

    # Write the content to a new file in the /content/ directory
    FASTA_FILEPATH = f"/content/{filename}"
    with open(FASTA_FILEPATH, 'wb') as f:
        f.write(content)
    print(f"✅ Success!")
    # print(f"\n✅ File '{filename}' uploaded and saved to {FASTA_FILEPATH}")
    # print(f"Set FASTA_FILEPATH to: {FASTA_FILEPATH}")

else:
    print("⚠️ No files were uploaded.")
    FASTA_FILEPATH = None

In [None]:
#@markdown Optional: See prior runs
runs = get_available_runs()
#req_runs = runs(colname== 'run_id' & 'value')
for item in runs:
  print(f"Run ID for {item.get('fasta_file')} : {item.get('run_id')}")

In [None]:
#@markdown Submit file for analysis
if FASTA_FILEPATH and Path(FASTA_FILEPATH).exists():
    print(f"\n🔬 Submitting {Path(FASTA_FILEPATH).name} for NARDINI analysis...")

    try:
        result = upload_fasta(BACKEND_URL, FASTA_FILEPATH)
        if isinstance(result, dict) and "run_id" in result:
            run_id = result["run_id"]
            print("✅ Analysis started successfully!")

            # Save run information to the specified OUTPUT_DIR
            save_run_info(run_id, FASTA_FILEPATH)

            print("\n📝 Next steps:")
            print("1. Use the 'Check Progress' cell to monitor analysis")
            print("2. Use the 'Download Results' cell when complete")
        else:
            print(f"❌ Error submitting file: {result}")
            run_id = None
    except Exception as e:
        print(f"❌ Error occurred: {e}")
        run_id = None
elif FASTA_FILEPATH:
    print(f"❌ File not found: {FASTA_FILEPATH}")
    run_id = None
else:
    print("⚠️  Please set FASTA_FILEPATH to the path of your FASTA file first!")

In [None]:
#@markdown Check Progress 🔍
# Monitor the status of your NARDINI analysis
# You can either use the run_id from the previous cell or enter one manually
check_run_id = run_id

# Option: Manually enter a run ID if needed (uncomment and modify)
# check_run_id = "your-run-id-here"

if not check_run_id:
    print("⚠️  No Run ID available!")
    print("Either run the analysis cell above first, or manually set check_run_id")
else:
    try:
        status_dict = get_run_status(BACKEND_URL, check_run_id)

        if isinstance(status_dict, dict):
            status = status_dict.get("status", "unknown")

            if status == "pending":
                print("⏳ Analysis is running...")

                # Show pending sequences
                pending_sequences = status_dict.get("pending_sequences", [])
                if pending_sequences:
                    remaining_count = len(pending_sequences)
                    print("\n📈 Progress Details:")
                    print(f"⏱️  {remaining_count} sequences remaining to process")

                    # Show first few pending sequences (limit output)
                    display_limit = min(5, len(pending_sequences))
                    for i, sequence in enumerate(pending_sequences[:display_limit], 1):
                        # Show only first 30 chars of sequence to keep output manageable
                        short_seq = (
                            sequence[:30] + "..." if len(sequence) > 30 else sequence
                        )
                        print(f"  ⏳ {i}. {short_seq}")

                    if len(pending_sequences) > display_limit:
                        print(
                            f"  ... and {len(pending_sequences) - display_limit} more sequences"
                        )
                else:
                    print("🔄 Processing has started, checking sequence completion...")

            elif status == "complete":
                print("🎉 Analysis completed successfully!")
                print("📥 You can now download the results using the next cell.")

            else:
                print(f"ℹ️  Status: {status}")

        else:
            print(f"❌ Error checking status: {status_dict}")

    except Exception as e:
        print(f"❌ Error occurred while checking status: {e}")

print("\n💡 Tip: Re-run this cell to get updated progress information")

In [None]:
#@markdown Download Results 📥
# Download the completed NARDINI analysis results

# Use the run_id from previous cells or enter one manually
download_run_id = run_id

# Option: Manually enter a run ID if needed (uncomment and modify)
# download_run_id = "your-run-id-here"

if not download_run_id:
    print("⚠️  No Run ID available!")
    print("Upload a file and try again!")
else:
    # print(f"\n📥 Downloading results for: {download_run_id}")

    # Attempt download directly
    try:
        results = download_zip(BACKEND_URL, download_run_id)
        if results.get("error"):
            print(f"❌ Download failed: {results.get('error')}")
            print("The analysis may still be in progress or an error occurred")
            print("Use the 'Check Progress' cell to verify the analysis status")
        else:
            print("\n🎉 Download successful!")
            print("Click the folder icon on the leftmost taskbar of this notebook to view 'nardini_results'!")
            print(f"📁 Results saved to: {results.get('destination_filepath')}")
            download_link = f"{BACKEND_URL}/download/{download_run_id}"
            print(f"You can also download your file at this link:\n{download_link}")

    except Exception as e:
        print(f"❌ Error occurred during download: {e}")
        print("The analysis may still be in progress or there was a connection issue")

# Credits

**✨ Made by Tanuj Vasudeva and Ethan Caine, 2025 ✨**

This notebook has been adapted for use in any Jupyter environment, not just Google Colab.



# Acknowledgments

We would like to thank Dr. John Woolford at Carnegie Mellon University — for whose lab this notebook was made — for his support of this project; Modal for hosting this service; Katherine Parry for helpful advice.



# References

Cohan, M. C., Shinn, M. K., Lalmansingh, J. M., & Pappu, R. V. (2021). Uncovering non-random binary patterns within sequences of intrinsically disordered proteins. *Journal of Molecular Biology*, 434(2), 167373.

## Additional Information

- **NARDINI Tool**: This notebook provides a user-friendly interface to the NARDINI analysis tool
- **Backend Service**: Analysis is performed on remote servers for optimal performance
- **Output Format**: Results include statistical data (.tsv files) and visualization plots (.png files)
- **Caching**: Previously analyzed sequences are cached to speed up repeat analyses