In [None]:
import requests
import json

# 定义 API URL
api_url = "https://lampprimer.mathematik.uni-marburg.de/api.php?table=lamp&action=list"

# 下载数据
response = requests.get(api_url,verify=False)

# 检查响应状态码
if response.status_code == 200:
    print("数据成功下载！")
    
    # 解析为 JSON 数据
    data = response.json()  # 解析 API 响应为字典
    print(f"数据样本：\n{json.dumps(data, indent=4)}")  # 格式化打印数据
    
    # 保存到本地 JSON 文件
    with open("1_primer_data.json", "w", encoding="utf-8") as file:
        json.dump(data, file, indent=4, ensure_ascii=False)
    print("数据已保存到 '1_primer_data.json'")
else:
    print(f"请求失败，状态码：{response.status_code}")


In [1]:
import chardet

# 检测文件编码
with open("1_primer_data.json", "rb") as file:
    raw_data = file.read()
    result = chardet.detect(raw_data)
    encoding = result['encoding']

print(f"文件编码为：{encoding}")


文件编码为：utf-8


In [None]:
#在json文件中提取数据，并将数据存储到.csv文件中

import json
import csv

# 读取 JSON 文件
data_file = "1_primer_data.json"
csv_file = "2_primer_data.csv"

# 定义目标字段
fields = [
    "genbank", "F3_sequence", "F3_position", "B3_sequence", "B3_position", 
    "FIP_sequence", "FIP_position", "BIP_sequence", "BIP_position", 
    "LF_sequence", "LF_position", "LB_sequence", "LB_position"
]

# 初始化 CSV 文件
with open(data_file, "r", encoding="utf-8") as json_file:
    data = json.load(json_file)

with open(csv_file, "w", encoding="utf-8", newline="") as csv_file:
    writer = csv.DictWriter(csv_file, fieldnames=fields)
    writer.writeheader()

    # 遍历每个 data 键
    for entry in data["data"]:
        row = {field: "" for field in fields}  # 初始化为空值

        # 提取 genbank 值
        primer_data = entry.get("primer", [])
        if isinstance(primer_data, list):
            for primer_value in primer_data:
                genbank = primer_value.get("genbank")
                if genbank and genbank != "NA":  # 找到非空且非 "NA" 的 genbank
                    row["genbank"] = genbank
                    break

            # 遍历 primer 数据，填充相应字段
            for primer_value in primer_data:
                name = primer_value.get("name")
                sequence = primer_value.get("sequence")
                position = primer_value.get("position")

                if name in ["F3", "B3", "FIP", "BIP", "LF", "LB"]:
                    row[f"{name}_sequence"] = sequence if sequence else ""
                    row[f"{name}_position"] = position if position else ""

        # 写入行数据
        writer.writerow(row)

print(f"数据已成功处理并保存到 {csv_file}")


In [None]:
#但是从json文件中提取的数据中有一些数据的FIP和BIP的position为空值，导致不能将其分为F2和F1或B2和B1。
#也有些数据的genbank的值为空值，导致不能从数据库中下载到原始序列。
#所以将这两种情况的数据视为无效数据，去除。

data_to_keep = []
csv_file = "2_primer_data.csv"
processed_csv_file = "3_primer_data.csv"
with open(csv_file, "r", encoding="utf-8") as infile:
    reader = csv.DictReader(infile)
    for row in reader:
        # 去除 genbank 为空的行
        if not row["genbank"]:
            continue

        # 检查 FIP_sequence 和 BIP_sequence 的 position 是否为空
        if (row["FIP_sequence"] and not row["FIP_position"]) or (row["BIP_sequence"] and not row["BIP_position"]):
            continue

        # 保留该行
        data_to_keep.append(row)

# 保存到新的 CSV 文件
with open(processed_csv_file, "w", encoding="utf-8", newline="") as outfile:
    writer = csv.DictWriter(outfile, fieldnames=fields)
    writer.writeheader()
    writer.writerows(data_to_keep)

print(f"处理后的数据已成功保存到 {processed_csv_file}")


In [None]:
#根据genbank号，从数据库中将原始基因序列下载下来。并删除原始序列为空的数据。

import json
import csv
from Bio import Entrez, SeqIO
from tqdm import tqdm  # 用于显示进度条

# 设置邮箱地址 (NCBI 要求)
Entrez.email = "your_email@example.com"

# 从 NCBI 下载序列的函数
def fetch_sequence_from_ncbi(genbank_id):
    try:
        with Entrez.efetch(db="nucleotide", id=genbank_id, rettype="fasta", retmode="text") as handle:
            record = SeqIO.read(handle, "fasta")
            return str(record.seq)
    except Exception as e:
        print(f"无法下载序列 {genbank_id}: {e}")
        return ""  # 如果下载失败返回空值

# 输入和输出文件
data_file = "3_primer_data.csv"
output_file = "4_primer_data.csv"

# 读取原始 CSV 文件并处理
data_to_save = []
with open(data_file, "r", encoding="utf-8") as infile:
    reader = csv.DictReader(infile)
    fields = reader.fieldnames + ["ori_sequence"]  # 在原有字段后添加 "ori_sequence"

    rows = list(reader)  # 将数据加载到列表中以计算总行数
    for row in tqdm(rows, desc="Processing rows", unit="row"):
        genbank_id = row.get("genbank")
        try:
            if genbank_id and genbank_id != "NA":
                row["ori_sequence"] = fetch_sequence_from_ncbi(genbank_id)
            else:
                row["ori_sequence"] = ""  # 如果 genbank 为空或无效
        except Exception as e:
            print(f"跳过因错误无法处理的行: {e}")
            row["ori_sequence"] = ""  # 防止程序中断，继续处理
        data_to_save.append(row)

# 保存到新的 CSV 文件
with open(output_file, "w", encoding="utf-8", newline="") as outfile:
    writer = csv.DictWriter(outfile, fieldnames=fields)
    writer.writeheader()
    writer.writerows(data_to_save)

print(f"数据已成功处理并保存到 {output_file}")


import csv
import sys

# 设置较大的字段大小限制
csv.field_size_limit(10**7)  # 设置为 10MB，可根据需要调整

# 输入文件
file_path = "4_primer_data.csv"

# 清理数据并调整列顺序
cleaned_data = []
with open(file_path, "r", encoding="utf-8") as infile:
    reader = csv.DictReader(infile)
    fields = reader.fieldnames

    # 调整列顺序
    reordered_fields = fields[:1] + ["ori_sequence"] + fields[1:-1]

    for row in reader:
        if row.get("ori_sequence"):  # 只保留 ori_sequence 非空的行
            cleaned_data.append(row)

# 覆写原文件
with open(file_path, "w", encoding="utf-8", newline="") as outfile:
    writer = csv.DictWriter(outfile, fieldnames=reordered_fields)
    writer.writeheader()
    writer.writerows(cleaned_data)

print(f"数据已成功清理并更新到 {file_path}")

In [None]:
#将F1和F2、B1和B2从FIP、BIP中提取出来，根据两者的position。

import csv
import re
import sys

# 增大 CSV 字段大小限制
csv.field_size_limit(10**7)

# 输入和输出文件
input_file = "4_primer_data.csv"
output_file = "5_primer_data.csv"

# 解析位置格式函数
def parse_positions(position_string):
    if position_string == "NA":
        return None, None
    # 匹配两种格式的正则表达式
    match = re.match(r"(\d+)-(\d+)\+(?:.*\+)?(\d+)-(\d+)", position_string)
    if match:
        start1, end1, start2, end2 = map(int, match.groups())
        length1 = abs(end1 - start1) + 1
        length2 = abs(end2 - start2) + 1
        return length1, length2
    return None, None

# 提取序列函数
def extract_subsequences(sequence, lengths):
    if not sequence or not lengths or lengths[0] is None or lengths[1] is None:
        return "", ""
    length1, length2 = lengths
    try:
        seq1 = sequence[:length1]  # 提取前面部分
        seq2 = sequence[-length2:]  # 提取后面部分
        return seq1, seq2
    except IndexError:
        return "", ""  # 索引越界时返回空值

# 处理文件
data_to_save = []
with open(input_file, "r", encoding="utf-8") as infile:
    reader = csv.DictReader(infile)
    fields = reader.fieldnames

    # 添加新列到字段列表
    new_fields = fields[:]
    new_fields.insert(new_fields.index("FIP_position") + 1, "F1_sequence")
    new_fields.insert(new_fields.index("F1_sequence") + 1, "F2_sequence")
    new_fields.insert(new_fields.index("BIP_position") + 1, "B1_sequence")
    new_fields.insert(new_fields.index("B1_sequence") + 1, "B2_sequence")

    for row in reader:
        # 处理 FIP_sequence 和 FIP_position
        fip_seq = row.get("FIP_sequence", "")
        fip_pos = row.get("FIP_position", "")
        if fip_pos == "NA":
            row["F1_sequence"] = ""
            row["F2_sequence"] = ""
        else:
            fip_lengths = parse_positions(fip_pos)
            f1_seq, f2_seq = extract_subsequences(fip_seq, fip_lengths)
            row["F1_sequence"] = f1_seq
            row["F2_sequence"] = f2_seq

        # 处理 BIP_sequence 和 BIP_position
        bip_seq = row.get("BIP_sequence", "")
        bip_pos = row.get("BIP_position", "")
        if bip_pos == "NA":
            row["B1_sequence"] = ""
            row["B2_sequence"] = ""
        else:
            bip_lengths = parse_positions(bip_pos)
            b1_seq, b2_seq = extract_subsequences(bip_seq, bip_lengths)
            row["B1_sequence"] = b1_seq
            row["B2_sequence"] = b2_seq

        data_to_save.append(row)

# 保存处理后的数据
with open(output_file, "w", encoding="utf-8", newline="") as outfile:
    writer = csv.DictWriter(outfile, fieldnames=new_fields)
    writer.writeheader()
    writer.writerows(data_to_save)

print(f"数据已成功处理并保存到 {output_file}")


In [None]:
#将数据的列进行筛选一下，只保留原始序列和八条引物序列。
#数据中，有些引物数据为空值。LF和LB有较多空值，但空值还是占据少部分。

import csv

# 增大 CSV 字段大小限制
csv.field_size_limit(10**7)

# 输入和输出文件
input_file = "5_primer_data.csv"
output_file = "6_primer_data.csv"

# 需要保留的列
sequence_columns = [
    "ori_sequence", "F1_sequence", "F2_sequence", "F3_sequence", 
    "B1_sequence", "B2_sequence", "B3_sequence", "LB_sequence", "LF_sequence"
]

# 处理文件
data_to_save = []
with open(input_file, "r", encoding="utf-8") as infile:
    reader = csv.DictReader(infile)
    fields = reader.fieldnames

    for row in reader:
        # 条件过滤
        if row.get("FIP_position") and not row.get("F1_sequence"):
            continue  # 去掉 FIP_position 不为空但 F1_sequence 为空的行
        if row.get("BIP_position") and not row.get("B1_sequence"):
            continue  # 去掉 BIP_position 不为空但 B1_sequence 为空的行

        # 只保留指定的列
        filtered_row = {key: row[key] for key in sequence_columns if key in row}
        data_to_save.append(filtered_row)

# 保存处理后的数据
with open(output_file, "w", encoding="utf-8", newline="") as outfile:
    writer = csv.DictWriter(outfile, fieldnames=sequence_columns)
    writer.writeheader()
    writer.writerows(data_to_save)

print(f"数据已成功处理并保存到 {output_file}")


In [None]:
#对其中四条引物序列做反向互补。

import csv

# 增大 CSV 字段大小限制
csv.field_size_limit(10**7)

# 输入和输出文件
input_file = "6_primer_data.csv"
output_file = "7_primer_data.csv"

# 互补碱基映射
complement = str.maketrans("ATCG", "TAGC")

# 生成反向重复序列函数
def reverse_complement(sequence):
    if not sequence:
        return ""  # 空值处理
    try:
        return sequence.translate(complement)[::-1]  # 替换碱基并反转
    except Exception as e:
        print(f"处理序列时出错: {e}")
        return ""

# 处理文件
data_to_save = []
with open(input_file, "r", encoding="utf-8") as infile:
    reader = csv.DictReader(infile)
    fields = reader.fieldnames

    # 新的列名映射
    column_mapping = {
        "B3_sequence": "B3_reverse_sequence",
        "LF_sequence": "LF_reverse_sequence",
        "F1_sequence": "F1_reverse_sequence",
        "B2_sequence": "B2_reverse_sequence"
    }

    # 更新字段列表
    updated_fields = [column_mapping.get(col, col) for col in fields]

    for row in reader:
        # 替换目标列为反向重复序列并更新列名
        for old_col, new_col in column_mapping.items():
            if old_col in row:
                row[new_col] = reverse_complement(row.pop(old_col))
        data_to_save.append(row)

# 保存处理后的数据
with open(output_file, "w", encoding="utf-8", newline="") as outfile:
    writer = csv.DictWriter(outfile, fieldnames=updated_fields)
    writer.writeheader()
    writer.writerows(data_to_save)

print(f"数据已成功处理并保存到 {output_file}")


In [None]:
#对数据进行去重处理，去除八条引物完全相同的数据。

import pandas as pd

# 读取原始文件
df = pd.read_csv('7_primer_data.csv')

# 筛选出指定列
columns_to_check = [
    "F1_reverse_sequence", "F2_sequence", "F3_sequence", "B1_sequence", 
    "B2_reverse_sequence", "B3_reverse_sequence", "LB_sequence", "LF_reverse_sequence"
]

# 去重：删除这八列内容完全相同的行
filtered_df = df.drop_duplicates(subset=columns_to_check)

# 保存筛选后的数据到新的文件
filtered_df.to_csv('7_primer_data_filtered.csv', index=False)

# 打印新文件的行数
print(f"新文件共有 {len(filtered_df)} 行")


In [1]:
#提取引物段，并计算引物段长度，过滤掉引物段长度等于0并且大于300的数据。

import pandas as pd
import re

# 读取CSV文件
df = pd.read_csv('7_primer_data_filtered.csv')

# 定义一个函数来提取引物段序列
def extract_primer_segment(row):
    ori_sequence = row['ori_sequence']
    F3_sequence = row['F3_sequence']
    B3_reverse_sequence = row['B3_reverse_sequence']
    
    # 使用正则表达式查找F3和B3之间的序列
    pattern = re.escape(F3_sequence) + '(.*?)' + re.escape(B3_reverse_sequence)
    match = re.search(pattern, ori_sequence)
    
    if match:
        primer_segment = match.group(0)  # 包含F3和B3的序列
        return primer_segment
    else:
        return None

# 应用函数来提取引物段序列
df['primer_segment'] = df.apply(extract_primer_segment, axis=1)

# 计算引物段序列的长度
df['primer_segment_length'] = df['primer_segment'].apply(lambda x: len(x) if x else 0)

# 过滤掉引物段长度为0或大于300的行
df_filtered = df[(df['primer_segment_length'] > 0) & (df['primer_segment_length'] <= 300)]

# 将结果保存到新的CSV文件
df_filtered.to_csv('8_primer_data.csv', index=False)

In [1]:
# 有一些数据比较顽皮，去掉

import pandas as pd

# 输入文件
input_file = "8_primer_data.csv"

# 读取数据
data = pd.read_csv(input_file)

# 过滤数据：去掉 F2_sequence 列中内容为 "GCAGCTTCTGGCTGACCAA" 的行
filtered_data = data[data['F2_sequence'] != "GCAGCTTCTGGCTGACCAA"]
filtered_data = data[data['primer_segment'] != "TACCCGGCCACCCTTGTCTACCGGACCTGTTGCCTCGGCGGGCCTGCAGCGATGCTGCCGGGGGAGCTTCTCCTCCCCGGGCCCGTGTCCGCCGGGGACACCGCAAGAACCGTCGGTGAACGATTGGCGTCTGAGCATGAGAGCGATAATAATCCAGTCAAAACTTTCAACAACGGATCTCTTGGTTCCGACAT"]
filtered_data = data[data['primer_segment'] != "AGATGTATCTGAGGGTCTGTAGCTCAGTTGGTTAGAGCACACGCTTGATAAGCGTGGGGTCACAAGTTCAAGTCTTGTCAGACCCACCATGACTTTGACTGGTTGAAGTTATAGATAAAAGATACATGATTGATGATGTAAGCTGGGGACTTAGCTTAGTTGGTAGAGCGCCTGCTTTGCACGCAGGAGGTCAGGAGTTCGACTCTCCTAGTCTCCACCAGAA"]
filtered_data = data[data['primer_segment'] != "CGCTGGCTGGCTTTTCTGCCACCGCGCTGACCAACCTCGTCGCGGAACCATTCGCTAAACTCGAACAGGACTTTGGCGGCTCCATCGGTGTGTACGCGATGGATACCGGCTCAGGCGCAACTGTAAGTTACCGCGCTGAGGAGCGCTTCCCACTGTGCAGCTCATTCAAGGGCTTTCTTGCTGCCGCTGTG"]


# 保存过滤后的数据到原文件
filtered_data.to_csv(input_file, index=False)

print(f"Filtered data saved to {input_file}")

Filtered data saved to 8_primer_data.csv


In [2]:
# 划分数据集

import pandas as pd
from sklearn.model_selection import train_test_split

# 读取CSV文件
df = pd.read_csv('8_primer_data.csv')

# 随机划分数据集
# 首先划分出测试集
train_val, test = train_test_split(df, test_size=0.1, random_state=42)

# 然后从训练集和验证集中划分出验证集
train, val = train_test_split(train_val, test_size=0.1 / 0.9, random_state=42)

# 保存划分后的数据集
train.to_csv('9_train_data.csv', index=False)
val.to_csv('9_val_data.csv', index=False)
test.to_csv('9_test_data.csv', index=False)

In [1]:
#首先进行数据增强300长度。
# 生成标签序列。
#one-hot编码。

import pandas as pd
import random

# 文件路径
input_files = ["9_train_data.csv", "9_val_data.csv", "9_test_data.csv"]
output_files = ["train_data_300.csv", "val_data_300.csv", "test_data_300.csv"]

# 序列与标签的对应规则
primer_to_label = {
    "F3_sequence": 4,
    "F2_sequence": 3,
    "LF_reverse_sequence": 2,
    "F1_reverse_sequence": 1,
    "B1_sequence": 5,
    "LB_sequence": 6,
    "B2_reverse_sequence": 7,
    "B3_reverse_sequence": 8
}

# 碱基到 one-hot 编码的映射
base_to_one_hot = {
    'A': '1000',
    'T': '0100',
    'C': '0010',
    'G': '0001'
}

# 生成标签序列逻辑
def generate_label_sequence(row):
    enhanced_sequence = row['enhanced_sequence']
    label_sequence = [0] * 300  # 修改标签序列长度为300

    for primer, label in primer_to_label.items():
        primer_sequence = row.get(primer, "")
        if primer_sequence and isinstance(primer_sequence, str) and primer_sequence in enhanced_sequence:
            start = enhanced_sequence.index(primer_sequence)
            end = start + len(primer_sequence)
            for i in range(start, end):
                label_sequence[i] = label

    return ''.join(map(str, label_sequence))  # 转为字符串格式

# 将 enhanced_sequence 转换为 one-hot 编码
def convert_to_one_hot(sequence):
    if not isinstance(sequence, str):
        return ''  # 如果序列无效，返回空字符串

    one_hot_sequence = []
    for base in sequence:
        one_hot_sequence.append(base_to_one_hot.get(base, '0000'))  # 非标准碱基填充 '0000'
    return ''.join(one_hot_sequence)  # 连接为一个长字符串

def find_all_occurrences(sequence, sub_sequence):
    """找到所有子序列的位置"""
    positions = []
    pos = -1
    while True:
        pos = sequence.find(sub_sequence, pos + 1)
        if pos == -1:
            break
        positions.append(pos)
    return positions

# 滑动窗口增强和处理文件
def process_file(input_file, output_file):
    # 读取原始数据
    data = pd.read_csv(input_file)
    enhanced_data = []

    # 遍历每一行数据
    for index, row in data.iterrows():
        ori_sequence = row['ori_sequence']
        F3_sequence = row['F3_sequence']
        B3_reverse_sequence = row['B3_reverse_sequence']

        try:
            # 检查是否存在重复的引物序列
            f3_positions = find_all_occurrences(ori_sequence, F3_sequence)
            b3_positions = find_all_occurrences(ori_sequence, B3_reverse_sequence)
            
            if len(f3_positions) > 1 or len(b3_positions) > 1:
                print(f"Skipping row {index} due to duplicate primer sequences.")
                continue
                
            if len(f3_positions) == 0 or len(b3_positions) == 0:
                print(f"Skipping row {index} due to missing primer sequences.")
                continue

            start_A = f3_positions[0]
            end_A = b3_positions[0] + len(B3_reverse_sequence)

            # 如果序列 A 的长度大于 500bp，跳过该行
            if end_A - start_A > 500:
                print(f"Skipping row {index} due to sequence A length > 500.")
                continue

            # 初始化窗口
            start_window = start_A
            end_window = start_window + 300

            # 如果窗口超出原始序列范围，调整起点
            if end_window > len(ori_sequence):
                start_window = max(0, len(ori_sequence) - 300)
                end_window = start_window + 300

            # 滑动窗口增强
            while start_window >= 0 and end_window >= end_A:
                # 检查窗口起点是否为负值
                if start_window < 0:
                    print(f"Skipping window in row {index} due to negative start position.")
                    break
                    
                window_sequence = ori_sequence[start_window:end_window]
                
                # 检查窗口长度
                if len(window_sequence) != 300:
                    print(f"Skipping window in row {index} due to incorrect window length.")
                    break

                # 创建增强行
                new_row = row.copy()
                new_row['enhanced_sequence'] = window_sequence
                new_row['label_sequence'] = generate_label_sequence(new_row)
                new_row['one_hot_encoded'] = convert_to_one_hot(window_sequence)
                new_row = new_row.drop(labels=['ori_sequence'])
                enhanced_data.append(new_row)

                # 滑动窗口向左移动 5bp
                start_window -= 5
                end_window = start_window + 300

        except Exception as e:
            print(f"Error processing row {index}: {str(e)}")
            continue

    # 将增强数据转换为 DataFrame
    enhanced_df = pd.DataFrame(enhanced_data)

    # 统计每个 primer_segment 的数量，并补充不足的部分
    primer_segment_counts = enhanced_df['primer_segment'].value_counts()

    # 对于每个 primer_segment，检查是否有不足25条的情况
    for primer_segment, count in primer_segment_counts.items():
        if count < 25:
            # 找到该 primer_segment 对应的所有行
            rows_to_copy = enhanced_df[enhanced_df['primer_segment'] == primer_segment]
            # 随机复制并补充到25条
            additional_rows = rows_to_copy.sample(25 - count, replace=True)
            enhanced_df = pd.concat([enhanced_df, additional_rows], ignore_index=True)

    # 保存增强数据
    enhanced_df.to_csv(output_file, index=False)
    print(f"Processed data saved to {output_file}")

# 对每个文件进行处理
for input_file, output_file in zip(input_files, output_files):
    process_file(input_file, output_file)


Skipping row 67 due to duplicate primer sequences.
Skipping row 160 due to duplicate primer sequences.
Processed data saved to train_data_300.csv
Processed data saved to val_data_300.csv
Processed data saved to test_data_300.csv


In [2]:
#首先进行数据增强500长度。
# 生成标签序列。
#one-hot编码。

import pandas as pd
import random

# 文件路径
input_files = ["9_train_data.csv", "9_val_data.csv", "9_test_data.csv"]
output_files = ["train_data_500.csv", "val_data_500.csv", "test_data_500.csv"]

# 序列与标签的对应规则
primer_to_label = {
    "F3_sequence": 4,
    "F2_sequence": 3,
    "LF_reverse_sequence": 2,
    "F1_reverse_sequence": 1,
    "B1_sequence": 5,
    "LB_sequence": 6,
    "B2_reverse_sequence": 7,
    "B3_reverse_sequence": 8
}

# 碱基到 one-hot 编码的映射
base_to_one_hot = {
    'A': '1000',
    'T': '0100',
    'C': '0010',
    'G': '0001'
}

# 生成标签序列逻辑
def generate_label_sequence(row):
    enhanced_sequence = row['enhanced_sequence']
    label_sequence = [0] * 500  # 修改标签序列长度为500

    for primer, label in primer_to_label.items():
        primer_sequence = row.get(primer, "")
        if primer_sequence and isinstance(primer_sequence, str) and primer_sequence in enhanced_sequence:
            start = enhanced_sequence.index(primer_sequence)
            end = start + len(primer_sequence)
            for i in range(start, end):
                label_sequence[i] = label

    return ''.join(map(str, label_sequence))  # 转为字符串格式

# 将 enhanced_sequence 转换为 one-hot 编码
def convert_to_one_hot(sequence):
    if not isinstance(sequence, str):
        return ''  # 如果序列无效，返回空字符串

    one_hot_sequence = []
    for base in sequence:
        one_hot_sequence.append(base_to_one_hot.get(base, '0000'))  # 非标准碱基填充 '0000'
    return ''.join(one_hot_sequence)  # 连接为一个长字符串

def find_all_occurrences(sequence, sub_sequence):
    """找到所有子序列的位置"""
    positions = []
    pos = -1
    while True:
        pos = sequence.find(sub_sequence, pos + 1)
        if pos == -1:
            break
        positions.append(pos)
    return positions

# 滑动窗口增强和处理文件
def process_file(input_file, output_file):
    # 读取原始数据
    data = pd.read_csv(input_file)
    enhanced_data = []

    # 遍历每一行数据
    for index, row in data.iterrows():
        ori_sequence = row['ori_sequence']
        F3_sequence = row['F3_sequence']
        B3_reverse_sequence = row['B3_reverse_sequence']

        try:
            # 检查是否存在重复的引物序列
            f3_positions = find_all_occurrences(ori_sequence, F3_sequence)
            b3_positions = find_all_occurrences(ori_sequence, B3_reverse_sequence)
            
            if len(f3_positions) > 1 or len(b3_positions) > 1:
                print(f"Skipping row {index} due to duplicate primer sequences.")
                continue
                
            if len(f3_positions) == 0 or len(b3_positions) == 0:
                print(f"Skipping row {index} due to missing primer sequences.")
                continue

            start_A = f3_positions[0]
            end_A = b3_positions[0] + len(B3_reverse_sequence)

            # 如果序列 A 的长度大于 500bp，跳过该行
            if end_A - start_A > 500:
                print(f"Skipping row {index} due to sequence A length > 500.")
                continue

            # 初始化窗口
            start_window = start_A
            end_window = start_window + 500  # 修改窗口大小为500

            # 如果窗口超出原始序列范围，调整起点
            if end_window > len(ori_sequence):
                start_window = max(0, len(ori_sequence) - 500)  # 修改为500
                end_window = start_window + 500  # 修改为500

            # 滑动窗口增强
            while start_window >= 0 and end_window >= end_A:
                # 检查窗口起点是否为负值
                if start_window < 0:
                    print(f"Skipping window in row {index} due to negative start position.")
                    break
                    
                window_sequence = ori_sequence[start_window:end_window]
                
                # 检查窗口长度
                if len(window_sequence) != 500:  # 修改检查长度为500
                    print(f"Skipping window in row {index} due to incorrect window length.")
                    break

                # 创建增强行
                new_row = row.copy()
                new_row['enhanced_sequence'] = window_sequence
                new_row['label_sequence'] = generate_label_sequence(new_row)
                new_row['one_hot_encoded'] = convert_to_one_hot(window_sequence)
                new_row = new_row.drop(labels=['ori_sequence'])
                enhanced_data.append(new_row)

                # 滑动窗口向左移动 10bp
                start_window -= 10  # 修改步长为10bp
                end_window = start_window + 500  # 修改为500

        except Exception as e:
            print(f"Error processing row {index}: {str(e)}")
            continue

    # 将增强数据转换为 DataFrame
    enhanced_df = pd.DataFrame(enhanced_data)

    # 统计每个 primer_segment 的数量，并补充不足的部分
    primer_segment_counts = enhanced_df['primer_segment'].value_counts()

    # 对于每个 primer_segment，检查是否有不足25条的情况
    for primer_segment, count in primer_segment_counts.items():
        if count < 25:
            # 找到该 primer_segment 对应的所有行
            rows_to_copy = enhanced_df[enhanced_df['primer_segment'] == primer_segment]
            # 随机复制并补充到25条
            additional_rows = rows_to_copy.sample(25 - count, replace=True)
            enhanced_df = pd.concat([enhanced_df, additional_rows], ignore_index=True)

    # 保存增强数据
    enhanced_df.to_csv(output_file, index=False)
    print(f"Processed data saved to {output_file}")

# 对每个文件进行处理
for input_file, output_file in zip(input_files, output_files):
    process_file(input_file, output_file)


Skipping window in row 26 due to incorrect window length.
Skipping row 67 due to duplicate primer sequences.
Skipping row 160 due to duplicate primer sequences.
Processed data saved to train_data_500.csv
Processed data saved to val_data_500.csv
Skipping window in row 2 due to incorrect window length.
Processed data saved to test_data_500.csv


In [3]:
#去除标签序列不合理的数据，每一格标签必须包含431578
import pandas as pd

# 定义一个函数来处理每个 CSV 文件
def process_file(file_path):
    # 读取 CSV 文件
    df = pd.read_csv(file_path)
    
    # 定义需要保留的数字
    required_digits = {'4', '3', '1', '5', '7', '8'}
    
    # 使用 apply 来判断每一行的 'label_sequence' 是否包含所有需要的数字
    def contains_all_required_digits(label_sequence):
        return all(digit in label_sequence for digit in required_digits)
    
    # 过滤掉不包含所有需要的数字的行
    filtered_df = df[df['label_sequence'].apply(contains_all_required_digits)]
    
    # 直接保存回原文件
    filtered_df.to_csv(file_path, index=False)

# 列出所有需要处理的文件路径
file_paths = [
    'test_data_500.csv',
    'train_data_300.csv',
    'train_data_500.csv',
    'val_data_300.csv',
    'val_data_500.csv',
    'test_data_300.csv'
]

# 对每个文件进行处理
for file_path in file_paths:
    process_file(file_path)

print("所有文件已更新。")


所有文件已更新。
