In [1]:
import multiprocessing as mp
import numpy as np
import pod5 as p5
import pysam
from multiprocessing import Queue  # ,Manager,Pool
import time
import os
import signal
import logging
from tqdm import tqdm
from multiprocessing import Pool
import random


def handler(signum, frame):
    print("hello world!")


signal.signal(signal.SIGPIPE, signal.SIG_IGN)  # 忽略SIGPIPE信号
logger = logging.getLogger("test_logger")
logger.setLevel(logging.DEBUG)
test_log = logging.FileHandler(
    "/home/xiaoyf/methylation/deepsignal/log/process.log", "a", encoding="utf-8"
)
formatter = logging.Formatter(
    "%(asctime)s - %(filename)s - line:%(lineno)d - %(levelname)s - %(message)s -%(process)s"
)
test_log.setFormatter(formatter)
# 加载文件到logger对象中
logger.addHandler(test_log)

In [2]:
def extract_signal_from_pod5(pod5_path) -> np.array:
    signals = []
    with p5.Reader(pod5_path) as reader:
        for read_record in reader.reads():
            if read_record.signal is None:
                logger.critical(
                    "Signal is None for read id {}".format(read_record.read_id)
                )
            # signals[str(read_record.read_id)] = {'signal':read_record.signal,'shift':read_record.calibration.offset,'scale':read_record.calibration.scale}#不加str会变成UUID，很奇怪
            signals.append(
                [
                    str(read_record.read_id),
                    read_record.signal.astype(np.int8),
                    np.int8(read_record.calibration.offset),
                    np.float16(read_record.calibration.scale),
                ]
            )
            # 0:read_id,1:signal,2:shift,3:scale
    return np.array(signals, dtype=object)  # np.array is small than list


def extract_move_from_bam(bam_path) -> np.array:
    seq_move = []
    bamfile = pysam.AlignmentFile(bam_path, "rb", check_sq=False)
    try:
        for read in bamfile.fetch(until_eof=True):  # 暂时不使用索引，使用返回是空值
            # print(read.query_name)
            tags = dict(read.tags)
            mv_tag = tags["mv"]
            ts_tag = tags["ts"]
            sm_tag = tags["sm"]
            sd_tag = tags["sd"]
            # read.update({read.query_name:{"sequence":read.query_sequence,"stride":mv_tag[0],"mv_table":np.array(mv_tag[1:]),"num_trimmed":ts_tag,"shift":sm_tag,"scale":sd_tag}})
            seq_move.append(
                [
                    read.query_name,
                    read.query_sequence,
                    np.int8(mv_tag[0]),
                    np.array(mv_tag[1:], dtype=np.int8),
                    np.int8(ts_tag),
                    np.float16(sm_tag),
                    np.float16(sd_tag),
                ]
            )
    except ValueError:
        print("bam don't has index")
        for read in bamfile.fetch(until_eof=True, multiple_iterators=False):
            tags = dict(read.tags)
            mv_tag = tags["mv"]
            ts_tag = tags["ts"]
            sm_tag = tags["sm"]
            sd_tag = tags["sd"]
            seq_move.append(
                [
                    read.query_name,
                    read.query_sequence,
                    np.int8(mv_tag[0]),
                    np.array(mv_tag[1:], dtype=np.int8),
                    np.int8(ts_tag),
                    np.float16(sm_tag),
                    np.float16(sd_tag),
                ]
            )
            # 0:read_id,1:sequence,2:stride,3:mv_table,4:num_trimmed,5:to_norm_shift,6:to_norm_scale
            # read[read.query_name] = {"sequence":read.query_sequence,"stride":mv_tag[0],"mv_table":np.array(mv_tag[1:]),"num_trimmed":ts_tag,"shift":sm_tag,"scale":sd_tag}
    return np.array(seq_move, dtype=object)


def read_from_pod5_bam(pod5_path, bam_path, read_id=None) -> np.array:
    read = []
    signal = extract_signal_from_pod5(pod5_path)
    seq_move = extract_move_from_bam(bam_path)
    if read_id is not None:
        for i in range(len(seq_move)):
            if seq_move[i][0] == read_id:
                if seq_move[i][1] is not None:
                    for j in range(len(signal)):
                        if signal[j][0] == seq_move[i][0]:
                            read.append(
                                [
                                    signal[j][0],
                                    signal[j][1],
                                    signal[j][2],
                                    signal[j][3],
                                    seq_move[i][1],
                                    seq_move[i][2],
                                    seq_move[i][3],
                                    seq_move[i][4],
                                    seq_move[i][5],
                                    seq_move[i][6],
                                ]
                            )

    else:
        for i in range(len(seq_move)):
            if seq_move[i][1] is not None:
                for j in range(len(signal)):
                    if signal[j][0] == seq_move[i][0]:
                        read.append(
                            [
                                signal[j][0],
                                signal[j][1],
                                signal[j][2],
                                signal[j][3],
                                seq_move[i][1],
                                seq_move[i][2],
                                seq_move[i][3],
                                seq_move[i][4],
                                seq_move[i][5],
                                seq_move[i][6],
                            ]
                        )
                # 0:read_id,1:signal,2:to_pA_shift,3:to_pA_scale,4:sequence,5:stride,6:mv_table,7:num_trimmed,8:to_norm_shift,9:to_norm_scale

    return np.array(read, dtype=object)

In [31]:
bam_path = "/homeb/xiaoyf/data/HG002/example/bam/has_moves.bam"
bam = extract_move_from_bam(bam_path)

[E::idx_find_and_load] Could not retrieve index file for '/homeb/xiaoyf/data/HG002/example/bam/has_moves.bam'
  return np.array(seq_move)


In [None]:
for read in bam:
    # print(read[1])
    if read[5] - int(read[5]) > 0:
        print("shift is not int")

In [7]:
pod5_path = "/homeb/xiaoyf/data/HG002/example/pod5/output.pod5"
signal = extract_signal_from_pod5(pod5_path)

In [11]:
signal[0][1]

array([1230.,  945.,  993., ...,  687.,  696.,  846.], dtype=float16)

In [6]:
def extract_signal_from_pod5_raw(pod5_path):
    signals = []
    with p5.Reader(pod5_path) as reader:
        for read_record in reader.reads():
            # signals[str(read_record.read_id)] = {'signal':read_record.signal,'shift':read_record.calibration.offset,'scale':read_record.calibration.scale}#不加str会变成UUID，很奇怪
            signals.append(
                [
                    str(read_record.read_id),
                    read_record.signal,
                    read_record.calibration.offset,
                    read_record.calibration.scale,
                ]
            )
    return signals

In [6]:
import sys

print(sys.getsizeof(signal))

33920


In [13]:
signal_raw = extract_signal_from_pod5_raw(pod5_path)
# print(sys.getsizeof(signal_raw))

In [21]:
for read in signal_raw:
    # print(read[1])
    if read[2] - int(read[2]) > 0:
        print("shift is not int")

In [13]:
import sys
import numpy as np

my_list = [np.float16(1.0), np.float16(2.0), np.float16(3.0)]
print(sys.getsizeof(my_list))
my_array = np.array(my_list)
print(my_array.nbytes)

my_list = [1.0, 1.0, 1.0]
print(sys.getsizeof(my_list))
my_array = np.array(my_list)
print(my_array.nbytes)

80
6
80
24


In [4]:
bam_path = "/homeb/xiaoyf/data/HG002/example/bam/sorted_has_moves.bam"
bamfile = pysam.AlignmentFile(bam_path, "rb", check_sq=False)
bamfile.has_index()

True

In [14]:
for read in bamfile.get_index_statistics():
    # print(read)
    print(read)

In [18]:
for read in bamfile:
    print(read.query_name)

In [None]:
seq_and_move_table = extract_move_from_bam(bam_path)

In [3]:
pod5_path = "/homeb/xiaoyf/data/HG002/example/pod5/output.pod5"
bam_path = "/homeb/xiaoyf/data/HG002/example/bam/has_moves.bam"
read = read_from_pod5_bam(pod5_path, bam_path)

[E::idx_find_and_load] Could not retrieve index file for '/homeb/xiaoyf/data/HG002/example/bam/has_moves.bam'


In [12]:
read[0]

['3239b4d9-0a7e-471c-86fe-e156b9f279a0',
 array([1284, 1066, 1050, ..., 1008, 1023,  920], dtype=int16),
 -243.0,
 0.1462070643901825,
 'ATGTATATGTAACCTACTTGGTTCAGTTACGTACTGCTGAATAAGTCTCACAATATTGATGGCTTTAAAGAGAGAAGTCCCCGGCCGGGCTGGTGGCTCACGCCTGTAATCCCAGCACTTTGGGAGGCCAAGGCAGGAGGATCACCTGAGGTCAAGAGTTCGAGACCAGCCTGGCCAACATGGTGAAACCTCCTCTCTACTAAAAATACAAAAATTAGCCAGGCATGGTGGCAGGTGCCTGTAATCCCAGCTACTTGGGAGGCTGAGGCAGGAGAATTGCTTGAACCTGGGAGGCAGAGATTGCAGTGAGCTGAGATCCCGCCACTGCAGTCCAGCCTGGGGGACAAGCAATACGTT',
 5,
 array([1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0,
        1, 1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1, 1,
        1, 0, 1, 0, 1, 1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 0, 1, 0, 0, 0, 0,
        0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1, 1, 0,
        0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 0, 1, 1, 0,
        1, 1, 0, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1,
        0, 1, 1, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1,

In [38]:
# feature=caculate_feature_one_one_for_each_base(read,21)
# feature['0000b1ad-fdaf-49e6-bc11-cbe93270e3a3'][0].keys()

In [None]:
read_q = Queue()
_prepare_read(read_q, read, batch_size=1000)
read_batch = read_q.get()
for read_id in read_batch:
    print(read_id)

In [5]:
# caculate_bar=tqdm(total = read_number, desc='extract_feature', position=0)
# write_feature_bar=tqdm(total = read_number, desc='write_feature', position=1)

extract_feature:   0%|          | 0/4000 [00:00<?, ?it/s]

In [4]:
iupac_alphabets = {
    "A": ["A"],
    "T": ["T"],
    "C": ["C"],
    "G": ["G"],
    "R": ["A", "G"],
    "M": ["A", "C"],
    "S": ["C", "G"],
    "Y": ["C", "T"],
    "K": ["G", "T"],
    "W": ["A", "T"],
    "B": ["C", "G", "T"],
    "D": ["A", "G", "T"],
    "H": ["A", "C", "T"],
    "V": ["A", "C", "G"],
    "N": ["A", "C", "G", "T"],
}
iupac_alphabets_rna = {
    "A": ["A"],
    "C": ["C"],
    "G": ["G"],
    "U": ["U"],
    "R": ["A", "G"],
    "M": ["A", "C"],
    "S": ["C", "G"],
    "Y": ["C", "U"],
    "K": ["G", "U"],
    "W": ["A", "U"],
    "B": ["C", "G", "U"],
    "D": ["A", "G", "U"],
    "H": ["A", "C", "U"],
    "V": ["A", "C", "G"],
    "N": ["A", "C", "G", "U"],
}


def get_refloc_of_methysite_in_motif(seqstr, motifset, methyloc_in_motif=0) -> list:
    """

    :param seqstr:
    :param motifset:
    :param methyloc_in_motif: 0-based
    :return:
    """
    motifset = set(motifset)
    strlen = len(seqstr)
    motiflen = len(list(motifset)[0])
    sites = []
    for i in range(0, strlen - motiflen + 1):
        if seqstr[i : i + motiflen] in motifset:
            sites.append(i + methyloc_in_motif)
    return sites


def _convert_motif_seq(ori_seq, is_dna=True):
    outbases = []
    for bbase in ori_seq:
        if is_dna:
            outbases.append(iupac_alphabets[bbase])
        else:
            outbases.append(iupac_alphabets_rna[bbase])

    def recursive_permute(bases_list):
        if len(bases_list) == 1:
            return bases_list[0]
        elif len(bases_list) == 2:
            pseqs = []
            for fbase in bases_list[0]:
                for sbase in bases_list[1]:
                    pseqs.append(fbase + sbase)
            return pseqs
        else:
            pseqs = recursive_permute(bases_list[1:])
            pseq_list = [bases_list[0], pseqs]
            return recursive_permute(pseq_list)

    return recursive_permute(outbases)


def get_motif_seqs(motifs, is_dna=True):
    ori_motif_seqs = motifs.strip().split(",")

    motif_seqs = []
    for ori_motif in ori_motif_seqs:
        motif_seqs += _convert_motif_seq(ori_motif.strip().upper(), is_dna)
    return motif_seqs

In [5]:
def expand(feature, index, nbase, nsig, num, fill_num=1):
    nbase.append(np.tile(np.array(feature[index][5], dtype=str), num * fill_num))
    # nstd.append(np.tile(feature[index][2],num*fill_num))
    # nmean.append(np.tile(feature[index][3],num*fill_num))
    try:
        nsig.append(
            np.tile(
                np.random.choice(feature[index][1], size=num, replace=False), fill_num
            )
        )
        # np.array取随机不能用random包，要用numpy自带的random
    except Exception as e:
        logger.critical(feature[index][1])
    # return nbase,nsig


# 0:read_id,1:signal,2:std,3:mean,4:num,5:base
def _get_neighbord_feature(sequence, feature, base_num) -> list:
    # 数据预处理主要速度瓶颈，同样的reads数，不运行这个函数大概快了十倍，从二十多分钟减到两分钟
    motif = "CG"
    max_sites = 15
    motif_seqs = get_motif_seqs(motif)
    tsite_locs = get_refloc_of_methysite_in_motif(sequence, set(motif_seqs))
    if len(tsite_locs) > max_sites:
        tsite_locs = np.random.choice(
            tsite_locs,
            size=max_sites,
            replace=False,
        )

    nfeature = []
    windows_size = (base_num - 1) // 2
    signal_sample = 5
    for i in range(len(feature)):
        nbase = []
        # nstd=[]
        # nmean=[]
        nsig = []
        if i not in tsite_locs:
            continue
        # 更改扩增逻辑，增添采样函数
        # remora一条read好像只提取15个点
        # if feature[i][4]>base_num:
        #    logger.info("base correspoding signal number {} is more than window size {}".format(feature[i][4],base_num))

        if i < windows_size:
            flag = windows_size - i
            expand(feature, i, nbase, nsig, signal_sample, windows_size - i)
            if i != 0:
                for k in range(i):  # 左闭右开
                    expand(feature, k, nbase, nsig, signal_sample)
                    flag += 1
            for k in range(i, i + windows_size + 1):
                expand(feature, k, nbase, nsig, signal_sample)
                flag += 1
            logger.debug(
                "focu base on the far left of read and expand number is {}".format(flag)
            )
        elif (len(feature) - 1) - i < windows_size:
            flag = 0
            for k in range(i - windows_size, i + 1):
                flag += 1
                expand(feature, k, nbase, nsig, signal_sample)
            if i != len(feature) - 1:
                for k in range(i, len(feature) - 1):
                    flag += 1
                    expand(feature, k, nbase, nsig, signal_sample)
            flag += windows_size - ((len(feature) - 1) - i)
            expand(
                feature,
                i,
                nbase,
                nsig,
                signal_sample,
                windows_size - ((len(feature) - 1) - i),
            )
            logger.debug(
                "focu base on the far right of read and expand number is {}".format(
                    flag
                )
            )
        else:
            for k in range(i - windows_size, i):
                expand(feature, k, nbase, nsig, signal_sample)
            for k in range(i, i + windows_size):
                expand(feature, k, nbase, nsig, signal_sample)
        # feature[read_id][i].update({'nbase':nbase,'nsig':nsig,'nstd':nstd,'nmean':nmean})
        nfeature.append([feature[i][0], nbase, nsig])

        # 0:read_id,1:nbase,2:nsig,3:nstd,4:nmean
        # logger.debug('feature id: {}, feature:{}'.format(str(feature[0]),(str(nbase),str(nsig),str(nstd),str(nmean))))
    return nfeature


# 0:read_id,1:signal,2:to_pA_shift,3:to_pA_scale,4:sequence,5:stride,6:mv_table,7:num_trimmed,8:to_norm_shift,9:to_norm_scale
def norm_signal_read_id(signal) -> np.array:
    shift_scale_norm = []
    # signal_norm=[]
    if signal[3] == 0:
        logger.critical("to_pA_scale of read {} is 0").format(signal[0])
    shift_scale_norm = [
        (signal[8] / signal[3]) - np.float16(signal[2]),
        (signal[9] / signal[3]),
    ]
    # 0:shift,1:scale
    num_trimmed = signal[7]
    # print('num_trimmed:{} and signal:{}'.format(num_trimmed,signal[1]))
    # print('shift:{} and scale:{}'.format(shift_scale_norm[0],shift_scale_norm[1]))
    if shift_scale_norm[1] == 0:
        logger.critical("scale of read {} is 0").format(signal[0])
    if num_trimmed >= 0:
        signal_norm = (
            signal[1][num_trimmed:].astype(np.float16) - shift_scale_norm[0]
        ) / shift_scale_norm[1]
    else:
        signal_norm = (
            signal[1][:num_trimmed].astype(np.float16) - shift_scale_norm[0]
        ) / shift_scale_norm[1]

    return signal_norm


def caculate_batch_feature_for_each_base(
    lock, read_q, feature_q, base_num=0, write_batch=10
):
    # print("extrac_features process-{} starts".format(os.getpid()))
    logger.info("extrac_features process-{} starts".format(os.getpid()))
    read_num = 0

    while True:
        if read_q.empty():
            time.sleep(10)
            continue
        lock.acquire()
        read_batch = read_q.get()
        lock.release()
        if read_batch == "kill":
            read_q.put("kill")
            # time.sleep(10)
            break
        read_num += len(read_batch)
        # flag=0
        # if len(read_batch)>1:
        #    flag=1
        #    pos=bar_q.get()
        #    caculate_bar = tqdm(total = len(read_batch), desc='extract_feature', position=pos)
        #    bar_q.put(pos+1)
        # else:
        #    flag=0
        logger.info("read batch size: {}".format(len(read_batch)))
        nfeature = []
        for read_one in read_batch:
            feature = []
            #    if flag == 1:
            #        caculate_bar.update()
            # print(read_one)
            sequence = read_one[4]  # 这个转成np.array内存占用大很多
            stride = read_one[5]
            movetable = np.array(read_one[6])
            # num_trimmed = read[read_id]['num_trimmed']
            trimed_signals = norm_signal_read_id(read_one)  # 筛掉背景信号,norm
            if trimed_signals.size == 0:
                logger.critical("norm has error, raw data is {}".format(read_one))
                continue
            move_pos = np.append(np.argwhere(movetable == 1).flatten(), len(movetable))
            # print(len(move_pos))

            for move_idx in range(len(move_pos) - 1):
                start, end = move_pos[move_idx], move_pos[move_idx + 1]
                signal = trimed_signals[(start * stride) : (end * stride)]  # .tolist()
                if signal.size == 0:
                    logger.critical(
                        "signal is empty, it's crazy, read id is {} and base index is".format(
                            read_one[0], move_idx
                        )
                    )
                    continue
                if True in np.isnan(signal):
                    logger.critical("signal has nan for read_id:{}".format(read_one[0]))

                try:
                    mean = np.mean(signal)
                    if np.amax(signal) < mean:
                        logger.critical(
                            "ValueERROR: mean greater than max for read_id:{}".format(
                                read_one[0]
                            )
                        )
                except Exception as e:
                    logger.critical(signal)
                std = np.std(signal.astype(np.float32))  # np.float16会溢出
                num = end - start

                feature.append(
                    [
                        read_one[0],
                        signal,
                        np.float16(std),
                        np.float16(mean),
                        np.int8(num * stride),
                        sequence[move_idx],
                    ]
                )
                # 0:read_id,1:signal,2:std,3:mean,4:num,5:base
                # feature[read_id].append({'signal':signal,'std':str(std),'mean':str(mean),'num':int(num*stride),'base':sequence[move_idx]})
            if base_num != 0:
                nfeature.append(_get_neighbord_feature(sequence, feature, base_num))
                logger.info(
                    "extract neigbor features for read_id:{}".format(read_one[0])
                )
                if len(nfeature) == write_batch:
                    lock.acquire()
                    feature_q.put(nfeature)
                    lock.release()
                    nfeature = []
                    while feature_q.qsize() > 50:
                        time.sleep(2)
                        if feature_q.full():
                            logger.error("queue full")

            # feature_q.put(feature)
        if len(nfeature) != 0:
            lock.acquire()
            feature_q.put(nfeature)
            lock.release()
            nfeature = []
        # feature_q.append(feature)

        # print("extrac_features process-{} ending, proceed {} read batch".format(os.getpid(), read_num))
    logger.info(
        "extrac_features process-{} ending, proceed {} read".format(
            os.getpid(), read_num
        )
    )
    # if caculate_bar is not None:
    #    caculate_bar.close()
    # pbar.close()


def _prepare_read(read_q, read, batch_size=1000):
    i = 0
    # j=0
    read_batch = []
    for read_one in read:
        read_batch.append(read_one)
        i = i + 1
        # j=j+1
        # if j==40:
        #    break
        if i == batch_size:
            i = 0
            if read_q.full():
                logger.critical("read_q is full")
            read_q.put(read_batch)
            read_batch = []
    read_q.put(np.array(read_batch, dtype=object))
    # print('total batch number is {}'.format((len(read)-1)//batch_size+1))
    logger.info("total batch number is {}".format((len(read) - 1) // batch_size + 1))
    # return len(read)


def write_feature(read_number, file, feature_q):
    # print("write_process-{} starts".format(os.getpid()))
    logger.info("write_process-{} starts".format(os.getpid()))
    dataset = []
    # pos=bar_q.get()
    write_feature_bar = tqdm(
        total=read_number,
        desc="extract feature",
        position=0,
        colour="green",
        unit=" read",
    )
    # bar_q.put(pos+1)
    try:
        with open("/home/xiaoyf/methylation/deepsignal/log/feature.txt", "w") as f:
            while True:
                if feature_q.empty():
                    time.sleep(10)
                    continue
                write_batch = feature_q.get()
                if write_batch == "kill":
                    logger.info("write_process-{} finished".format(os.getpid()))
                    # time.sleep(10)
                    np_data = np.array(dataset)
                    np.save("/home/xiaoyf/methylation/deepsignal/log/data.npy", np_data)
                    # 包含neigbor feature的40条reads保存成npy需要27.87GB，这个开销是无法忍受的
                    # print('write_process-{} finished'.format(os.getpid()))
                    break

                # logger.debug('feature id: {}'.format(str(features[0][0])))
                for read in write_batch:
                    write_feature_bar.update()
                    logger.info(
                        "write process get neigbor features number:{}".format(len(read))
                    )
                    for feature in read:
                        # 0:read_id,1:nbase,2:nsig,3:nstd,4:nmean
                        # #f.write(read_id+'\t')

                        dataset.append(feature)
                        f.write(
                            str(feature[0])
                            + "\t"
                            + str(feature[1])
                            + "\t"
                            + str(feature[2])
                            + "\n"
                        )

                f.flush()
    except Exception as e:
        logger.critical(
            "error in writing features, this always happend because memory not enough"
        )
        print(e)
    finally:
        write_feature_bar.close()


def bar_listener(p_bar, desc="", position=1, number=4000):
    bar = tqdm(total=number, desc=desc, position=position)
    for item in iter(p_bar.get, None):
        bar.update(item)


def extract_feature(read, nproc=4, batch_size=20):
    start = time.time()
    feature_q = Queue()
    read_q = Queue()
    # bar=Queue()
    # bar.put(0)
    # caculate_batch_feature_pbar = Manager().Queue()
    # write_pbar = Manager().Queue()
    _prepare_read(read_q, read, batch_size)
    read_number = len(read)
    feature_procs = []
    read_q.put("kill")
    manager = mp.Manager()
    lock = manager.Lock()  # 初始化一把锁

    # extract_feature_bar = mp.Process(target=bar_listener, args=(caculate_batch_feature_pbar, "extract_features", 1,))
    # extract_feature_bar.daemon = True
    # extract_feature_bar.start()

    for _ in range(nproc):
        p = mp.Process(
            target=caculate_batch_feature_for_each_base,
            args=(
                lock,
                read_q,
                feature_q,
                21,
            ),
        )
        p.daemon = True
        p.start()
        feature_procs.append(p)

    write_filename = "/home/xiaoyf/methylation/deepsignal/log/data.npy"

    # write_feature_bar = mp.Process(target=bar_listener, args=(write_pbar, "write_features", 2,))
    # write_feature_bar.daemon = True
    # write_feature_bar.start()
    # tqdm(total = 4000, desc="write_features", position=1)

    p_w = mp.Process(
        target=write_feature,
        args=(
            read_number,
            write_filename,
            feature_q,
        ),
    )
    p_w.daemon = True
    p_w.start()
    # with tqdm(total = read_number, desc='extract_feature', position=0) as pbar:
    for p in feature_procs:
        p.join()

    # caculate_bar.close()
    # while True:
    #    flag=0
    #    for p in feature_procs:
    #        if not p.is_alive():
    #            flag+=1
    #    if flag==0:
    #        break
    #    if flag!=0 and not p_w.is_alive():
    #        logger.error("p_w terminate error")
    #        p_w.join()
    #        p_w.start()
    while True:
        flag = 0
        for p in feature_procs:
            if p.is_alive():
                flag += 1
        if flag == 0:
            break
    feature_q.put("kill")
    p_w.join()
    # write_feature_bar.close()

    # extract_feature_bar.join()
    # write_feature_bar.join()
    # print("[main]extract_features costs %.1f seconds.." %(time.time() - start))
    logger.info("[main]extract_features costs %.1f seconds.." % (time.time() - start))

In [6]:
if __name__ == "__main__":
    extract_feature(read, 2, 200)

extract feature:   8%|[32m▊         [0m| 320/4000 [04:04<08:12,  7.47 read/s]  

In [13]:
neg = 0
for r in read:
    if r[7] < 0:
        neg += 1
    if r[0] == "73e5c496-c9c6-42e4-8150-9cdb5ad83c52":
        print(r[1])
        print(len(r[1]))
        print(len(r[4]))
        print(r[7])
        focus = r
norm_signal = norm_signal_read_id(focus)
print(norm_signal)
print(len(norm_signal))
print(neg)

[ -86  108   96 ... -126  118   36]
76421
10111
-6
[-4.133 -3.11  -3.172 ... -4.33  -3.008 -3.06 ]
76415
16


In [50]:
# 0:read_id,1:signal,2:to_pA_shift,3:to_pA_scale,4:sequence,5:stride,6:mv_table,7:num_trimmed,8:to_norm_shift,9:to_norm_scale
read_q = Queue()
_prepare_read(read_q, read, batch_size=1000)

In [9]:
[1, 2, 3, 4] * 2

[1, 2, 3, 4, 1, 2, 3, 4]

In [61]:
import sys

sys.getsizeof(np.array(["a", "b", "s", "d"]))

128

In [20]:
print(np.array([1, 2, 3, 4]))

[1 2 3 4]


In [62]:
sys.getsizeof("absd")

53

In [60]:
"asvde"[1]

's'

In [52]:
read_batch = read_q.get()

In [53]:
i = 0
for read_i in read_batch:
    if i == 1:
        print(read_i)
    i += 1

['910762d9-7962-43db-bbea-556bddc10e69'
 array([ 59, -22, -27, ...,  94, 104, -11], dtype=int8) 19 0.1462
 'ATGTGTTACTTCGTTCAGTTACGTATTGCTTTTTTAATTTATTTAGGGGGAATGATGGTTGTCTTTGGATATACTACAGCGATGGCTATTGAGGAGTATCCTGAGGCATGGGGGTCAGGGGTTGAGGTCTTGGTGAGTGTTTTAGTGGGGTTAGCGATGGAGGTAGGATTGGTGCTGTGGGTGAAAGAGTATGATGGGGTGGTGGTTGTGGTAAACTTTAATAGTGTAGGAAGCTGAATAAACTTATGAAGGAGGGGTCAGGGTTGATTCGGGAGGATCCTATTGGTGCGGGGGCTTTGTATGATTATGGGCGTTGATTAGTAGTAGTTACTGGTTGAACATTGTTTGTTGGTGTATATATTGTAATTGAGATTGCTGGGGGAATAGGTTATGTGATTAGGAGTAGGGTTAGGATGAGTGGGAAGAAGAAAGAGAGGAAGTAAAGTTTAATTATGCCTTTTTGGGTTGAGGTGATGATGGAGGTGGAGATTTGGTGCTGTGAAATTGTTTTAGGTAATAGCTTTTCTAGTCAGGTTAGGTCTAGGGGGAGTAGGGGCAGGTTTTGGCTCGTAAGAAGGCCTGGGTGGGATTGTGCGGTGTGTGATGCTGGGTAGAATGCGAGTATGTTGGAGAAATAAAATGTGCATAGTGGGGATTTTATTTTAAGTTTGTTGGTTAGGTAGTTGAGGTCTAGGGCTGTTAGAAGTCCTAGGAAAGTGACAGCGAGGGCTGTGAGTTTTAGGTAGAGATGTTGGAAGGGGGATGCGGGGGAAATGTTGTTAGTAATGAGAAATCCTAGGAATACGGCTTTTCCGGCTGCCAGGCGTTTAATGGGGTTTAGTAGGGTGGGGTTATTTTCGTTAATGTTAGTAAGGGTGGGGAAGCGAGGTTG

In [9]:
def extract_feature(nproc=4):
    start = time.time()
    feature_q = Queue()
    read_q = Queue()
    _prepare_read(read_q, read, batch_size=20)
    feature_procs = []
    read_q.put("kill")
    caculate_batch_feature_pbar = Queue()
    extract_feature_bar = mp.Process(
        target=bar_listener,
        args=(
            caculate_batch_feature_pbar,
            "extract_features",
            1,
        ),
    )
    extract_feature_bar.daemon = True
    extract_feature_bar.start()
    for _ in range(nproc):
        p = mp.Process(
            target=caculate_batch_feature_for_each_base,
            args=(
                caculate_batch_feature_pbar,
                read_q,
                feature_q,
                21,
            ),
        )
        p.daemon = True
        p.start()
        feature_procs.append(p)

    write_filename = "/home/xiaoyf/methylation/deepsignal/log/feature.txt"
    write_pbar = Queue()
    write_feature_bar = mp.Process(
        target=bar_listener,
        args=(
            write_pbar,
            "write_features",
            2,
        ),
    )
    write_feature_bar.daemon = True
    write_feature_bar.start()
    # tqdm(total = 4000, desc="write_features", position=1)
    p_w = mp.Process(
        target=write_feature,
        args=(
            write_pbar,
            write_filename,
            feature_q,
        ),
    )
    p_w.daemon = False
    p_w.start()

    for p in feature_procs:
        p.join()

    feature_q.put("kill")
    p_w.join()
    write_pbar.put(None)
    caculate_batch_feature_pbar.put(None)
    extract_feature_bar.join()
    write_feature_bar.join()
    # print("[main]extract_features costs %.1f seconds.." %(time.time() - start))
    logger.info("[main]extract_features costs %.1f seconds.." % (time.time() - start))


def write_feature(file, feature_q):
    # print("write_process-{} starts".format(os.getpid()))
    logger.info("write_process-{} starts".format(os.getpid()))
    dataset = []
    try:
        with open(file, "w") as f:
            while True:
                if feature_q.empty():
                    time.sleep(10)
                    continue
                features = feature_q.get()
                if features == "kill":
                    logger.info("write_process-{} finished".format(os.getpid()))
                    np_data = np.array(dataset)
                    np.save("/home/xiaoyf/methylation/deepsignal/log/data.npy", np_data)
                    # 包含neigbor feature的40条reads保存成npy需要27.87GB，这个开销是无法忍受的
                    # print('write_process-{} finished'.format(os.getpid()))
                    break
                logger.info(
                    "write process get neigbor features number:{}".format(len(features))
                )
                # logger.debug('feature id: {}'.format(str(features[0][0])))
                for feature in features:
                    # 0:read_id,1:nbase,2:nsig,3:nstd,4:nmean
                    # f.write(read_id+'\t')
                    write_feature_bar.update()
                    dataset.append(feature[1:])
                    # f.write(str(feature[1])+'\t'+str(feature[2])+
                    #            '\t'+str(feature[3])+'\t'+str(feature[4])+'\n')

                f.flush()
    except Exception as e:
        logger.error("error in writing features")
        print(e)
    # write_pbar.close()

In [3]:
import multiprocessing as mp
from tqdm import tqdm
import time


def listener(q):
    pbar = tqdm(total=1000)
    while True:
        if not q.empty():
            k = q.get()
            if k == 1:
                pbar.update(1)
            else:
                break
    pbar.close()


def solve(q):
    for i in range(100):
        time.sleep(1)
        q.put(1)


if __name__ == "__main__":
    manage = mp.Manager()
    q = manage.Queue()
    p = mp.Process(target=listener, args=(q,))
    p.start()
    processList = []
    for i in range(10):
        t = mp.Process(target=solve, args=(q,))
        processList.append(t)
        t.start()
    for t in processList:
        t.join()
    q.put(-1)
    p.join()

  0%|          | 0/1000 [00:00<?, ?it/s]

100%|██████████| 1000/1000 [01:40<00:00,  9.98it/s]


In [35]:
data = np.load("/home/xiaoyf/methylation/deepsignal/log/data.npy", allow_pickle=True)

#0:read_id,1:nbase,2:nsig,3:nstd,4:nmean

In [17]:
"-0.111112" * 2

'-0.111112-0.111112'

In [32]:
[-0.111112] * 2

[-0.111112, -0.111112]

In [36]:
data[7]

array(['732722e8-65c8-4ad1-b4d5-f9ec85e830d3',
       list([['A', 'A', 'A', 'A', 'A'], ['G', 'G', 'G', 'G', 'G'], ['C', 'C', 'C', 'C', 'C'], ['A', 'A', 'A', 'A', 'A'], ['G', 'G', 'G', 'G', 'G'], ['A', 'A', 'A', 'A', 'A'], ['G', 'G', 'G', 'G', 'G'], ['A', 'A', 'A', 'A', 'A'], ['G', 'G', 'G', 'G', 'G'], ['A', 'A', 'A', 'A', 'A'], ['C', 'C', 'C', 'C', 'C'], ['G', 'G', 'G', 'G', 'G'], ['G', 'G', 'G', 'G', 'G'], ['G', 'G', 'G', 'G', 'G'], ['G', 'G', 'G', 'G', 'G'], ['T', 'T', 'T', 'T', 'T'], ['T', 'T', 'T', 'T', 'T'], ['T', 'T', 'T', 'T', 'T'], ['G', 'G', 'G', 'G', 'G'], ['G', 'G', 'G', 'G', 'G']]),
       list([[-0.90380859375, -0.98291015625, -0.75, -0.931640625, -1.048828125], [-0.86181640625, -0.9130859375, -0.98779296875, -0.94140625, -0.98779296875], [-1.0205078125, -1.052734375, 0.228271484375, -0.69873046875, -0.92724609375], [-0.4658203125, -0.80126953125, -0.796875, -0.89453125, -0.810546875], [-1.11328125, -1.169921875, -1.1279296875, -1.1923828125, -0.9921875], [-1.0673828125, -

In [7]:
data[3]

array(['3239b4d9-0a7e-471c-86fe-e156b9f279a0',
       list([0.8430438164710445, 0.8067593431803018, 0.7963923508115183, 0.7704748698895593, 0.8274933279178691]),
       '0.025095831333870715', '0.8088327416540585', 5, 'T'], dtype=object)

In [9]:
from itertools import product

all_kmer_levels = dict(("".join(bs), []) for bs in product("ACGT", repeat=4))

In [None]:
base2code_dna = {
    "A": 0,
    "C": 1,
    "G": 2,
    "T": 3,
    "N": 4,
    "W": 4,
    "S": 4,
    "M": 4,
    "K": 4,
    "R": 4,
    "Y": 4,
    "B": 4,
    "V": 4,
    "D": 4,
    "H": 4,
    "Z": 4,
}  # set 4 for all bases except ACGT, for now


def norm_signal_read_id_dict(signal):
    shift_scale_norm = {}
    signal_norm = {}
    shift_scale_norm = {}
    shift_scale_norm["shift"] = (
        signal["to_norm_shift"] / signal["to_pA_scale"]
    ) - signal["to_pA_shift"]
    shift_scale_norm["scale"] = signal["to_norm_scale"] / signal["to_pA_scale"]
    num_trimmed = signal["num_trimmed"]
    signal_norm = (
        signal["signal"][num_trimmed:] - shift_scale_norm["shift"]
    ) / shift_scale_norm["scale"]
    return signal_norm


def read_from_pod5_bam_dict(pod5_path, bam_path, read_id=None):
    read = {}
    signal = extract_signal_from_pod5(pod5_path)
    seq_move = extract_move_from_bam(bam_path)
    if read_id is not None:
        if seq_move[read_id]["sequence"] is not None:
            if signal[read_id] is not None:
                read[read_id] = {
                    "sequence": seq_move[read_id]["sequence"],
                    "signal": signal[read_id]["signal"],
                    "mv_table": seq_move[read_id]["mv_table"],
                    "num_trimmed": seq_move[read_id]["num_trimmed"],
                    "to_norm_shift": seq_move[read_id]["shift"],
                    "to_norm_scale": seq_move[read_id]["scale"],
                    "stride": seq_move[read_id]["stride"],
                    "to_pA_shift": signal[read_id]["shift"],
                    "to_pA_scale": signal[read_id]["scale"],
                }
    else:
        for read_id in seq_move.keys():
            if seq_move[read_id]["sequence"] is not None:
                if signal[read_id] is not None:
                    read[read_id] = {
                        "sequence": seq_move[read_id]["sequence"],
                        "signal": signal[read_id]["signal"],
                        "mv_table": seq_move[read_id]["mv_table"],
                        "num_trimmed": seq_move[read_id]["num_trimmed"],
                        "to_norm_shift": seq_move[read_id]["shift"],
                        "to_norm_scale": seq_move[read_id]["scale"],
                        "stride": seq_move[read_id]["stride"],
                        "to_pA_shift": signal[read_id]["shift"],
                        "to_pA_scale": signal[read_id]["scale"],
                    }
    return read

In [None]:
def _get_neighbord_feature_dict(feature, base_num):
    windows_size = base_num - 1 // 2
    for read_id in feature.keys():
        for i in range(len(feature[read_id])):
            nbase = []
            nstd = []
            nmean = []
            nsig = []
            if i < windows_size:
                if i != 0:
                    for k in range(i):
                        nbase = (
                            nbase
                            + list(feature[read_id][k]["base"])
                            * feature[read_id][k]["num"]
                        )
                        nstd = (
                            nstd
                            + list(feature[read_id][k]["std"])
                            * feature[read_id][k]["num"]
                        )
                        nmean = (
                            nmean
                            + list(feature[read_id][k]["mean"])
                            * feature[read_id][k]["num"]
                        )
                        nsig = nsig + feature[read_id][k]["signal"]
                nbase = (
                    nbase
                    + list(feature[read_id][i]["base"])
                    * (windows_size - i)
                    * feature[read_id][i]["num"]
                )
                nbase = (
                    nbase
                    + list(feature[read_id][i]["base"]) * feature[read_id][i]["num"]
                )
                nstd = (
                    nstd
                    + list(feature[read_id][i]["std"])
                    * (windows_size - i)
                    * feature[read_id][i]["num"]
                )
                nstd = (
                    nstd + list(feature[read_id][i]["std"]) * feature[read_id][i]["num"]
                )
                nmean = (
                    nmean
                    + list(feature[read_id][i]["mean"])
                    * (windows_size - i)
                    * feature[read_id][i]["num"]
                )
                nmean = (
                    nmean
                    + list(feature[read_id][i]["mean"]) * feature[read_id][i]["num"]
                )
                for k in range(i, i + windows_size):
                    nbase = (
                        nbase
                        + list(feature[read_id][k]["base"]) * feature[read_id][k]["num"]
                    )
                    nstd = (
                        nbase
                        + list(feature[read_id][k]["std"]) * feature[read_id][k]["num"]
                    )
                    nmean = (
                        nbase
                        + list(feature[read_id][k]["mean"]) * feature[read_id][k]["num"]
                    )
                nsig = nsig + feature[read_id][i]["signal"] * (windows_size - i)
                nsig = nsig + feature[read_id][i]["signal"]
                for k in range(i, i + windows_size):
                    nsig = nsig + feature[read_id][k]["signal"]
            elif (len(feature[read_id]) - 1) - i < windows_size:
                for k in range(i - windows_size, i):
                    nbase = (
                        nbase
                        + list(feature[read_id][k]["base"]) * feature[read_id][k]["num"]
                    )
                    nstd = (
                        nstd
                        + list(feature[read_id][k]["std"]) * feature[read_id][k]["num"]
                    )
                    nmean = (
                        nmean
                        + list(feature[read_id][k]["mean"]) * feature[read_id][k]["num"]
                    )
                nbase = (
                    nbase
                    + list(feature[read_id][i]["base"]) * feature[read_id][i]["num"]
                )
                nstd = (
                    nstd + list(feature[read_id][i]["std"]) * feature[read_id][i]["num"]
                )
                nmean = (
                    nmean
                    + list(feature[read_id][i]["mean"]) * feature[read_id][i]["num"]
                )
                for k in range(i - windows_size, i):
                    nsig = nsig + feature[read_id][k]["signal"]
                nsig = nsig + feature[read_id][i]["signal"]
                if i != len(feature[read_id]) - 1:
                    for k in range(i, len(feature[read_id]) - 1):
                        nbase = (
                            nbase
                            + list(feature[read_id][k]["base"])
                            * feature[read_id][k]["num"]
                        )
                        nstd = (
                            nstd
                            + list(feature[read_id][k]["std"])
                            * feature[read_id][k]["num"]
                        )
                        nmean = (
                            nmean
                            + list(feature[read_id][k]["mean"])
                            * feature[read_id][k]["num"]
                        )
                        nsig = nsig + feature[read_id][k]["signal"]
                nbase = (
                    nbase
                    + list(feature[read_id][i]["base"])
                    * (windows_size - ((len(feature[read_id]) - 1) - i))
                    * feature[read_id][i]["num"]
                )
                nstd = (
                    nstd
                    + list(feature[read_id][i]["std"])
                    * (windows_size - ((len(feature[read_id]) - 1) - i))
                    * feature[read_id][i]["num"]
                )
                nmean = (
                    nmean
                    + list(feature[read_id][i]["mean"])
                    * (windows_size - ((len(feature[read_id]) - 1) - i))
                    * feature[read_id][i]["num"]
                )
                nsig = nsig + feature[read_id][i]["signal"] * (
                    windows_size - ((len(feature[read_id]) - 1) - i)
                )
            else:
                for k in range(i - windows_size, i):
                    nbase = (
                        nbase
                        + list(feature[read_id][k]["base"]) * feature[read_id][k]["num"]
                    )
                    nstd = (
                        nstd
                        + list(feature[read_id][k]["std"]) * feature[read_id][k]["num"]
                    )
                    nmean = (
                        nmean
                        + list(feature[read_id][k]["mean"]) * feature[read_id][k]["num"]
                    )
                    nsig = nsig + feature[read_id][k]["signal"]
                nbase = (
                    nbase
                    + list(feature[read_id][i]["base"]) * feature[read_id][i]["num"]
                )
                nstd = (
                    nstd + list(feature[read_id][i]["std"]) * feature[read_id][i]["num"]
                )
                nmean = (
                    nmean
                    + list(feature[read_id][i]["mean"]) * feature[read_id][i]["num"]
                )
                nsig = nsig + feature[read_id][i]["signal"]
                for k in range(i, i + windows_size):
                    nbase = (
                        nbase
                        + list(feature[read_id][k]["base"]) * feature[read_id][k]["num"]
                    )
                    nstd = (
                        nstd
                        + list(feature[read_id][k]["std"]) * feature[read_id][k]["num"]
                    )
                    nmean = (
                        nmean
                        + list(feature[read_id][k]["mean"]) * feature[read_id][k]["num"]
                    )
                    nsig = nsig + feature[read_id][k]["signal"]
            feature[read_id][i].update(
                {"nbase": nbase, "nsig": nsig, "nstd": nstd, "nmean": nmean}
            )


def caculate_batch_feature_for_each_base_dict(read_q, feature_q, base_num=0):
    feature = {}
    print("extrac_features process-{} starts".format(os.getpid()))
    read_num = 0
    while True:
        if read_q.empty():
            time.sleep(10)
        read_batch = read_q.get()
        if read_batch == "kill":
            read_q.put("kill")
            break
        read_num += len(read_batch)
        for read_id in read_batch.keys():
            feature[read_id] = []
            sequence = read_batch[read_id]["sequence"]
            movetable = read_batch[read_id]["mv_table"]
            stride = read_batch[read_id]["stride"]
            # num_trimmed = read[read_id]['num_trimmed']
            trimed_signals = norm_signal_read_id(read_batch[read_id])  # 筛掉背景信号,norm
            move_pos = np.append(np.argwhere(movetable == 1).flatten(), len(movetable))
            # print(len(move_pos))
            for move_idx in range(len(move_pos) - 1):
                start, end = move_pos[move_idx], move_pos[move_idx + 1]
                signal = trimed_signals[(start * stride) : (end * stride)].tolist()
                mean = np.mean(signal)
                std = np.std(signal)
                num = end - start
                # print(move_idx)
                feature[read_id].append(
                    {
                        "signal": signal,
                        "std": str(std),
                        "mean": str(mean),
                        "num": int(num * stride),
                        "base": sequence[move_idx],
                    }
                )
        # feature_q.append(feature)
        if base_num != 0:
            _get_neighbord_feature(feature, base_num)
        feature_q.put(feature)
        print(
            "extrac_features process-{} ending, proceed {} read batch".format(
                os.getpid(), read_num
            )
        )


def _prepare_read_dict(read_q, read, batch_size=1000):
    i = 0
    read_batch = {}
    for read_id in read.keys():
        # print(read_id)
        read_batch[read_id] = {}
        read_batch[read_id].update(read[read_id])
        i = i + 1
        if i == batch_size:
            i = 0
            read_q.put(read_batch)
            read_batch = {}
    read_q.put(read_batch)


def write_feature_dict(feature_q):
    print("write_process-{} starts".format(os.getpid()))
    with open("/homeb/xiaoyf/data/HG002/example/feature.txt", "w") as f:
        while True:
            if feature_q.empty():
                time.sleep(10)
                continue
            feature = feature_q.get()
            if feature == "kill":
                print("write_process-{} finished".format(os.getpid()))
                break
            for read_id in feature.keys():
                # f.write(read_id+'\t')
                for i in range(len(feature[read_id])):
                    f.write(
                        str(feature[read_id][i]["nbase"])
                        + "\t"
                        + str(feature[read_id][i]["nsig"])
                        + "\t"
                        + str(feature[read_id][i]["nstd"])
                        + "\t"
                        + str(feature[read_id][i]["nmean"])
                        + "\n"
                    )
            f.flush()


def extract_feature():
    start = time.time()
    feature_q = Queue()
    read_q = Queue()
    _prepare_read(read_q, read, batch_size=500)
    feature_procs = []
    nproc = 4
    read_q.put("kill")
    for _ in range(nproc):
        p = mp.Process(
            target=caculate_batch_feature_for_each_base,
            args=(
                read_q,
                feature_q,
                21,
            ),
        )
        p.daemon = True
        p.start()
        feature_procs.append(p)

    p_w = mp.Process(target=write_feature, args=(feature_q,))
    p_w.daemon = True
    p_w.start()

    for p in feature_procs:
        p.join()

    feature_q.put("kill")
    p_w.join()
    print("[main]extract_features costs %.1f seconds.." % (time.time() - start))

In [None]:
# 0:read_id,1:signal,2:std,3:mean,4:num,5:base
def _get_neighbord_feature(sequence, feature, base_num):
    # 数据预处理主要速度瓶颈，同样的reads数，不运行这个函数大概快了十倍，从二十多分钟减到两分钟
    motif = "CG"
    motif_seqs = get_motif_seqs(motif)
    tsite_locs = get_refloc_of_methysite_in_motif(sequence, set(motif_seqs))
    nfeature = []
    windows_size = base_num - 1 // 2
    for i in range(len(feature)):
        nbase = []
        nstd = []
        nmean = []
        nsig = []
        if i not in tsite_locs:
            continue
        if i < windows_size:
            if i != 0:
                for k in range(i):
                    nbase = nbase + list(feature[k][5]) * signal_sample
                    nstd = nstd + list(feature[k][2]) * signal_sample
                    nmean = nmean + list(feature[k][3]) * signal_sample
                    nsig = nsig + feature[k][1]
            nbase = nbase + list(feature[i][5]) * (windows_size - i) * signal_sample
            nbase = nbase + list(feature[i][5]) * signal_sample
            nstd = nstd + list(feature[i][2]) * (windows_size - i) * signal_sample
            nstd = nstd + list(feature[i][2]) * signal_sample
            nmean = nmean + list(feature[i][3]) * (windows_size - i) * signal_sample
            nmean = nmean + list(feature[i][3]) * signal_sample
            nsig = nsig + feature[i][1] * (windows_size - i)
            nsig = nsig + feature[i][1]
            for k in range(i, i + windows_size):
                nbase = nbase + list(feature[k][5]) * signal_sample
                nstd = nbase + list(feature[k][2]) * signal_sample
                nmean = nbase + list(feature[k][3]) * signal_sample
                nsig = nsig + feature[k][1]
        elif (len(feature[i]) - 1) - i < windows_size:
            for k in range(i - windows_size, i):
                nbase = nbase + list(feature[k][5]) * signal_sample
                nstd = nstd + list(feature[k][2]) * signal_sample
                nmean = nmean + list(feature[k][3]) * signal_sample
                nsig = nsig + feature[k][1]
            nbase = nbase + list(feature[i][5]) * signal_sample
            nstd = nstd + list(feature[i][2]) * signal_sample
            nmean = nmean + list(feature[i][3]) * signal_sample
            nsig = nsig + feature[i][1]
            if i != len(feature[i]) - 1:
                for k in range(i, len(feature[i]) - 1):
                    nbase = nbase + list(feature[k][5]) * signal_sample
                    nstd = nstd + list(feature[k][2]) * signal_sample
                    nmean = nmean + list(feature[k][3]) * signal_sample
                    nsig = nsig + feature[k][1]
            nbase = (
                nbase
                + list(feature[i][5])
                * (windows_size - ((len(feature[i]) - 1) - i))
                * signal_sample
            )
            nstd = (
                nstd
                + list(feature[i][2])
                * (windows_size - ((len(feature[i]) - 1) - i))
                * signal_sample
            )
            nmean = (
                nmean
                + list(feature[i][3])
                * (windows_size - ((len(feature[i]) - 1) - i))
                * signal_sample
            )
            nsig = nsig + feature[i][1] * (windows_size - ((len(feature[i]) - 1) - i))
        else:
            for k in range(i - windows_size, i):
                nbase = nbase + list(feature[k][5]) * signal_sample
                nstd = nstd + list(feature[k][2]) * signal_sample
                nmean = nmean + list(feature[k][3]) * signal_sample
                nsig = nsig + feature[k][1]
            nbase = nbase + list(feature[i][5]) * signal_sample
            nstd = nstd + list(feature[i][2]) * signal_sample
            nmean = nmean + list(feature[i][3]) * signal_sample
            nsig = nsig + feature[i][1]
            for k in range(i, i + windows_size):
                nbase = nbase + list(feature[k][5]) * signal_sample
                nstd = nstd + list(feature[k][2]) * signal_sample
                nmean = nmean + list(feature[k][3]) * signal_sample
                nsig = nsig + feature[k][1]
        # feature[read_id][i].update({'nbase':nbase,'nsig':nsig,'nstd':nstd,'nmean':nmean})
        nfeature.append([feature[i][0], nbase, nsig])

        # 0:read_id,1:nbase,2:nsig,3:nstd,4:nmean
        # logger.debug('feature id: {}, feature:{}'.format(str(feature[0]),(str(nbase),str(nsig),str(nstd),str(nmean))))
    return nfeature