## Sequence Import Functions

In [132]:
from Bio import SeqIO

In [133]:
test_genome = '../feature_gen_test/Test_Data/Reference/GCF_009734005.1_ASM973400v2_genomic.fna'
estimated_genome_size = 2.9
genome_skew = 0.1

In [134]:
def read_fasta(fasta):
    """
    Function that imports a fasta file into a dictonary
    """
    with open(fasta, "r") as fasta_file:
        fasta_dict = {}
        for record in SeqIO.parse(fasta_file, "fasta"):
            fasta_dict[record.id] = record.seq
    return fasta_dict

In [135]:
test_fasta = read_fasta(test_genome)

In [136]:
test_fasta

{'NZ_CP038996.1': Seq('ATGGTATCCCTCGATGCTTTATGGAATGAATTAAAAGCAACATACCAAAAGGAT...TAT'),
 'NZ_CP038997.1': Seq('TATCACTATTTAACAATTCTTAACGTTTTCTTAATGTTTATAATCACGTATATA...ACA')}

In [137]:
#def get_longest_record(fasta_dict):
#    """
#    Function that get the longest sequence from a dict of sequences
#    """
#    longest_seq_id = max(fasta_dict, key=lambda k: len(fasta_dict[k]))
#    longest_seq = fasta_dict[longest_seq_id]
#    longest_record = (longest_seq_id, longest_seq)
#    return longest_record

In [138]:
#longest_seq = get_longest_record(test_fasta)

In [139]:
def concat_record(fasta_dict):
    return ''.join(str(value) for value in fasta_dict.values())

In [140]:
def check_genome_size(fasta_dict, genome_size, skew):
    """
    This function checks the size of the provided sequences and checks if it is within a ranges provided by skew.
    Genome_size in a float in MB.
    skew is a float representing the precentage skew
    """
    concat_rec = concat_record(fasta_dict)
    
    bp_genome_size = genome_size * 1000000
    minsize = bp_genome_size - (bp_genome_size * skew)
    maxsize = bp_genome_size + (bp_genome_size * skew)
    #print('Checking that genome size is at least ' + str(minsize) + ' and the it is less than ' + str(maxsize))
    
    if ((len(concat_rec)) >= minsize) & ((len(concat_rec)) <= maxsize):
        return True
    else:
        return False

In [141]:
print(check_genome_size(test_fasta, estimated_genome_size, genome_skew))

True


In [142]:
def genome_import_process(fasta, genome_size, skew):
    """
    Function that imports a fasta file into a dictonary and checks its size
    :param fasta: fasta file
    :param genome_size: estimated genome size for species
    :param skew: the percent skew of the genome size allowed
    :return: tuple of (fasta_file_name, sequence id, sequence)
    """
    
    fasta_dict = read_fasta(fasta)
    
    
    if check_genome_size(fasta_dict, genome_size, skew):
        print('Input sequence ' + fasta + ' passed checks')
        return fasta, fasta_dict
    else:
        print('Input sequence ' + fasta + ' did not pass checks')

In [143]:
passed_fasta = genome_import_process(test_genome, estimated_genome_size, genome_skew)

Input sequence ../feature_gen_test/Test_Data/Reference/GCF_009734005.1_ASM973400v2_genomic.fna passed checks


## Nucleotide Frequencies

In [163]:
from itertools import product
import pandas as pd
import os

In [177]:
output_folder = './Output/'

In [178]:
def space_seperated_record(fasta_dict):
    return ' '.join(str(value) for value in fasta_dict.values())

In [179]:
def count_nucleotides(input_seq):
    
    sequence = input_seq.upper()
    
    nucleotides = ['A', 'C', 'G', 'T']
    
    nucleotide_counts = {}
    
    for nucleotide in nucleotides:
        nucleotide_counts[nucleotide] = sequence.count(nucleotide)
        
    return nucleotide_counts

In [180]:
def generate_combinations(nucleotides, X):
    return [''.join(seq) for seq in product(nucleotides, repeat=X)]

In [181]:
def count_dinucleotides(input_seq):
    di_nuc = generate_combinations(['A', 'C', 'G', 'T'], 2)
    
    sequence = input_seq.upper()
    
    dinucleotide_counts = {}
    
    for pair in di_nuc:
        dinucleotide_counts[pair] = sequence.count(pair)
        
    return dinucleotide_counts

In [182]:
def count_trinucleotides(input_seq):
    tri_nuc = generate_combinations(['A', 'C', 'G', 'T'], 3)
    
    sequence = input_seq.upper()
    
    trinucleotide_counts = {}
    
    for pair in tri_nuc:
        trinucleotide_counts[pair] = sequence.count(pair)
    
    return trinucleotide_counts

In [183]:
def merge_dicts(dict1, dict2, dict3, dict4):
    merged_dict = {**dict1, **dict2, **dict3, **dict4}
    return merged_dict

In [188]:
def create_dir(dir_path):
    """Create a directory if it does not exist."""
    try:
        os.makedirs(dir_path, exist_ok=True)  # exist_ok=True avoids raising an error if the directory already exists
        print(f"Directory '{dir_path}' is created or already exists.")
    except Exception as e:
        print(f"Error creating directory: {e}")

In [185]:
def create_db(import_sequnce, output_folder):
    
    input_file = import_sequnce[0]
    seq_dict = import_sequnce[1]
    space_seq = space_seperated_record(seq_dict)
    
    input_data = {}
    input_data['file']  = import_sequnce[0]
    
    nuc_counts = count_nucleotides(space_seq)
    dinuc_counts = count_dinucleotides(space_seq)
    trinuc_counts = count_trinucleotides(space_seq)
    
    data =  merge_dicts(input_data, nuc_counts, dinuc_counts, trinuc_counts)
    
    nuc_freq_df = pd.DataFrame(data, index=[0])
    
    file_prefix = os.path.splitext(os.path.basename(input_file))[0]
    
    filename = file_prefix + '.nuc_freq.csv'
    
    output_path = os.path.join(output_folder, filename)
    
    create_dir(output_folder)
    
    nuc_freq_df.to_csv(output_path, index=False)
    
    return nuc_freq_df

In [186]:
df = create_db(passed_fasta,output_folder)

Directory './Output/' is created or already exists.
