In [2]:
#extract sequences

In [5]:
from Bio import SeqIO

def extract_pb1_sequences(fasta_files, gff_files, output_file):
    pb2_sequences = []
    for fasta_file, gff_file in zip(fasta_files, gff_files):
        genome_dict = {record.id: record.seq for record in SeqIO.parse(fasta_file, "fasta")}
        with open(gff_file) as gff:
            for line in gff:
                if not line.startswith("#"):
                    fields = line.strip().split("\t")
                    if len(fields) > 2 and fields[2] == "gene" and "PB1" in fields[8]:
                        seq_id = fields[0]
                        start, end = int(fields[3]) - 1, int(fields[4])
                        pb2_seq = genome_dict[seq_id][start:end]
                        pb2_sequences.append(pb2_seq)
                        break
    with open(output_file, "w") as out_fasta:
        for i, seq in enumerate(pb2_sequences):
            out_fasta.write(f">strain_{i+1}\n{seq}\n")

fasta_files = ["flu_2018.fa", "flu_2021.fa", "flu_2022.fa"]
gff_files = ["flu_2018.gff", "flu_2021.gff", "flu_2022.gff"]

output_fasta = "pb1_sequences.fasta"

extract_pb1_sequences(fasta_files, gff_files, output_fasta)

In [6]:
from Bio import AlignIO
import math
import numpy as np

def calculate_conservation_scores(alignment_file, output_file):
    
    alignment = AlignIO.read(alignment_file, "fasta")
    seq_len = alignment.get_alignment_length()
    scores = []

    for i in range(seq_len):
        column = alignment[:, i]
        valid_bases = [base for base in column if base not in {"-", "N"}]
        if len(valid_bases) == 0:
            conservation_score = 0 
        else:
            freq = {base: valid_bases.count(base) / len(valid_bases) for base in set(valid_bases)}
            entropy = -sum(p * math.log2(p) for p in freq.values())
            conservation_score = 1 - np.divide(entropy, math.log2(len(freq)), out=np.zeros_like (entropy), where= (math.log2(len(freq))) != 0)
        
        scores.append(conservation_score)

    with open(output_file, "w") as bed:
        for i, score in enumerate(scores):
            bed.write(f"PB1\t{i}\t{i+1}\t{score:.4f}\n")

    print(f"Conservation scores written to {output_file}")

calculate_conservation_scores("pb1_sequences.fasta", "pb1_conservation.bed")

Conservation scores written to pb1_conservation.bed


In [7]:
from Bio import AlignIO
import math
import numpy as np

def calculate_conservation_scores_to_wig(alignment_file, output_file, chrom="MT781553.2", start=1, step=1):
    
    alignment = AlignIO.read(alignment_file, "fasta")
    seq_len = alignment.get_alignment_length()
    scores = []

    for i in range(seq_len):
        column = alignment[:, i]
        valid_bases = [base for base in column if base not in {"-", "N"}]
        if len(valid_bases) == 0:
            conservation_score = 0
        else:
            freq = {base: valid_bases.count(base) / len(valid_bases) for base in set(valid_bases)}
            entropy = -sum(p * math.log2(p) for p in freq.values())
            conservation_score = 1 - np.divide(entropy, math.log2(len(freq)), out=np.zeros_like(entropy), where=(math.log2(len(freq))) != 0)
        
        scores.append(conservation_score)

    with open(output_file, "w") as wig:
        wig.write(f"track type=wiggle_0 name=\"Conservation Scores\" description=\"Conservation scores for PB1 alignment\"\n")
        wig.write(f"fixedStep chrom={chrom} start={start} step={step}\n")
        for score in scores:
            wig.write(f"{score:.4f}\n")

    print(f"Conservation scores written to {output_file}")

calculate_conservation_scores_to_wig("pb1_sequences.fasta", "pb1_conservation.wig")

Conservation scores written to pb1_conservation.wig
