In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import os
cur_path = "/content/drive/MyDrive/bigdata_project"
os.chdir(cur_path)
!pwd

/content/drive/MyDrive/bigdata_project


In [None]:
!pip install pyspark 
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
openjdk-8-jdk-headless is already the newest version (8u362-ga-0ubuntu1~20.04.1).
0 upgraded, 0 newly installed, 0 to remove and 24 not upgraded.


In [None]:
from pyspark import SparkContext
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
import pickle

In [None]:
!pip install pyspark[sql]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from __future__ import print_function
from functools import wraps
import pyspark as spark
from pyspark import SparkConf
import time
from operator import add
import os 
from subprocess import STDOUT, check_call, check_output

class Fastq:
    def __init__(self, path:str) -> str:
        self.path = path
        self.install_java_scala()
        self.stop_context()
        self.sc = spark.SparkContext.getOrCreate(conf=self.set_conf())
        self.data = self.sc.textFile(self.path)

    def stop_context(self):
        try:
          self.sc.stop()
        except:
          pass

    def set_conf(self):
        conf = SparkConf().setAppName("analyze_fastq")
        conf = (conf.setMaster('local[*]')
          .set('spark.executor.memory', '4G')
          .set('spark.driver.memory', '4G')
          .set('spark.driver.maxResultSize', '4G'))
        return conf

    def install_java_scala(self):
        try:
          java_ver = check_output(['java', '-version'], stderr=STDOUT)
        except:
          java_ver = b''
        try:
          scala_ver = check_output(['scala', '-version'], stderr=STDOUT)
        except:
          scala_ver = b''
        if b'1.8.0_232' not in java_ver:
          java_8_install = ['apt-get', '--quiet', 'install',
                            '-y', 'openjdk-8-jdk-headless']
          java_set_alt = ['update-alternatives', '--set', 'java', 
                          '/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java' ] 
          check_call(java_8_install, stdout=open(os.devnull, 'wb'), 
                     stderr=STDOUT)
          os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" 
          check_call(java_set_alt)  
        if b'2.11.12' not in scala_ver:
          scala_install = ['apt-get', '--quiet', 'install', 'scala']
          check_call(scala_install)
          
    def _logging(func):
        @wraps(func)
        def log_print(instance, *args, **kwargs):
          start = time.time()
          res = func(instance, *args, **kwargs)
          print("Finished Executing {}  in {}s!".format(func.__name__, time.time() - start))
          return res
        return log_print

    @_logging
    def get_data(self):
        return self.data    

## Get reference and read resource

In [None]:
fasta_ref = Fastq(cur_path + '/reference_chr21_20000000_20050000.fa')
fasta_read = Fastq(cur_path + '/reads.fq')

## Define kmer function

In [None]:
def cut_kmer(k,source):
  result = []
  l_s = len(source)
  for i in range(l_s-k+1):
    result.append(source[i:i+k])
  return result

## Process reference and read

#### Reference

In [None]:
def process_ref(k,fasta_r):
  return fasta_r.data.map(lambda x: x.split(',')).map(lambda x:('bin'+ str(x[0])+'_'+str(x[1]),x[2])).map(lambda x: (x[0],x[1], cut_kmer(k,x[1])))

In [None]:
res_ref_15 =  process_ref(15,fasta_ref)
res_ref_20 =  process_ref(20,fasta_ref)

In [None]:
res_ref_15.take(2)

[('binstart_end', 'sequence', []),
 ('bin20000000_20000100',
  'CCCTTCTCCTATCCCTTGAAAAATTGTCATTTATTTCTCTTATCCATATGGCATAATCAAAGAATAAATTGGTGATATTTGTTCAAAAATCCATGCCTAT',
  ['CCCTTCTCCTATCCC',
   'CCTTCTCCTATCCCT',
   'CTTCTCCTATCCCTT',
   'TTCTCCTATCCCTTG',
   'TCTCCTATCCCTTGA',
   'CTCCTATCCCTTGAA',
   'TCCTATCCCTTGAAA',
   'CCTATCCCTTGAAAA',
   'CTATCCCTTGAAAAA',
   'TATCCCTTGAAAAAT',
   'ATCCCTTGAAAAATT',
   'TCCCTTGAAAAATTG',
   'CCCTTGAAAAATTGT',
   'CCTTGAAAAATTGTC',
   'CTTGAAAAATTGTCA',
   'TTGAAAAATTGTCAT',
   'TGAAAAATTGTCATT',
   'GAAAAATTGTCATTT',
   'AAAAATTGTCATTTA',
   'AAAATTGTCATTTAT',
   'AAATTGTCATTTATT',
   'AATTGTCATTTATTT',
   'ATTGTCATTTATTTC',
   'TTGTCATTTATTTCT',
   'TGTCATTTATTTCTC',
   'GTCATTTATTTCTCT',
   'TCATTTATTTCTCTT',
   'CATTTATTTCTCTTA',
   'ATTTATTTCTCTTAT',
   'TTTATTTCTCTTATC',
   'TTATTTCTCTTATCC',
   'TATTTCTCTTATCCA',
   'ATTTCTCTTATCCAT',
   'TTTCTCTTATCCATA',
   'TTCTCTTATCCATAT',
   'TCTCTTATCCATATG',
   'CTCTTATCCATATGG',
   'TCTTATCCATATGGC

In [None]:
# change to dataframe
spark = SparkSession.builder.appName('BigData').getOrCreate()
ref_rdd_15 = spark.sparkContext.parallelize(res_ref_15.collect())
ref_rdd_20 = spark.sparkContext.parallelize(res_ref_20.collect())
# remove header
header_15 = ref_rdd_15.first()
ref_df_15 = ref_rdd_15.filter(lambda row : row != header_15).toDF()
ref_df_15 = ref_df_15.withColumnRenamed("_1","bin_pos")\
   .withColumnRenamed("_2","bin")\
   .withColumnRenamed("_3","kmer_list")
header_20 = ref_rdd_20.first()
ref_df_20 = ref_rdd_20.filter(lambda row : row != header_20).toDF()
ref_df_20 = ref_df_20.withColumnRenamed("_1","bin_pos")\
   .withColumnRenamed("_2","bin")\
   .withColumnRenamed("_3","kmer_list")

In [None]:
ref_df_15.show(2)

+--------------------+--------------------+--------------------+
|             bin_pos|                 bin|           kmer_list|
+--------------------+--------------------+--------------------+
|bin20000000_20000100|CCCTTCTCCTATCCCTT...|[CCCTTCTCCTATCCC,...|
|bin20000100_20000200|TAGATTCATTTAGAATA...|[TAGATTCATTTAGAA,...|
+--------------------+--------------------+--------------------+
only showing top 2 rows



In [None]:
ref_df_20.show(2)

+--------------------+--------------------+--------------------+
|             bin_pos|                 bin|           kmer_list|
+--------------------+--------------------+--------------------+
|bin20000000_20000100|CCCTTCTCCTATCCCTT...|[CCCTTCTCCTATCCCT...|
|bin20000100_20000200|TAGATTCATTTAGAATA...|[TAGATTCATTTAGAAT...|
+--------------------+--------------------+--------------------+
only showing top 2 rows



#### Read

In [None]:
def process_read(k, fasta_r):
  return fasta_r.data.zipWithIndex().map(lambda x: (x[1]//4,x[1]%4,x[0])).filter(lambda x:x[1] == 1 or x[1] == 0).map(lambda x: ("read"+ str(x[0]),x[2])).reduceByKey(lambda x,y:[x,y]).sortBy(lambda x: int(x[0][4:])).map(lambda x: (x[0],x[1][0],x[1][1], cut_kmer(k,x[1][1])))

In [None]:
res_read_15 =  process_read(15,fasta_read)
res_read_20 =  process_read(20,fasta_read)

In [None]:
res_read_15.take(1)

[('read0',
  '@ST-K00126:336:H5W53BBXX:19962722:20120330:50538:50948',
  'TCCTTACTGGTTTTGCAGGTAACTTATAGAGTATTTCCACTTCCCTTCTCCTATCCCTTGAAAAATTGTCATTTATTTCTCTTATCCATATGGCATAATC',
  ['TCCTTACTGGTTTTG',
   'CCTTACTGGTTTTGC',
   'CTTACTGGTTTTGCA',
   'TTACTGGTTTTGCAG',
   'TACTGGTTTTGCAGG',
   'ACTGGTTTTGCAGGT',
   'CTGGTTTTGCAGGTA',
   'TGGTTTTGCAGGTAA',
   'GGTTTTGCAGGTAAC',
   'GTTTTGCAGGTAACT',
   'TTTTGCAGGTAACTT',
   'TTTGCAGGTAACTTA',
   'TTGCAGGTAACTTAT',
   'TGCAGGTAACTTATA',
   'GCAGGTAACTTATAG',
   'CAGGTAACTTATAGA',
   'AGGTAACTTATAGAG',
   'GGTAACTTATAGAGT',
   'GTAACTTATAGAGTA',
   'TAACTTATAGAGTAT',
   'AACTTATAGAGTATT',
   'ACTTATAGAGTATTT',
   'CTTATAGAGTATTTC',
   'TTATAGAGTATTTCC',
   'TATAGAGTATTTCCA',
   'ATAGAGTATTTCCAC',
   'TAGAGTATTTCCACT',
   'AGAGTATTTCCACTT',
   'GAGTATTTCCACTTC',
   'AGTATTTCCACTTCC',
   'GTATTTCCACTTCCC',
   'TATTTCCACTTCCCT',
   'ATTTCCACTTCCCTT',
   'TTTCCACTTCCCTTC',
   'TTCCACTTCCCTTCT',
   'TCCACTTCCCTTCTC',
   'CCACTTCCCTTCTCC',
   'CACTT

In [None]:
spark = SparkSession.builder.appName('BigData').getOrCreate()
# 15
read_df_15 = spark.sparkContext.parallelize(res_read_15.collect()).toDF()
read_df_15 = read_df_15.withColumnRenamed("_1","read_num")\
   .withColumnRenamed("_2","read_name")\
   .withColumnRenamed("_3","read")\
   .withColumnRenamed("_4","kmer_list")
# 20
read_df_20 = spark.sparkContext.parallelize(res_read_20.collect()).toDF()
read_df_20 = read_df_20.withColumnRenamed("_1","read_num")\
   .withColumnRenamed("_2","read_name")\
   .withColumnRenamed("_3","read")\
   .withColumnRenamed("_4","kmer_list")

In [None]:
read_df_15.show(2)

+--------+--------------------+--------------------+--------------------+
|read_num|           read_name|                read|           kmer_list|
+--------+--------------------+--------------------+--------------------+
|   read0|@ST-K00126:336:H5...|TCCTTACTGGTTTTGCA...|[TCCTTACTGGTTTTG,...|
|   read1|@ST-K00126:1714:H...|GGTTTTTCAGGTAACTT...|[GGTTTTTCAGGTAAC,...|
+--------+--------------------+--------------------+--------------------+
only showing top 2 rows



In [None]:
read_df_20.show(2)

+--------+--------------------+--------------------+--------------------+
|read_num|           read_name|                read|           kmer_list|
+--------+--------------------+--------------------+--------------------+
|   read0|@ST-K00126:336:H5...|TCCTTACTGGTTTTGCA...|[TCCTTACTGGTTTTGC...|
|   read1|@ST-K00126:1714:H...|GGTTTTTCAGGTAACTT...|[GGTTTTTCAGGTAACT...|
+--------+--------------------+--------------------+--------------------+
only showing top 2 rows



## Build distinct kmer set

In [None]:
def get_distinct_kmer_set(read, ref):
  all_kmer_ref = ref.select('kmer_list').collect()
  all_kmer_read = read.select('kmer_list').collect()
  distinct_kmer = set()
  for i in range(read.count()):
    distinct_kmer = distinct_kmer.union(set(all_kmer_read[i][0]))
  for i in range(ref.count()):
    distinct_kmer = distinct_kmer.union(set(all_kmer_ref[i][0]))
  return distinct_kmer

In [None]:
all_kmer_sorted_15 = sorted(get_distinct_kmer_set(read_df_15, ref_df_15))
all_kmer_sorted_20 = sorted(get_distinct_kmer_set(read_df_20, ref_df_20))

In [None]:
print('The number of disticnt kmer with kmer size = 15: ' + str(len(all_kmer_sorted_15)))
print('The number of disticnt kmer with kmer size = 20: ' + str(len(all_kmer_sorted_20)))

The number of disticnt kmer with kmer size = 15: 72530
The number of disticnt kmer with kmer size = 20: 77751


## One-hot encoding

In [None]:
def k_mer_exist(i,k_mer, source):
    all_len = len(source)
    results = [0] * all_len
    for t in range(all_len):
      if k_mer in source[t]:
        results[t] = 1
    return [i,results]

In [None]:
one_hot_ref_15 = np.zeros((len(all_kmer_sorted_15),ref_df_15.count()))
def collect_result_ref_15(result):
    global one_hot_ref_15
    one_hot_ref_15[result[0],:] = result[1]

one_hot_read_15 = np.zeros((len(all_kmer_sorted_15),read_df_15.count()))
def collect_result_read_15(result):
    global one_hot_read_15
    one_hot_read_15[result[0],:] = result[1]
  
one_hot_ref_20 = np.zeros((len(all_kmer_sorted_20),ref_df_20.count()))
def collect_result_ref_20(result):
    global one_hot_ref_20
    one_hot_ref_20[result[0],:] = result[1]

one_hot_read_20 = np.zeros((len(all_kmer_sorted_20),read_df_20.count()))
def collect_result_read_20(result):
    global one_hot_read_20
    one_hot_read_20[result[0],:] = result[1]

In [None]:
import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())

In [None]:
all_read_15 = [i[0] for i in list(read_df_15.select('read').collect())]
all_ref_15 = [i[0] for i in list(ref_df_15.select('bin').collect())]
all_read_20 = [i[0] for i in list(read_df_20.select('read').collect())]
all_ref_20 = [i[0] for i in list(ref_df_20.select('bin').collect())]

In [None]:
# one-hot encoding for reference with kmer =15
for i, k_mer in enumerate(all_kmer_sorted_15):
  pool.apply_async(k_mer_exist, args=(i, k_mer, all_ref_15), callback=collect_result_ref_15)
pool.close()
pool.join() 

In [None]:
# one-hot encoding for read with kmer = 15
pool = mp.Pool(mp.cpu_count())
for i, k_mer in enumerate(all_kmer_sorted_15):
  pool.apply_async(k_mer_exist, args=(i, k_mer, all_read_15), callback=collect_result_read_15)
pool.close()
pool.join() 

In [None]:
# one-hot encoding for reference with kmer =20
pool = mp.Pool(mp.cpu_count())
for i, k_mer in enumerate(all_kmer_sorted_20):
  pool.apply_async(k_mer_exist, args=(i, k_mer, all_ref_20), callback=collect_result_ref_20)
pool.close()
pool.join() 

In [None]:
# one-hot encoding for read with kmer = 20
pool = mp.Pool(mp.cpu_count())
for i, k_mer in enumerate(all_kmer_sorted_20):
  pool.apply_async(k_mer_exist, args=(i, k_mer, all_read_20), callback=collect_result_read_20)
pool.close()
pool.join() 

## Convert one-hot encoding results into sparse vector formats

In [None]:
def convert_to_sparse_vector(kmer_list, one_hot_r):
  kmer_len = len(kmer_list)
  sparse = []
  for i in range(one_hot_r.shape[1]):
    target = one_hot_r[:,i]
    index = [t for t,value in enumerate(target) if value > 0]
    len_one = len(index)
    sparse.append((i, Vectors.sparse(kmer_len,index,[1.0]*len_one)))
  return sparse

In [None]:
ref_sparse_15 = convert_to_sparse_vector(all_kmer_sorted_15, one_hot_ref_15)
read_sparse_15 = convert_to_sparse_vector(all_kmer_sorted_15, one_hot_read_15)
ref_sparse_20 = convert_to_sparse_vector(all_kmer_sorted_20, one_hot_ref_20)
read_sparse_20 = convert_to_sparse_vector(all_kmer_sorted_20, one_hot_read_20)

## Store sparse vector result

In [None]:
with open(cur_path + '/ref_sparse_15', "wb") as fp:  
   pickle.dump(ref_sparse_15,fp)
with open(cur_path + '/read_sparse_15', "wb") as fp:  
   pickle.dump(read_sparse_15,fp)
with open(cur_path + '/ref_sparse_20', "wb") as fp:  
   pickle.dump(ref_sparse_20,fp)
with open(cur_path + '/read_sparse_20', "wb") as fp:  
   pickle.dump(read_sparse_20,fp)