# Test Databases

In [1]:
import os
import shutil

from oligo_designer_toolsuite.utils import FastaParser
from oligo_designer_toolsuite.database import OligoDatabase, ReferenceDatabase, OligoAttributes
from oligo_designer_toolsuite.sequence_generator import OligoSequenceGenerator

## Setup

In [2]:
# Global Parameters
FILE_NCBI_EXONS = "../data/genomic_regions/sequences_ncbi_exons.fna"
FILE_DATABASE_OLIGO_ATTRIBUTES = "../data/databases/database_oligo_attributes.tsv"
FILE_VARIANTS = "../../data/annotations/custom_GCF_000001405.40.chr16.vcf"

REGION_IDS = [
    "AARS1",
    "DECR2",
    "FAM234A",
    "RHBDF1",
    "WASIR2",
    "this_gene_does_not_exist",
]

## Test Reference Database

In [None]:
tmp_path = os.path.join(os.getcwd(), "tmp_reference_database")

fasta_parser = FastaParser()

reference = ReferenceDatabase(dir_output=tmp_path)
reference.load_database_from_fasta(files_fasta=[FILE_NCBI_EXONS, FILE_NCBI_EXONS], database_overwrite=True)
reference.load_database_from_fasta(files_fasta=FILE_NCBI_EXONS, database_overwrite=False)

In [None]:
reference.filter_database("AARS1", remove_region=True)
for entry in reference.database:
    region, _, _, = fasta_parser.parse_fasta_header(entry.id)
    assert region != "AARS1", f"error: this region {region} should be filtered out."

In [None]:
file_fasta_database = reference.write_database_to_fasta(filename="filtered_databse")
# assert fasta_parser.check_fasta_format(file_fasta_database) == True, f"error: wrong file format for database in {file_fasta_database}"

In [None]:
shutil.rmtree(tmp_path)

## Test Oligo Database

In [3]:
tmp_path = os.path.join(os.getcwd(), "tmp_oligo_database")

fasta_parser = FastaParser()
attribute_calculator = OligoAttributes()
oligo_sequence_generator = OligoSequenceGenerator(dir_output=tmp_path)
oligo_database = OligoDatabase(min_oligos_per_region=2, write_regions_with_insufficient_oligos=True, dir_output=tmp_path)

file_random_seqs = oligo_sequence_generator.create_sequences_random(
    filename_out="random_sequences1",
    length_sequences=30,
    num_sequences=100,
    name_sequences="random_sequences1",
    base_alphabet_with_probability={"A": 0.1, "C": 0.3, "G": 0.4, "T": 0.2},
)

file_sliding_window = oligo_sequence_generator.create_sequences_sliding_window(
    files_fasta_in=FILE_NCBI_EXONS,
    length_interval_sequences=(30, 31),
    region_ids=REGION_IDS
)


In [16]:
from oligo_designer_toolsuite.utils import check_if_list
from subprocess import Popen

oligo_database.load_database_from_fasta(
    files_fasta=file_sliding_window,
    sequence_type="target",
    region_ids=REGION_IDS,
    database_overwrite=True,
)
oligo_database = attribute_calculator.calculate_reverse_complement_sequence(oligo_database=oligo_database,sequence_type="target", sequence_type_reverse_complement="oligo")

Output()



In [23]:
file_A = oligo_database.write_database_to_bed(dir_output="/Users/lisasousa/Desktop/oligo-designer-toolsuite/projects/SNP_filter/",region_ids=REGION_IDS)
file_B = '/Users/lisasousa/Desktop/oligo-designer-toolsuite/projects/SNP_filter/GCF_000001405.40.chr16.vcf'
file_bed_out = '/Users/lisasousa/Desktop/oligo-designer-toolsuite/projects/SNP_filter/overlap.bed'

In [24]:
file_B = check_if_list(file_B)

cmd = "bedtools intersect -wa -wb -bed"
cmd += " -a " + file_A
cmd += " -b " + " ".join(file_B)
cmd += " > " + file_bed_out

print(cmd)

bedtools intersect -wa -wb -bed -a /Users/lisasousa/Desktop/oligo-designer-toolsuite/projects/SNP_filter/db_oligo.bed -b /Users/lisasousa/Desktop/oligo-designer-toolsuite/projects/SNP_filter/GCF_000001405.40.chr16.vcf > /Users/lisasousa/Desktop/oligo-designer-toolsuite/projects/SNP_filter/overlap.bed


In [25]:
process = Popen(cmd, shell=True).wait()

In [26]:
import pandas as pd
SEPARATOR_OLIGO_ID = "::"
search_results = pd.read_csv(
    filepath_or_buffer=file_bed_out,
    header=None,
    sep="\t",
    low_memory=False,
    engine="c",
    usecols=[3,8],
    names=["oligo_id", "snp_id"],
)

search_results["region_id"] = search_results["oligo_id"].str.split(SEPARATOR_OLIGO_ID).str[0]
search_results

Unnamed: 0,oligo_id,snp_id,region_id
0,AARS1::1,rs749554524,AARS1
1,AARS1::1,rs1295605832,AARS1
2,AARS1::1,rs755334113,AARS1
3,AARS1::1,rs373069396,AARS1
4,AARS1::1,rs1960244242,AARS1
...,...,...,...
453715,WASIR2::1960,rs1030129796,WASIR2
453716,WASIR2::1960,rs1250590943,WASIR2
453717,WASIR2::1960,rs1197079217,WASIR2
453718,WASIR2::1960,rs1456353780,WASIR2


In [32]:
search_results = (
    search_results.groupby("oligo_id", as_index=False)
      .agg({
          "snp_id": list,             # store all matching snp_id entries as a list
          "region_id": "first"        # or any aggregator (min, max, etc.) since they should be the same
      })
)
search_results

Unnamed: 0,oligo_id,snp_id,region_id
0,AARS1::1,"[rs749554524, rs1295605832, rs755334113, rs373...",AARS1
1,AARS1::10,"[rs1388493313, rs147433234, rs749554524, rs129...",AARS1
2,AARS1::100,"[rs147433234, rs749554524, rs1295605832, rs755...",AARS1
3,AARS1::1000,"[rs1295970458, rs1960068371, rs762373456, rs12...",AARS1
4,AARS1::1001,"[rs774810867, rs1295970458, rs1960068371, rs76...",AARS1
...,...,...,...
26562,WASIR2::995,"[rs372330566, rs1897139288, rs535254974, rs146...",WASIR2
26563,WASIR2::996,"[rs1897139288, rs535254974, rs1465620309, rs18...",WASIR2
26564,WASIR2::997,"[rs1897139288, rs535254974, rs1465620309, rs18...",WASIR2
26565,WASIR2::998,"[rs535254974, rs1465620309, rs1897139356, rs18...",WASIR2


In [37]:
li = []
li.append("test")
li.extend(["g", "f"])
li

['test', 'g', 'f']

In [None]:
oligo_database.load_database_from_fasta(
    files_fasta=file_random_seqs,
    sequence_type="oligo",
    region_ids=["random_sequences1"],
    database_overwrite=True,
)
oligo_database = attribute_calculator.calculate_reverse_complement_sequence(oligo_database=oligo_database,sequence_type="oligo", sequence_type_reverse_complement="target")
oligo_database.load_database_from_fasta(
    files_fasta=file_sliding_window,
    sequence_type="target",
    region_ids=REGION_IDS,
    database_overwrite=False,
)
oligo_database = attribute_calculator.calculate_reverse_complement_sequence(oligo_database=oligo_database,sequence_type="target", sequence_type_reverse_complement="oligo")

assert len(oligo_database.database) > 0, "error: no sequences loaded into database"

In [None]:
oligo_database = attribute_calculator.calculate_isoform_consensus(oligo_database=oligo_database)
oligo_database = attribute_calculator.calculate_GC_content(oligo_database=oligo_database, sequence_type="oligo")

In [None]:
oligo_database.load_database_from_table(FILE_DATABASE_OLIGO_ATTRIBUTES, database_overwrite=True, merge_databases_on_sequence_type="oligo")
print(oligo_database.database["region_1"])
print(oligo_database.database["region_2"])
print(oligo_database.database["region_3"])

In [None]:
file_database = oligo_database.save_database(region_ids=["region_1", "region_2"])
oligo_database.load_database(file_database, database_overwrite=True)

assert len(oligo_database.database.keys()) == 2, "error: wrong number regions saved and loaded"

In [None]:
oligo_database.load_database_from_table(FILE_DATABASE_OLIGO_ATTRIBUTES, database_overwrite=True)
oligo_database.filter_database_by_attribute_category(attribute_name="exon_number", attribute_category=["1","21"], remove_if_equals_category=False)

assert len(oligo_database.database["region_3"]) == 3, "error: wrong number of oligos removed"

In [None]:
attribute_calculator = OligoAttributes()
oligo_database = attribute_calculator.calculate_isoform_consensus(oligo_database=oligo_database)
oligo_database.filter_database_by_attribute_threshold(attribute_name="isoform_consensus", attribute_thr=70, remove_if_smaller_threshold=True)

assert len(oligo_database.database["region_1"]) == 3, "error: wrong number of oligos removed"

In [None]:
file_fasta = oligo_database.write_database_to_fasta(sequence_type="oligo", save_description=False, filename="database_region1_region2")

assert fasta_parser.check_fasta_format(file_fasta) == True, f"error: wrong file format for database in {file_fasta}"

In [None]:
oligo_database.load_database_from_fasta(
    files_fasta=file_sliding_window,
    sequence_type="target",
    region_ids=REGION_IDS,
    database_overwrite=True,
)
oligo_database.load_database_from_fasta(
    files_fasta=file_random_seqs,
    sequence_type="oligo",
    database_overwrite=False,
)

oligo_database.remove_regions_with_insufficient_oligos("database_generation")
assert len(oligo_database.database.keys()) == (len(REGION_IDS) - 1 + 1), "error: wrong number of regions in database"

In [None]:
oligo_database.load_database_from_fasta(
    files_fasta=file_random_seqs,
    sequence_type="oligo",
    database_overwrite=True,
)

list_sequences = oligo_database.get_sequence_list()
assert len(list_sequences) == 100, "error: wrong number of sequences in database"

In [None]:
oligo_database.load_database_from_table(
    file_database=FILE_DATABASE_OLIGO_ATTRIBUTES,
    region_ids=None,
    database_overwrite=True,
)

mapping = oligo_database.get_sequence_oligoid_mapping(sequence_type="oligo")
assert len(mapping["CTCACTCGACTCTTACACAGTCATA"]) == 4, "error: wrong number of oligos for sequence"

In [None]:
oligo_database.load_database_from_table(
    file_database=FILE_DATABASE_OLIGO_ATTRIBUTES,
    region_ids=None,
    database_overwrite=True,
)
attribute = oligo_database.get_oligo_attribute_table(attribute="test_attribute")

assert len(attribute["test_attribute"].unique()) == 2, "error: wrong attribute returned"

In [None]:
oligo_database.load_database_from_table(
    file_database=FILE_DATABASE_OLIGO_ATTRIBUTES,
    region_ids="region_3",
    database_overwrite=True,
)
new_attribute = {
    "region_3::1": {"GC_content": 63},
    "region_3::2": {"GC_content": 66},
    "region_3::3": {"GC_content": 80},
    "region_3::4": {"GC_content": 70},
    "region_3::5": {"GC_content": 40},
}
oligo_database.update_oligo_attributes(new_attribute)
attribute = oligo_database.get_oligo_attribute_table(attribute="GC_content")

assert len(attribute) == 5, "error: attribute not correctly updated"