In [19]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
from IPython.display import display
%load_ext autoreload
%autoreload 2

import duckdb
import pandas as pd
from pathlib import Path
import logging
from Bio import SeqIO
from datetime import datetime
import time

from typing import List, Optional, Union, Dict, Set

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


# Build database

We're using our `database.builder` module to build the database.

TODO: Add logic to only insert sequences that don't already exist in the database.

TODO: I think there's some duplication/differences happening in the context manager that seem dumb, both of the "with" statements should be able to be combined.

In [5]:
from planter.database.builder import SequenceDBBuilder
# Define paths
DB_PATH = "/mnt/data2/planter_outputs/planter-test.duckdb"
OUTPUT_DIR = Path("/mnt/data2/planter_outputs")
# CLUSTER_TSV = "/mnt/data/planter_outputs/clusters.tsv"  # if you have clustering results

# List of SRA IDs to process
# sample_ids = ['ERR9123871', 'ERR9123872', 'ERR9123874', 'ERR9123875', 'ERR9123876']

sample_ids = [
    'ERR9123871', 'ERR9123872', 'ERR9123874', 'ERR9123875', 'ERR9123876', 
    'ERR9123877', 'ERR9123878', 'ERR9123879', 'ERR9123880', 'ERR9123881', 
    'ERR9123882', 'SRR10444679', 'SRR10444680', 'SRR10444681', 'SRR10444682', 
    'SRR10444683', 'SRR10444684', 'SRR11011255', 'SRR11011256', 'SRR11011257', 
    'SRR11011258', 'SRR11011259', 'SRR11011260', 'SRR12068547', 'SRR128113', 
    'SRR128114', 'SRR13765006', 'SRR14292007', 'SRR14292008', 'SRR18070778', 
    'SRR18070779', 'SRR18070780', 'SRR18070781', 'SRR18070782', 'SRR18070783', 
    'SRR18070784', 'SRR18070785', 'SRR18070786', 'SRR18070787', 'SRR18070788', 
    'SRR18070789', 'SRR18070790', 'SRR18070791', 'SRR18070792', 'SRR18070793', 
    'SRR18070794', 'SRR18070795', 'SRR18735292', 'SRR19034772', 'SRR19034773', 
    'SRR19619612', 'SRR19619613', 'SRR19619614', 'SRR22271585', 'SRR22271586', 
    'SRR22271587', 'SRR22271588', 'SRR22271589', 'SRR22904707', 'SRR24974225', 
    'SRR24974226', 'SRR24974227', 'SRR24974228', 'SRR25582085', 'SRR29366264', 
    'SRR29366265', 'SRR29366266', 'SRR5489198', 'SRR5992919', 'SRR5992920', 
    'SRR6048009', 'SRR8859643', 'SRR8859644', 'SRR8859645', 
    'SRR8859646', 'SRR8859647', 'SRR8859648'
]


# Build database
with SequenceDBBuilder(DB_PATH, output_dir=OUTPUT_DIR) as builder:
    # Build initial database with sequences and annotations
    results = builder.build_database(sample_ids)
    display(results)
    
    # # Optionally load clustering results if available
    # if Path(CLUSTER_TSV).exists():
    #     builder.load_clusters_from_tsv(CLUSTER_TSV)
    
    # Get final database summary
    summary = builder.get_database_summary()
    display(summary)



INFO:planter.database.builder:Applying migration: 000_schema_version.sql
INFO:planter.database.builder:Applying migration: 001_initial_schema.sql
INFO:planter.database.builder:Applying migration: 002_add_indexes.sql
INFO:planter.database.builder:Fetching metadata for ERR9123871
INFO:planter.database.utils.sra:Fetching metadata for ERR9123871
INFO:planter.database.builder:Loading sequences from /mnt/data2/planter_outputs/ERR9123871/transdecoder/ERR9123871.pep
INFO:planter.database.builder:Loaded 30281 new sequences for ERR9123871
INFO:planter.database.builder:Loading annotations from /mnt/data2/planter_outputs/ERR9123871/eggnog/ERR9123871.emapper.annotations
INFO:planter.database.builder:Processed ERR9123871: 30281 sequences, 27884 annotations, 0 duplicates
INFO:planter.database.builder:Fetching metadata for ERR9123872
INFO:planter.database.utils.sra:Fetching metadata for ERR9123872
INFO:planter.database.builder:Loading sequences from /mnt/data2/planter_outputs/ERR9123872/transdecoder/E

Unnamed: 0,sample_id,status,sequences_loaded,annotations_loaded,duplicates,error
0,ERR9123871,success,30281,27884,0,
1,ERR9123872,success,26342,24607,9,
2,ERR9123874,success,26649,25078,5,
3,ERR9123875,success,26224,24564,23,
4,ERR9123876,success,26284,24646,29,


Unnamed: 0,total_sequences,total_samples,representative_sequences,annotated_sequences,sequences_with_go,sequences_with_ec,total_clusters,avg_sequence_length,min_sequence_length,max_sequence_length
0,135780,5,0,126779,69579,30115,0,433.54,100,3243


# Query

## Get the database summary

In [101]:
from planter.database.query_manager import DatabaseManager
db_path = "/mnt/data2/planter_outputs/planter2.duckdb"

with DatabaseManager(db_path) as db_manager:
    summary = db_manager.query_manager.database_summary()
    display(summary)

Unnamed: 0,total_sequences,total_samples,representative_sequences,annotated_sequences,sequences_with_go,sequences_with_ec,total_clusters,avg_sequence_length,min_sequence_length,max_sequence_length
0,1985570,77,0,1637348,785398,387029,0,430.94,100,7993


## See what tables and schemas are present in the database

In [98]:
with DatabaseManager(db_path) as db_manager:
    # Fetch all tables
    tables = db_manager.con.execute("SHOW TABLES;").fetchdf()
    print("Tables in database:")
    print(tables)

    # Iterate through each table and fetch its schema
    for table in tables['name']:
        print(f"\nSchema for table: {table}")
        schema = db_manager.con.execute(f"PRAGMA table_info('{table}');").fetchdf()
        display(schema)

Tables in database:
              name
0      annotations
1  cluster_members
2         clusters
3       ec_numbers
4         go_terms
5        kegg_info
6   schema_version
7        sequences
8     sra_metadata

Schema for table: annotations


Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,seqhash_id,VARCHAR,True,,True
1,1,seed_ortholog,VARCHAR,False,,False
2,2,evalue,DOUBLE,False,,False
3,3,score,DOUBLE,False,,False
4,4,eggnog_ogs,VARCHAR,False,,False
5,5,max_annot_lvl,VARCHAR,False,,False
6,6,cog_category,VARCHAR,False,,False
7,7,description,VARCHAR,False,,False
8,8,preferred_name,VARCHAR,False,,False
9,9,sample_id,VARCHAR,True,,False



Schema for table: cluster_members


Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,seqhash_id,VARCHAR,True,,True
1,1,cluster_id,VARCHAR,True,,False



Schema for table: clusters


Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,cluster_id,VARCHAR,True,,True
1,1,representative_seqhash_id,VARCHAR,True,,False
2,2,size,INTEGER,True,,False



Schema for table: ec_numbers


Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,seqhash_id,VARCHAR,True,,True
1,1,ec_number,VARCHAR,True,,True



Schema for table: go_terms


Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,seqhash_id,VARCHAR,True,,True
1,1,go_term,VARCHAR,True,,True



Schema for table: kegg_info


Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,seqhash_id,VARCHAR,True,,True
1,1,kegg_ko,VARCHAR,False,,False
2,2,kegg_pathway,VARCHAR,False,,False
3,3,kegg_module,VARCHAR,False,,False
4,4,kegg_reaction,VARCHAR,False,,False
5,5,kegg_rclass,VARCHAR,False,,False



Schema for table: schema_version


Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,version,INTEGER,True,,True
1,1,migration_name,VARCHAR,True,,False
2,2,applied_at,TIMESTAMP,False,CURRENT_TIMESTAMP,False



Schema for table: sequences


Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,seqhash_id,VARCHAR,True,,True
1,1,sequence,VARCHAR,True,,False
2,2,sample_id,VARCHAR,True,,False
3,3,assembly_date,TIMESTAMP,True,,False
4,4,is_representative,BOOLEAN,True,CAST('f' AS BOOLEAN),False
5,5,length,INTEGER,True,,False



Schema for table: sra_metadata


Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,sample_id,VARCHAR,True,,True
1,1,organism,VARCHAR,False,,False
2,2,study_title,VARCHAR,False,,False
3,3,study_abstract,VARCHAR,False,,False
4,4,bioproject,VARCHAR,False,,False
5,5,biosample,VARCHAR,False,,False
6,6,library_strategy,VARCHAR,False,,False
7,7,library_source,VARCHAR,False,,False
8,8,library_selection,VARCHAR,False,,False
9,9,library_layout,VARCHAR,False,,False


## Get sequence_annotations for seqhashIDs

In [82]:
from planter.database.query_manager import DatabaseManager
import pandas as pd

db_path = "/mnt/data2/planter_outputs/planter2.duckdb"

# Test some seqhash IDs from your MMseqs results
test_ids = [
   "v1_DLS_8813cfa2d04ae4cf4c316e335a01c9d81b66255681bc3e51a98cd00dc5563466.p2",
   "v1_DLS_416333688207b36e52865182ab219b807e55f9b9cc56f30bb249fd4faa38e3da.p1",
   "v1_DLS_9ec0dd0c4e615b3c23b1b8e475cd0dc87f96b87d66fb13d2d033dc87dc652ca1.p1",
   "v1_DLS_633f05f5d947805a8eae5b38eea7cdd090f48c3dc1e3ce89eb0b6732042591aa.p1",
   "v1_DLS_700011b382d7c61a71117db9be1cfde6ad9435a22f74e60d769f57737d13c55b.p1"
]

with DatabaseManager(db_path) as db:
    annotations = db.query_manager.sequence_annotations(values=(test_ids,))
    display("\nAnnotations:", annotations)

'\nAnnotations:'

Unnamed: 0,seqhash_id,sample_id,organism,description,preferred_name,cog_category,go_terms,ec_numbers,kegg_ko,kegg_pathway
0,v1_DLS_633f05f5d947805a8eae5b38eea7cdd090f48c3...,SRR10444679,Xanthoria parietina,PKS_DH,-,Q,,,,
1,v1_DLS_8813cfa2d04ae4cf4c316e335a01c9d81b66255...,SRR128114,Digitalis purpurea,Belongs to the UDP-glycosyltransferase family,-,CG,,2.4.1.210,,
2,v1_DLS_416333688207b36e52865182ab219b807e55f9b...,SRR24974225,Digitalis purpurea,Belongs to the UDP-glycosyltransferase family,-,CG,,,,


## Search sequences

You can parameterize the search:
```sql

    {% if sample_id_condition %} AND {{ sample_id_condition }} {% endif %}
    {% if min_length_condition %} AND {{ min_length_condition }} {% endif %}
    {% if max_length_condition %} AND {{ max_length_condition }} {% endif %}
    {% if description_condition %} AND {{ description_condition }} {% endif %}
    {% if organism_condition %} AND {{ organism_condition }} {% endif %}
LIMIT {{ limit }}
```

In [112]:
from planter.database.query_manager import DatabaseManager

db_path = "/mnt/data2/planter_outputs/planter2.duckdb"
output_dir = "/mnt/data2/planter_outputs"

with DatabaseManager(db_path) as db_manager:
    sample_ids = ["SRR18070780", "SRR10444679"]
    # sample_id_condition = ", ".join([f"'{sample_id}'" for sample_id in sample_ids])
    cog_categories = ['Q']  # desired COG categories
    # cog_category_condition = ", ".join([f"'{c}'" for c in cog_categories])

    go_terms = ['GO:0075109']  # example GO terms to search

    params = {
        "sample_id_condition": sample_ids,  # e.g., "s.sample_id IN ('SRR18070780', 'SRR18070781')"
        "cog_category_condition": ['M', 'Q'],
        # "go_term_condition": go_terms,  # pass a list of GO terms        
        "min_length_condition": "s.length >= 100",
        "max_length_condition": "s.length <= 1000",
        "description_condition": None,
        "organism_condition": None,
        # "limit": 10,
    }

    results = db_manager.query_manager.search_sequences(params=params)
    display(results)

    # call go_term_summary dynamically
    # go_summary = db_manager.query_manager.go_term_summary(None, None, 5)
    # print("\nGO term summary:")
    # display(go_summary)

    # call organism_summary dynamically
    # organism_summary = db_manager.query_manager.organism_summary()
    # print("\nOrganism summary:")
    # display(organism_summary)
    # print(dir(db_manager.query_manager))
    # db_manager.query_manager.search_sequences('v1_DLS_31412ec4347c212e6892097053de8dc39cd53341988080fd7b80866c35840a0a.p1')
    

Unnamed: 0,seqhash_id,sample_id,length,is_representative,description,preferred_name,cog_category,evalue,seed_ortholog,max_annot_lvl,cluster_id,cluster_size,organism,study_title,bioproject,biosample
0,v1_DLS_633d5b574e7f1bb30b6c8343ecbf46c6e7abfdb...,SRR10444679,416,False,Belongs to the zinc-containing alcohol dehydro...,FDH1,Q,2.120000e-245,364733.XP_007786925.1,4751|Fungi,,,Xanthoria parietina,Xanthoria parietina 46-1 Gene Expression Profi...,PRJNA584076,SAMN13173247
1,v1_DLS_77c014405bcc0337388bb239ab428f9dd03a48c...,SRR10444679,411,False,Alcohol dehydrogenase GroES-like domain,lad1,Q,7.160000e-192,35720.XP_003656577.1,4751|Fungi,,,Xanthoria parietina,Xanthoria parietina 46-1 Gene Expression Profi...,PRJNA584076,SAMN13173247
2,v1_DLS_3f591e78ad65d2f652f99bebccccd2723af48de...,SRR10444679,563,False,Cytochrome p-450,-,IQ,1.490000e-169,40559.M7TAT1,4751|Fungi,,,Xanthoria parietina,Xanthoria parietina 46-1 Gene Expression Profi...,PRJNA584076,SAMN13173247
3,v1_DLS_daa45c4ccbd841941d2b87d2f9c0d5d016da6d2...,SRR10444679,155,False,Destroys radicals which are normally produced ...,SOD1,Q,2.500000e-97,364733.XP_007805264.1,4751|Fungi,,,Xanthoria parietina,Xanthoria parietina 46-1 Gene Expression Profi...,PRJNA584076,SAMN13173247
4,v1_DLS_22ffb3919b77d3c8ede9ccaed821bba709981fc...,SRR10444679,744,False,ABC transport system ATP-binding protein,MDL1,Q,3.750000e-293,43228.XP_007730531.1,4751|Fungi,,,Xanthoria parietina,Xanthoria parietina 46-1 Gene Expression Profi...,PRJNA584076,SAMN13173247
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3049,v1_DLS_aee85287c230fdf88dadc38dbf040b317690430...,SRR10444679,545,False,Phosphopantetheine attachment site,-,Q,1.450000e-86,364733.XP_007801216.1,4751|Fungi,,,Xanthoria parietina,Xanthoria parietina 46-1 Gene Expression Profi...,PRJNA584076,SAMN13173247
3050,v1_DLS_af30a17a3e559e5ab99082ab00ffde6f9f094a3...,SRR10444679,341,False,cytochrome P450,-,Q,3.570000e-102,37727.XP_002145199.1,4751|Fungi,,,Xanthoria parietina,Xanthoria parietina 46-1 Gene Expression Profi...,PRJNA584076,SAMN13173247
3051,v1_DLS_be130915e2b2268f933a9cf059aee1f995c696f...,SRR10444679,177,False,Belongs to the cytochrome P450 family,-,Q,3.900000e-83,86049.XP_008728781.1,4751|Fungi,,,Xanthoria parietina,Xanthoria parietina 46-1 Gene Expression Profi...,PRJNA584076,SAMN13173247
3052,v1_DLS_c3588ced93d98d666bf9d656c7d7aa6e86bdb93...,SRR10444679,475,False,Occurs in almost all aerobically respiring org...,-,Q,2.150000e-308,248742.XP_005645077.1,3041|Chlorophyta,,,Xanthoria parietina,Xanthoria parietina 46-1 Gene Expression Profi...,PRJNA584076,SAMN13173247


In [48]:
self.con.execute(query).fetchdf()


NameError: name 'self' is not defined

In [23]:
import requests
from pathlib import Path
from goatools.obo_parser import GODag

def download_go_file(cache_dir=".cache", file_name="go.obo"):
    """Download the GO file if it doesn't exist."""
    cache_path = Path(cache_dir)
    cache_path.mkdir(exist_ok=True)
    file_path = cache_path / file_name

    if not file_path.exists():
        print("Downloading GO file...")
        url = "https://purl.obolibrary.org/obo/go.obo"
        response = requests.get(url, stream=True)
        response.raise_for_status()
        with open(file_path, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
    return file_path

# usage
obo_file = download_go_file()
go_dag = GODag(obo_file)


Downloading GO file...
.cache/go.obo: fmt(1.2) rel(2024-11-03) 43,983 Terms


In [26]:
# create a dictionary mapping GO term IDs to their names
go_terms_dict = {term.id: term.name for term in go_dag.values()}

# add the "description" column to the dataframe
go_summary["description"] = go_summary["go_term"].map(go_terms_dict)



In [28]:
go_summary.tail(40)

Unnamed: 0,go_term,sequence_count,sample_ids,description
22945,GO:0034771,5,"SRR18735292, SRR5992920, SRR5992919",
22946,GO:0036216,5,"SRR18735292, SRR5992919, SRR5992920",cellular response to stem cell factor stimulus
22947,GO:0034264,5,"ERR9123875, ERR9123877, ERR9123876, ERR9123874...",isopentenyl adenine metabolic process
22948,GO:2000354,5,"SRR5992920, SRR18735292, SRR5992919, SRR22904707",regulation of ovarian follicle development
22949,GO:0046208,5,"SRR18735292, SRR10444679, SRR5992920, SRR10444680",spermine catabolic process
22950,GO:1990335,5,"SRR5992920, SRR18735292",
22951,GO:1905550,5,"SRR5992920, SRR5992919, SRR25582085, SRR10444680",regulation of protein localization to endoplas...
22952,GO:0047191,5,SRR5992920,1-alkylglycerophosphocholine O-acyltransferase...
22953,GO:0004796,5,"SRR10444680, SRR8859647, SRR8859646, SRR10444679",thromboxane-A synthase activity
22954,GO:1905600,5,"SRR22904707, SRR5992919, SRR5992920, SRR18735292",regulation of receptor-mediated endocytosis in...


In [None]:
class SequenceDBQuery:
    """Handles database queries and analysis."""
    def __init__(self, db_path: str):
        self.db_path = db_path
        self.con = duckdb.connect(db_path)
        self.logger = logging.getLogger(__name__)

    def __enter__(self):
        """Context manager entry"""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit"""
        self.con.close()

    def get_organism_summary(self) -> pd.DataFrame:
        """Get summary of sequence counts per organism."""
        query = """
        SELECT 
            m.organism,
            COUNT(DISTINCT m.sample_id) as sample_count,
            COUNT(DISTINCT s.seqhash_id) as total_sequences,
            ROUND(AVG(s.length), 2) as avg_sequence_length,
            COUNT(DISTINCT CASE WHEN a.seqhash_id IS NOT NULL THEN s.seqhash_id END) as annotated_sequences,
            ROUND(COUNT(DISTINCT CASE WHEN a.seqhash_id IS NOT NULL THEN s.seqhash_id END) * 100.0 / 
                  COUNT(DISTINCT s.seqhash_id), 2) as percent_annotated,
            COUNT(DISTINCT CASE WHEN cm.seqhash_id IS NOT NULL THEN s.seqhash_id END) as sequences_in_clusters,
            STRING_AGG(DISTINCT m.bioproject, '; ') as bioprojects
        FROM sra_metadata m
        JOIN sequences s ON s.sample_id = m.sample_id
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN cluster_members cm ON s.seqhash_id = cm.seqhash_id
        WHERE m.organism IS NOT NULL
        GROUP BY m.organism
        ORDER BY total_sequences DESC
        """
        return self.con.execute(query).fetchdf()

    def get_sample_with_metadata(self, sample_id: str) -> pd.DataFrame:
        """Get complete sample information including SRA metadata."""
        query = """
        SELECT 
            m.*,
            COUNT(DISTINCT s.seqhash_id) as total_sequences,
            COUNT(DISTINCT CASE WHEN a.seqhash_id IS NOT NULL THEN s.seqhash_id END) as annotated_sequences,
            COUNT(DISTINCT CASE WHEN g.seqhash_id IS NOT NULL THEN s.seqhash_id END) as sequences_with_go,
            COUNT(DISTINCT CASE WHEN e.seqhash_id IS NOT NULL THEN s.seqhash_id END) as sequences_with_ec,
            COUNT(DISTINCT CASE WHEN cm.seqhash_id IS NOT NULL THEN s.seqhash_id END) as sequences_in_clusters
        FROM sra_metadata m
        JOIN sequences s ON s.sample_id = m.sample_id
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN go_terms g ON s.seqhash_id = g.seqhash_id
        LEFT JOIN ec_numbers e ON s.seqhash_id = e.seqhash_id
        LEFT JOIN cluster_members cm ON s.seqhash_id = cm.seqhash_id
        WHERE m.sample_id = ?
        GROUP BY m.sample_id, m.organism, m.study_title, m.study_abstract, 
                 m.bioproject, m.biosample, m.library_strategy, m.library_source,
                 m.library_selection, m.library_layout, m.instrument, 
                 m.run_spots, m.run_bases, m.run_published
        """
        return self.con.execute(query, [sample_id]).fetchdf()

    def get_organism_summary(self) -> pd.DataFrame:
        """Get summary of sequence counts per organism."""
        query = """
        SELECT 
            m.organism,
            COUNT(DISTINCT m.sample_id) as sample_count,
            COUNT(DISTINCT s.seqhash_id) as total_sequences,
            COUNT(DISTINCT CASE WHEN a.seqhash_id IS NOT NULL THEN s.seqhash_id END) as annotated_sequences,
            COUNT(DISTINCT CASE WHEN g.seqhash_id IS NOT NULL THEN s.seqhash_id END) as sequences_with_go,
            COUNT(DISTINCT CASE WHEN e.seqhash_id IS NOT NULL THEN s.seqhash_id END) as sequences_with_ec,
            COUNT(DISTINCT CASE WHEN cm.seqhash_id IS NOT NULL THEN s.seqhash_id END) as sequences_in_clusters,
            STRING_AGG(DISTINCT m.bioproject, '; ') as bioprojects
        FROM sra_metadata m
        JOIN sequences s ON s.sample_id = m.sample_id
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN go_terms g ON s.seqhash_id = g.seqhash_id
        LEFT JOIN ec_numbers e ON s.seqhash_id = e.seqhash_id
        LEFT JOIN cluster_members cm ON s.seqhash_id = cm.seqhash_id
        WHERE m.organism IS NOT NULL
        GROUP BY m.organism
        ORDER BY total_sequences DESC
        """
        return self.con.execute(query).fetchdf()
    
    def get_sequence_by_id(self, seqhash_id: str) -> dict:
        """Retrieve complete information for a specific sequence."""
        query = """
        SELECT 
            s.*,
            a.seed_ortholog,
            a.evalue,
            a.score,
            a.eggnog_ogs,
            a.description,
            a.preferred_name,
            a.cog_category,
            STRING_AGG(DISTINCT g.go_term, '; ') as go_terms,
            STRING_AGG(DISTINCT e.ec_number, '; ') as ec_numbers,
            k.kegg_ko,
            k.kegg_pathway,
            k.kegg_module,
            c.cluster_id,
            c.size as cluster_size
        FROM sequences s
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN go_terms g ON s.seqhash_id = g.seqhash_id
        LEFT JOIN ec_numbers e ON s.seqhash_id = e.seqhash_id
        LEFT JOIN kegg_info k ON s.seqhash_id = k.seqhash_id
        LEFT JOIN cluster_members cm ON s.seqhash_id = cm.seqhash_id
        LEFT JOIN clusters c ON cm.cluster_id = c.cluster_id
        WHERE s.seqhash_id = ?
        GROUP BY s.seqhash_id, s.sequence, s.sample_id, s.assembly_date, 
                 s.is_representative, s.length, a.seed_ortholog, a.evalue, 
                 a.score, a.eggnog_ogs, a.description, a.preferred_name,
                 a.cog_category, k.kegg_ko, k.kegg_pathway, k.kegg_module,
                 c.cluster_id, c.size
        """
        result = self.con.execute(query, [seqhash_id]).fetchdf()
        return result.to_dict('records')[0] if not result.empty else None

    def search_sequences(self, 
                        sample_id: Optional[str] = None,
                        min_length: Optional[int] = None,
                        max_length: Optional[int] = None,
                        has_annotation: Optional[bool] = None,
                        description_contains: Optional[str] = None,
                        go_term: Optional[str] = None,
                        ec_number: Optional[str] = None,
                        is_representative: Optional[bool] = None,
                        min_cluster_size: Optional[int] = None,
                        limit: int = 100) -> pd.DataFrame:
        """Search sequences with multiple criteria."""
        conditions = ["1=1"]
        params = []
        
        if sample_id:
            conditions.append("s.sample_id = ?")
            params.append(sample_id)
            
        if min_length:
            conditions.append("s.length >= ?")
            params.append(min_length)
            
        if max_length:
            conditions.append("s.length <= ?")
            params.append(max_length)
            
        if has_annotation is not None:
            if has_annotation:
                conditions.append("a.seqhash_id IS NOT NULL")
            else:
                conditions.append("a.seqhash_id IS NULL")
                
        if description_contains:
            conditions.append("LOWER(a.description) LIKE ?")
            params.append(f"%{description_contains.lower()}%")
            
        if go_term:
            conditions.append("""
                EXISTS (
                    SELECT 1 FROM go_terms g 
                    WHERE g.seqhash_id = s.seqhash_id 
                    AND g.go_term = ?
                )
            """)
            params.append(go_term)
            
        if ec_number:
            conditions.append("""
                EXISTS (
                    SELECT 1 FROM ec_numbers e 
                    WHERE e.seqhash_id = s.seqhash_id 
                    AND e.ec_number = ?
                )
            """)
            params.append(ec_number)
            
        if is_representative is not None:
            conditions.append("s.is_representative = ?")
            params.append(is_representative)
            
        if min_cluster_size is not None:
            conditions.append("""
                EXISTS (
                    SELECT 1 FROM clusters c 
                    JOIN cluster_members cm ON c.cluster_id = cm.cluster_id
                    WHERE cm.seqhash_id = s.seqhash_id 
                    AND c.size >= ?
                )
            """)
            params.append(min_cluster_size)
            
        query = f"""
        SELECT DISTINCT
            s.seqhash_id,
            s.sample_id,
            s.length,
            s.is_representative,
            a.description,
            a.preferred_name,
            a.cog_category,
            c.cluster_id,
            c.size as cluster_size
        FROM sequences s
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN cluster_members cm ON s.seqhash_id = cm.seqhash_id
        LEFT JOIN clusters c ON cm.cluster_id = c.cluster_id
        WHERE {' AND '.join(conditions)}
        LIMIT ?
        """
        params.append(limit)
        
        return self.con.execute(query, params).fetchdf()

    def get_sample_stats(self, sample_id: Optional[str] = None) -> pd.DataFrame:
        """Get statistics for all samples or a specific sample."""
        query = """
        SELECT 
            s.sample_id,
            COUNT(DISTINCT s.seqhash_id) as total_sequences,
            AVG(s.length) as avg_length,
            MIN(s.length) as min_length,
            MAX(s.length) as max_length,
            COUNT(DISTINCT a.seqhash_id) as annotated_sequences,
            COUNT(DISTINCT g.seqhash_id) as sequences_with_go,
            COUNT(DISTINCT e.seqhash_id) as sequences_with_ec,
            COUNT(DISTINCT CASE WHEN s.is_representative THEN s.seqhash_id END) as representative_sequences,
            COUNT(DISTINCT cm.cluster_id) as clusters
        FROM sequences s
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN go_terms g ON s.seqhash_id = g.seqhash_id
        LEFT JOIN ec_numbers e ON s.seqhash_id = e.seqhash_id
        LEFT JOIN cluster_members cm ON s.seqhash_id = cm.seqhash_id
        """
        if sample_id:
            query += " WHERE s.sample_id = ?"
            return self.con.execute(query + " GROUP BY s.sample_id", [sample_id]).fetchdf()
        return self.con.execute(query + " GROUP BY s.sample_id").fetchdf()

    def get_cluster_info(self, cluster_id: str) -> pd.DataFrame:
        """Get detailed information about a specific cluster."""
        query = """
        SELECT 
            s.seqhash_id,
            s.sample_id,
            s.length,
            s.is_representative,
            a.description,
            a.preferred_name,
            a.cog_category,
            STRING_AGG(DISTINCT g.go_term, '; ') as go_terms,
            STRING_AGG(DISTINCT e.ec_number, '; ') as ec_numbers
        FROM cluster_members cm
        JOIN sequences s ON cm.seqhash_id = s.seqhash_id
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN go_terms g ON s.seqhash_id = g.seqhash_id
        LEFT JOIN ec_numbers e ON s.seqhash_id = e.seqhash_id
        WHERE cm.cluster_id = ?
        GROUP BY s.seqhash_id, s.sample_id, s.length, s.is_representative,
                 a.description, a.preferred_name, a.cog_category
        """
        return self.con.execute(query, [cluster_id]).fetchdf()

    def get_cluster_stats(self) -> pd.DataFrame:
        """Get summary statistics about clusters."""
        query = """
        SELECT 
            COUNT(DISTINCT cluster_id) as total_clusters,
            AVG(size) as avg_cluster_size,
            MIN(size) as min_cluster_size,
            MAX(size) as max_cluster_size,
            APPROX_QUANTILE(size, 0.5) as median_cluster_size
        FROM clusters
        """
        return self.con.execute(query).fetchdf()

    def get_go_term_summary(self, 
                          sample_id: Optional[str] = None, 
                          min_sequences: int = 5) -> pd.DataFrame:
        """Get summary of GO term frequencies."""
        query = """
        SELECT 
            g.go_term,
            COUNT(DISTINCT s.seqhash_id) as sequence_count
        FROM sequences s
        JOIN go_terms g ON s.seqhash_id = g.seqhash_id
        """
        if sample_id:
            query += " WHERE s.sample_id = ?"
            params = [sample_id]
        else:
            params = []
            
        query += f"""
            GROUP BY g.go_term 
            HAVING COUNT(DISTINCT s.seqhash_id) >= {min_sequences} 
            ORDER BY sequence_count DESC
        """
        return self.con.execute(query, params).fetchdf()

    def get_ec_number_summary(self, 
                            sample_id: Optional[str] = None, 
                            min_sequences: int = 5) -> pd.DataFrame:
        """Get summary of EC number frequencies."""
        query = """
        SELECT 
            e.ec_number,
            COUNT(DISTINCT s.seqhash_id) as sequence_count
        FROM sequences s
        JOIN ec_numbers e ON s.seqhash_id = e.seqhash_id
        """
        if sample_id:
            query += " WHERE s.sample_id = ?"
            params = [sample_id]
        else:
            params = []
            
        query += f"""
            GROUP BY e.ec_number 
            HAVING COUNT(DISTINCT s.seqhash_id) >= {min_sequences} 
            ORDER BY sequence_count DESC
        """
        return self.con.execute(query, params).fetchdf()

    def compare_samples(self, sample_ids: List[str]) -> pd.DataFrame:
        """Compare statistics between multiple samples."""
        sample_list = ", ".join([f"'{s}'" for s in sample_ids])
        query = f"""
        SELECT 
            s.sample_id,
            COUNT(DISTINCT s.seqhash_id) as total_sequences,
            COUNT(DISTINCT a.seqhash_id) as annotated_sequences,
            COUNT(DISTINCT g.seqhash_id) as sequences_with_go,
            COUNT(DISTINCT e.seqhash_id) as sequences_with_ec,
            COUNT(DISTINCT cm.cluster_id) as clusters,
            AVG(s.length) as avg_length,
            COUNT(DISTINCT CASE WHEN s.is_representative THEN s.seqhash_id END) as representative_sequences
        FROM sequences s
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN go_terms g ON s.seqhash_id = g.seqhash_id
        LEFT JOIN ec_numbers e ON s.seqhash_id = e.seqhash_id
        LEFT JOIN cluster_members cm ON s.seqhash_id = cm.seqhash_id
        WHERE s.sample_id IN ({sample_list})
        GROUP BY s.sample_id
        """
        return self.con.execute(query).fetchdf()

# Get Organism metadata

In [6]:
class SRAMetadataCollector:
    """Handles collection and caching of SRA metadata."""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
    
    def collect_metadata(self, sample_ids: List[str], cache_path: Optional[str] = None) -> pd.DataFrame:
        """
        Collect metadata for a list of SRA samples, with optional caching.
        
        Args:
            sample_ids: List of SRA identifiers
            cache_path: Optional path to cache results in CSV format
        """
        # Check cache if provided
        existing_data = pd.DataFrame()
        samples_to_fetch = sample_ids
        
        if cache_path and Path(cache_path).exists():
            existing_data = pd.read_csv(cache_path)
            samples_to_fetch = [sid for sid in sample_ids 
                              if sid not in existing_data['sample_id'].values]
            self.logger.info(f"Found {len(existing_data)} cached entries")
            
        # Fetch new samples
        new_data = []
        for sample_id in samples_to_fetch:
            self.logger.info(f"Fetching metadata for {sample_id}")
            try:
                info = get_sra_info(sample_id)
                if isinstance(info, dict):  # Successful fetch
                    metadata = {
                        'sample_id': sample_id,
                        'organism': info.get('organism'),
                        'study_title': info.get('study_title'),
                        'study_abstract': info.get('study_abstract'),
                        'bioproject': info.get('bioproject'),
                        'biosample': info.get('biosample'),
                        'library_strategy': info.get('library', {}).get('strategy'),
                        'library_source': info.get('library', {}).get('source'),
                        'library_selection': info.get('library', {}).get('selection'),
                        'library_layout': info.get('library', {}).get('layout'),
                        'instrument': info.get('instrument'),
                        'run_spots': info.get('run', {}).get('spots'),
                        'run_bases': info.get('run', {}).get('bases'),
                        'run_published': info.get('run', {}).get('published')
                    }
                    new_data.append(metadata)
                else:
                    self.logger.warning(f"Failed to fetch {sample_id}: {info}")
            except Exception as e:
                self.logger.error(f"Error processing {sample_id}: {str(e)}")
            
            time.sleep(1)  # Respect rate limit
            
        # Combine new and existing data
        new_df = pd.DataFrame(new_data)
        combined_df = pd.concat([existing_data, new_df], ignore_index=True)
        
        # Cache if path provided
        if cache_path and not new_df.empty:
            combined_df.to_csv(cache_path, index=False)
            self.logger.info(f"Updated cache at {cache_path}")
        
        return combined_df


# Build

In [5]:
import duckdb
import pandas as pd
from pathlib import Path
import logging
from Bio import SeqIO
from datetime import datetime
from typing import List, Optional, Union, Dict, Set

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class SequenceDBBuilder:
    """Handles database creation and data loading operations."""
    
    def __init__(self, db_path: str):
        self.db_path = db_path
        self.con = None
        self.logger = logging.getLogger(__name__)

    def __enter__(self):
        """Context manager entry"""
        self.con = duckdb.connect(self.db_path)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit"""
        if self.con:
            self.con.close()
            
    def clean_database(self):
        """Drop all existing tables"""
        self.logger.info("Cleaning up any existing tables...")
        cleanup_sql = """
        DROP TABLE IF EXISTS cluster_members;
        DROP TABLE IF EXISTS clusters;
        DROP TABLE IF EXISTS kegg_info;
        DROP TABLE IF EXISTS ec_numbers;
        DROP TABLE IF EXISTS go_terms;
        DROP TABLE IF EXISTS annotations;
        DROP TABLE IF EXISTS sequences;
        DROP TABLE IF EXISTS temp_annotations;
        DROP TABLE IF EXISTS temp_clusters;
        """
        self.con.execute(cleanup_sql)

    def init_database(self):
        """Initialize database with clean schema including SRA metadata."""
        self.clean_database()
        
        schema_sql = """
        CREATE TABLE IF NOT EXISTS sequences (
            seqhash_id VARCHAR PRIMARY KEY,
            sequence VARCHAR NOT NULL,
            sample_id VARCHAR NOT NULL,
            assembly_date TIMESTAMP NOT NULL,
            is_representative BOOLEAN NOT NULL DEFAULT FALSE,
            length INTEGER NOT NULL
        );

        CREATE TABLE IF NOT EXISTS annotations (
            seqhash_id VARCHAR PRIMARY KEY,
            seed_ortholog VARCHAR,
            evalue DOUBLE,
            score DOUBLE,
            eggnog_ogs VARCHAR,
            max_annot_lvl VARCHAR,
            cog_category VARCHAR,
            description VARCHAR,
            preferred_name VARCHAR,
            sample_id VARCHAR NOT NULL
        );

        CREATE TABLE IF NOT EXISTS go_terms (
            seqhash_id VARCHAR NOT NULL,
            go_term VARCHAR NOT NULL,
            PRIMARY KEY (seqhash_id, go_term)
        );

        CREATE TABLE IF NOT EXISTS ec_numbers (
            seqhash_id VARCHAR NOT NULL,
            ec_number VARCHAR NOT NULL,
            PRIMARY KEY (seqhash_id, ec_number)
        );

        CREATE TABLE IF NOT EXISTS kegg_info (
            seqhash_id VARCHAR PRIMARY KEY,
            kegg_ko VARCHAR,
            kegg_pathway VARCHAR,
            kegg_module VARCHAR,
            kegg_reaction VARCHAR,
            kegg_rclass VARCHAR
        );

        CREATE TABLE IF NOT EXISTS clusters (
            cluster_id VARCHAR PRIMARY KEY,
            representative_seqhash_id VARCHAR NOT NULL,
            size INTEGER NOT NULL,
            FOREIGN KEY (representative_seqhash_id) REFERENCES sequences(seqhash_id)
        );

        CREATE TABLE IF NOT EXISTS cluster_members (
            seqhash_id VARCHAR NOT NULL,
            cluster_id VARCHAR NOT NULL,
            PRIMARY KEY (seqhash_id),
            FOREIGN KEY (seqhash_id) REFERENCES sequences(seqhash_id),
            FOREIGN KEY (cluster_id) REFERENCES clusters(cluster_id)
        );

        CREATE TABLE IF NOT EXISTS sra_metadata (
            sample_id VARCHAR PRIMARY KEY,
            organism VARCHAR,
            study_title VARCHAR,
            study_abstract VARCHAR,
            bioproject VARCHAR,
            biosample VARCHAR,
            library_strategy VARCHAR,
            library_source VARCHAR,
            library_selection VARCHAR,
            library_layout VARCHAR,
            instrument VARCHAR,
            run_spots VARCHAR,
            run_bases VARCHAR,
            run_published VARCHAR
        );
        """
        
        self.con.execute(schema_sql)
    
    def load_sra_metadata(self, metadata_df: pd.DataFrame):
        """Load SRA metadata into the database."""
        self.logger.info("Loading SRA metadata into database")
        self.con.execute("""
            INSERT INTO sra_metadata 
            SELECT * FROM metadata_df
            ON CONFLICT (sample_id) DO UPDATE 
            SET 
                organism = EXCLUDED.organism,
                study_title = EXCLUDED.study_title,
                study_abstract = EXCLUDED.study_abstract,
                bioproject = EXCLUDED.bioproject,
                biosample = EXCLUDED.biosample,
                library_strategy = EXCLUDED.library_strategy,
                library_source = EXCLUDED.library_source,
                library_selection = EXCLUDED.library_selection,
                library_layout = EXCLUDED.library_layout,
                instrument = EXCLUDED.instrument,
                run_spots = EXCLUDED.run_spots,
                run_bases = EXCLUDED.run_bases,
                run_published = EXCLUDED.run_published
        """)

    def load_sample_data(self, base_dir: Path, sample_id: str) -> List[dict]:
        """Load data for a single sample, tracking duplicates."""
        self.logger.info(f"\nProcessing sample: {sample_id}")
        
        try:
            # Keep track of duplicates
            duplicates = []
            
            # Define paths
            sequence_path = base_dir / sample_id / f'transdecoder/{sample_id}.pep'
            annotation_path = base_dir / sample_id / f'eggnog/{sample_id}.emapper.annotations'
            
            # Load sequences
            self._load_sequences(sequence_path, sample_id, duplicates)
            
            # Load annotations if file exists
            if annotation_path.exists():
                self._load_annotations(annotation_path, sample_id)
            else:
                self.logger.warning(f"No annotation file found at {annotation_path}")
            
            return duplicates
            
        except Exception as e:
            self.logger.error(f"Error processing sample {sample_id}: {e}")
            raise

    def _load_sequences(self, sequence_path: Path, sample_id: str, duplicates: list):
        """Helper method to load sequences from FASTA file."""
        self.logger.info(f"Loading sequences from {sequence_path}")
        
        existing_seqs = set(self.con.execute("SELECT seqhash_id FROM sequences").df()['seqhash_id'])
        sequences = []
        skipped_count = 0
        
        for record in SeqIO.parse(sequence_path, "fasta"):
            seqhash_id = record.id.split()[0]
            
            if seqhash_id in existing_seqs:
                duplicates.append({
                    'seqhash_id': seqhash_id,
                    'sample_id': sample_id,
                    'length': len(record.seq)
                })
                skipped_count += 1
                continue
                
            sequences.append({
                'seqhash_id': seqhash_id,
                'sequence': str(record.seq),
                'sample_id': sample_id,
                'assembly_date': datetime.now(),
                'is_representative': False,
                'length': len(record.seq)
            })
            
            if len(sequences) % 1000 == 0:
                self._batch_insert_sequences(sequences)
                sequences = []
        
        if sequences:
            self._batch_insert_sequences(sequences)
        
        self.logger.info(f"Loaded {len(sequences)} new sequences for {sample_id}")
        if skipped_count > 0:
            self.logger.info(f"Skipped {skipped_count} duplicate sequences")

    def _batch_insert_sequences(self, sequences: List[dict]):
        """Helper method to insert sequences in batches."""
        if not sequences:
            return
        df = pd.DataFrame(sequences)
        self.con.execute("INSERT INTO sequences SELECT * FROM df")

    def _load_annotations(self, annotation_path: Path, sample_id: str):
        """Helper method to load annotations from eggNOG file."""
        self.logger.info(f"Loading annotations from {annotation_path}")
        
        column_names = [
            'query', 'seed_ortholog', 'evalue', 'score', 'eggNOG_OGs', 
            'max_annot_lvl', 'COG_category', 'Description', 'Preferred_name',
            'GOs', 'EC', 'KEGG_ko', 'KEGG_Pathway', 'KEGG_Module', 
            'KEGG_Reaction', 'KEGG_rclass', 'BRITE', 'KEGG_TC', 'CAZy',
            'BiGG_Reaction', 'PFAMs'
        ]
        
        try:
            # Create temporary table
            columns_def = ", ".join([f'"{name}" VARCHAR' for name in column_names])
            self.con.execute(f"CREATE TABLE temp_annotations ({columns_def})")
            
            # Load annotation data
            self.con.execute(f"""
                INSERT INTO temp_annotations
                SELECT * FROM read_csv_auto(
                    '{annotation_path}',
                    sep='\t',
                    header=False,
                    names={column_names},
                    comment='#'
                )
            """)
            
            # Process annotations
            self._process_annotations(sample_id)
            
        finally:
            # Clean up
            self.con.execute("DROP TABLE IF EXISTS temp_annotations")

    def _process_annotations(self, sample_id: str):
        """Process annotations from temporary table into final tables."""
        # Main annotations
        self.con.execute(f"""
            INSERT INTO annotations
            SELECT 
                query as seqhash_id,
                seed_ortholog,
                TRY_CAST(evalue AS DOUBLE) as evalue,
                TRY_CAST(score AS DOUBLE) as score,
                "eggNOG_OGs" as eggnog_ogs,
                max_annot_lvl,
                "COG_category" as cog_category,
                "Description" as description,
                "Preferred_name" as preferred_name,
                '{sample_id}' as sample_id
            FROM temp_annotations
            WHERE query IN (SELECT seqhash_id FROM sequences)
            AND query NOT IN (SELECT seqhash_id FROM annotations)
        """)
        
        # GO terms
        self.con.execute("""
            INSERT INTO go_terms
            SELECT DISTINCT
                query as seqhash_id,
                UNNEST(STRING_SPLIT(NULLIF("GOs", '-'), ',')) as go_term
            FROM temp_annotations
            WHERE query IN (SELECT seqhash_id FROM sequences)
            AND query NOT IN (SELECT seqhash_id FROM go_terms)
            AND "GOs" IS NOT NULL AND "GOs" != '-'
        """)
        
        # EC numbers
        self.con.execute("""
            INSERT INTO ec_numbers
            SELECT DISTINCT
                query as seqhash_id,
                UNNEST(STRING_SPLIT(NULLIF("EC", '-'), ',')) as ec_number
            FROM temp_annotations
            WHERE query IN (SELECT seqhash_id FROM sequences)
            AND query NOT IN (SELECT seqhash_id FROM ec_numbers)
            AND "EC" IS NOT NULL AND "EC" != '-'
        """)

    def load_clusters_from_tsv(self, tsv_path: str):
        """Load cluster information from MMseqs2 cluster update TSV."""
        self.logger.info(f"Loading cluster data from {tsv_path}")
        
        try:
            # Create temporary table for TSV data
            self.con.execute("""
            CREATE TEMP TABLE temp_clusters AS 
            SELECT 
                representative as representative_seqhash_id,
                member as seqhash_id
            FROM read_csv_auto(?, sep='\t', header=False, 
                             names=['representative', 'member'])
            WHERE representative IN (SELECT seqhash_id FROM sequences)
            AND member IN (SELECT seqhash_id FROM sequences)
            """, [tsv_path])
            
            # Insert clusters
            self.con.execute("""
            WITH cluster_info AS (
                SELECT 
                    representative_seqhash_id,
                    ROW_NUMBER() OVER (ORDER BY representative_seqhash_id) as cluster_num,
                    COUNT(*) as size
                FROM temp_clusters
                GROUP BY representative_seqhash_id
            )
            INSERT INTO clusters (cluster_id, representative_seqhash_id, size)
            SELECT 
                'CLUSTER_' || cluster_num as cluster_id,
                representative_seqhash_id,
                size
            FROM cluster_info
            """)
            
            # Insert cluster members
            self.con.execute("""
            INSERT INTO cluster_members (seqhash_id, cluster_id)
            SELECT 
                tc.seqhash_id,
                c.cluster_id
            FROM temp_clusters tc
            JOIN clusters c ON tc.representative_seqhash_id = c.representative_seqhash_id
            """)
            
            # Update representative status
            self.con.execute("""
            UPDATE sequences
            SET is_representative = TRUE
            WHERE seqhash_id IN (SELECT representative_seqhash_id FROM clusters)
            """)
            
        finally:
            self.con.execute("DROP TABLE IF EXISTS temp_clusters")


# Query

In [12]:
class SequenceDBQuery:
    """Handles database queries and analysis."""
    def __init__(self, db_path: str):
        self.db_path = db_path
        self.con = duckdb.connect(db_path)
        self.logger = logging.getLogger(__name__)

    def __enter__(self):
        """Context manager entry"""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit"""
        self.con.close()

    def get_organism_summary(self) -> pd.DataFrame:
        """Get summary of sequence counts per organism."""
        query = """
        SELECT 
            m.organism,
            COUNT(DISTINCT m.sample_id) as sample_count,
            COUNT(DISTINCT s.seqhash_id) as total_sequences,
            ROUND(AVG(s.length), 2) as avg_sequence_length,
            COUNT(DISTINCT CASE WHEN a.seqhash_id IS NOT NULL THEN s.seqhash_id END) as annotated_sequences,
            ROUND(COUNT(DISTINCT CASE WHEN a.seqhash_id IS NOT NULL THEN s.seqhash_id END) * 100.0 / 
                  COUNT(DISTINCT s.seqhash_id), 2) as percent_annotated,
            COUNT(DISTINCT CASE WHEN cm.seqhash_id IS NOT NULL THEN s.seqhash_id END) as sequences_in_clusters,
            STRING_AGG(DISTINCT m.bioproject, '; ') as bioprojects
        FROM sra_metadata m
        JOIN sequences s ON s.sample_id = m.sample_id
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN cluster_members cm ON s.seqhash_id = cm.seqhash_id
        WHERE m.organism IS NOT NULL
        GROUP BY m.organism
        ORDER BY total_sequences DESC
        """
        return self.con.execute(query).fetchdf()

    def get_sample_with_metadata(self, sample_id: str) -> pd.DataFrame:
        """Get complete sample information including SRA metadata."""
        query = """
        SELECT 
            m.*,
            COUNT(DISTINCT s.seqhash_id) as total_sequences,
            COUNT(DISTINCT CASE WHEN a.seqhash_id IS NOT NULL THEN s.seqhash_id END) as annotated_sequences,
            COUNT(DISTINCT CASE WHEN g.seqhash_id IS NOT NULL THEN s.seqhash_id END) as sequences_with_go,
            COUNT(DISTINCT CASE WHEN e.seqhash_id IS NOT NULL THEN s.seqhash_id END) as sequences_with_ec,
            COUNT(DISTINCT CASE WHEN cm.seqhash_id IS NOT NULL THEN s.seqhash_id END) as sequences_in_clusters
        FROM sra_metadata m
        JOIN sequences s ON s.sample_id = m.sample_id
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN go_terms g ON s.seqhash_id = g.seqhash_id
        LEFT JOIN ec_numbers e ON s.seqhash_id = e.seqhash_id
        LEFT JOIN cluster_members cm ON s.seqhash_id = cm.seqhash_id
        WHERE m.sample_id = ?
        GROUP BY m.sample_id, m.organism, m.study_title, m.study_abstract, 
                 m.bioproject, m.biosample, m.library_strategy, m.library_source,
                 m.library_selection, m.library_layout, m.instrument, 
                 m.run_spots, m.run_bases, m.run_published
        """
        return self.con.execute(query, [sample_id]).fetchdf()

    def get_organism_summary(self) -> pd.DataFrame:
        """Get summary of sequence counts per organism."""
        query = """
        SELECT 
            m.organism,
            COUNT(DISTINCT m.sample_id) as sample_count,
            COUNT(DISTINCT s.seqhash_id) as total_sequences,
            COUNT(DISTINCT CASE WHEN a.seqhash_id IS NOT NULL THEN s.seqhash_id END) as annotated_sequences,
            COUNT(DISTINCT CASE WHEN g.seqhash_id IS NOT NULL THEN s.seqhash_id END) as sequences_with_go,
            COUNT(DISTINCT CASE WHEN e.seqhash_id IS NOT NULL THEN s.seqhash_id END) as sequences_with_ec,
            COUNT(DISTINCT CASE WHEN cm.seqhash_id IS NOT NULL THEN s.seqhash_id END) as sequences_in_clusters,
            STRING_AGG(DISTINCT m.bioproject, '; ') as bioprojects
        FROM sra_metadata m
        JOIN sequences s ON s.sample_id = m.sample_id
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN go_terms g ON s.seqhash_id = g.seqhash_id
        LEFT JOIN ec_numbers e ON s.seqhash_id = e.seqhash_id
        LEFT JOIN cluster_members cm ON s.seqhash_id = cm.seqhash_id
        WHERE m.organism IS NOT NULL
        GROUP BY m.organism
        ORDER BY total_sequences DESC
        """
        return self.con.execute(query).fetchdf()
    
    def get_sequence_by_id(self, seqhash_id: str) -> dict:
        """Retrieve complete information for a specific sequence."""
        query = """
        SELECT 
            s.*,
            a.seed_ortholog,
            a.evalue,
            a.score,
            a.eggnog_ogs,
            a.description,
            a.preferred_name,
            a.cog_category,
            STRING_AGG(DISTINCT g.go_term, '; ') as go_terms,
            STRING_AGG(DISTINCT e.ec_number, '; ') as ec_numbers,
            k.kegg_ko,
            k.kegg_pathway,
            k.kegg_module,
            c.cluster_id,
            c.size as cluster_size
        FROM sequences s
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN go_terms g ON s.seqhash_id = g.seqhash_id
        LEFT JOIN ec_numbers e ON s.seqhash_id = e.seqhash_id
        LEFT JOIN kegg_info k ON s.seqhash_id = k.seqhash_id
        LEFT JOIN cluster_members cm ON s.seqhash_id = cm.seqhash_id
        LEFT JOIN clusters c ON cm.cluster_id = c.cluster_id
        WHERE s.seqhash_id = ?
        GROUP BY s.seqhash_id, s.sequence, s.sample_id, s.assembly_date, 
                 s.is_representative, s.length, a.seed_ortholog, a.evalue, 
                 a.score, a.eggnog_ogs, a.description, a.preferred_name,
                 a.cog_category, k.kegg_ko, k.kegg_pathway, k.kegg_module,
                 c.cluster_id, c.size
        """
        result = self.con.execute(query, [seqhash_id]).fetchdf()
        return result.to_dict('records')[0] if not result.empty else None

    def search_sequences(self, 
                        sample_id: Optional[str] = None,
                        min_length: Optional[int] = None,
                        max_length: Optional[int] = None,
                        has_annotation: Optional[bool] = None,
                        description_contains: Optional[str] = None,
                        go_term: Optional[str] = None,
                        ec_number: Optional[str] = None,
                        is_representative: Optional[bool] = None,
                        min_cluster_size: Optional[int] = None,
                        limit: int = 100) -> pd.DataFrame:
        """Search sequences with multiple criteria."""
        conditions = ["1=1"]
        params = []
        
        if sample_id:
            conditions.append("s.sample_id = ?")
            params.append(sample_id)
            
        if min_length:
            conditions.append("s.length >= ?")
            params.append(min_length)
            
        if max_length:
            conditions.append("s.length <= ?")
            params.append(max_length)
            
        if has_annotation is not None:
            if has_annotation:
                conditions.append("a.seqhash_id IS NOT NULL")
            else:
                conditions.append("a.seqhash_id IS NULL")
                
        if description_contains:
            conditions.append("LOWER(a.description) LIKE ?")
            params.append(f"%{description_contains.lower()}%")
            
        if go_term:
            conditions.append("""
                EXISTS (
                    SELECT 1 FROM go_terms g 
                    WHERE g.seqhash_id = s.seqhash_id 
                    AND g.go_term = ?
                )
            """)
            params.append(go_term)
            
        if ec_number:
            conditions.append("""
                EXISTS (
                    SELECT 1 FROM ec_numbers e 
                    WHERE e.seqhash_id = s.seqhash_id 
                    AND e.ec_number = ?
                )
            """)
            params.append(ec_number)
            
        if is_representative is not None:
            conditions.append("s.is_representative = ?")
            params.append(is_representative)
            
        if min_cluster_size is not None:
            conditions.append("""
                EXISTS (
                    SELECT 1 FROM clusters c 
                    JOIN cluster_members cm ON c.cluster_id = cm.cluster_id
                    WHERE cm.seqhash_id = s.seqhash_id 
                    AND c.size >= ?
                )
            """)
            params.append(min_cluster_size)
            
        query = f"""
        SELECT DISTINCT
            s.seqhash_id,
            s.sample_id,
            s.length,
            s.is_representative,
            a.description,
            a.preferred_name,
            a.cog_category,
            c.cluster_id,
            c.size as cluster_size
        FROM sequences s
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN cluster_members cm ON s.seqhash_id = cm.seqhash_id
        LEFT JOIN clusters c ON cm.cluster_id = c.cluster_id
        WHERE {' AND '.join(conditions)}
        LIMIT ?
        """
        params.append(limit)
        
        return self.con.execute(query, params).fetchdf()

    def get_sample_stats(self, sample_id: Optional[str] = None) -> pd.DataFrame:
        """Get statistics for all samples or a specific sample."""
        query = """
        SELECT 
            s.sample_id,
            COUNT(DISTINCT s.seqhash_id) as total_sequences,
            AVG(s.length) as avg_length,
            MIN(s.length) as min_length,
            MAX(s.length) as max_length,
            COUNT(DISTINCT a.seqhash_id) as annotated_sequences,
            COUNT(DISTINCT g.seqhash_id) as sequences_with_go,
            COUNT(DISTINCT e.seqhash_id) as sequences_with_ec,
            COUNT(DISTINCT CASE WHEN s.is_representative THEN s.seqhash_id END) as representative_sequences,
            COUNT(DISTINCT cm.cluster_id) as clusters
        FROM sequences s
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN go_terms g ON s.seqhash_id = g.seqhash_id
        LEFT JOIN ec_numbers e ON s.seqhash_id = e.seqhash_id
        LEFT JOIN cluster_members cm ON s.seqhash_id = cm.seqhash_id
        """
        if sample_id:
            query += " WHERE s.sample_id = ?"
            return self.con.execute(query + " GROUP BY s.sample_id", [sample_id]).fetchdf()
        return self.con.execute(query + " GROUP BY s.sample_id").fetchdf()

    def get_cluster_info(self, cluster_id: str) -> pd.DataFrame:
        """Get detailed information about a specific cluster."""
        query = """
        SELECT 
            s.seqhash_id,
            s.sample_id,
            s.length,
            s.is_representative,
            a.description,
            a.preferred_name,
            a.cog_category,
            STRING_AGG(DISTINCT g.go_term, '; ') as go_terms,
            STRING_AGG(DISTINCT e.ec_number, '; ') as ec_numbers
        FROM cluster_members cm
        JOIN sequences s ON cm.seqhash_id = s.seqhash_id
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN go_terms g ON s.seqhash_id = g.seqhash_id
        LEFT JOIN ec_numbers e ON s.seqhash_id = e.seqhash_id
        WHERE cm.cluster_id = ?
        GROUP BY s.seqhash_id, s.sample_id, s.length, s.is_representative,
                 a.description, a.preferred_name, a.cog_category
        """
        return self.con.execute(query, [cluster_id]).fetchdf()

    def get_cluster_stats(self) -> pd.DataFrame:
        """Get summary statistics about clusters."""
        query = """
        SELECT 
            COUNT(DISTINCT cluster_id) as total_clusters,
            AVG(size) as avg_cluster_size,
            MIN(size) as min_cluster_size,
            MAX(size) as max_cluster_size,
            APPROX_QUANTILE(size, 0.5) as median_cluster_size
        FROM clusters
        """
        return self.con.execute(query).fetchdf()

    def get_go_term_summary(self, 
                          sample_id: Optional[str] = None, 
                          min_sequences: int = 5) -> pd.DataFrame:
        """Get summary of GO term frequencies."""
        query = """
        SELECT 
            g.go_term,
            COUNT(DISTINCT s.seqhash_id) as sequence_count
        FROM sequences s
        JOIN go_terms g ON s.seqhash_id = g.seqhash_id
        """
        if sample_id:
            query += " WHERE s.sample_id = ?"
            params = [sample_id]
        else:
            params = []
            
        query += f"""
            GROUP BY g.go_term 
            HAVING COUNT(DISTINCT s.seqhash_id) >= {min_sequences} 
            ORDER BY sequence_count DESC
        """
        return self.con.execute(query, params).fetchdf()

    def get_ec_number_summary(self, 
                            sample_id: Optional[str] = None, 
                            min_sequences: int = 5) -> pd.DataFrame:
        """Get summary of EC number frequencies."""
        query = """
        SELECT 
            e.ec_number,
            COUNT(DISTINCT s.seqhash_id) as sequence_count
        FROM sequences s
        JOIN ec_numbers e ON s.seqhash_id = e.seqhash_id
        """
        if sample_id:
            query += " WHERE s.sample_id = ?"
            params = [sample_id]
        else:
            params = []
            
        query += f"""
            GROUP BY e.ec_number 
            HAVING COUNT(DISTINCT s.seqhash_id) >= {min_sequences} 
            ORDER BY sequence_count DESC
        """
        return self.con.execute(query, params).fetchdf()

    def compare_samples(self, sample_ids: List[str]) -> pd.DataFrame:
        """Compare statistics between multiple samples."""
        sample_list = ", ".join([f"'{s}'" for s in sample_ids])
        query = f"""
        SELECT 
            s.sample_id,
            COUNT(DISTINCT s.seqhash_id) as total_sequences,
            COUNT(DISTINCT a.seqhash_id) as annotated_sequences,
            COUNT(DISTINCT g.seqhash_id) as sequences_with_go,
            COUNT(DISTINCT e.seqhash_id) as sequences_with_ec,
            COUNT(DISTINCT cm.cluster_id) as clusters,
            AVG(s.length) as avg_length,
            COUNT(DISTINCT CASE WHEN s.is_representative THEN s.seqhash_id END) as representative_sequences
        FROM sequences s
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN go_terms g ON s.seqhash_id = g.seqhash_id
        LEFT JOIN ec_numbers e ON s.seqhash_id = e.seqhash_id
        LEFT JOIN cluster_members cm ON s.seqhash_id = cm.seqhash_id
        WHERE s.sample_id IN ({sample_list})
        GROUP BY s.sample_id
        """
        return self.con.execute(query).fetchdf()

In [7]:
from pathlib import Path

# Define paths and sample list
base_dir = Path('/mnt/data2/planter_outputs')
db_path = "/mnt/data2/planter_outputs/planter.duckdb"
sample_list = [
    'ERR9123871', 'ERR9123872', 'ERR9123874', 'ERR9123875', 'ERR9123876', 
    'ERR9123877', 'ERR9123878', 'ERR9123879', 'ERR9123880', 'ERR9123881', 
    'ERR9123882', 'SRR10444679', 'SRR10444680', 'SRR10444681', 'SRR10444682', 
    'SRR10444683', 'SRR10444684', 'SRR11011255', 'SRR11011256', 'SRR11011257', 
    'SRR11011258', 'SRR11011259', 'SRR11011260', 'SRR12068547', 'SRR128113', 
    'SRR128114', 'SRR13765006', 'SRR14292007', 'SRR14292008', 'SRR18070778', 
    'SRR18070779', 'SRR18070780', 'SRR18070781', 'SRR18070782', 'SRR18070783', 
    'SRR18070784', 'SRR18070785', 'SRR18070786', 'SRR18070787', 'SRR18070788', 
    'SRR18070789', 'SRR18070790', 'SRR18070791', 'SRR18070792', 'SRR18070793', 
    'SRR18070794', 'SRR18070795', 'SRR18735292', 'SRR19034772', 'SRR19034773', 
    'SRR19619612', 'SRR19619613', 'SRR19619614', 'SRR22271585', 'SRR22271586', 
    'SRR22271587', 'SRR22271588', 'SRR22271589', 'SRR22904707', 'SRR24974225', 
    'SRR24974226', 'SRR24974227', 'SRR24974228', 'SRR25582085', 'SRR29366264', 
    'SRR29366265', 'SRR29366266', 'SRR5489198', 'SRR5992919', 'SRR5992920', 
    'SRR6048009', 'SRR8859643', 'SRR8859644', 'SRR8859645', 
    'SRR8859646', 'SRR8859647', 'SRR8859648'
]

# First, collect metadata (can be done separately)
metadata_collector = SRAMetadataCollector()
metadata_df = metadata_collector.collect_metadata(
    sample_list,
    cache_path="sra_metadata_cache.csv"  # Optional cache
)

INFO:__main__:Found 77 cached entries


In [8]:
metadata_df.head(1)

Unnamed: 0,sample_id,organism,study_title,study_abstract,bioproject,biosample,library_strategy,library_source,library_selection,library_layout,instrument,run_spots,run_bases,run_published
0,ERR9123871,Silene latifolia subsp. alba,Chemical genetics in Silene latifolia elucidat...,Dioecious plants possess diverse sex determina...,PRJEB36078,SAMEA7773503,RNA-Seq,TRANSCRIPTOMIC,cDNA,PAIRED,ILLUMINA,75922486,12147597760,2022-05-13 05:01:40


In [11]:

# Use the builder class with context manager
with SequenceDBBuilder(db_path) as builder:
    # Initialize fresh database
    builder.init_database()
    
    # Load the metadata into the database
    builder.load_sra_metadata(metadata_df)  # Add this line!

    # Process each sample
    for sample_id in sample_list:
        try:
            logger.info(f"Processing sample: {sample_id}")
            duplicates = builder.load_sample_data(base_dir, sample_id)
            if duplicates:
                logger.info(f"Found {len(duplicates)} duplicate sequences in {sample_id}")
        except Exception as e:
            logger.error(f"Error processing sample {sample_id}: {e}")
            continue
    
    # Get summary after loading
    logger.info("\nDatabase Summary:")
    summary_query = """
    SELECT 
        COUNT(DISTINCT seqhash_id) as total_sequences,
        COUNT(DISTINCT sample_id) as total_samples
    FROM sequences
    """
    summary = builder.con.execute(summary_query).fetchone()
    logger.info(f"Total sequences: {summary[0]}")
    logger.info(f"Total samples: {summary[1]}")



NameError: name 'SequenceDBBuilder' is not defined

In [13]:
# Query with metadata
db_path = '/mnt/data2/planter_outputs/planter2.duckdb'
with SequenceDBQuery(db_path) as db:
    # Get overall organism summary
    organism_summary = db.get_organism_summary()
    print("\nSequence summary by organism:")
    display(organism_summary)
    
    # Get complete sample info
    sample_info = db.get_sample_with_metadata("SRR14292007")
    print("\nSample details:")
    display(sample_info)


    seq_info = db.get_sequence_by_id("v1_DLS_61ac870c5c1ec555228430b3b7c633a221dbc5b48f9bb1f64ca0ee5b2451748c.p3")
    print("\nSequence details:")
    display(seq_info)


Sequence summary by organism:


Unnamed: 0,organism,sample_count,total_sequences,annotated_sequences,sequences_with_go,sequences_with_ec,sequences_in_clusters,bioprojects
0,Silene latifolia subsp. alba,11,300466,280127,153639,66403,0,PRJEB36078
1,Xanthoria parietina,6,289959,205284,68980,47053,0,PRJNA584075; PRJNA584074; PRJNA584073; PRJNA58...
2,Matricaria chamomilla var. recutita,6,240845,222539,125021,53751,0,PRJNA382469
3,Digitalis purpurea,7,234863,220513,127801,54686,0,PRJNA985863; PRJNA80007; PRJNA929980
4,Gyalolechia flavorubescens,18,220334,168318,75492,38886,0,PRJNA210248
5,Cladonia metacorallifera,5,99512,78266,32907,16271,0,PRJNA891905
6,Acarospora socialis,3,89037,69740,22927,17159,0,PRJNA1013257
7,Usnea undulata,2,73897,54075,16726,12718,0,PRJNA530379
8,Usnea sinensis,2,64067,48224,15753,11273,0,PRJNA530379
9,Alliaria petiolata,1,61932,56263,35216,14000,0,PRJNA702530



Sample details:


Unnamed: 0,sample_id,organism,study_title,study_abstract,bioproject,biosample,library_strategy,library_source,library_selection,library_layout,instrument,run_spots,run_bases,run_published,total_sequences,annotated_sequences,sequences_with_go,sequences_with_ec,sequences_in_clusters
0,SRR14292007,Cladonia macilenta,Cladonia macilenta that produce biruloquinone,"In this study, we identified a polyketide synt...",PRJNA723447,SAMN18818446,,,,,ILLUMINA,,,,21454,16763,7034,3406,0



Sequence details:


{'seqhash_id': 'v1_DLS_61ac870c5c1ec555228430b3b7c633a221dbc5b48f9bb1f64ca0ee5b2451748c.p3',
 'sequence': 'MASILQTNSLVVSQATPASPWAHKYRGATVEDLDPPPALSSKPTDSISTALLNAYERDYTHLTVVSEDTRALLGYLNIPRLKELLKNGTVNESDYVEKAMQKFRRRGNVYKVITMDTPLEELEAFFNGGVDGNGPQDFAVVTDGSRRFVLGVATKGDLEEFVKRRPA*',
 'sample_id': 'SRR8859648',
 'assembly_date': Timestamp('2024-11-20 19:16:23.806861'),
 'is_representative': False,
 'length': 168,
 'seed_ortholog': '61459.XP_007779632.1',
 'evalue': 8.99e-82,
 'score': 245.0,
 'eggnog_ogs': '2E0C8@1|root,2S7T3@2759|Eukaryota,3A2TJ@33154|Opisthokonta,3P34X@4751|Fungi,3QV03@4890|Ascomycota,20GSS@147545|Eurotiomycetes,3MWTA@451870|Chaetothyriomycetidae',
 'description': 'Cystathionine beta-synthase',
 'preferred_name': '-',
 'cog_category': 'S',
 'go_terms': None,
 'ec_numbers': None,
 'kegg_ko': None,
 'kegg_pathway': None,
 'kegg_module': None,
 'cluster_id': None,
 'cluster_size': nan}

In [2]:
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def clean_database(con: duckdb.DuckDBPyConnection):
    """Drop all existing tables"""
    logger.info("Cleaning up any existing tables...")
    
    cleanup_sql = """
    DROP TABLE IF EXISTS cluster_members;
    DROP TABLE IF EXISTS clusters;
    DROP TABLE IF EXISTS kegg_info;
    DROP TABLE IF EXISTS ec_numbers;
    DROP TABLE IF EXISTS go_terms;
    DROP TABLE IF EXISTS annotations;
    DROP TABLE IF EXISTS sequences;
    DROP TABLE IF EXISTS temp_annotations;
    DROP TABLE IF EXISTS temp_clusters;
    """
    con.execute(cleanup_sql)

def init_database(db_path: str):
    """Initialize database with clean schema."""
    # Create new connection
    con = duckdb.connect(db_path)
    
    # Clean up any existing tables
    clean_database(con)
    
    schema_sql = """
    CREATE TABLE IF NOT EXISTS sequences (
        seqhash_id VARCHAR PRIMARY KEY,
        sequence VARCHAR NOT NULL,
        sample_id VARCHAR NOT NULL,
        assembly_date TIMESTAMP NOT NULL,
        is_representative BOOLEAN NOT NULL DEFAULT FALSE,
        length INTEGER NOT NULL
    );

    CREATE TABLE IF NOT EXISTS annotations (
        seqhash_id VARCHAR PRIMARY KEY,
        seed_ortholog VARCHAR,
        evalue DOUBLE,
        score DOUBLE,
        eggnog_ogs VARCHAR,
        max_annot_lvl VARCHAR,
        cog_category VARCHAR,
        description VARCHAR,
        preferred_name VARCHAR,
        sample_id VARCHAR NOT NULL
    );

    CREATE TABLE IF NOT EXISTS go_terms (
        seqhash_id VARCHAR NOT NULL,
        go_term VARCHAR NOT NULL,
        PRIMARY KEY (seqhash_id, go_term)
    );

    CREATE TABLE IF NOT EXISTS ec_numbers (
        seqhash_id VARCHAR NOT NULL,
        ec_number VARCHAR NOT NULL,
        PRIMARY KEY (seqhash_id, ec_number)
    );

    CREATE TABLE IF NOT EXISTS kegg_info (
        seqhash_id VARCHAR PRIMARY KEY,
        kegg_ko VARCHAR,
        kegg_pathway VARCHAR,
        kegg_module VARCHAR,
        kegg_reaction VARCHAR,
        kegg_rclass VARCHAR
    );
    """
    
    con.execute(schema_sql)
    return con

def load_sample_data(con: duckdb.DuckDBPyConnection, 
                    base_dir: Path,
                    sample_id: str):
    """Load data for a single sample, tracking duplicates."""
    logger.info(f"\nProcessing sample: {sample_id}")
    
    try:
        # Keep track of duplicates
        duplicates = []
        
        # Define paths
        sequence_path = base_dir / sample_id / f'transdecoder/{sample_id}.pep'
        annotation_path = base_dir / sample_id / f'eggnog/{sample_id}.emapper.annotations'
        
        # Check which sequences are already in the database
        sequences = []
        existing_seqs = set(con.execute("SELECT seqhash_id FROM sequences").df()['seqhash_id'])
        
        # Load sequences
        logger.info(f"Loading sequences from {sequence_path}")
        skipped_count = 0
        for record in SeqIO.parse(sequence_path, "fasta"):
            seqhash_id = record.id.split()[0]
            
            if seqhash_id in existing_seqs:
                duplicates.append({
                    'seqhash_id': seqhash_id,
                    'sample_id': sample_id,
                    'length': len(record.seq)
                })
                skipped_count += 1
                continue
                
            sequences.append({
                'seqhash_id': seqhash_id,
                'sequence': str(record.seq),
                'sample_id': sample_id,
                'assembly_date': datetime.now(),
                'is_representative': False,
                'length': len(record.seq)
            })
            
            if len(sequences) % 1000 == 0:
                logger.info(f"Read {len(sequences)} new sequences...")
        
        # Load new sequences
        if sequences:
            df = pd.DataFrame(sequences)
            con.execute("INSERT INTO sequences SELECT * FROM df")
        
        logger.info(f"Loaded {len(sequences)} new sequences for {sample_id}")
        if skipped_count > 0:
            logger.info(f"Skipped {skipped_count} duplicate sequences")
            logger.info("First few duplicates:")
            for dup in duplicates[:5]:
                existing_sample = con.execute("""
                    SELECT sample_id FROM sequences 
                    WHERE seqhash_id = ?
                """, [dup['seqhash_id']]).fetchone()[0]
                logger.info(f"  Sequence {dup['seqhash_id']} already exists in sample {existing_sample}")
        
        # Clean up before loading annotations
        con.execute("DROP TABLE IF EXISTS temp_annotations")
        
        # Load annotations
        logger.info(f"Loading annotations from {annotation_path}")
        
        column_names = [
            'query', 'seed_ortholog', 'evalue', 'score', 'eggNOG_OGs', 
            'max_annot_lvl', 'COG_category', 'Description', 'Preferred_name',
            'GOs', 'EC', 'KEGG_ko', 'KEGG_Pathway', 'KEGG_Module', 
            'KEGG_Reaction', 'KEGG_rclass', 'BRITE', 'KEGG_TC', 'CAZy',
            'BiGG_Reaction', 'PFAMs'
        ]
        
        columns_def = ", ".join([f'"{name}" VARCHAR' for name in column_names])
        con.execute(f"CREATE TABLE temp_annotations ({columns_def})")
        
        # Load annotation data
        con.execute(f"""
            INSERT INTO temp_annotations
            SELECT * FROM read_csv_auto(
                '{annotation_path}',
                sep='\t',
                header=False,
                names={column_names},
                comment='#'
            )
        """)
        
        # Process main annotations (only for non-duplicate sequences)
        con.execute(f"""
            INSERT INTO annotations
            SELECT 
                query as seqhash_id,
                seed_ortholog,
                TRY_CAST(evalue AS DOUBLE) as evalue,
                TRY_CAST(score AS DOUBLE) as score,
                "eggNOG_OGs" as eggnog_ogs,
                max_annot_lvl,
                "COG_category" as cog_category,
                "Description" as description,
                "Preferred_name" as preferred_name,
                '{sample_id}' as sample_id
            FROM temp_annotations
            WHERE query NOT IN (SELECT seqhash_id FROM annotations)
        """)
        
        # Process GO terms
        con.execute(f"""
            INSERT INTO go_terms
            SELECT DISTINCT
                query as seqhash_id,
                UNNEST(STRING_SPLIT(NULLIF("GOs", '-'), ',')) as go_term
            FROM temp_annotations
            WHERE query NOT IN (SELECT seqhash_id FROM go_terms)
            AND "GOs" IS NOT NULL AND "GOs" != '-'
        """)
        
        # Process EC numbers
        con.execute(f"""
            INSERT INTO ec_numbers
            SELECT DISTINCT
                query as seqhash_id,
                UNNEST(STRING_SPLIT(NULLIF("EC", '-'), ',')) as ec_number
            FROM temp_annotations
            WHERE query NOT IN (SELECT seqhash_id FROM ec_numbers)
            AND "EC" IS NOT NULL AND "EC" != '-'
        """)
        
        return duplicates
        
    finally:
        # Clean up temporary tables
        logger.info("Cleaning up temporary tables...")
        con.execute("DROP TABLE IF EXISTS temp_annotations")
        con.execute("DROP TABLE IF EXISTS temp_go")
        con.execute("DROP TABLE IF EXISTS temp_ec")

def get_database_summary(con):
    """Get summary statistics with unambiguous column references."""
    return con.execute("""
        SELECT 
            s.sample_id,  -- Specifically use sequences table's sample_id
            COUNT(DISTINCT s.seqhash_id) as sequence_count,
            COUNT(DISTINCT a.seqhash_id) as annotated_count,
            COUNT(DISTINCT g.seqhash_id) as with_go_terms,
            COUNT(DISTINCT e.seqhash_id) as with_ec_numbers
        FROM sequences s
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN go_terms g ON s.seqhash_id = g.seqhash_id
        LEFT JOIN ec_numbers e ON s.seqhash_id = e.seqhash_id
        GROUP BY s.sample_id
        ORDER BY s.sample_id
    """).df()

def build_initial_database(db_path: str, base_dir: Path, sample_list: list):
    """Build initial database from a list of samples."""
    logger.info(f"Building database at {db_path} from {len(sample_list)} samples")
    
    # Remove existing database if it exists
    if Path(db_path).exists():
        logger.info(f"Removing existing database: {db_path}")
        Path(db_path).unlink()
    
    # Initialize database
    con = init_database(db_path)
    
    # Track duplicates across all samples
    all_duplicates = {}
    
    # Process each sample
    for sample_id in sample_list:
        try:
            duplicates = load_sample_data(con, base_dir, sample_id)
            if duplicates:
                all_duplicates[sample_id] = duplicates
        except Exception as e:
            logger.error(f"Error processing sample {sample_id}: {e}")
            continue
    
    # Log summary statistics
    logger.info("\nDatabase Summary:")
    sample_stats = get_database_summary(con)
    print(sample_stats)
    
    if all_duplicates:
        logger.info("\nDuplicate Summary:")
        for sample_id, dups in all_duplicates.items():
            logger.info(f"{sample_id}: {len(dups)} duplicate sequences")
            
        # Optional: show some duplicate details
        logger.info("\nExample duplicates (first few):")
        for sample_id, dups in all_duplicates.items():
            logger.info(f"\nDuplicates in {sample_id}:")
            for dup in dups[:5]:  # Show first 5 duplicates for each sample
                existing_sample = con.execute("""
                    SELECT sample_id 
                    FROM sequences 
                    WHERE seqhash_id = ?
                """, [dup['seqhash_id']]).fetchone()[0]
                logger.info(f"  {dup['seqhash_id']} (length: {dup['length']}) - first found in {existing_sample}")
    
    return con

# Build database

In [3]:
base_dir = Path('/mnt/data2/planter_outputs')
sample_list = [
    'ERR9123871', 'ERR9123872', 'ERR9123874', 'ERR9123875', 'ERR9123876', 
    'ERR9123877', 'ERR9123878', 'ERR9123879', 'ERR9123880', 'ERR9123881', 
    'ERR9123882', 'SRR10444679', 'SRR10444680', 'SRR10444681', 'SRR10444682', 
    'SRR10444683', 'SRR10444684', 'SRR11011255', 'SRR11011256', 'SRR11011257', 
    'SRR11011258', 'SRR11011259', 'SRR11011260', 'SRR12068547', 'SRR128113', 
    'SRR128114', 'SRR13765006', 'SRR14292007', 'SRR14292008', 'SRR18070778', 
    'SRR18070779', 'SRR18070780', 'SRR18070781', 'SRR18070782', 'SRR18070783', 
    'SRR18070784', 'SRR18070785', 'SRR18070786', 'SRR18070787', 'SRR18070788', 
    'SRR18070789', 'SRR18070790', 'SRR18070791', 'SRR18070792', 'SRR18070793', 
    'SRR18070794', 'SRR18070795', 'SRR18735292', 'SRR19034772', 'SRR19034773', 
    'SRR19619612', 'SRR19619613', 'SRR19619614', 'SRR22271585', 'SRR22271586', 
    'SRR22271587', 'SRR22271588', 'SRR22271589', 'SRR22904707', 'SRR24974225', 
    'SRR24974226', 'SRR24974227', 'SRR24974228', 'SRR25582085', 'SRR29366264', 
    'SRR29366265', 'SRR29366266', 'SRR5489198', 'SRR5992919', 'SRR5992920', 
    'SRR6048009', 'SRR8859643', 'SRR8859644', 'SRR8859645', 
    'SRR8859646', 'SRR8859647', 'SRR8859648'
]
db_path = "/mnt/data2/planter_outputs/planter.duckdb"

# Build initial database
con = build_initial_database(db_path, base_dir, sample_list)


INFO:__main__:Building database at /mnt/data2/planter_outputs/planter.duckdb from 77 samples
INFO:__main__:Cleaning up any existing tables...
INFO:__main__:
Processing sample: ERR9123871
INFO:__main__:Loading sequences from /mnt/data2/planter_outputs/ERR9123871/transdecoder/ERR9123871.pep
INFO:__main__:Read 1000 new sequences...
INFO:__main__:Read 2000 new sequences...
INFO:__main__:Read 3000 new sequences...
INFO:__main__:Read 4000 new sequences...
INFO:__main__:Read 5000 new sequences...
INFO:__main__:Read 6000 new sequences...
INFO:__main__:Read 7000 new sequences...
INFO:__main__:Read 8000 new sequences...
INFO:__main__:Read 9000 new sequences...
INFO:__main__:Read 10000 new sequences...
INFO:__main__:Read 11000 new sequences...
INFO:__main__:Read 12000 new sequences...
INFO:__main__:Read 13000 new sequences...
INFO:__main__:Read 14000 new sequences...
INFO:__main__:Read 15000 new sequences...
INFO:__main__:Read 16000 new sequences...
INFO:__main__:Read 17000 new sequences...
INFO:

     sample_id  sequence_count  annotated_count  with_go_terms  \
0   ERR9123871           30281            27884          15140   
1   ERR9123872           26342            24607          13512   
2   ERR9123874           26649            25078          13739   
3   ERR9123875           26224            24564          13484   
4   ERR9123876           26284            24646          13704   
..         ...             ...              ...            ...   
72  SRR8859644           28264            20519           7207   
73  SRR8859645           30587            22900           7348   
74  SRR8859646           33480            25324           8405   
75  SRR8859647           37443            28010           8949   
76  SRR8859648           36454            26065           7777   

    with_ec_numbers  
0              6682  
1              5928  
2              5895  
3              5794  
4              5816  
..              ...  
72             4654  
73             5363  
74       

INFO:__main__:  v1_DLS_1919cce724ee4c987d8dc305165aea1c1c3b8c5b12a098d6f61488f5a56c564c.p1 (length: 447) - first found in SRR18070778
INFO:__main__:
Duplicates in SRR18070792:
INFO:__main__:  v1_DLS_b3de4ccd40f7c9c114f13325a2dac991da78f802172206504b474c78a5eb6f4e.p1 (length: 376) - first found in SRR18070788
INFO:__main__:  v1_DLS_b3de4ccd40f7c9c114f13325a2dac991da78f802172206504b474c78a5eb6f4e.p2 (length: 223) - first found in SRR18070788
INFO:__main__:
Duplicates in SRR18070793:
INFO:__main__:  v1_DLS_462287f58489fbe44cb3253b8b74862a0263d437f1923f423a26a422cc6ff27e.p1 (length: 152) - first found in SRR18070792
INFO:__main__:
Duplicates in SRR18070795:
INFO:__main__:  v1_DLS_0f1ca3117ad468ad62699a5a098558e6d82cd6a3aa6a4612838bb39754ab87be.p1 (length: 174) - first found in SRR18070780
INFO:__main__:  v1_DLS_0fdb65628bfa8522554ac09366e57489516ff767e64528de7fbd28bc127cf993.p1 (length: 272) - first found in SRR18070789
INFO:__main__:  v1_DLS_1a7f26c561725c119d289fcfecea98d822410e95987fa8d

## Close the connection to save

In [4]:
con.close()

# Query

In [12]:
import duckdb
import pandas as pd
from typing import List, Optional, Union
import logging

import duckdb
import pandas as pd
from typing import List, Optional, Union
import logging

class SequenceDB:
    def __init__(self, db_path: str):
        """Initialize connection to the sequence database."""
        self.con = duckdb.connect(db_path)
        self.logger = logging.getLogger(__name__)
    
    def add_cluster_tables(self):
        """Add tables for storing cluster information if they don't exist."""
        schema_sql = """
        CREATE TABLE IF NOT EXISTS clusters (
            cluster_id VARCHAR PRIMARY KEY,
            representative_seqhash_id VARCHAR NOT NULL,
            size INTEGER NOT NULL,
            FOREIGN KEY (representative_seqhash_id) REFERENCES sequences(seqhash_id)
        );

        CREATE TABLE IF NOT EXISTS cluster_members (
            seqhash_id VARCHAR NOT NULL,
            cluster_id VARCHAR NOT NULL,
            PRIMARY KEY (seqhash_id),
            FOREIGN KEY (seqhash_id) REFERENCES sequences(seqhash_id),
            FOREIGN KEY (cluster_id) REFERENCES clusters(cluster_id)
        );
        """
        self.con.execute(schema_sql)
    
    def load_clusters_from_tsv(self, tsv_path: str):
        """
        Load cluster information from MMseqs2 cluster update TSV file.
        Expected format: representative_sequence\tmember_sequence
        """
        self.logger.info(f"Loading cluster data from {tsv_path}")
        
        # Add cluster tables if they don't exist
        self.add_cluster_tables()
        
        # Create temporary table for the TSV data
        self.con.execute("""
        CREATE TEMP TABLE temp_clusters AS 
        SELECT 
            representative as representative_seqhash_id,
            member as seqhash_id
        FROM read_csv_auto(?, sep='\t', header=False, 
                          names=['representative', 'member'])
        """, [tsv_path])
        
        # Insert clusters and update sequence representatives
        self.con.execute("""
        -- First, insert cluster information
        WITH cluster_info AS (
            SELECT 
                representative_seqhash_id,
                ROW_NUMBER() OVER (ORDER BY representative_seqhash_id) as cluster_num,
                COUNT(*) as size
            FROM temp_clusters
            GROUP BY representative_seqhash_id
        )
        INSERT INTO clusters (cluster_id, representative_seqhash_id, size)
        SELECT 
            'CLUSTER_' || cluster_num as cluster_id,
            representative_seqhash_id,
            size
        FROM cluster_info;

        -- Then, insert cluster members
        INSERT INTO cluster_members (seqhash_id, cluster_id)
        SELECT 
            tc.seqhash_id,
            c.cluster_id
        FROM temp_clusters tc
        JOIN clusters c ON tc.representative_seqhash_id = c.representative_seqhash_id;

        -- Update representative status in sequences table
        UPDATE sequences
        SET is_representative = TRUE
        WHERE seqhash_id IN (SELECT representative_seqhash_id FROM clusters);
        """)
        
        # Clean up
        self.con.execute("DROP TABLE temp_clusters")
        
        # Log summary
        cluster_stats = self.get_cluster_stats()
        self.logger.info("\nCluster loading complete:")
        self.logger.info(f"Total clusters: {cluster_stats['total_clusters']}")
        self.logger.info(f"Total clustered sequences: {cluster_stats['total_clustered_sequences']}")
        self.logger.info(f"Average cluster size: {cluster_stats['avg_cluster_size']:.2f}")
    
    def get_cluster_stats(self) -> dict:
        """Get summary statistics about clusters."""
        stats = self.con.execute("""
        SELECT 
            COUNT(DISTINCT cluster_id) as total_clusters,
            COUNT(DISTINCT cm.seqhash_id) as total_clustered_sequences,
            AVG(c.size) as avg_cluster_size,
            MIN(c.size) as min_cluster_size,
            MAX(c.size) as max_cluster_size
        FROM clusters c
        JOIN cluster_members cm ON c.cluster_id = cm.cluster_id
        """).fetchone()
        
        return {
            'total_clusters': stats[0],
            'total_clustered_sequences': stats[1],
            'avg_cluster_size': stats[2],
            'min_cluster_size': stats[3],
            'max_cluster_size': stats[4]
        }
    
    def get_cluster_info(self, cluster_id: str) -> pd.DataFrame:
        """Get detailed information about a specific cluster."""
        query = """
        SELECT 
            s.seqhash_id,
            s.sample_id,
            s.length,
            s.is_representative,
            a.description,
            a.preferred_name,
            STRING_AGG(DISTINCT g.go_term, '; ') as go_terms,
            STRING_AGG(DISTINCT e.ec_number, '; ') as ec_numbers
        FROM cluster_members cm
        JOIN sequences s ON cm.seqhash_id = s.seqhash_id
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN go_terms g ON s.seqhash_id = g.seqhash_id
        LEFT JOIN ec_numbers e ON s.seqhash_id = e.seqhash_id
        WHERE cm.cluster_id = ?
        GROUP BY s.seqhash_id, s.sample_id, s.length, s.is_representative,
                 a.description, a.preferred_name
        """
        return self.con.execute(query, [cluster_id]).fetchdf()
    
    def get_sequence_cluster(self, seqhash_id: str) -> pd.DataFrame:
        """Get cluster information for a specific sequence."""
        query = """
        SELECT 
            c.cluster_id,
            c.size as cluster_size,
            c.representative_seqhash_id,
            s.sample_id as representative_sample,
            a.description as representative_description
        FROM cluster_members cm
        JOIN clusters c ON cm.cluster_id = c.cluster_id
        JOIN sequences s ON c.representative_seqhash_id = s.seqhash_id
        LEFT JOIN annotations a ON c.representative_seqhash_id = a.seqhash_id
        WHERE cm.seqhash_id = ?
        """
        return self.con.execute(query, [seqhash_id]).fetchdf()

    def get_sequence_by_id(self, seqhash_id: str) -> dict:
        """Retrieve complete information for a specific sequence."""
        query = """
        SELECT 
            s.*,
            a.seed_ortholog,
            a.evalue,
            a.score,
            a.eggnog_ogs,
            a.description,
            a.preferred_name,
            a.cog_category,
            STRING_AGG(DISTINCT g.go_term, '; ') as go_terms,
            STRING_AGG(DISTINCT e.ec_number, '; ') as ec_numbers,
            k.kegg_ko,
            k.kegg_pathway,
            k.kegg_module
        FROM sequences s
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN go_terms g ON s.seqhash_id = g.seqhash_id
        LEFT JOIN ec_numbers e ON s.seqhash_id = e.seqhash_id
        LEFT JOIN kegg_info k ON s.seqhash_id = k.seqhash_id
        WHERE s.seqhash_id = ?
        GROUP BY s.seqhash_id, s.sequence, s.sample_id, s.assembly_date, 
                 s.is_representative, s.length, a.seed_ortholog, a.evalue, 
                 a.score, a.eggnog_ogs, a.description, a.preferred_name,
                 a.cog_category, k.kegg_ko, k.kegg_pathway, k.kegg_module
        """
        result = self.con.execute(query, [seqhash_id]).fetchdf()
        return result.to_dict('records')[0] if not result.empty else None
    
    def search_sequences(self, 
                        sample_id: Optional[str] = None,
                        min_length: Optional[int] = None,
                        max_length: Optional[int] = None,
                        has_annotation: Optional[bool] = None,
                        description_contains: Optional[str] = None,
                        go_term: Optional[str] = None,
                        ec_number: Optional[str] = None,
                        limit: int = 100) -> pd.DataFrame:
        """
        Search sequences with multiple criteria.
        Returns a DataFrame of matching sequences.
        """
        conditions = ["1=1"]  # Always true condition to start
        params = []
        
        if sample_id:
            conditions.append("s.sample_id = ?")
            params.append(sample_id)
            
        if min_length:
            conditions.append("s.length >= ?")
            params.append(min_length)
            
        if max_length:
            conditions.append("s.length <= ?")
            params.append(max_length)
            
        if has_annotation is not None:
            if has_annotation:
                conditions.append("a.seqhash_id IS NOT NULL")
            else:
                conditions.append("a.seqhash_id IS NULL")
                
        if description_contains:
            conditions.append("LOWER(a.description) LIKE ?")
            params.append(f"%{description_contains.lower()}%")
            
        if go_term:
            conditions.append("EXISTS (SELECT 1 FROM go_terms g WHERE g.seqhash_id = s.seqhash_id AND g.go_term = ?)")
            params.append(go_term)
            
        if ec_number:
            conditions.append("EXISTS (SELECT 1 FROM ec_numbers e WHERE e.seqhash_id = s.seqhash_id AND e.ec_number = ?)")
            params.append(ec_number)
            
        query = f"""
        SELECT DISTINCT
            s.seqhash_id,
            s.sample_id,
            s.length,
            s.is_representative,
            a.description,
            a.preferred_name,
            a.cog_category
        FROM sequences s
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        WHERE {' AND '.join(conditions)}
        LIMIT ?
        """
        params.append(limit)
        
        return self.con.execute(query, params).fetchdf()
    
    def get_sample_stats(self, sample_id: Optional[str] = None) -> pd.DataFrame:
        """Get statistics for all samples or a specific sample."""
        query = """
        SELECT 
            s.sample_id,
            COUNT(DISTINCT s.seqhash_id) as total_sequences,
            AVG(s.length) as avg_length,
            MIN(s.length) as min_length,
            MAX(s.length) as max_length,
            COUNT(DISTINCT a.seqhash_id) as annotated_sequences,
            COUNT(DISTINCT g.seqhash_id) as sequences_with_go,
            COUNT(DISTINCT e.seqhash_id) as sequences_with_ec
        FROM sequences s
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN go_terms g ON s.seqhash_id = g.seqhash_id
        LEFT JOIN ec_numbers e ON s.seqhash_id = e.seqhash_id
        """
        if sample_id:
            query += " WHERE s.sample_id = ?"
            return self.con.execute(query + " GROUP BY s.sample_id", [sample_id]).fetchdf()
        return self.con.execute(query + " GROUP BY s.sample_id").fetchdf()
    
    def get_go_term_summary(self, sample_id: Optional[str] = None, min_sequences: int = 5) -> pd.DataFrame:
        """Get summary of GO term frequencies."""
        query = """
        SELECT 
            g.go_term,
            COUNT(DISTINCT s.seqhash_id) as sequence_count
        FROM sequences s
        JOIN go_terms g ON s.seqhash_id = g.seqhash_id
        """
        if sample_id:
            query += " WHERE s.sample_id = ?"
            params = [sample_id]
        else:
            params = []
            
        query += f" GROUP BY g.go_term HAVING COUNT(DISTINCT s.seqhash_id) >= {min_sequences} ORDER BY sequence_count DESC"
        return self.con.execute(query, params).fetchdf()
    
    def get_ec_number_summary(self, sample_id: Optional[str] = None, min_sequences: int = 5) -> pd.DataFrame:
        """Get summary of EC number frequencies."""
        query = """
        SELECT 
            e.ec_number,
            COUNT(DISTINCT s.seqhash_id) as sequence_count
        FROM sequences s
        JOIN ec_numbers e ON s.seqhash_id = e.seqhash_id
        """
        if sample_id:
            query += " WHERE s.sample_id = ?"
            params = [sample_id]
        else:
            params = []
            
        query += f" GROUP BY e.ec_number HAVING COUNT(DISTINCT s.seqhash_id) >= {min_sequences} ORDER BY sequence_count DESC"
        return self.con.execute(query, params).fetchdf()
        
    def search_sequences(self, 
                        sample_id: Optional[str] = None,
                        min_length: Optional[int] = None,
                        max_length: Optional[int] = None,
                        has_annotation: Optional[bool] = None,
                        description_contains: Optional[str] = None,
                        go_term: Optional[str] = None,
                        ec_number: Optional[str] = None,
                        is_representative: Optional[bool] = None,
                        min_cluster_size: Optional[int] = None,
                        limit: int = 100) -> pd.DataFrame:
        """
        Search sequences with multiple criteria including cluster information.
        Returns a DataFrame of matching sequences.
        """
        conditions = ["1=1"]
        params = []
        
        if sample_id:
            conditions.append("s.sample_id = ?")
            params.append(sample_id)
            
        if min_length:
            conditions.append("s.length >= ?")
            params.append(min_length)
            
        if max_length:
            conditions.append("s.length <= ?")
            params.append(max_length)
            
        if has_annotation is not None:
            if has_annotation:
                conditions.append("a.seqhash_id IS NOT NULL")
            else:
                conditions.append("a.seqhash_id IS NULL")
                
        if description_contains:
            conditions.append("LOWER(a.description) LIKE ?")
            params.append(f"%{description_contains.lower()}%")
            
        if go_term:
            conditions.append("EXISTS (SELECT 1 FROM go_terms g WHERE g.seqhash_id = s.seqhash_id AND g.go_term = ?)")
            params.append(go_term)
            
        if ec_number:
            conditions.append("EXISTS (SELECT 1 FROM ec_numbers e WHERE e.seqhash_id = s.seqhash_id AND e.ec_number = ?)")
            params.append(ec_number)
            
        if is_representative is not None:
            conditions.append("s.is_representative = ?")
            params.append(is_representative)
            
        if min_cluster_size is not None:
            conditions.append("""
                EXISTS (
                    SELECT 1 FROM clusters c 
                    WHERE c.representative_seqhash_id = s.seqhash_id 
                    AND c.size >= ?
                )
            """)
            params.append(min_cluster_size)
            
        query = f"""
        SELECT DISTINCT
            s.seqhash_id,
            s.sample_id,
            s.length,
            s.is_representative,
            a.description,
            a.preferred_name,
            a.cog_category,
            c.cluster_id,
            c.size as cluster_size
        FROM sequences s
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN cluster_members cm ON s.seqhash_id = cm.seqhash_id
        LEFT JOIN clusters c ON cm.cluster_id = c.cluster_id
        WHERE {' AND '.join(conditions)}
        LIMIT ?
        """
        params.append(limit)
        
        return self.con.execute(query, params).fetchdf()

In [13]:
# Initialize the database interface
db_path = "/mnt/data2/planter_outputs/planter.duckdb"
db = SequenceDB(db_path)


In [10]:
# Load cluster information from MMseqs2 TSV file
db.load_clusters_from_tsv("/mnt/data2/planter_outputs/repseq/update_SRR8859648/newClusterDB.tsv")

# Get overall cluster statistics
stats = db.get_cluster_stats()
print("Cluster statistics:", stats)

# Get information about a specific cluster
cluster_info = db.get_cluster_info("CLUSTER_1")
print("\nCluster members:", cluster_info)

# Get cluster information for a specific sequence
seq_cluster = db.get_sequence_cluster("your_sequence_id")
print("\nSequence cluster info:", seq_cluster)


INFO:__main__:Loading cluster data from /mnt/data2/planter_outputs/repseq/update_SRR8859648/newClusterDB.tsv


ConstraintException: Constraint Error: Violates foreign key constraint because key "seqhash_id: v1_DLS_0004b18321ae94c2f6e95187309a28d05a8cb180c2123f332889dc8d3f246b82.p1" does not exist in the referenced table

In [15]:

# Get statistics for all samples
stats = db.get_sample_stats()
print("Sample Statistics:")
display(stats)

# Search for sequences with specific criteria
results = db.search_sequences(
    sample_id="SRR18070787",
    min_length=200,
    description_contains="kinase",
    limit=10
)
print("\nFound sequences:")
display(results)

# Get detailed information for a specific sequence
seq_info = db.get_sequence_by_id("v1_DLS_0004b18321ae94c2f6e95187309a28d05a8cb180c2123f332889dc8d3f246b82.p1")
print("\nSequence details:")
display(seq_info)

# Get GO term summary
go_summary = db.get_go_term_summary(min_sequences=10)
print("\nMost common GO terms:")
display(go_summary)


Sample Statistics:


Unnamed: 0,sample_id,total_sequences,avg_length,min_length,max_length,annotated_sequences,sequences_with_go,sequences_with_ec
0,ERR9123877,27560,451.145814,100,2762,25693,14075,6115
1,ERR9123879,28766,450.366135,100,3456,26726,14547,6364
2,SRR10444683,23882,488.507425,100,4727,17507,7997,3915
3,SRR11011260,41927,421.588747,100,5069,38566,21438,9320
4,SRR128114,21608,291.993758,100,965,19342,11212,4915
...,...,...,...,...,...,...,...,...
72,SRR29366265,33502,467.670111,100,4812,26471,8561,6567
73,SRR22271586,19862,494.236919,100,2480,15637,6565,3245
74,SRR18070789,9846,423.257129,100,2419,7541,3657,1763
75,SRR8859648,36454,462.970037,100,5234,26065,7777,6146



Found sequences:


Unnamed: 0,seqhash_id,sample_id,length,is_representative,description,preferred_name,cog_category,cluster_id,cluster_size
0,v1_DLS_029cd8067614e2680843f522b1997df7afc9879...,SRR18070787,1947,False,Belongs to the PI3 PI4-kinase family,STT4,T,,
1,v1_DLS_033a5e3a653914b9f212d9db0c0c9fc73b3a01e...,SRR18070787,209,False,Riboflavin kinase,FMN1,H,,
2,v1_DLS_1275325847514a04bd06dfccf2490552cd5d8b6...,SRR18070787,658,False,Protein tyrosine kinase,ppk32,T,,
3,v1_DLS_153edc097bff0caacc961a0830446d12c259b02...,SRR18070787,901,False,Protein tyrosine kinase,-,T,,
4,v1_DLS_1caa68af96db857c54df23d36dd8050eac9d98a...,SRR18070787,455,False,Ribose-phosphate pyrophosphokinase,PRS5,EF,,
5,v1_DLS_1f563cbf008c905ac0478b0b1f3cfe6d08dabef...,SRR18070787,620,False,Protein kinase C conserved region 2 (CalB),-,S,,
6,v1_DLS_2c62c833a7448c1d2f0b0e1964057cffc1bcdf6...,SRR18070787,341,False,Fructosamine kinase,-,G,,
7,v1_DLS_2eee5cce5a507d151d38c9cc0e66375659cee7c...,SRR18070787,358,False,Belongs to the phosphoglycerate kinase family,PGK1,G,,
8,v1_DLS_34b33903ac7116b0dc138f6837ed5d07f203368...,SRR18070787,1054,False,Serine threonine protein kinase,CDC5,D,,
9,v1_DLS_3e166aacd860ecdbefd1955179e994c879e4a87...,SRR18070787,437,False,Mitochondrial NADH kinase,POS5,G,,



Sequence details:


None


Most common GO terms:


Unnamed: 0,go_term,sequence_count
0,GO:0005575,709636
1,GO:0008150,708355
2,GO:0005623,697434
3,GO:0044464,697427
4,GO:0005622,650290
...,...,...
20993,GO:0098730,10
20994,GO:0007147,10
20995,GO:2000564,10
20996,GO:0071623,10


In [17]:
# Diagnostic query to check sequences
tsv_path = "/mnt/data2/planter_outputs/repseq/update_SRR8859648/newClusterDB.tsv"
missing_seqs = db.con.execute("""
WITH cluster_seqs AS (
    SELECT representative as seqhash_id FROM read_csv_auto(?, sep='\t', header=False, names=['representative', 'member'])
    UNION
    SELECT member FROM read_csv_auto(?, sep='\t', header=False, names=['representative', 'member'])
)
SELECT 
    cs.seqhash_id,
    CASE WHEN s.seqhash_id IS NULL THEN 'Missing' ELSE 'Present' END as status
FROM cluster_seqs cs
LEFT JOIN sequences s ON cs.seqhash_id = s.seqhash_id
WHERE s.seqhash_id IS NULL
LIMIT 10
""", [tsv_path, tsv_path]).fetchdf()
print("Missing sequences:")
print(missing_seqs)


Missing sequences:
                                          seqhash_id   status
0  v1_DLS_cf6975b0be8ea4fd95c090f9d99c5b4c8c18405...  Missing
1  v1_DLS_655c1d0533a8d838371bb7e39245f51a9990ef4...  Missing
2  v1_DLS_33b8cc988ae22437a6340b3d956529161e7af32...  Missing
3  v1_DLS_81eec195ff7c18e3da3515e7e28e778ce675851...  Missing
4  v1_DLS_737fb5387adb86ef62214b690181129ff12c7f2...  Missing
5  v1_DLS_cb2577ba4b9a0b63ed0c151b4cdd25eb1f8f393...  Missing
6  v1_DLS_bb4a9733a2d0ab0010de0e80a82247d2a7229a9...  Missing
7  v1_DLS_f42739564a917b895dc496b0868d35fa1ec40ba...  Missing
8  v1_DLS_9beb6405a65587a82847554af33c3bd68a3decb...  Missing
9  v1_DLS_aa46c90b09cda0e4414ae2ded24ecf33fd42b48...  Missing


In [65]:
import duckdb
import pandas as pd
from pathlib import Path
import logging
from Bio import SeqIO
from datetime import datetime
import time

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize DuckDB
db_path = "demo_sequences.duckdb"
con = duckdb.connect(db_path)

# First, let's verify our schema
def print_schema():
    """Print the current schema of all tables"""
    tables = con.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'main'").df()
    for table in tables['table_name']:
        columns = con.execute(f"SELECT * FROM information_schema.columns WHERE table_name = '{table}'").df()
        print(f"\nTable: {table}")
        print(columns[['column_name', 'data_type', 'is_nullable']])

# Clean everything up first
def clean_database():
    """Drop all existing tables"""
    con.execute("DROP TABLE IF EXISTS cluster_members")
    con.execute("DROP TABLE IF EXISTS clusters")
    con.execute("DROP TABLE IF EXISTS sequences")
    con.execute("DROP TABLE IF EXISTS annotations")
    con.execute("DROP TABLE IF EXISTS go_terms")
    con.execute("DROP TABLE IF EXISTS ec_numbers")
    con.execute("DROP TABLE IF EXISTS kegg_info")
    con.execute("DROP TABLE IF EXISTS temp_clusters")

# Create schema
schema_sql = """
CREATE TABLE sequences (
    seqhash_id VARCHAR PRIMARY KEY,
    sequence VARCHAR NOT NULL,
    sra_id VARCHAR NOT NULL,
    assembly_date TIMESTAMP NOT NULL,
    is_representative BOOLEAN NOT NULL DEFAULT FALSE,
    length INTEGER NOT NULL
);

CREATE TABLE clusters (
    representative_seqhash VARCHAR PRIMARY KEY,
    created_date TIMESTAMP NOT NULL,
    member_count INTEGER NOT NULL
);

CREATE TABLE cluster_members (
    representative_seqhash VARCHAR NOT NULL,
    member_seqhash VARCHAR NOT NULL,
    PRIMARY KEY (representative_seqhash, member_seqhash)
);

CREATE TABLE annotations (
    seqhash_id VARCHAR PRIMARY KEY,
    seed_ortholog VARCHAR,
    evalue DOUBLE,
    score DOUBLE,
    eggnog_ogs VARCHAR,
    max_annot_lvl VARCHAR,
    cog_category VARCHAR,
    description VARCHAR,
    preferred_name VARCHAR
);

CREATE TABLE go_terms (
    seqhash_id VARCHAR NOT NULL,
    go_term VARCHAR NOT NULL,
    PRIMARY KEY (seqhash_id, go_term)
);

CREATE TABLE ec_numbers (
    seqhash_id VARCHAR NOT NULL,
    ec_number VARCHAR NOT NULL,
    PRIMARY KEY (seqhash_id, ec_number)
);

CREATE TABLE kegg_info (
    seqhash_id VARCHAR NOT NULL,
    kegg_ko VARCHAR,
    kegg_pathway VARCHAR,
    kegg_module VARCHAR,
    kegg_reaction VARCHAR,
    kegg_rclass VARCHAR,
    PRIMARY KEY (seqhash_id)
);

-- Create indexes for common queries
CREATE INDEX IF NOT EXISTS idx_sequences_sra ON sequences(sra_id);
CREATE INDEX IF NOT EXISTS idx_annotations_cog ON annotations(cog_category);
CREATE INDEX IF NOT EXISTS idx_go_terms ON go_terms(go_term);
"""

def init_database():
    """Initialize database with schema."""
    con.execute(schema_sql)
    logger.info("Database initialized")

def load_sequences(fasta_path: str, sra_id: str):
    """Load sequences from FASTA file into database."""
    logger.info(f"Loading sequences from {fasta_path}")
    
    # Create a list to store sequence data
    sequences = []
    for record in SeqIO.parse(fasta_path, "fasta"):
        seqhash_id = record.id.split()[0]  # Handle TransDecoder headers
        sequences.append({
            'seqhash_id': seqhash_id,
            'sequence': str(record.seq),
            'sra_id': sra_id,
            'assembly_date': datetime.now(),
            'is_representative': False,
            'length': len(record.seq)
        })
        
        if len(sequences) % 1000 == 0:
            logger.info(f"Read {len(sequences)} sequences...")
    
    # Convert to DataFrame and load into DuckDB
    df = pd.DataFrame(sequences)
    con.execute("DELETE FROM sequences WHERE sra_id = ?", [sra_id])
    con.execute("INSERT INTO sequences SELECT * FROM df")
    
    logger.info(f"Successfully loaded {len(sequences)} sequences")

def load_annotations(annot_path: str):
    """Load eggNOG-mapper annotations into database."""
    logger.info(f"Loading annotations from {annot_path}")
    
    # Clean up any existing temporary tables first
    logger.info("Cleaning up any existing temporary tables...")
    con.execute("DROP TABLE IF EXISTS temp_annotations")
    con.execute("DROP TABLE IF EXISTS temp_go")
    con.execute("DROP TABLE IF EXISTS temp_ec")
    
    # Define the expected column names
    column_names = [
        'query', 'seed_ortholog', 'evalue', 'score', 'eggNOG_OGs', 
        'max_annot_lvl', 'COG_category', 'Description', 'Preferred_name',
        'GOs', 'EC', 'KEGG_ko', 'KEGG_Pathway', 'KEGG_Module', 
        'KEGG_Reaction', 'KEGG_rclass', 'BRITE', 'KEGG_TC', 'CAZy',
        'BiGG_Reaction', 'PFAMs'
    ]
    
    # Create temporary table with defined columns
    columns_def = ", ".join([f'"{name}" VARCHAR' for name in column_names])
    con.execute(f"""
        CREATE TABLE temp_annotations ({columns_def})
    """)
    
    # Load data directly with column names
    con.execute(f"""
        INSERT INTO temp_annotations
        SELECT * FROM read_csv_auto(
            ?,
            sep='\t',
            header=False,
            comment='#',
            names=[{','.join(f"'{name}'" for name in column_names)}]
        )
    """, [str(annot_path)])
    
    # Verify the data loading
    count = con.execute("SELECT COUNT(*) FROM temp_annotations").fetchone()[0]
    logger.info(f"Loaded {count} rows into temporary table")
    
    # Let's look at a sample of the data
    logger.info("Sample of loaded data:")
    sample = con.execute("SELECT * FROM temp_annotations LIMIT 3").df()
    logger.info(f"\n{sample}")
    
    # Check for duplicate IDs
    logger.info("Checking for duplicate IDs...")
    duplicates = con.execute("""
        SELECT query, COUNT(*) as count
        FROM temp_annotations
        GROUP BY query
        HAVING COUNT(*) > 1
    """).df()
    
    if not duplicates.empty:
        logger.warning(f"Found duplicate IDs:\n{duplicates}")
    
    try:
        # Process main annotations
        con.execute("""
            INSERT INTO annotations 
            SELECT DISTINCT
                query as seqhash_id,
                seed_ortholog,
                TRY_CAST(evalue AS DOUBLE) as evalue,
                TRY_CAST(score AS DOUBLE) as score,
                "eggNOG_OGs" as eggnog_ogs,
                max_annot_lvl,
                "COG_category" as cog_category,
                "Description" as description,
                "Preferred_name" as preferred_name
            FROM temp_annotations
        """)
        
        annotation_count = con.execute("SELECT COUNT(*) FROM annotations").fetchone()[0]
        logger.info(f"Inserted {annotation_count} annotations")
        
        # Process GO terms
        con.execute("""
            CREATE TABLE temp_go AS
            SELECT 
                query as seqhash_id,
                unnest(string_split(COALESCE("GOs", ''), ',')) as go_term
            FROM temp_annotations
            WHERE "GOs" IS NOT NULL AND "GOs" != '' AND "GOs" != '-'
        """)
        
        con.execute("""
            INSERT INTO go_terms
            SELECT DISTINCT seqhash_id, trim(go_term)
            FROM temp_go
            WHERE trim(go_term) != ''
        """)
        
        go_count = con.execute("SELECT COUNT(*) FROM go_terms").fetchone()[0]
        logger.info(f"Loaded {go_count} GO term associations")
        
        # Process EC numbers if present
        con.execute("""
            CREATE TABLE temp_ec AS
            SELECT 
                query as seqhash_id,
                unnest(string_split(COALESCE("EC", ''), ',')) as ec_number
            FROM temp_annotations
            WHERE "EC" IS NOT NULL AND "EC" != '' AND "EC" != '-'
        """)
        
        con.execute("""
            INSERT INTO ec_numbers
            SELECT DISTINCT seqhash_id, trim(ec_number)
            FROM temp_ec
            WHERE trim(ec_number) != ''
        """)
        
        ec_count = con.execute("SELECT COUNT(*) FROM ec_numbers").fetchone()[0]
        logger.info(f"Loaded {ec_count} EC number associations")
        
    except Exception as e:
        logger.error(f"Error processing annotations: {e}")
        raise
    finally:
        # Clean up temporary tables
        con.execute("DROP TABLE IF EXISTS temp_annotations")
        con.execute("DROP TABLE IF EXISTS temp_go")
        con.execute("DROP TABLE IF EXISTS temp_ec")
    
    logger.info("Successfully completed annotation loading")

def load_clusters(cluster_path: str):
    """Load MMseqs clustering results into database."""
    logger.info(f"Loading clusters from {cluster_path}")
    
    # Clean up any existing temporary tables
    con.execute("DROP TABLE IF EXISTS temp_clusters")
    
    # Create temporary table
    con.execute("""
        CREATE TABLE temp_clusters (
            representative_seqhash VARCHAR,
            member_seqhash VARCHAR
        )
    """)
    
    # Load clustering results
    con.execute("""
        INSERT INTO temp_clusters
        SELECT * FROM read_csv_auto(
            ?,
            sep='\t',
            header=False,
            names=['representative_seqhash', 'member_seqhash']
        )
    """, [str(cluster_path)])
    
    try:
        # Get count of loaded clusters
        loaded_count = con.execute("""
            SELECT COUNT(DISTINCT representative_seqhash) 
            FROM temp_clusters
        """).fetchone()[0]
        logger.info(f"Loaded {loaded_count} unique clusters")
        
        # Create clusters with all required columns
        con.execute("""
            INSERT INTO clusters (representative_seqhash, created_date, member_count)
            SELECT 
                representative_seqhash,
                CURRENT_TIMESTAMP,
                COUNT(*) as member_count
            FROM temp_clusters
            GROUP BY representative_seqhash
        """)
        
        # Add cluster members
        con.execute("""
            INSERT INTO cluster_members (representative_seqhash, member_seqhash)
            SELECT DISTINCT
                representative_seqhash,
                member_seqhash
            FROM temp_clusters
        """)
        
        # Mark representative sequences
        con.execute("""
            UPDATE sequences
            SET is_representative = FALSE
        """)
        
        con.execute("""
            UPDATE sequences
            SET is_representative = TRUE
            WHERE seqhash_id IN (
                SELECT DISTINCT representative_seqhash
                FROM clusters
            )
        """)
        
        # Log statistics
        cluster_count = con.execute("SELECT COUNT(*) FROM clusters").fetchone()[0]
        member_count = con.execute("SELECT COUNT(*) FROM cluster_members").fetchone()[0]
        rep_count = con.execute("SELECT COUNT(*) FROM sequences WHERE is_representative").fetchone()[0]
        
        logger.info(f"Loaded {cluster_count} clusters with {member_count} total members")
        logger.info(f"Marked {rep_count} sequences as representatives")
        
    except Exception as e:
        logger.error(f"Error processing clusters: {e}")
        logger.error("Current schema:")
        print_schema()
        raise
    finally:
        con.execute("DROP TABLE IF EXISTS temp_clusters")
    
    logger.info("Successfully completed cluster loading")

# Function to get cluster members
def get_cluster_members(representative_seqhash: str) -> pd.DataFrame:
    """Get detailed information about all members of a cluster."""
    return con.execute("""
        SELECT 
            s.seqhash_id,
            s.length,
            s.is_representative,
            a.description,
            a.cog_category,
            c.member_count as total_cluster_size
        FROM cluster_members cm
        JOIN sequences s ON cm.member_seqhash = s.seqhash_id
        JOIN clusters c ON cm.representative_seqhash = c.representative_seqhash
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        WHERE cm.representative_seqhash = ?
        ORDER BY s.length DESC
    """, [representative_seqhash]).df()

# Function to get cluster statistics
def get_cluster_stats():
    """Get summary statistics about clusters."""
    return con.execute("""
        SELECT 
            COUNT(*) as total_clusters,
            ROUND(AVG(member_count), 2) as avg_cluster_size,
            MIN(member_count) as min_cluster_size,
            MAX(member_count) as max_cluster_size,
            COUNT(DISTINCT cm.member_seqhash) as total_members
        FROM clusters c
        JOIN cluster_members cm ON c.representative_seqhash = cm.representative_seqhash
    """).df()

# Example query functions
def query_by_annotation(annotation_type: str, value: str) -> pd.DataFrame:
    """Query sequences by annotation criteria."""
    if annotation_type == "cog":
        return con.execute("""
            SELECT s.seqhash_id, s.sequence, a.cog_category, a.description
            FROM sequences s
            JOIN annotations a ON s.seqhash_id = a.seqhash_id
            WHERE a.cog_category = ?
            AND s.is_representative = TRUE
        """, [value]).df()
    elif annotation_type == "go":
        return con.execute("""
            SELECT DISTINCT s.seqhash_id, s.sequence, g.go_term
            FROM sequences s
            JOIN go_terms g ON s.seqhash_id = g.seqhash_id
            WHERE g.go_term = ?
            AND s.is_representative = TRUE
        """, [value]).df()

def get_cluster_members(representative_seqhash: str) -> pd.DataFrame:
    """Get all members of a cluster."""
    return con.execute("""
        SELECT s.*
        FROM sequences s
        JOIN cluster_members m ON s.seqhash_id = m.member_seqhash
        JOIN clusters c ON m.cluster_id = c.cluster_id
        WHERE c.representative_seqhash = ?
    """, [representative_seqhash]).df()

# Define the official COG mapping
cog_mapping = {
    'J': 'TRANSLATION, RIBOSOMAL STRUCTURE AND BIOGENESIS',
    'A': 'RNA PROCESSING AND MODIFICATION',
    'K': 'TRANSCRIPTION',
    'L': 'REPLICATION, RECOMBINATION AND REPAIR',
    'B': 'CHROMATIN STRUCTURE AND DYNAMICS',
    'D': 'CELL CYCLE CONTROL, CELL DIVISION, CHROMOSOME PARTITIONING',
    'Y': 'NUCLEAR STRUCTURE',
    'V': 'DEFENSE MECHANISMS',
    'T': 'SIGNAL TRANSDUCTION MECHANISMS',
    'M': 'CELL WALL/MEMBRANE/ENVELOPE BIOGENESIS',
    'N': 'CELL MOTILITY',
    'Z': 'CYTOSKELETON',
    'W': 'EXTRACELLULAR STRUCTURES',
    'U': 'INTRACELLULAR TRAFFICKING, SECRETION, AND VESICULAR TRANSPORT',
    'O': 'POSTTRANSLATIONAL MODIFICATION, PROTEIN TURNOVER, CHAPERONES',
    'X': 'MOBILOME: PROPHAGES, TRANSPOSONS',
    'C': 'ENERGY PRODUCTION AND CONVERSION',
    'G': 'CARBOHYDRATE TRANSPORT AND METABOLISM',
    'E': 'AMINO ACID TRANSPORT AND METABOLISM',
    'F': 'NUCLEOTIDE TRANSPORT AND METABOLISM',
    'H': 'COENZYME TRANSPORT AND METABOLISM',
    'I': 'LIPID TRANSPORT AND METABOLISM',
    'P': 'INORGANIC ION TRANSPORT AND METABOLISM',
    'Q': 'SECONDARY METABOLITES BIOSYNTHESIS, TRANSPORT AND CATABOLISM',
    'R': 'GENERAL FUNCTION PREDICTION ONLY',
    'S': 'FUNCTION UNKNOWN'
}

def explore_cog_categories(include_descriptions=True):
    """
    Analyze COG category distribution with support for multiple categories per sequence.
    
    Args:
        include_descriptions: If True, includes full COG category descriptions
    """
    logger.info("\nAnalyzing COG category distribution...")
    
    # Create temporary table with split COG categories
    con.execute("DROP TABLE IF EXISTS temp_cog_categories")
    con.execute("""
        CREATE TABLE temp_cog_categories AS
        WITH RECURSIVE chars AS (
            SELECT 
                seqhash_id,
                cog_category,
                1 as position
            FROM annotations
            WHERE cog_category IS NOT NULL
            
            UNION ALL
            
            SELECT 
                seqhash_id,
                cog_category,
                position + 1
            FROM chars
            WHERE position < LENGTH(cog_category)
        )
        SELECT 
            seqhash_id,
            SUBSTRING(cog_category, position, 1) as single_cog
        FROM chars
        WHERE SUBSTRING(cog_category, position, 1) != ''
          AND SUBSTRING(cog_category, position, 1) != '-'
    """)
    
    # Get basic distribution
    cog_stats = con.execute("""
        WITH cog_counts AS (
            SELECT 
                single_cog as cog_category,
                COUNT(DISTINCT c.seqhash_id) as sequence_count,
                COUNT(DISTINCT CASE WHEN s.is_representative THEN s.seqhash_id END) as representative_count,
                COUNT(DISTINCT cl.cluster_id) as cluster_count
            FROM temp_cog_categories c
            JOIN sequences s ON c.seqhash_id = s.seqhash_id
            LEFT JOIN clusters cl ON s.seqhash_id = cl.representative_seqhash
            GROUP BY single_cog
        )
        SELECT 
            cog_category,
            sequence_count,
            representative_count,
            cluster_count,
            ROUND(sequence_count * 100.0 / (SELECT COUNT(DISTINCT seqhash_id) FROM annotations WHERE cog_category IS NOT NULL), 2) as percentage
        FROM cog_counts
        ORDER BY sequence_count DESC
    """).df()
    
    # Add descriptions if requested
    if include_descriptions:
        cog_stats['description'] = cog_stats['cog_category'].map(cog_mapping)
    
    con.execute("DROP TABLE IF EXISTS temp_cog_categories")
    
    return cog_stats

def analyze_cog_combinations(top_n=20):
    """
    Analyze common combinations of COG categories.
    
    Args:
        top_n: Number of top combinations to return
    """
    logger.info("\nAnalyzing COG category combinations...")
    
    combos = con.execute("""
        SELECT 
            a.cog_category,
            COUNT(*) as frequency,
            ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM annotations WHERE cog_category IS NOT NULL), 2) as percentage,
            STRING_AGG(DISTINCT s.seqhash_id, ',' LIMIT 3) as example_sequences
        FROM annotations a
        JOIN sequences s ON a.seqhash_id = s.seqhash_id
        WHERE a.cog_category IS NOT NULL
        GROUP BY a.cog_category
        HAVING LENGTH(a.cog_category) > 1
        ORDER BY frequency DESC
        LIMIT ?
    """, [top_n]).df()
    
    # Add decoded combinations
    def decode_combination(combo):
        return " + ".join([f"{c} ({cog_mapping.get(c, 'Unknown')})" for c in combo])
    
    combos['decoded_combination'] = combos['cog_category'].apply(decode_combination)
    
    return combos

def get_cog_category_summary():
    """Get summary statistics about COG categories."""
    summary = con.execute("""
        SELECT 
            COUNT(DISTINCT seqhash_id) as total_annotated_sequences,
            COUNT(DISTINCT CASE WHEN LENGTH(cog_category) > 1 THEN seqhash_id END) as multi_cog_sequences,
            ROUND(AVG(LENGTH(cog_category)), 2) as avg_cogs_per_sequence,
            COUNT(DISTINCT cog_category) as unique_combinations
        FROM annotations 
        WHERE cog_category IS NOT NULL
    """).df()
    
    # Add category counts by functional group
    metabolism_cogs = ['C', 'G', 'E', 'F', 'H', 'I', 'P', 'Q']
    information_cogs = ['J', 'A', 'K', 'L', 'B']
    cellular_cogs = ['D', 'Y', 'V', 'T', 'M', 'N', 'Z', 'W', 'U', 'O']
    poorly_characterized = ['R', 'S']
    
    category_counts = con.execute("""
        WITH RECURSIVE chars AS (
            SELECT seqhash_id, cog_category, 1 as position
            FROM annotations
            WHERE cog_category IS NOT NULL
            
            UNION ALL
            
            SELECT seqhash_id, cog_category, position + 1
            FROM chars
            WHERE position < LENGTH(cog_category)
        )
        SELECT 
            SUBSTRING(cog_category, position, 1) as single_cog,
            COUNT(DISTINCT seqhash_id) as seq_count
        FROM chars
        WHERE SUBSTRING(cog_category, position, 1) != ''
          AND SUBSTRING(cog_category, position, 1) != '-'
        GROUP BY SUBSTRING(cog_category, position, 1)
    """).df()
    
    # Calculate counts for each functional group
    metabolism_count = category_counts[category_counts['single_cog'].isin(metabolism_cogs)]['seq_count'].sum()
    information_count = category_counts[category_counts['single_cog'].isin(information_cogs)]['seq_count'].sum()
    cellular_count = category_counts[category_counts['single_cog'].isin(cellular_cogs)]['seq_count'].sum()
    poorly_characterized_count = category_counts[category_counts['single_cog'].isin(poorly_characterized)]['seq_count'].sum()
    
    functional_groups = pd.DataFrame({
        'functional_group': ['Metabolism', 'Information Storage and Processing', 
                           'Cellular Processes and Signaling', 'Poorly Characterized'],
        'sequence_count': [metabolism_count, information_count, 
                         cellular_count, poorly_characterized_count]
    })
    
    return summary, functional_groups



In [66]:
# Reset our tables before loading
con.execute("DROP TABLE IF EXISTS annotations")
con.execute("DROP TABLE IF EXISTS go_terms")
con.execute("DROP TABLE IF EXISTS ec_numbers")
con.execute("DROP TABLE IF EXISTS kegg_info")

# Clean and reinitialize
clean_database()
con.execute(schema_sql)

# Verify schema before loading
print("Initial schema:")
print_schema()

# Now load your data
base_dir = Path('/mnt/data2/planter_outputs')
sample = 'SRR14292008'
load_sequences(base_dir / sample / f'transdecoder/{sample}.pep', sample)
load_annotations(base_dir / sample / f'eggnog/{sample}.emapper.annotations')
load_clusters('/mnt/data2/planter_outputs/repseq/update_SRR14292008/newClusterDB.tsv')
# Example usage:
# Get sequences with specific COG category
# cog_seqs = query_by_annotation("cog", "F")
# print(f"Found {len(cog_seqs)} sequences in COG category F")

# Get cluster members for a representative sequence
# members = get_cluster_members("your_seqhash_id")
# print(f"Found {len(members)} cluster members")

<duckdb.duckdb.DuckDBPyConnection at 0x7be9d5e6cbb0>

<duckdb.duckdb.DuckDBPyConnection at 0x7be9d5e6cbb0>

<duckdb.duckdb.DuckDBPyConnection at 0x7be9d5e6cbb0>

<duckdb.duckdb.DuckDBPyConnection at 0x7be9d5e6cbb0>

<duckdb.duckdb.DuckDBPyConnection at 0x7be9d5e6cbb0>

INFO:__main__:Loading sequences from /mnt/data2/planter_outputs/SRR14292008/transdecoder/SRR14292008.pep
INFO:__main__:Read 1000 sequences...
INFO:__main__:Read 2000 sequences...
INFO:__main__:Read 3000 sequences...
INFO:__main__:Read 4000 sequences...
INFO:__main__:Read 5000 sequences...
INFO:__main__:Read 6000 sequences...
INFO:__main__:Read 7000 sequences...
INFO:__main__:Read 8000 sequences...
INFO:__main__:Read 9000 sequences...
INFO:__main__:Read 10000 sequences...
INFO:__main__:Read 11000 sequences...
INFO:__main__:Read 12000 sequences...
INFO:__main__:Read 13000 sequences...


Initial schema:

Table: annotations
      column_name data_type is_nullable
0      seqhash_id   VARCHAR          NO
1   seed_ortholog   VARCHAR         YES
2          evalue    DOUBLE         YES
3           score    DOUBLE         YES
4      eggnog_ogs   VARCHAR         YES
5   max_annot_lvl   VARCHAR         YES
6    cog_category   VARCHAR         YES
7     description   VARCHAR         YES
8  preferred_name   VARCHAR         YES

Table: clusters
              column_name  data_type is_nullable
0  representative_seqhash    VARCHAR          NO
1            created_date  TIMESTAMP          NO
2            member_count    INTEGER          NO

Table: cluster_members
              column_name data_type is_nullable
0  representative_seqhash   VARCHAR          NO
1          member_seqhash   VARCHAR          NO

Table: ec_numbers
  column_name data_type is_nullable
0  seqhash_id   VARCHAR          NO
1   ec_number   VARCHAR          NO

Table: go_terms
  column_name data_type is_nullable
0  

INFO:__main__:Read 14000 sequences...
INFO:__main__:Read 15000 sequences...
INFO:__main__:Read 16000 sequences...
INFO:__main__:Read 17000 sequences...
INFO:__main__:Successfully loaded 17947 sequences
INFO:__main__:Loading annotations from /mnt/data2/planter_outputs/SRR14292008/eggnog/SRR14292008.emapper.annotations
INFO:__main__:Cleaning up any existing temporary tables...
INFO:__main__:Loaded 14337 rows into temporary table
INFO:__main__:Sample of loaded data:
INFO:__main__:
                                               query          seed_ortholog  \
0  v1_DLS_00048542b2614e34db583d1e78f0cd5d7895791...  310453.XP_007587614.1   
1  v1_DLS_00048542b2614e34db583d1e78f0cd5d7895791...  698440.XP_007289890.1   
2  v1_DLS_00048542b2614e34db583d1e78f0cd5d7895791...   93612.XP_008025513.1   

      evalue  score                                         eggNOG_OGs  \
0   1.32e-88  263.0  COG0636@1|root,KOG0233@2759|Eukaryota,38C5K@33...   
1  3.02e-155  459.0  COG0303@1|root,KOG2371@2759|Euk

In [72]:
# Demo queries to explore the database
# Set pandas display options
pd.set_option('display.max_colwidth', None)  # Don't truncate column contents
pd.set_option('display.max_columns', None)   # Show all columns
pd.set_option('display.width', None)         # Don't wrap wide displays

def show_largest_clusters(n=10):
    """Show the n largest clusters with their annotations."""
    logger.info(f"\nTop {n} largest clusters:")
    return con.execute("""
        SELECT 
            c.representative_seqhash,
            c.member_count,
            s.length as rep_length,
            a.cog_category,
            a.description,
            COUNT(DISTINCT g.go_term) as go_term_count
        FROM clusters c
        JOIN sequences s ON c.representative_seqhash = s.seqhash_id
        LEFT JOIN annotations a ON c.representative_seqhash = a.seqhash_id
        LEFT JOIN go_terms g ON c.representative_seqhash = g.seqhash_id
        GROUP BY c.representative_seqhash, c.member_count, s.length, a.cog_category, a.description
        ORDER BY c.member_count DESC
        LIMIT ?
    """, [n]).df()

def analyze_cog_distribution():
    """Analyze the distribution of COG categories."""
    logger.info("\nCOG category distribution:")
    return con.execute("""
        WITH RECURSIVE chars AS (
            SELECT 
                seqhash_id,
                cog_category,
                1 as position
            FROM annotations
            WHERE cog_category IS NOT NULL
            
            UNION ALL
            
            SELECT 
                seqhash_id,
                cog_category,
                position + 1
            FROM chars
            WHERE position < LENGTH(cog_category)
        )
        SELECT 
            SUBSTRING(cog_category, position, 1) as cog,
            COUNT(*) as sequence_count,
            ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM annotations), 2) as percentage
        FROM chars
        WHERE SUBSTRING(cog_category, position, 1) != ''
        GROUP BY SUBSTRING(cog_category, position, 1)
        ORDER BY sequence_count DESC
    """).df()

def find_interesting_sequences(min_cluster_size=5, min_length=500):
    """Find potentially interesting sequences based on various criteria."""
    logger.info("\nInteresting sequences (long, well-annotated, in large clusters):")
    return con.execute("""
        SELECT 
            s.seqhash_id,
            s.length,
            c.member_count as cluster_size,
            a.cog_category,
            a.description,
            COUNT(DISTINCT g.go_term) as go_term_count,
            COUNT(DISTINCT e.ec_number) as ec_number_count
        FROM sequences s
        JOIN clusters c ON s.seqhash_id = c.representative_seqhash
        JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN go_terms g ON s.seqhash_id = g.seqhash_id
        LEFT JOIN ec_numbers e ON s.seqhash_id = e.seqhash_id
        WHERE c.member_count >= ?
          AND s.length >= ?
        GROUP BY s.seqhash_id, s.length, c.member_count, a.cog_category, a.description
        ORDER BY c.member_count DESC, s.length DESC
        LIMIT 20
    """, [min_cluster_size, min_length]).df()

def sequence_length_distribution():
    """Analyze the distribution of sequence lengths."""
    logger.info("\nSequence length distribution:")
    return con.execute("""
        WITH bins AS (
            SELECT 
                CASE 
                    WHEN length < 100 THEN '<100'
                    WHEN length < 200 THEN '100-200'
                    WHEN length < 500 THEN '200-500'
                    WHEN length < 1000 THEN '500-1000'
                    ELSE '>1000'
                END as length_bin,
                COUNT(*) as count,
                COUNT(CASE WHEN is_representative THEN 1 END) as rep_count
            FROM sequences
            GROUP BY 
                CASE 
                    WHEN length < 100 THEN '<100'
                    WHEN length < 200 THEN '100-200'
                    WHEN length < 500 THEN '200-500'
                    WHEN length < 1000 THEN '500-1000'
                    ELSE '>1000'
                END
        )
        SELECT 
            length_bin,
            count as total_sequences,
            rep_count as representative_sequences,
            ROUND(count * 100.0 / (SELECT SUM(count) FROM bins), 2) as percentage
        FROM bins
        ORDER BY 
            CASE length_bin
                WHEN '<100' THEN 1
                WHEN '100-200' THEN 2
                WHEN '200-500' THEN 3
                WHEN '500-1000' THEN 4
                ELSE 5
            END
    """).df()

def search_by_function(search_term):
    """Search for sequences by functional description."""
    logger.info(f"\nSearching for sequences related to '{search_term}':")
    return con.execute("""
        SELECT 
            s.seqhash_id,
            s.length,
            s.is_representative,
            a.description,
            a.cog_category,
            CASE 
                WHEN c.member_count IS NOT NULL THEN c.member_count
                ELSE 0
            END as cluster_size
        FROM sequences s
        JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN clusters c ON s.seqhash_id = c.representative_seqhash
        WHERE a.description ILIKE ?
        ORDER BY s.length DESC
    """, [f'%{search_term}%']).df()

def cluster_member_analysis(representative_seqhash):
    """Analyze members of a specific cluster."""
    logger.info(f"\nAnalyzing cluster members for {representative_seqhash}:")
    return con.execute("""
        SELECT 
            s.seqhash_id,
            s.length,
            s.sra_id,
            a.description,
            a.cog_category
        FROM cluster_members cm
        JOIN sequences s ON cm.member_seqhash = s.seqhash_id
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        WHERE cm.representative_seqhash = ?
        ORDER BY s.length DESC
    """, [representative_seqhash]).df()

def show_largest_clusters(n=10, display_full=True):
    """
    Show the n largest clusters with their annotations.
    
    Args:
        n: Number of clusters to show
        display_full: If True, prints full seqhash IDs separately
    """
    query_result = con.execute("""
        SELECT 
            c.representative_seqhash,
            c.member_count,
            s.length as rep_length,
            a.cog_category,
            a.description,
            COUNT(DISTINCT g.go_term) as go_term_count
        FROM clusters c
        JOIN sequences s ON c.representative_seqhash = s.seqhash_id
        LEFT JOIN annotations a ON c.representative_seqhash = a.seqhash_id
        LEFT JOIN go_terms g ON c.representative_seqhash = g.seqhash_id
        GROUP BY c.representative_seqhash, c.member_count, s.length, 
                 a.cog_category, a.description
        ORDER BY c.member_count DESC
        LIMIT ?
    """, [n]).df()
    
    if display_full:
        print("\nFull sequence IDs for largest clusters:")
        for idx, row in query_result.iterrows():
            print(f"\nCluster {idx+1}:")
            print(f"Representative: {row['representative_seqhash']}")
            print(f"Members: {row['member_count']}")
            print(f"COG: {row['cog_category']}")
            print(f"Description: {row['description']}")
            print("-" * 80)
    
    return query_result

def examine_sequence(seqhash_id):
    """
    Get detailed information about a specific sequence.
    
    Args:
        seqhash_id: Full sequence hash ID
    """
    # Get basic sequence info
    seq_info = con.execute("""
        SELECT 
            s.seqhash_id,
            s.length,
            s.is_representative,
            a.cog_category,
            a.description,
            a.evalue,
            a.seed_ortholog
        FROM sequences s
        LEFT JOIN annotations a ON s.seqhash_id = a.seqhash_id
        WHERE s.seqhash_id = ?
    """, [seqhash_id]).df()
    
    # Get GO terms
    go_terms = con.execute("""
        SELECT go_term
        FROM go_terms
        WHERE seqhash_id = ?
        ORDER BY go_term
    """, [seqhash_id]).df()
    
    # Get cluster info if it's a representative
    cluster_info = con.execute("""
        SELECT member_count
        FROM clusters
        WHERE representative_seqhash = ?
    """, [seqhash_id]).df()
    
    print(f"\nSequence Details for: {seqhash_id}")
    print("-" * 80)
    print(f"Length: {seq_info['length'].iloc[0]}")
    print(f"Is Representative: {seq_info['is_representative'].iloc[0]}")
    print(f"COG Category: {seq_info['cog_category'].iloc[0]}")
    print(f"Description: {seq_info['description'].iloc[0]}")
    print(f"E-value: {seq_info['evalue'].iloc[0]}")
    print(f"Seed Ortholog: {seq_info['seed_ortholog'].iloc[0]}")
    
    if not go_terms.empty:
        print("\nGO Terms:")
        for term in go_terms['go_term']:
            print(f"  {term}")
    
    if not cluster_info.empty:
        print(f"\nCluster Size: {cluster_info['member_count'].iloc[0]} members")

def search_sequences(term, limit=10):
    """
    Search for sequences by annotation description.
    
    Args:
        term: Search term
        limit: Maximum number of results to return
    """
    results = con.execute("""
        SELECT 
            s.seqhash_id,
            s.length,
            s.is_representative,
            a.cog_category,
            a.description,
            CASE 
                WHEN c.member_count IS NOT NULL THEN c.member_count
                ELSE 0 
            END as cluster_size
        FROM sequences s
        JOIN annotations a ON s.seqhash_id = a.seqhash_id
        LEFT JOIN clusters c ON s.seqhash_id = c.representative_seqhash
        WHERE a.description ILIKE ?
        ORDER BY s.length DESC
        LIMIT ?
    """, [f'%{term}%', limit]).df()
    
    for idx, row in results.iterrows():
        print(f"\nResult {idx+1}:")
        print(f"Sequence ID: {row['seqhash_id']}")
        print(f"Length: {row['length']}")
        print(f"COG: {row['cog_category']}")
        print(f"Description: {row['description']}")
        if row['cluster_size'] > 0:
            print(f"Cluster Size: {row['cluster_size']}")
        print("-" * 80)
    
    return results

# Example usage:
print("=== Largest Clusters ===")
clusters = show_largest_clusters(5)

print("\n=== Example Sequence Search ===")
search_results = search_sequences("kinase", limit=3)

# To examine a specific sequence:
if not clusters.empty:
    example_seq = clusters.iloc[0]['representative_seqhash']
    print("\n=== Example Sequence Details ===")
    examine_sequence(example_seq)


=== Largest Clusters ===

Full sequence IDs for largest clusters:

Cluster 1:
Representative: v1_DLS_31c8f6bb7af515ec897d4dc97dcf1c5b23b06f51326fa027676061e35737fb69.p1
Members: 6
COG: None
Description: None
--------------------------------------------------------------------------------

Cluster 2:
Representative: v1_DLS_2f5f97c028c943c8ea6dfb26c9b8a9b473ed71bb504e7e94a2da88dc4582d159.p6
Members: 5
COG: None
Description: None
--------------------------------------------------------------------------------

Cluster 3:
Representative: v1_DLS_107112bb223beca181620a5fa96cb5ea148efc5af8f0f981f04bc070055b515d.p2
Members: 4
COG: None
Description: None
--------------------------------------------------------------------------------

Cluster 4:
Representative: v1_DLS_0fa1029048bdfd19c3085285f8c512d572e81f459768765c01db7dff95cd4f8f.p2
Members: 4
COG: None
Description: None
--------------------------------------------------------------------------------

Cluster 5:
Representative: v1_DLS_2e28f15

In [None]:
# # Example usage
# print("\n=== COG Analysis ===")
# print("\nCOG Category Distribution:")
# cog_dist = explore_cog_categories()
# print(cog_dist)

# print("\nCommon COG Combinations:")
# combos = analyze_cog_combinations(top_n=10)
# print(combos)

# print("\nCOG Category Summary:")
# summary, functional_groups = get_cog_category_summary()
# print("\nOverall Statistics:")
# print(summary)
# print("\nFunctional Group Distribution:")
# print(functional_groups)

# # Helper function to get sequences by COG category
# def get_sequences_by_cog(cog_category):
#     """Get sequences annotated with a specific COG category."""
#     return con.execute("""
#         WITH RECURSIVE chars AS (
#             SELECT seqhash_id, cog_category, 1 as position
#             FROM annotations
#             WHERE cog_category IS NOT NULL
            
#             UNION ALL
            
#             SELECT seqhash_id, cog_category, position + 1
#             FROM chars
#             WHERE position < LENGTH(cog_category)
#         )
#         SELECT 
#             s.seqhash_id,
#             s.length,
#             a.description,
#             a.cog_category as full_cog_categories,
#             s.is_representative,
#             CASE 
#                 WHEN c.cluster_id IS NOT NULL THEN 
#                     (SELECT COUNT(*) FROM cluster_members cm WHERE cm.cluster_id = c.cluster_id)
#                 ELSE 0 
#             END as cluster_size
#         FROM chars ch
#         JOIN sequences s ON ch.seqhash_id = s.seqhash_id
#         JOIN annotations a ON s.seqhash_id = a.seqhash_id
#         LEFT JOIN clusters c ON s.seqhash_id = c.representative_seqhash
#         WHERE SUBSTRING(ch.cog_category, ch.position, 1) = ?
#         ORDER BY s.length DESC
#     """, [cog_category]).df()

In [34]:
eggnog_test = pd.read_csv(base_dir / sample / f'eggnog/{sample}.emapper.annotations', sep='\t', comment='#', header=None)

duplicate_header = 'v1_DLS_00048542b2614e34db583d1e78f0cd5d7895791164d212c3863b184d2d248322.p3'


In [47]:
newClusterDB_path = '/mnt/data2/planter_outputs/repseq/update_SRR14292008/newClusterDB.tsv'
pd.read_csv(newClusterDB_path, delimiter='\t', header=None)

Unnamed: 0,0,1
0,v1_DLS_55fa16b80e2adc2e0b3738aa3183d18521a8dd2...,v1_DLS_55fa16b80e2adc2e0b3738aa3183d18521a8dd2...
1,v1_DLS_940b7f3fa867500048ec213a7b36cdaab2f9888...,v1_DLS_940b7f3fa867500048ec213a7b36cdaab2f9888...
2,v1_DLS_cdf02843a9f4e7aaf19cc4fcdaf71aa10fdd7dd...,v1_DLS_cdf02843a9f4e7aaf19cc4fcdaf71aa10fdd7dd...
3,v1_DLS_e0c60bba4d61a3591cf8f2db164c6eee3be0527...,v1_DLS_e0c60bba4d61a3591cf8f2db164c6eee3be0527...
4,v1_DLS_8f976d2134ab73bb9e2ae3e71570e98bc2debd6...,v1_DLS_8f976d2134ab73bb9e2ae3e71570e98bc2debd6...
...,...,...
95344,v1_DLS_e4abf268b58d90b0395df33bd2dc6e847a4e4e7...,v1_DLS_e4abf268b58d90b0395df33bd2dc6e847a4e4e7...
95345,v1_DLS_e4c48624757dfc0ee10c8d9efc810e461bcdf69...,v1_DLS_e4c48624757dfc0ee10c8d9efc810e461bcdf69...
95346,v1_DLS_e5531a7c4ab1350a6f2adf8d07502a0f3e659e7...,v1_DLS_e5531a7c4ab1350a6f2adf8d07502a0f3e659e7...
95347,v1_DLS_e5531a7c4ab1350a6f2adf8d07502a0f3e659e7...,v1_DLS_1244d609c6b688d16255e9e3a7cea7400eb652d...
