MAKE SURE YOU HAVE COMPILED EVERYTHING BEFOREHAND (run all the lines it asks in the read me)
FILE PATHS MUST NOT HAVE SPACES!!!

To get started, run the 8 cells below this

In [1]:
def hamming_distance(str1, str2) -> int:
    if len(str1) != len(str2):
        raise ValueError("Input strings must have the same length")

    distance = 0
    str1 = str1.upper()
    str2 = str2.upper()
    for i in range(len(str1)):
        if str1[i] == 'N': # character N does not match with anything
            distance += 1
        elif str1[i] != str2[i]:
            distance += 1
    return distance

In [2]:
from Bio import SeqIO
import logging
import math
import os
import random
from typing import List, Tuple, Union

def synthesize_sequence(
    s_length: int,
    n_repeats: int,
    repeat_length: Union[int, Tuple[int, int]],
    repeat_coverage: float,
    repeat_noise: int,
    log: str = None
) -> List[str]:
    '''
    Generate a randomized nucleotide sequence with the given parameters.

    Arguments:
        s_length (int): Length of the sequence to be generated.
        n_repeats (int): Number of unique repeats. These repeats will be randomly
            scattered around the generated sequence.
        repeat_length (int or tuple[int, int]): Value, or min and max values for the length of the repeats.
        If a tuple if provided, the repeats will be randomized within this range.
        repeat_coverage ([0, 1]): The proportion of the finals sequence that should be populated
            with repeats. Note that this is a lower bound: The randomized regions may have repeats
            by chance.
        repeat_noise (int): Maximum Hamming distance to randomize repeats with. Once a repeat is
            decided, the sequence will be populated with its variants that are at most this distance
            away from that repeat.
        log (str): If provided, will output a log of the process to the provided path.
    '''
    print("Creating string",s_length,n_repeats,repeat_length,repeat_coverage,repeat_noise,log)
    verbose = False
    if log is not None:
        verbose = True
        f = open(log, 'w')
        f.write('Generating baits with following arguments:\n')
        f.write('L = {}\n'.format(s_length))
        f.write('RN (repeat number) = {}\n'.format(n_repeats))
        f.write('RL (repeat length) = {}\n'.format(repeat_length))
        f.write('RC (repeat coverage) = {}\n'.format(repeat_coverage))
        f.write('RE (repeat error) = {}\n'.format(repeat_noise))

    BASES = ['A', 'G', 'T', 'C']
    vec = [None] * s_length
    total_populated = 0
    repeats_pools = {}
    if verbose:
        repeats_locations = {}
    for i in range(n_repeats):
        l = repeat_length
        if type(repeat_length) is tuple:
            l = random.randint(repeat_length[0], repeat_length[1])
        repeat = ''.join(random.choices(BASES, k = l))
        while repeat in repeats_pools.keys():
            repeat = ''.join(random.choices(BASES, k = l))
        repeats_pools[repeat] = set(range(s_length - l + 1))
        if verbose:
            repeats_locations[repeat] = []
    repeat_coverage = repeat_coverage * s_length
    repeats = list(repeats_pools.keys())
    repeats_ctr = 0
    while total_populated < repeat_coverage:
        repeat = repeats[repeats_ctr]
        i = -1
        if repeat_coverage == s_length:
            i = total_populated # if entire sequence is repeats, fill the next index
        elif len(repeats_pools[repeat]) == 0:
            if verbose:
                f.write('WARNING: {} has no more indices it can be planted in.\n'.format(repeat))
            repeats.remove(repeat)
            n_repeats -= 1
            repeats_ctr = repeats_ctr % n_repeats
            continue
        else:
            i = random.choice(list(repeats_pools[repeat]))
        if i + len(repeat) > s_length:
            if repeat_coverage == s_length:
                vec.extend([None] * (i + len(repeat) - s_length)) # extend vector to accomodate this plant
                if verbose:
                    f.write('Extended sequence to {} base pairs to add final repeat.'.format(len(vec)))
            else:
                continue
        for j in range(len(repeat)):
            if vec[i + j] is not None:
                raise Exception('this shouldnt happen')
        if verbose:
            repeats_locations[repeat].append(i)
        n_modifs = random.randint(0, repeat_noise)
        modif_locs = random.choices(range(len(repeat)), k = n_modifs)
        for j in range(len(repeat)):
            if j in modif_locs:
                vec[i + j] = random.choice([x for x in BASES if x != repeat[j]])
            else:
                vec[i + j] = repeat[j]
        for other in repeats:
            for j in range(i - len(other) + 1, i + len(repeat)):
                if j in repeats_pools[other]:
                    repeats_pools[other].discard(j)
        repeats_ctr = (repeats_ctr + 1) % n_repeats
        total_populated = len(vec) - vec.count(None)
    if verbose:
        f.write('Total covered base pairs: {}\n'.format(total_populated))
        for k, v in repeats_locations.items():
            if len(v) == 0 or len(v) == 1:
                print('WARNING: Some repeats were planted < 2 times')
                f.write('WARNING: Some repeats were planted < 2 times\n')
                break
        f.write('Repeats and the locations they were planted in:\n')
        for k, v in repeats_locations.items():
            f.write(str(k) +  ': ' + str(v) + '\n')
    for i in range(s_length):
        if vec[i] is None:
            vec[i] = random.choice(BASES)
    return ''.join(vec)

In [3]:
import subprocess
import time

In [4]:
from typing import List, Union, Tuple
import numpy as np
def initialize_ignore_vector(
  seqlens: List[int],
  l: int,
) -> List[bool]:
  '''
  Given a list of sequence lengths, create an ignore vector where the indices that would
  cover concatenation spots are marked. Concatenation spots cannot be aligned to with baits,
  so they never need to be checked for alignments and can be ignored.
  '''
  ignore = [False] * sum(seqlens)
  seqlen_sum = 0
  for seqlen in seqlens:
    seqlen_sum += seqlen
    for i in range(seqlen_sum - l + 1, seqlen_sum):
      ignore[i] = True
  return ignore

def update_ignore_vector(
  cov: List[bool],
  seqlens: List[int],
  ignore: List[bool],
  region: Tuple[int, int],
  l: int
) -> None:
  '''
  Given a coverage vector, an ignore vector, and a covered region, find the indices that
  no longer need to be checked for alignments and update the ignore vector accordingly. An
  index can be ignored if the index and the l - 1 indices following it are already covered,
  provided that none of those indices are concatenation spots.
  '''
  total_seqlen = len(cov)
  current_seqstart = 0
  current_seqend = 0
  seqlens_ctr = -1

  check_start = region[0] - l + 1
  if check_start < 0:
    check_start = 0
  check_end = region[1] + l
  if check_end > total_seqlen:
    check_end = total_seqlen

  streak = 0 # how many indices in a row are already covered.
  for i in range(check_start, check_end):
    while i >= current_seqend:
      current_seqstart = current_seqend
      seqlens_ctr += 1
      current_seqend += seqlens[seqlens_ctr]
      streak = 0 # if we cross a concatenation spot, streak is back to zero
    if cov[i]:
      streak += 1
    else:
      streak = 0
    if streak >= l:
      ignore[i - l + 1] = True

def calculate_seqlens(seqs: List[str]) -> List[int]:
  return [len(seq) for seq in seqs]

def calculate_coverage(subs, l) -> List[Tuple[int, int]]:
  '''
  Given a list of substring starting indices and substring length, return an actual list of starting and ending indices for the coverage of these substrings.
  Example: suppose we have subs = [5, 10, 15, 45], l = 10. These cover from [5, 25] and [45, 55]
  '''
  if len(subs) == 0:
    return []
  subs = sorted(subs)
  results = []
  curr_start = subs[0]
  curr_end = subs[0] + l
  for sub in subs[1:]:
    if sub <= curr_end: # if this index is covered by the last started interval, extend the interval
      curr_end = sub + l
    else: # otherwise, end last one and start new one
      results.append((curr_start, curr_end))
      curr_start = sub
      curr_end = sub + l
  results.append((curr_start, curr_end)) # end last interval
  return results

def naive_alignment(
  bait: str,
  s_storage: np.ndarray,
  d: int,
  ignore: List[bool]
) -> List[int]:
  bait = np.array(list(bait))
  l = len(bait)
  result = []
  for i in range(len(s_storage)):
    if ignore[i]:
      continue
    distance = l - (s_storage[i, :] == bait).sum()
    if distance <= d:
      result.append(i)
  return result

def verify_baits(
  baits: List[str],
  s: Union[str, List[str]],
  d: int,
  log: str = None,
):
  if log is not None:
    verbose = True
    f = open(log, 'w')
    f.write('Verifying baits with provided arguments:\n')
    f.write('d (mismatch allowance) = {}\n'.format(d))
    f.write('--------\n')
  else:
    verbose = False

  if isinstance(s, list):
    seqlens = calculate_seqlens(s)
    s = ''.join(s)
  else:
    seqlens = [len(s)]

  length = len(s)
  cov = [False] * length
  l = len(baits[0])
  ignore = initialize_ignore_vector(seqlens, l)
  if verbose:
    f.write('Initialized integer array and ignore vector with length {}\n'.format(length))

  ids = ['bait#{}'.format(str(i)) for i in range(len(baits))]

  s_storage = np.empty((length - l + 1, l), dtype = str)
  s = np.array(list(s))
  for i in range(length - l + 1):
    s_storage[i, :] = s[i: i + l]
  for id, bait in zip(ids, baits):
    if verbose:
      f.write('Aligning bait {}.\n'.format(id))
    matches = naive_alignment(bait, s_storage, d, ignore)
    coverages = calculate_coverage(matches, l)
    if verbose:
      f.write('Bait covers between:\n {}\n'.format(coverages))
    for c in coverages:
      for j in range(c[0], c[1]):
        cov[j] = True
  if verbose:
    f.write('--------\n')
    f.write('Remaining uncovered indices: {}.\n'.format(cov.count(False)))
    f.write(str([i for i in range(len(cov)) if not cov[i]]))
    f.close()
  return [i for i in range(len(cov)) if not cov[i]]
import random
def similarity_index(
  s: Union[str, List[str]],
  d: int,
  trials: int,
):
  if isinstance(s, list):
    seqlens = calculate_seqlens(s)
    s = ''.join(s)
  else:
    seqlens = [len(s)]

  length = len(s)
  cov = [False] * length
  l=120
  ignore = initialize_ignore_vector(seqlens, l)

  s_storage = np.empty((length - l + 1, l), dtype = str)
  s = np.array(list(s))
  for i in range(length - l + 1):
    s_storage[i, :] = s[i: i + l]
  trial_counter=trials
  similarities=0
  while trial_counter > 0:
    trial_counter-=1;
    random_number1=random.randint(0,len(ignore)-1)#inclusive for both ends
    random_number2=random.randint(0,len(ignore)-1)
    if random_number1==random_number2:
      trial_counter+=1
      continue
    if ignore[random_number1]==True or ignore[random_number2]==True:
      trial_counter+=1
      continue
    distance = l - (s_storage[random_number1, :] == s_storage[random_number2, :]).sum()
    if distance <= d:
      similarities+=1
  return similarities/trials

In [5]:
def diagnosis(maxRadius,trials,inputFile):
    start_time=time.time()
    with open(inputFile, 'r') as file:
        original = [line.strip() for line in file.readlines()]
    print("Similarity Index:",similarity_index(original,maxRadius,trials))
    end_time = time.time()
    duration = end_time - start_time
    print(f"Diagnosis time for {inputFile}: {duration} seconds")
    return

def run(windowLength,maxRadius,lenientRadius,lenientRadius2,overlapCount,bypassHyperparameter,searchBreadths,inputFile,outputFile,verification):
    #NOTE: FILE PATHS MUST NOT HAVE SPACES
    inp=str(windowLength)+"\n"+str(maxRadius)+"\n"+str(lenientRadius)+"\n"+str(lenientRadius2)+"\n"+str(overlapCount)+"\n"+str(bypassHyperparameter)+"\n"
    inp+=str(len(searchBreadths))+"\n"
    for sb in searchBreadths:
        inp+=str(sb)+"\n"
    inp+=inputFile+"\n"
    inp+=outputFile
    cleaned_string = inp.replace("\n", " ")
    print(cleaned_string)

    start_time = time.time()
    os.chmod(inputFile, 0o777)
    process=subprocess.Popen("./MultithreadedGenerativeSearchV4WithInput.exe",
                         stdin=subprocess.PIPE,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE,
                         text=True)
    stdout, stderr = process.communicate(input=inp)

    end_time = time.time()
    duration = end_time - start_time
    print(f"Execution time for {inputFile}: {duration} seconds")
    rt=duration
    print("Errors (should be blank):", stderr)
    #print("inputFile: ",inputFile)
    with open(inputFile, 'r') as file:
        original = [line.strip() for line in file.readlines()]
    with open(outputFile, 'r') as file:
        baits = [line.strip() for line in file.readlines()]
    #baits=baits[0:-1] to test true negative aka a bait list that doesnt cover
    print("Number of baits:", len(baits))
    if verification:
        print("Verifying")
        start_time = time.time()
        missing_spots=verify_baits(baits,original,40)
        if len(missing_spots)==0:
            print("\033[92m", len(missing_spots), "spots uncovered\033[0m")          
        else:
            print("\033[91m", len(missing_spots), "spots uncovered\033[0m") 
        end_time = time.time()
        duration = end_time - start_time
        print(f"Verification time for {inputFile}: {duration} seconds")
    return [len(baits),rt]

In [6]:
def modifiedFasta(length,inputFile,outputFile):
    #NOTE: FILE PATHS MUST NOT HAVE SPACES
    inp=str(length)+"\n"+str(inputFile)+"\n"+str(outputFile)
    cleaned_string = inp.replace("\n", " ")
    print(cleaned_string)
    start_time = time.time()
    os.chmod(inputFile, 0o777)
    process=subprocess.Popen("./dataCleaning.exe",
                         stdin=subprocess.PIPE,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE,
                         text=True)
    stdout, stderr = process.communicate(input=inp)

    end_time = time.time()
    duration = end_time - start_time
    print(f"Execution time for converting fasta: {duration} seconds")
    with open(outputFile, 'r') as file:
        # Read the entire content of the file
        content = file.read()
    
    # Get the length of the content
    file_length = len(content)
    
    # Print the length
    print("Number of nucelotides:", file_length)
    print("Errors (ignore ERROR! some values not ACGT):", stderr)
    os.chmod(outputFile, 0o777)
    return

In [7]:
def numberProbes(inputFile,outputFile):
    with open(inputFile, 'r') as input_file:
        # Open the output file in write mode
        with open(outputFile, 'w') as output_file:
            i=1
            # Read each line from the input file
            for line in input_file:
                # Write each line twice to the output file
                output_file.write("probe-"+str(i).zfill(8)+"\n")
                i+=1
                output_file.write(line)  
    print(f"Finished writing to {outputFile}")

In [8]:
def clusterFasta(length,windowLength,maxRadius,lenientRadius,lenientRadius2,overlapCount,bypassHyperparameter,searchBreadths,inputFile,outputFile,verification):
    modifiedFasta(length,inputFile,"temporaryModifiedFastaFile.txt")
    run(windowLength,maxRadius,lenientRadius,lenientRadius2,overlapCount,bypassHyperparameter,searchBreadths,"temporaryModifiedFastaFile.txt",outputFile,verification)

Run the 8 cells above this

In [9]:
#calls .\dataCleaning.exe, inputs are exactly the same as in the READ ME
modifiedFasta(250000, "NAFastaPartitions/final_NA_target_space_All.fasta", "temporaryModifiedFastaFile.txt")

250000 NAFastaPartitions/final_NA_target_space_All.fasta temporaryModifiedFastaFile.txt
Execution time for converting fasta: 0.018815994262695312 seconds
Number of nucelotides: 251359
Errors (ignore ERROR! some values not ACGT): 


In [10]:
#calls .\MultithreadedGenerativeSearchV4WithInput.exe, inputs are exactly the same as in the READ ME
#with the added adition of a boolean of if you want to double check the generated baits completely cover the sequence
run(60, 6,12, 12, 5, 6, [10,10,10,2], "temporaryModifiedFastaFile.txt", "output.txt", False)
#killing one of these calls may require not only to press the stop button on jupyter notebook
#but also going into task manager and manually ending the task

60 6 12 12 5 6 4 10 10 10 2 temporaryModifiedFastaFile.txt output.txt
Execution time for temporaryModifiedFastaFile.txt: 6.980608224868774 seconds
Errors (should be blank): 
Number of baits: 442


[442, 6.980608224868774]

In [11]:
#example with calling the double check function, may take a long time (hours)
run(60, 6,12, 12, 5, 6, [10,10,10,2], "temporaryModifiedFastaFile.txt", "output.txt", True)

60 6 12 12 5 6 4 10 10 10 2 temporaryModifiedFastaFile.txt output.txt
Execution time for temporaryModifiedFastaFile.txt: 6.9085681438446045 seconds
Errors (should be blank): 
Number of baits: 442
Verifying
[92m 0 spots uncovered[0m
Verification time for temporaryModifiedFastaFile.txt: 394.15737533569336 seconds


[442, 6.9085681438446045]

In [12]:
#combines the modifiedFasta and run functions
clusterFasta(250000, 60, 6, 12, 12, 5, 6, [10,10,10,2],"NAFastaPartitions/final_NA_target_space_All.fasta", "output.txt",False)

250000 NAFastaPartitions/final_NA_target_space_All.fasta temporaryModifiedFastaFile.txt
Execution time for converting fasta: 0.019032716751098633 seconds
Number of nucelotides: 251359
Errors (ignore ERROR! some values not ACGT): 
60 6 12 12 5 6 4 10 10 10 2 temporaryModifiedFastaFile.txt output.txt
Execution time for temporaryModifiedFastaFile.txt: 7.214697360992432 seconds
Errors (should be blank): 
Number of baits: 443


In [13]:
#calls .\numberProbes
numberProbes("output.txt","numberedOutput.txt")

Finished writing to numberedOutput.txt
