In [None]:
from Bio import SeqIO
import re
import csv
import pandas as pd

Get records from .gb file

In [None]:
file_path = 'FMDV_all_records.gb'
genbank_records = SeqIO.parse(file_path, 'gb')

Create empty lists for storing necessary information

In [None]:
hosts = []
collection_dates = []
countries = []
cities = []
isolation_sources = []
isolates = []
strains = []
serotypes = []
record_ids = []
references = []
number_of_records = 0
CDS_count = 0

Functions

In [None]:
"""
The function checks if the qualifier is a list and returns the appropriate value.
If the qualifier is a list, its first element is returned.
If the qualifier is not a list, it is returned as is.
If the qualifier is not found by the key, the string 'Unknown' is returned. 
"""
def parse_qualifier(qualifier, key):
    value = qualifier.get(key, 'Unknown')
    return value if not isinstance(value, list) else value[0]

def merge_values(values):
    #Merges multiple values into one by prioritizing non-'Unknown' values.
    for value in values:
        if value != 'Unknown':
            return value
    return 'Unknown'

# The function applies parse_qualifier to the necessary qualifiers.
def extract_data_from_record(record):
    features = record.features
    source_feature_list = [feature for feature in features if feature.type == 'source']

    # Dictionary to store merged values for each qualifier
    merged_values = {
        'host': [],
        'collection_date': [],
        'country': [],
        'isolation_source': [],
        'isolate': [],
        'strain': [],
        'serotype': []
    }

    # Iterate through 'source' features
    for source_feature in source_feature_list:
        qual_dict = source_feature.qualifiers

        # Iterate through qualifiers and merge values
        for key in merged_values.keys():
            merged_values[key].append(parse_qualifier(qual_dict, key))

    # Append merged values to respective lists
    hosts.append(merge_values(merged_values['host']))
    collection_dates.append(merge_values(merged_values['collection_date']))
    countries.append(merge_values(merged_values['country']))
    isolation_sources.append(merge_values(merged_values['isolation_source']))
    isolates.append(merge_values(merged_values['isolate']))
    strains.append(merge_values(merged_values['strain']))
    serotypes.append(merge_values(merged_values['serotype']))

    # Extract references
    for reference in record.annotations["references"]:
        references.append(reference.title)
        break

    # Extract record ids    
    record_ids.append(record.name)

# Execute the function for each record in the .gb file.
for record in genbank_records:
    extract_data_from_record(record)
    CDS_count += sum(1 for feature in record.features if feature.type == 'CDS')
    number_of_records += 1

print("Number of records:", number_of_records)
print("Number of CDS:", CDS_count)
print("Hosts:", hosts[:5])
print("Collection Dates:", collection_dates[:5])
print("Countries:", countries[:5])
#print("Cities:", cities[:5])
print("Isolation Sources:", isolation_sources[:5])
print("Isolates:", isolates[:5])
print("Strains:", strains[:5])
print("Serotypes:", serotypes[:5])
print("Record IDs:", record_ids[:5])


In [None]:
# Function for reading map.csv files
def read_csv(file_name):
    with open(file_name) as csvfile:
        reader = csv.DictReader(csvfile,
                                delimiter=",", 
                                fieldnames=["base", "new"])
        result = {}
        
        for row in reader:
            base_value = row["base"].strip()
            new_value = row["new"].strip().rstrip(';')
            result[base_value] = new_value
        csvfile.close()
        return result

In [None]:
# The function maps values from a list to values in a CSV file.
def map_qualifiers(qualifier, csv_file):
    data_dict = read_csv(csv_file)
    feature_map_comp = [(re.compile(key), value) for key, value in data_dict.items()]
    mapped_values = []
    for name in qualifier:
        for regex, new_name in feature_map_comp:
            match = regex.search(name)
            if match:
                mapped_values.append(new_name)
                break
        else:
            mapped_values.append(name)
    return mapped_values

In [None]:
# Mapping hosts
hosts_mapped = map_qualifiers(hosts, 'Maps/host_map.csv')

# Mapping countries
countries_mapped = map_qualifiers(countries, 'Maps/country_map.csv')

# Converting all dates to years
collection_years = ['Unknown' if date == 'Unknown' else (date[-4:] if date[-4:].isdigit() else date[:4]) for date in collection_dates]

In [None]:
# The function prints unique values and their count in the list
def print_sorted_counts(items):
    count_dict = {}
    for item in items:    
        count_dict[item] = count_dict.get(item, 0) + 1
    
    for key, value in sorted(count_dict.items(), key=lambda x: x[1], reverse=True):
        print(f"{key}: {value}")

Below you can output the values ​​of qualifiers with frequencies

In [None]:
# Values of the host qualifier
print("Counts for hosts:")
print_sorted_counts(hosts_mapped)

In [None]:
# Values of the country qualifier
print("Counts for countries:")
print_sorted_counts(countries_mapped)

In [None]:
# Values for collection_date qualifier
print("Counts for collection_dates:")
print_sorted_counts(collection_years)

In [None]:
# Values for isolation_source qualifier
print("Counts for isolation_sources:")
print_sorted_counts(isolation_sources)

In [None]:
# Values for isolate qualifier
print("Counts for isolates:")
print_sorted_counts(isolates)

In [None]:
# Values for strain qualifier
print("Counts for strains:")
print_sorted_counts(strains)

In [None]:
# Values for serotype qualifier
print("Counts for serotypes:")
print_sorted_counts(serotypes)

Save qualifier values to a table

In [None]:
df = pd.DataFrame({
    'GenBankAccession': record_ids,
    'Country': countries_mapped,
    'Host': hosts_mapped,
    'CollectionDate': collection_years,
    'Serotype': serotypes,
    'Strain': strains,
    'Isolate': isolates,
    'IsolationSource': isolation_sources
})
df.to_csv('qualifiers_table.csv')

In [None]:
# Read file with sources to remove
def read_exclusion_criteria(file_name):
    with open(file_name, 'r') as file:
        exclusion_criteria = [line.strip() for line in file]
    return exclusion_criteria

The function extracts CDS sequences and creates a FASTA file with headers in the format >GenbankAC/country/host/year/serotype.

For this purpose, mapped lists are used.

In [None]:
def write_fasta_from_genbank(genbank_records, hosts, collection_dates, countries, serotypes, record_ids, isolation_exclusion_file, reference_exclusion_file, output_file):
    isolation_exclusion_criteria = read_exclusion_criteria(isolation_exclusion_file)
    reference_exclusion_criteria = read_exclusion_criteria(reference_exclusion_file)
    with open(output_file, 'w') as fasta_file:
        for i, record in enumerate(genbank_records):  

            isolation_source = isolation_sources[i]
            reference = references[i]

            # Check if the isolation source matches any exclusion criteria
            if any(re.search(criteria, isolation_source) for criteria in isolation_exclusion_criteria):
                continue  # Skip this record
            
            # Check if the reference matches any exclusion criteria
            if any(re.search(criteria, reference) for criteria in reference_exclusion_criteria):
                continue  # Skip this record
            
            # Constructing the header
            header = f">{record_ids[i].replace(' ', '-')}/{countries[i].replace(' ', '-')}/{hosts[i].replace(' ', '-')}/{collection_dates[i].replace(' ', '-')}/{serotypes[i].replace(' ', '-')}"
            fasta_file.write(header + '\n')

            cds_list = []
            # Getting the coordinates of the coding sequence/sequences
            for feature in record.features:
                if feature.type == 'CDS':
                    # Check if product qualifier is 'polyprotein'
                    if 'product' in feature.qualifiers:
                        product_value = feature.qualifiers['product'][0]
                        pattern = r'(?i)(polyprotein|poylprotein|polyprotein precursor|polypeptide)'
                        if re.match(pattern, product_value): 
                        
                            cds_start = feature.location.start.position
                            cds_end = feature.location.end.position
                            
                            cds_sequence = record.seq[cds_start:cds_end]
                            cds_list.append(str(cds_sequence))
                            full_cds = ''.join(cds_list)

            # Writing the sequence, moving to a new line every 70 characters
            for i in range(0, len(full_cds), 70):
                fasta_file.write(str(full_cds[i:i+70]) + '\n')

In [None]:
genbank_records = SeqIO.parse(file_path, 'gb')
write_fasta_from_genbank(genbank_records, hosts_mapped, collection_years, countries_mapped, serotypes, record_ids, 'isolation_sources_to_remove.txt', 'references_to_remove.txt', 'extracted_CDS_full.fasta')