In [1]:
import os
import sys
from Bio import SeqIO
sys.path.append('utils')
import utils
import kanalyzer
import protein_score
import mutation_score
import discriminative_score

In [2]:
datasets = {
        "UL33": {
        "k": 9,
        "taxon_id": 10359,
        "input_file": "data/fasta/UL33_all_nucleotide_sequences_annotated.fasta",
        "output_folder": "data/results",
        "infos_file": "data/UL33_infos.json",
        "aligner_matrix": "BLOSUM62",
        "aligner_open_gap_score": -10.0,
        "aligner_extend_gap_score": -2.0,
        "threshold": 25
    }
}

In [3]:
def compile_results(results, threshold_percentage):
    """
    Compile results by aggregating variations and amino acid changes based on a threshold.

    Parameters:
    - results: Dict containing the analysis results from kanalyzer.
    - threshold_percentage: The threshold percentage to include variations.

    Returns:
    - compiled_results: Dict containing compiled variations and amino acid changes.
    """
    temp_variations = {}

    # Collect class counts and temporary variations data
    for gene, sequences in results.items():
        class_counts = {}
        temp_variations.setdefault(gene, {})
        
        for description, mutations in sequences.items():
            class_id = description.split('|')[-1]

            class_counts[class_id] = class_counts.get(class_id, 0) + 1

            for position_kmer, details in mutations.items():
                variation = details['variations']
                amino_acid_changes = details['amino_acid_changes']

                temp_variations[gene].setdefault(position_kmer, {})
                temp_variations[gene][position_kmer].setdefault(variation, {
                    'count': 0,
                    'amino_acid_changes': set(),
                    'class_specific_counts': {}
                })

                variation_info = temp_variations[gene][position_kmer][variation]
                variation_info['count'] += 1
                variation_info['amino_acid_changes'].update(amino_acid_changes)
                variation_info['class_specific_counts'][class_id] = variation_info['class_specific_counts'].get(class_id, 0) + 1

    compiled_results = {}
    # Compile results based on the threshold
    for gene, positions in temp_variations.items():
        compiled_results[gene] = {}
        for position_kmer, variations in positions.items():
            compiled_results[gene][position_kmer] = {
                'variations': set(),
                'amino_acid_changes': set()
            }
            for variation, info in variations.items():
                include_variation = False
                for class_id, count in info['class_specific_counts'].items():
                    total = class_counts[class_id]
                    if (count / total) * 100 >= threshold_percentage:
                        include_variation = True
                        break
                if include_variation:
                    compiled_results[gene][position_kmer]['variations'].add(variation)
                    compiled_results[gene][position_kmer]['amino_acid_changes'].update(info['amino_acid_changes'])
    return compiled_results

In [4]:
def compute_kss_scores(compiled_results, results, infos):
    """
    Compute k-mer significance scores (KSS) for compiled results.

    Parameters:
    - compiled_results: Dict containing compiled variations and amino acid changes.
    - results: Original results from kanalyzer.
    - infos: Dict containing additional information like taxon_id and output_folder.

    Returns:
    - compiled_results: Updated compiled_results with KSS scores.
    """
    for gene, data in compiled_results.items():
        # Update protein score in results
        ps = protein_score.get_protein_score(infos['taxon_id'], gene)
        for key, details in data.items():
            details['protein_score'] = ps

        # Evaluate gene-related scores
        y = [entry.split('|')[-1] for entry in results[gene].keys()]
        categorized_scores_dict = {}
        uncategorized_scores_dict = {}

        for key, details in data.items():
            amino_acid_changes = details['amino_acid_changes']
            mutational_scores = mutation_score.get_mutational_scores(
                amino_acid_changes, categorize=True
            )
            mutational_score = max(mutational_scores.values()) if mutational_scores else 0
            details['mutational_score'] = mutational_score

            X = []
            variations = details['variations']
            for entry_value in results[gene].values():
                variation_presence = [
                    1 if variation in entry_value.get(key, {}).get("variations", "")
                    else 0 for variation in variations
                ]
                X.append(variation_presence)
            
            if not X:
                continue  # Skip if X is empty

            categorized_scores, uncategorized_scores = discriminative_score.get_discriminative_scores(X, y)
            categorized_scores_dict[str(key)] = categorized_scores
            uncategorized_scores_dict[str(key)] = uncategorized_scores

            details['discriminative_score'] = categorized_scores[3]
            average_score = (
                details['discriminative_score'] +
                details['mutational_score'] +
                details['protein_score']
            ) / 3
            details['k_mer_significance_score'] = round(average_score, 1)

        # Save categorized and uncategorized scores
        output_folder = infos['output_folder']
        utils.save_data_as_json(
            categorized_scores_dict,
            os.path.join(output_folder, f"{gene}_categorized_scores.json")
        )
        utils.save_data_as_json(
            uncategorized_scores_dict,
            os.path.join(output_folder, f"{gene}_uncategorized_scores.json")
        )
    return compiled_results

In [5]:
# Main processing loop
for dataset, infos in datasets.items():
    print(f"\nProcessing: {dataset}")
    # Load configuration and input files
    threshold = infos['threshold']
    input_file = infos['input_file']
    output_folder = infos['output_folder']
    os.makedirs(output_folder, exist_ok=True)

    # Load all sequences once to avoid opening the file multiple times
    with open(input_file, "r") as file:
        records = list(SeqIO.parse(file, "fasta"))

    # Analyze records using kanalyzer
    results = kanalyzer.analyze_records(records, infos)

    # Save initial results
    results_file_path = os.path.join(output_folder, f"{dataset}_results.json")
    utils.save_data_as_json(results, results_file_path)
    print(f"Results saved to {results_file_path}")

    # Compile results and save
    compiled_results = compile_results(results, threshold)
    compiled_results = compute_kss_scores(compiled_results, results, infos)
    compiled_file_path = os.path.join(output_folder, f"{dataset}_compiled_results.json")
    utils.save_data_as_json(compiled_results, compiled_file_path)
    print(f"Compiled results saved to {compiled_file_path}")


Processing: UL33
Results saved to data/results\UL33_results.json
Compiled results saved to data/results\UL33_compiled_results.json
