In [None]:
"""Data Preprocessing.
===================================================
Version | Last Modified |  Author       | Commment
---------------------------------------------------
0.0   | 10 Sep 21       | J. Charlier   | initial version
0.1   | 15 Oct 21       | J. Charlier   | updated version
===================================================
comments:
>> the encoder class is inherited from
  [1] "CRISPR-Net: A Recurrent Convolutional Network Quantiﬁes
  CRISPR Off-Target Activities with Mismatches and Indels", J. Lin et al
  https://onlinelibrary.wiley.com/doi/epdf/10.1002/advs.201903562
"""
#
import numpy as np
import pandas as pd
import pickle as pkl
import sklearn as skl
from sklearn.utils import Bunch
pd.options.display.max_columns=None
#
print('!!! REQUIREMENTS !!!')
from platform import python_version
print('python version ==', python_version())
print('numpy version ==' , np.__version__)
print('pandas version ==' , pd.__version__)
print('sklearn version ==' , skl.__version__)

!!! REQUIREMENTS !!!
python version == 3.7.12
numpy version == 1.19.5
pandas version == 1.1.5
sklearn version == 0.22.2.post1


In [None]:
class Encoder:
  def __init__(
      self,on_seq,
      off_seq,with_category=False,
      label=None,with_reg_val=False,value=None):
    tlen=24
    self.on_seq="-" *(tlen-len(on_seq))+ on_seq
    self.off_seq="-" *(tlen-len(off_seq))+off_seq
    self.encoded_dict_indel={
      'A': [1,0,0,0,0],
      'T': [0,1,0,0,0],
      'G': [0,0,1,0,0],
      'C': [0,0,0,1,0],
      '_': [0,0,0,0,1],
      '-': [0,0,0,0,0]
    }
    self.direction_dict={
      'A':5,'G':4,
      'C':3,'T':2,
      '_':1
    }
    if with_category:
      self.label=label
    # end if
    if with_reg_val:
      self.value=value
    # end if
    self.encode_on_off_dim7()
  # end of function __init__
  #
  def encode_sgRNA(self):
    code_list=[]
    encoded_dict=self.encoded_dict_indel
    sgRNA_bases=list(self.on_seq)
    for i in range(len(sgRNA_bases)):
      if sgRNA_bases[i] == "N":
        sgRNA_bases[i]=list(self.off_seq)[i]
      # end if
      code_list.append(encoded_dict[sgRNA_bases[i]])
    # end for
    self.sgRNA_code=np.array(code_list)
  # end of function encode_sgRNA
  #
  def encode_off(self):
    code_list=[]
    encoded_dict=self.encoded_dict_indel
    off_bases=list(self.off_seq)
    for i in range(len(off_bases)):
      code_list.append(encoded_dict[off_bases[i]])
    # end for
    self.off_code=np.array(code_list)
  # end of function encode_off
  #
  def encode_on_off_dim7(self):
    self.encode_sgRNA()
    self.encode_off()
    on_bases=list(self.on_seq)
    off_bases=list(self.off_seq)
    on_off_dim7_codes=[]
    for i in range(len(on_bases)):
      diff_code=np.bitwise_or(self.sgRNA_code[i],self.off_code[i])
      on_b=on_bases[i]
      off_b=off_bases[i]
      if on_b == "N":
        on_b=off_b
      # end if
      dir_code=np.zeros(2)
      if on_b=="-" or off_b=="-" or self.direction_dict[on_b]==self.direction_dict[off_b]:
        pass
      else:
        if self.direction_dict[on_b]>self.direction_dict[off_b]:
          dir_code[0]=1
        else:
          dir_code[1]=1
        # end if
      # end if
      on_off_dim7_codes.append(np.concatenate((diff_code,dir_code)))
    # end for
    self.on_off_code=np.array(on_off_dim7_codes)
  # end of function encode_on_off_dim7
# end of class Encoder
#
#
# Testing by replicating the results of Figure 3 of [1]
e=Encoder(
  on_seq ="GC_CTTGCATTGTACCCGAGGGG",
  off_seq="CGT_TAGCCTTGTATCCCAGGGA"
)
pd.DataFrame(e.on_off_code.T, index=['A','T','G','C','_','ins','del'])

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23
A,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0
T,0.0,0.0,0.0,1.0,0.0,1.0,1.0,0.0,0.0,0.0,1.0,1.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
G,0.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,1.0,1.0,1.0
C,0.0,1.0,1.0,0.0,1.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0
_,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
ins,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
del,0.0,0.0,1.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0


In [None]:
def printData(nm, data):
  print(nm+':')
  print(data)
# end of printData
#
def load_22sgRNA_data(nm):
  print("Loading Listgarten dataset 22 gRNA whole")
  sgRNA22_data=pd.read_csv(nm)
  sgRNA22_code=[]
  sgRNA22_labels=[]
  for idx,row in sgRNA22_data.iterrows():
    on_seq=row['sgRNA_seq'].upper()
    # print(idx,on_seq)
    off_seq=row['off_seq'].upper()
    label=row['label']
    en=Encoder(
      on_seq=on_seq, off_seq=off_seq,
      with_category=True, label=label
    )
    sgRNA22_code.append(en.on_off_code)
    sgRNA22_labels.append(en.label)
  # end for
  sgRNA22_labels=np.array(sgRNA22_labels)
  sgRNA22_code=np.array(sgRNA22_code)
  print(
    "Finished!","Dataset size: ",
    np.array(sgRNA22_code).shape,len(sgRNA22_labels[sgRNA22_labels > 0])
  )
  return Bunch(data=np.array(sgRNA22_code), target=np.array(sgRNA22_labels))
# end of function load_22sgRNA_data
#
def predictionPipeline():
  offtarget_df=pd.read_csv("aggregate_example_GACCTTGCATTGTACCCGAG.csv")
  gRNA_seq=offtarget_df['on_target']
  printData('gRNA_seq', gRNA_seq)
  offtarget_seq=offtarget_df['off_target']
  printData('offtarget_seq:', offtarget_seq)
  genic=offtarget_df['Gene_mark']
  printData('genic:', genic)
  genic_labels=np.ones(len(genic))
  genic_labels[genic.isnull()]=0
  print('genic_labels:\n', genic_labels)
  #aggregate_score = run_CRISPR_net_aggregate(gRNA_seq, offtarget_seq, genic_labels)
  #print("The overall off-target score of", file, 'is', aggregate_score)

In [None]:
# unit test
print(">> !!! begin unit test for load_22sgRNA_data")
listgartendt=load_22sgRNA_data("Listgarten_22gRNA_wholeDataset.csv")
print(">> !!! end unit test for load_22sgRNA_data")
#
print(">> !!! begin unit test for predictionPipeline")
predictionPipeline()
print(">> !!! end unit test for predictionPipeline")

>> !!! begin unit test for load_22sgRNA_data
Loading Listgarten dataset 22 gRNA whole
Finished! Dataset size:  (306086, 24, 7) 0
>> !!! end unit test for load_22sgRNA_data
>> !!! begin unit test for predictionPipeline
gRNA_seq:
0        -GACCTTGCATTGTACCCGAGGGG
1        -GACCTTGCATTGTACCCGAGGGG
2        -GACCTTGCATTGTACCCGAGTGG
3        -GACCTTGCATTGTACCCGAGTGG
4        -GACCTTGCATTGTACCCGAGTGG
                   ...           
75935    -GACCTTGCATTGTACCCGAGAGG
75936    -GACCTTGCATTGTACCCGAGAGG
75937    -GACCTTGCATTGTACCCGAGAGG
75938    -GACCTTGCATTGTACCCGAGAGG
75939    -GACCTTGCATTGTACCCGAGCGG
Name: on_target, Length: 75940, dtype: object
offtarget_seq::
0        -GAACTAGCCTTGTATCCCAGGGA
1        -GAACTAGCCTTGTATCCCAGGGA
2        -AACCATGCAATGCACACGTGTGG
3        -CACCTTCCCTTGCACCCTTGTGG
4        -GTGTTTGCAATGTACCCGTGTTG
                   ...           
75935    -GAGCTTGCAGTG_AGCCGAGATT
75936    -GAGCTTGCAGTGAA_CCGAGATG
75937    -GAGCTTGCAGTGAAC_CGAGATG
75938    -GAGCTTGCAGTGAACC_GAG