## Sorting sequences

In [41]:
import numpy as np
import numpy as np
from concrete import fhe
from Bio.Seq import Seq, MutableSeq

import sys, os
sys.path.append(os.path.dirname(os.getcwd()))

from concrete_biopython.FheSeq import FheSeq, FheMutableSeq
from concrete_biopython.SeqWrapper import SeqWrapper


In [42]:
SEQ1 = Seq('AA')
SEQ2 = Seq('AAA')
SEQ3 = Seq('AAB')
SEQ4 = Seq('CAA')

In [43]:
# concatenate sequences from a list and record indices as slices
def gen_seq_indices(seqs):
    slices=[]
    seq=Seq('')
    index=0
    for s in seqs:
        seq = seq+s
        slices.append(slice(index, index+len(s)))
        index+=len(s)
    return seq, slices

In [44]:
SEQ, INDICES = gen_seq_indices([SEQ3, SEQ4, SEQ1, SEQ2])
print(SEQ)

AABCAAAAAAA


In [45]:
def slice_seq(seq, slices):
    seqs=[]
    index=0
    for ind in slices:
        seqs.append(seq[ind.start:ind.stop])
    return seqs

In [46]:
def sort_rank(seqlist, lib=fhe):
    n=len(seqlist)
    # create comparison matrix
    comp_matrix = lib.zeros((n,n))
    # fill the matrix with results of seqlist[i]>seqlist[j]
    for i in range(0, n):
        for j in range(i+1, n):
            # compute upper half of the matrix
            comp_matrix[i,j] = 1*(seqlist[i]<seqlist[j])
            # then copy the opposite to the lower half because the matrix is antisymetrical
            comp_matrix[j,i] = 1-comp_matrix[i,j] # this counts seqlist[j]>=seqlist[i]
    # now sum up each row to get a sorting rank for the sequences
    return np.sum(comp_matrix, axis=0)
            
def process_seq(seq, lib=fhe):
    seqlist = slice_seq(seq, INDICES)
    return sort_rank(seqlist, lib)

In [47]:
OUTPUT_PYTHON = process_seq(SEQ, np)
print("The sorted indices of sequences are : ", OUTPUT_PYTHON)

The sorted indices of sequences are :  [2. 3. 0. 1.]


In [48]:
# wrap a fhe circuit in order to input and output Bio.Seq objects.
def circuit_wrapper(circuit, seq, simulate=False):
    # convert Seq objects to integers with SeqWrapper.toIntegers
    integers = SeqWrapper(seq).toIntegers()
    # run the circuit with integer inputs
    integer_output = circuit.simulate(integers) if simulate else circuit.encrypt_run_decrypt(integers)
    # convert back the integer outputs into a Seq objects with SeqWrapper.toSeq
    return integer_output

In [49]:
def process_seq_adapter(integer_seq):
    # convert integer to FheSeq object, process it and return result as integer array
    return process_seq(FheSeq(integer_seq))

In [50]:
def make_circuit(input_length):
    # compile the process_seq_adapter function and create a circuit
    compiler = fhe.Compiler(lambda data: process_seq_adapter(data), {"data": "encrypted"})    
    return compiler.compile(
        inputset=[
        np.random.randint(0, SeqWrapper.maxInteger()+1, size=(input_length,))
        for _ in range(100)
        ],
        configuration=fhe.Configuration(
            enable_unsafe_features=True,
            use_insecure_key_cache=True,
            insecure_key_cache_location=".keys",
            dataflow_parallelize=False, # setting it to True makes the jupyter kernel crash
        ),
        verbose=False,
    )

In [51]:
# now we can run our circuit on Seq objects and compare the result with output_seq

simulate = False

# and without (slower)
OUTPUT_FHE = circuit_wrapper(make_circuit(len(SEQ)), SEQ, simulate)
print("The sorted indices of sequences are : ", OUTPUT_FHE)
assert(np.all(OUTPUT_PYTHON == OUTPUT_FHE))




The sorted indices of sequences are :  [2 3 0 1]




## Hamming distance

In [31]:
def hamming(dna1, dna2):
    """
    Returns the number of differences between dna1 and dna2.
    """
    return np.sum(dna1 != dna2)

def hamming_adapter(seq1, seq2, lib=fhe):
    """
    Adapter for hamming distance of Seq objects
    """
    if lib is fhe:
        return hamming(seq1.toArray(),seq2.toArray())
    else:
        return hamming(np.array(list(seq1.__str__())), np.array(list(seq2.__str__())))


S = "AGGTTGGTAAAATGGTCCGTGGC"
T = "ACCGTAATAAACGTGTCCGTTGG"


In [52]:
SEQ_H, INDICES_H = gen_seq_indices([S,T])
print(SEQ_H)

AGGTTGGTAAAATGGTCCGTGGCACCGTAATAAACGTGTCCGTTGG


In [53]:
def process_seq(seq, lib=fhe):
    seqlist = slice_seq(seq, INDICES_H)
    return hamming_adapter(seqlist[0],seqlist[1], lib)

In [54]:
OUTPUT_PYTHON = process_seq(SEQ_H, np)
print(f'There are {OUTPUT_PYTHON} differences between S & T.')

There are 10 differences between S & T.


In [55]:
# now we can run our circuit on Seq objects and compare the result with output_seq

simulate = False

# and without (slower)
OUTPUT_FHE = circuit_wrapper(make_circuit(len(SEQ_H)), SEQ_H, simulate)
print("The sorted indices of sequences are : ", OUTPUT_FHE)
assert(np.all(OUTPUT_PYTHON == OUTPUT_FHE))



The sorted indices of sequences are :  10


