# README.md
## 测试机器
Mac M2
- vcpu 8
- memory 16G
## 安装程序
- python 3.12
- Bio 1.7.1
- cutadapt 4.9
- vsearch v2.30.0
- FastQC v0.12.1
- trimmomatic 0.39
## 运行
依次执行

In [1]:
import subprocess
import csv
from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
from Bio.Seq import Seq
import tqdm
import os
import json
from collections import Counter, defaultdict
from datetime import datetime

In [2]:
def validate_and_update_csv(csv_file_path, output_report_path="validation_report.txt", delimiter=','):
    """
    验证CSV文件中每行数据的序列位置关系，并在需要时更新end_base值，然后生成详细报告。

    参数:
    csv_file_path (str): CSV文件的路径
    output_report_path (str): 输出报告文件的路径，默认为"validation_report.txt"
    delimiter (str): CSV文件使用的分隔符，默认为逗号(',')
    
    返回:
    dict: 包含总体统计信息和详细结果的字典
    """
    # 初始化结果统计
    results = {
        'total_rows': 0,
        'passed_rows': 0,
        'failed_rows': 0,
        'details': [],
        'errors': [],
        'updates': [],  # 记录所有更新的行
        'column_validation': {
            'expected_columns': ['barcode1', 'barcode2', 'template', 'spacer', 'start_base', 'end_base', 'base_windows'],
            'actual_columns': [],
            'columns_match': False
        }
    }
    
    # 存储所有行数据以便后续可能的写回
    all_rows = []
    
    try:
        with open(csv_file_path, 'r', newline='', encoding='utf-8') as csvfile:
            # 创建CSV字典读取器，第一行作为列名
            reader = csv.DictReader(csvfile, delimiter=delimiter)
            fieldnames = reader.fieldnames
            
            # 检查列名是否匹配
            results['column_validation']['actual_columns'] = fieldnames
            expected_set = set(results['column_validation']['expected_columns'])
            actual_set = set(fieldnames) if fieldnames else set()
            results['column_validation']['columns_match'] = (expected_set == actual_set)
            
            if not results['column_validation']['columns_match']:
                missing_cols = expected_set - actual_set
                extra_cols = actual_set - expected_set
                error_msg = f"列名不匹配! 缺失的列: {missing_cols}，多余的列: {extra_cols}"
                results['errors'].append(error_msg)
                generate_report(results, output_report_path)
                return results
            
            # 读取所有行数据
            for row in reader:
                all_rows.append(row)
            
    except FileNotFoundError:
        error_msg = f"错误: 找不到文件 '{csv_file_path}'"
        results['errors'].append(error_msg)
        print(error_msg)
        generate_report(results, output_report_path)
        return results
    except Exception as e:
        error_msg = f"读取CSV文件时发生错误: {e}"
        results['errors'].append(error_msg)
        print(error_msg)
        generate_report(results, output_report_path)
        return results
    
    # 处理每一行数据
    needs_update = False
    for row_index, row in enumerate(all_rows, start=1):
        results['total_rows'] += 1
        
        # 初始化当前行的验证结果字典
        current_result = {
            'row_number': row_index + 1,  # +1 是因为从0开始计数，且跳过标题行
            'barcode1': row['barcode1'],
            'barcode2': row['barcode2'],
            'template': row['template'],
            'spacer': row['spacer'],
            'end_base_original': row['end_base'],
            'end_base_calculated': None,
            'needs_update': False,
            'checks': {
                'barcode1_at_start': False,
                'barcode2_at_end': False,
                'spacer_in_template': False
            },
            'all_passed': False,
            'errors': []
        }
        
        # 获取当前行的各个字段值
        barcode1 = row['barcode1']
        barcode2 = row['barcode2']
        template = row['template']
        spacer = row['spacer']
        
        # 检验1: barcode1是否在template开头
        if template.startswith(barcode1):
            current_result['checks']['barcode1_at_start'] = True
        else:
            current_result['errors'].append(f"barcode1 '{barcode1}' 不在template开头")
        
        # 检验2: barcode2是否在template末尾
        if template.endswith(barcode2):
            current_result['checks']['barcode2_at_end'] = True
        else:
            current_result['errors'].append(f"barcode2 '{barcode2}' 不在template末尾")
        
        # 检验3: spacer是否在template中
        if spacer in template:
            current_result['checks']['spacer_in_template'] = True
        else:
            current_result['errors'].append(f"spacer '{spacer}' 不在template中")
            results['details'].append(current_result)
            results['failed_rows'] += 1
            continue
        
        # 只有前3项检验都通过，才计算片段长度
        if (current_result['checks']['barcode1_at_start'] and 
            current_result['checks']['barcode2_at_end'] and 
            current_result['checks']['spacer_in_template']):
            
            # 计算从spacer开始(包含)到barcode2开始之前的片段长度
            spacer_start = template.find(spacer)
            barcode2_start = template.find(barcode2)
            
            # 提取从spacer开始到barcode2开始之前的片段
            fragment = template[spacer_start:barcode2_start]
            fragment_length = len(fragment)
            current_result['end_base_calculated'] = fragment_length
            
            # 检查是否需要更新end_base
            try:
                end_base_original = int(row['end_base'])
                if fragment_length != end_base_original:
                    current_result['needs_update'] = True
                    row['end_base'] = str(fragment_length)  # 更新内存中的值
                    needs_update = True
                    results['updates'].append({
                        'row': row_index + 1,
                        'original': end_base_original,
                        'calculated': fragment_length
                    })
            except ValueError:
                current_result['errors'].append(f"end_base值 '{row['end_base']}' 不是有效的整数")
            
            # 当前3项检验通过时，就算通过
            current_result['all_passed'] = True
            results['passed_rows'] += 1
        else:
            results['failed_rows'] += 1
        
        results['details'].append(current_result)
    
    # 如果需要更新，写回CSV文件
    if needs_update:
        try:
            with open(csv_file_path, 'w', newline='', encoding='utf-8') as csvfile:
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames, delimiter=delimiter)
                writer.writeheader()
                writer.writerows(all_rows)
        except Exception as e:
            error_msg = f"更新CSV文件时发生错误: {e}"
            results['errors'].append(error_msg)
            print(error_msg)
    
    # 生成报告文件
    generate_report(results, output_report_path)
    
    if results:
        print(f"检验完成: 共{results['total_rows']}行, "
          f"通过{results['passed_rows']}行, "
          f"失败{results['failed_rows']}行")
    
    if results.get('updates'):
        print(f"更新了 {len(results['updates'])} 行的 end_base 值")
    
    if results['failed_rows'] > 0:
        raise Exception(f"检验未全部通过，存在 {results['failed_rows']} 个错误。请先处理错误再执行后续操作。")

In [3]:
def generate_report(results, output_path):
    """
    生成详细的检验报告
    
    参数:
    results (dict): 检验结果字典
    output_path (str): 输出报告文件的路径
    """
    with open(output_path, 'w', encoding='utf-8') as report_file:
        # 写入报告头部
        report_file.write("=" * 80 + "\n")
        report_file.write("CSV文件序列检验与更新报告\n")
        report_file.write(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
        report_file.write("=" * 80 + "\n\n")
        
        # 写入列名验证结果
        report_file.write("列名验证:\n")
        report_file.write(f"期望的列名: {results['column_validation']['expected_columns']}\n")
        report_file.write(f"实际的列名: {results['column_validation']['actual_columns']}\n")
        report_file.write(f"列名是否匹配: {'是' if results['column_validation']['columns_match'] else '否'}\n")
        
        if not results['column_validation']['columns_match']:
            missing_cols = set(results['column_validation']['expected_columns']) - set(results['column_validation']['actual_columns'])
            extra_cols = set(results['column_validation']['actual_columns']) - set(results['column_validation']['expected_columns'])
            report_file.write(f"缺失的列: {missing_cols}\n")
            report_file.write(f"多余的列: {extra_cols}\n")
        report_file.write("\n")
        
        # 写入统计信息
        report_file.write("总体统计:\n")
        report_file.write(f"总行数: {results['total_rows']}\n")
        report_file.write(f"通过行数: {results['passed_rows']}\n")
        report_file.write(f"失败行数: {results['failed_rows']}\n")
        if results['total_rows'] > 0:
            report_file.write(f"通过率: {results['passed_rows']/results['total_rows']*100:.2f}%\n")
        
        # 写入更新统计
        if results.get('updates'):
            report_file.write(f"更新行数: {len(results['updates'])}\n")
        report_file.write("\n")
        
        # 如果有错误信息，写入错误
        if results.get('errors'):
            report_file.write("全局错误:\n")
            for error in results['errors']:
                report_file.write(f"  - {error}\n")
            report_file.write("\n")
        
        # 写入更新详情
        if results.get('updates'):
            report_file.write("end_base更新详情:\n")
            for update in results['updates']:
                report_file.write(f"  行 {update['row']}: {update['original']} → {update['calculated']}\n")
            report_file.write("\n")
        
        # 写入每行的详细结果
        if results['details']:
            report_file.write("详细检验结果:\n")
            report_file.write("-" * 80 + "\n")
            
            for detail in results['details']:
                report_file.write(f"行号: {detail['row_number']}\n")
                report_file.write(f"barcode1: {detail['barcode1']}\n")
                report_file.write(f"barcode2: {detail['barcode2']}\n")
                report_file.write(f"spacer: {detail['spacer']}\n")
                report_file.write(f"end_base(原始): {detail['end_base_original']}\n")
                
                if detail['end_base_calculated'] is not None:
                    report_file.write(f"end_base(计算): {detail['end_base_calculated']}\n")
                
                if detail['needs_update']:
                    report_file.write("end_base状态: 已更新 ✓\n")
                
                # 写入检验结果
                report_file.write("检验结果: ")
                if detail['all_passed']:
                    report_file.write("所有检验通过 ✓\n")
                else:
                    report_file.write("存在错误 ✗\n")
                    
                # 写入具体检验项结果
                report_file.write("  - barcode1在开头: ")
                report_file.write("通过 ✓\n" if detail['checks']['barcode1_at_start'] else "失败 ✗\n")
                
                report_file.write("  - barcode2在末尾: ")
                report_file.write("通过 ✓\n" if detail['checks']['barcode2_at_end'] else "失败 ✗\n")
                
                report_file.write("  - spacer在template中: ")
                report_file.write("通过 ✓\n" if detail['checks']['spacer_in_template'] else "失败 ✗\n")
                
                # 写入错误信息（如果有）
                if detail['errors']:
                    report_file.write("错误信息:\n")
                    for error in detail['errors']:
                        report_file.write(f"  - {error}\n")
                
                report_file.write("-" * 80 + "\n")
        
        # 写入报告尾部
        report_file.write("\n报告结束\n")
        report_file.write("=" * 80 + "\n")
    
    print(f"检验报告已生成: {output_path}")

In [4]:
def strict_QC(input_r1, input_r2, output_dir, primer_f="", primer_r=""):
    """
    执行严格的NGS数据质控流程
    :param input_r1: Read1输入文件路径
    :param input_r2: Read2输入文件路径
    :param output_dir: 输出目录
    :param primer_f: 正向引物序列（可选）
    :param primer_r: 反向引物序列（可选）
    """
    # 定义接头序列
    adapter_r1 = "AGATCGGAAGAGCACACGTCTGAACTCCAGTCAC"  # P7 adapter for read1
    adapter_r2 = "AGATCGGAAGAGCGTCGTGTAGGGAAAGAGTGT"  # P5 adapter for read2

    subprocess.run(["mkdir", "-p", f"{output_dir}/fastqc_raw"])
    subprocess.run(["mkdir", "-p", f"{output_dir}/fastqc_trimmed"])
    
    # FastQC原始数据质控
    subprocess.run(["fastqc", "-t", "14", input_r1, input_r2, "-o", f"{output_dir}/fastqc_raw"])
    
    # cutadapt切除接头与引物
    cutadapt_cmd = [
            "cutadapt",
            "-a", adapter_r2,  # R2的3'端接头（P5）
            "-A", adapter_r1,  # R1的3'端接头（P7）
            "-o", f"{output_dir}/F.fq.gz",
            "-p", f"{output_dir}/R.fq.gz",
            "--minimum-length", "50",
            "--max-n", "0",
            "--error-rate", "0.1",
            f"--json={output_dir}/cutadapt.json",
            "--cores=14"
        ]
        
    # 添加引物切除参数
    if primer_f and primer_r:
        cutadapt_cmd.extend(["-g", f"^{primer_f}", "-G", f"^{primer_r}"])
    
    cutadapt_cmd.extend([input_r1, input_r2])
    
    subprocess.run(cutadapt_cmd)
    
    # 切除后质控验证
    try:
        with open(f"{output_dir}/cutadapt.json") as f:
            log_data = json.load(f)
            
        total_pairs = log_data["read_counts"]["input"]
        kept_pairs = log_data["read_counts"]["output"]
        kept_ratio = kept_pairs / total_pairs * 100
        
        print(f"原始序列对: {total_pairs}")
        print(f"保留序列对: {kept_pairs} ({kept_ratio:.2f}%)")
        
        # 验证标准
        if kept_ratio < 90:
            print("\n⚠️ 警告: 保留率低于90%，建议检查接头/引物设计")
        else:
            print("\n✅ 保留率符合质控标准(>90%)")
                
    except json.JSONDecodeError as e:
        print(f"JSON解析失败: {e}")
    except FileNotFoundError:
        print("❌ cutadapt.json文件未生成，请检查命令执行")
    except KeyError as e:
        print(f"❌ JSON结构异常，缺失关键字段: {e}")
    
    # 额外质控：切除后FastQC验证
    subprocess.run([
        "fastqc", 
        "-t", "14",
        f"{output_dir}/F.fq.gz", 
        f"{output_dir}/R.fq.gz",
        "-o", f"{output_dir}/fastqc_trimmed"
    ])

In [5]:
def merget(input_forward, input_reverse, output_merged):
    """合并双端测序数据，若输出文件已存在则跳过"""
    # 检查输出文件是否已存在
    if os.path.exists(output_merged):
        print(f"文件 {output_merged} 已存在，跳过合并操作")
        return
    
    # 检查输入文件是否存在
    if not os.path.exists(input_forward):
        raise FileNotFoundError(f"正向文件不存在: {input_forward}")
    if not os.path.exists(input_reverse):
        raise FileNotFoundError(f"反向文件不存在: {input_reverse}")
    
    # 构建并执行VSEARCH命令
    vsearch_command = (
        f"vsearch --fastq_mergepairs {input_forward} "
        f"--reverse {input_reverse} "
        f"--fastqout {output_merged} "
        "--fastq_allowmergestagger"
    )
    subprocess.run(vsearch_command, shell=True, check=True)
    print(f"双端合并完成 → {output_merged}")

In [6]:
def QC_merger(input_fastq, output_dir, output_fastq):
    # 检查输出文件是否已存在（若存在则跳过）
    if os.path.exists(output_fastq):
        print(f"文件 {output_fastq} 已存在，跳过质量控制和修剪步骤")
        return
    
    # 创建输出目录（若不存在）
    subprocess.run(["mkdir", "-p", output_dir])
    
    # 执行FastQC质量控制
    subprocess.run([
        "fastqc", 
        "-t", "14", 
        input_fastq, 
        "-o", output_dir
    ])
    
    # 执行Trimmomatic修剪
    subprocess.run([
        "trimmomatic", "SE", 
        "-threads", "14", 
        "-phred33", 
        input_fastq, 
        output_fastq, 
        "LEADING:3", 
        "TRAILING:3", 
        "SLIDINGWINDOW:4:15"
    ])

In [7]:
class BarcodeClassifier:
    def __init__(self, input_fastq, csv_file, output_directory):
        self.input_fastq = input_fastq
        self.csv_file = csv_file
        self.output_directory = output_directory
        self.barcode_pairs = self.read_barcodes_from_csv()
        self.barcode_handles = {}
        # 创建输出目录（如果不存在）
        os.makedirs(output_directory, exist_ok=True)

    def read_barcodes_from_csv(self):
        """从CSV文件中读取条形码组合"""
        barcode_pairs = []
        with open(self.csv_file, 'r') as csvfile:
            csvreader = csv.reader(csvfile)
            next(csvreader)  # 跳过标题行
            for row in csvreader:
                barcode1, barcode2 = row[0].strip(), row[1].strip()
                barcode_pairs.append((barcode1, barcode2))
        return barcode_pairs

    def correct_sequence(self, sequence):
        """校正反向互补序列"""
        base_F = "ATCG"
        base_R = "TAGC"
        complement = {f: r for f, r in zip(base_F, base_R)}
        return ''.join(complement.get(base, base) for base in reversed(sequence))

    def output_files_exist(self, missing_threshold_num = 0.5):
        """检查所有输出文件是否已存在，并生成缺失文件报告"""
        report_lines = []  # 用于收集报告内容
        current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        report_lines.append(f"文件存在性检查报告生成时间: {current_time}")
        report_lines.append(f"检查目录: {self.output_directory}")
        report_lines.append("-" * 50)
    
        # 检查未匹配序列文件
        unmatched_file = os.path.join(self.output_directory, "unmatched_output.fastq")
        if not os.path.exists(unmatched_file):
            report_lines.append("未匹配序列文件 'unmatched_output.fastq' 不存在。")
            self._generate_report(report_lines)  # 即使未匹配文件缺失也生成报告
            return False
    
        total_barcode_pairs = len(self.barcode_pairs)
        missing_threshold = missing_threshold_num * total_barcode_pairs  # 阈值
        missing_files = []  # 用于存储缺失的条形码对
    
        # 检查每个条形码的输出文件
        for barcode1, barcode2 in self.barcode_pairs:
            barcode_file = os.path.join(
                self.output_directory, 
                f"{barcode1}_{barcode2}_output.fastq"
            )
            if not os.path.exists(barcode_file):
                missing_files.append((barcode1, barcode2))
                report_lines.append(f"条形码文件 '{barcode1}_{barcode2}_output.fastq' 不存在。")
    
        # 计算缺失数量
        missing_count = len(missing_files)
        report_lines.append("-" * 50)
        report_lines.append(f"条形码对总数: {total_barcode_pairs}")
        report_lines.append(f"缺失文件数: {missing_count}")
        report_lines.append(f"缺失阈值 ({missing_threshold_num*100}%): {missing_threshold:.1f}")
    
        # 如果缺失数量超过阈值
        if missing_count > missing_threshold:
            report_lines.append(f"结论: 缺失数量超过阈值，建议检查。")
            self._generate_report(report_lines)  # 生成报告
            print(f"缺失的条形码文件数量 ({missing_count}) 超过总条形码对数量的10% ({missing_threshold:.1f})。")
            for barcode1, barcode2 in missing_files:
                print(f"缺失的条形码对: {barcode1} -- {barcode2}")
            return False
        else:
            # 如果缺失率不超过10%，则认为文件存在（即使有少量缺失）
            if missing_count > 0:
                report_lines.append(f"结论: 有{missing_count}个文件缺失，但未超过阈值，已忽略。缺失原因可能是原始序列中不存在能匹配 barcode1 开头且 barcode2 结尾的序列。")
            else:
                report_lines.append("结论: 所有文件均存在。")
            self._generate_report(report_lines)  # 生成报告
            if missing_count > 0:
                print(f"有{missing_count}个条形码文件缺失，但未超过阈值 ({missing_threshold:.1f})，忽略缺失。")
            return True
    
    def _generate_report(self, report_content):
        """生成并保存报告文件"""
        report_filename = os.path.join(self.output_directory, "file_validation_report.txt")
        try:
            with open(report_filename, 'w', encoding='utf-8') as report_file: 
                report_file.write("\n".join(report_content))
            print(f"详细报告已保存至: {report_filename}")
        except IOError as e:
            print(f"写入报告文件时出错: {e}")

    def classify_by_barcodes(self):
        """执行条形码分类（如果输出文件不存在）"""
        # 检查所有输出文件是否已存在
        if self.output_files_exist():
            print("输出文件未超过缺失阈值，跳过分类操作。")
            return
            
        print("开始处理序列...")
        # 打开未匹配序列文件
        unmatched_handle = open(os.path.join(self.output_directory, "unmatched_output.fastq"), "w")
        
        with open(self.input_fastq, "r") as handle:
            for record in tqdm.tqdm(SeqIO.parse(handle, "fastq"), desc="Processing sequences"):
                found_match = False
                
                # 正向匹配
                for barcode1, barcode2 in self.barcode_pairs:
                    if str(record.seq).startswith(barcode1) and str(record.seq).endswith(barcode2):
                        self._write_record(record, barcode1, barcode2)
                        found_match = True
                        break
                
                # 反向互补匹配
                if not found_match:
                    corrected_seq = self.correct_sequence(str(record.seq))
                    for barcode1, barcode2 in self.barcode_pairs:
                        if corrected_seq.startswith(barcode1) and corrected_seq.endswith(barcode2):
                            self._write_corrected_record(record, corrected_seq, barcode1, barcode2)
                            found_match = True
                            break
                
                # 未匹配序列
                if not found_match:
                    SeqIO.write(record, unmatched_handle, "fastq")

        # 关闭所有文件句柄
        for handle in self.barcode_handles.values():
            handle.close()
        unmatched_handle.close()
    
    def _write_record(self, record, barcode1, barcode2):
        """写入匹配的序列记录"""
        barcode_pair_name = f"{barcode1}_{barcode2}"
        if barcode_pair_name not in self.barcode_handles:
            file_path = os.path.join(self.output_directory, f"{barcode_pair_name}_output.fastq")
            self.barcode_handles[barcode_pair_name] = open(file_path, "w")
        SeqIO.write(record, self.barcode_handles[barcode_pair_name], "fastq")
    
    def _write_corrected_record(self, record, corrected_seq, barcode1, barcode2):
        """写入校正后的序列记录"""
        barcode_pair_name = f"{barcode1}_{barcode2}"
        if barcode_pair_name not in self.barcode_handles:
            file_path = os.path.join(self.output_directory, f"{barcode_pair_name}_output.fastq")
            self.barcode_handles[barcode_pair_name] = open(file_path, "w")
        
        # 创建校正后的记录（保留质量值）
        corrected_record = SeqRecord(
            Seq(corrected_seq),
            id=record.id,
            description=record.description,
            letter_annotations={"phred_quality": record.letter_annotations['phred_quality']}
        )
        SeqIO.write(corrected_record, self.barcode_handles[barcode_pair_name], "fastq")

In [8]:
def process_sequences(config_file):
    with open(config_file, 'r') as csvfile:
        csvreader = csv.reader(csvfile)
        next(csvreader)  # 跳过标题
        for row in tqdm.tqdm(csvreader):
            barcode1 = row[0].strip()
            barcode2 = row[1].strip()
            template = row[2].strip().upper()
            spacer = row[3].strip().upper()
            start_base = int(row[4].strip())
            end_base = int(row[5].strip())
            ind = template.index(spacer)
            sequences = [] 
            file_path = os.path.join(file, f"{barcode1}_{barcode2}_output.fastq")
            try:
                with open(file_path, "r") as seq_file:
                    line_number = 0
                    for line in seq_file:
                        line_number += 1
                        if line_number % 4 == 2:
                            sequence = line.strip()
                            extracted_sequence = sequence[ind+start_base:ind+end_base]
                            sequences.append(extracted_sequence)
            except FileNotFoundError:
                continue
    
            sequence_counts = Counter(sequences)

            os.makedirs(os.path.join(file, "ExtractSeq"), exist_ok=True)
            output_file_path = os.path.join(file, "ExtractSeq", os.path.basename(file_path).replace('.fastq', '_counts.csv'))
            with open(output_file_path, 'w', newline='') as output_csvfile:
                writer = csv.writer(output_csvfile)
                writer.writerow(['Extracted Sequence', 'Count'])  
                for seq, count in sequence_counts.items():
                    writer.writerow([seq, count])

In [20]:
def process_base_counts(config_file, output_file):
    results = []
    with open(config_file, 'r') as csvfile:
        csvreader = csv.reader(csvfile)
        next(csvreader)  # 跳过标题
        for row in tqdm.tqdm(csvreader):
            barcode1 = row[0].strip()
            barcode2 = row[1].strip()
            template = row[2].strip().upper()
            spacer = row[3].strip().upper()
            base_windows = int(row[6].strip())
            file_path = os.path.join(file, f"{barcode1}_{barcode2}_output.fastq")
            sequences = []
            try:
                with open(file_path, 'r') as countsfile:
                    line_number = 0
                    for line in countsfile:
                        line_number += 1
                        if line_number % 4 == 2:
                            sequences.append(line.strip())
                
            except FileNotFoundError:
                continue
            ind = template.index(spacer)
            base_windows_ind = ind + base_windows - 1

            num_A = 0
            num_T = 0
            num_C = 0
            num_G = 0

            for seq in sequences:
                if len(seq) > base_windows_ind:
                    if seq[base_windows_ind] == 'A':
                        num_A += 1
                    elif seq[base_windows_ind] == 'T':
                        num_T += 1
                    elif seq[base_windows_ind] == 'C':
                        num_C += 1
                    elif seq[base_windows_ind] == 'G':
                        num_G += 1

            results.append([file_path, spacer, base_windows, num_A, num_T, num_C, num_G])

    # 写入结果到CSV文件
    with open(output_file, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow([ 'Spacer', 'Base Windows', 'A', 'T', 'C', 'G'])
        writer.writerows(results)

In [10]:
def process_all_sequences(config_file):
    """处理序列数据，保存spacer定位后的完整序列"""
    with open(config_file, 'r') as csvfile:
        csvreader = csv.reader(csvfile)
        next(csvreader)  # 跳过标题行
        for row in tqdm.tqdm(csvreader):
            # 解析配置参数
            barcode1 = row[0].strip()
            barcode2 = row[1].strip()
            template = row[2].strip().upper()
            spacer = row[3].strip().upper()
            
            # 定位spacer位置（用于验证序列有效性）
            try:
                ind = template.index(spacer)
            except ValueError:
                continue

            # 构建输入文件路径
            file_path = os.path.join(file,f"{barcode1}_{barcode2}_output.fastq")
            
            # 读取并保存完整序列
            sequences = []
            try:
                with open(file_path, "r") as seq_file:
                    for line_num, line in enumerate(seq_file, 1):
                        if line_num % 4 == 2:  # 序列行
                            sequences.append(line.strip())
            except FileNotFoundError:
                continue
                
            # 统计序列频率
            sequence_counts = Counter(sequences)
            
            # 输出结果到CSV
            os.makedirs(os.path.join(file, "AllSeq"), exist_ok=True)
            output_file_path = os.path.join(file, "AllSeq", os.path.basename(file_path).replace('.fastq', '_counts.csv'))
            with open(output_file_path, 'w', newline='') as output_csvfile:
                writer = csv.writer(output_csvfile)
                writer.writerow(['Full Sequence', 'Count'])
                for seq, count in sequence_counts.items():
                    writer.writerow([seq, count])

In [11]:
def process_surrounding_sequences(config_file, spacer_front=20, spacer_after=20):
    """处理序列数据，保存spacer定位区域前后特定bp的序列"""
    with open(config_file, 'r') as csvfile:
        csvreader = csv.reader(csvfile)
        next(csvreader)  # 跳过标题行
        for row in tqdm.tqdm(csvreader, desc="Processing spacer surrounding sequences"):
            # 解析配置参数
            barcode1 = row[0].strip()
            barcode2 = row[1].strip()
            template = row[2].strip().upper()
            spacer = row[3].strip().upper()
            
            # 定位spacer位置
            try:
                ind = template.index(spacer)
            except ValueError:
                continue

            # 构建输入文件路径
            file_path = os.path.join(file, f"{barcode1}_{barcode2}_output.fastq")
            
            # 读取并处理序列
            sequences = []
            try:
                with open(file_path, "r") as seq_file:
                    for line_num, line in enumerate(seq_file, 1):
                        if line_num % 4 == 2:  # 序列行
                            sequence = line.strip()
                            # 计算截取范围（含边界保护）
                            start_index = max(0, ind - spacer_front)
                            end_index = min(len(sequence), ind + len(spacer) + spacer_after)
                            extracted_sequence = sequence[start_index:end_index]
                            sequences.append(extracted_sequence)
            except FileNotFoundError:
                continue
                
            # 统计序列频率
            sequence_counts = Counter(sequences)
            
            # 输出结果到CSV
            os.makedirs(os.path.join(file, "SurroundingSeq"), exist_ok=True)
            output_file_path = os.path.join(
                file, "SurroundingSeq", 
                os.path.basename(file_path).replace('.fastq', '_counts.csv')
            )
            with open(output_file_path, 'w', newline='') as output_csvfile:
                writer = csv.writer(output_csvfile)
                writer.writerow(['Surrounding Sequence', 'Count'])
                for seq, count in sequence_counts.items():
                    writer.writerow([seq, count])

In [12]:
def NGS(file,purpose,tempalte_CSV,spacer_front=20, spacer_after=20):    
    input_forward  = file + "/F.fq.gz"
    input_reverse = file + "/R.fq.gz"
    output_merged = file + "/merged.fasta"
    merget(input_forward,input_reverse,output_merged)
    input_fastq = output_merged
    output_dir = file + "/fastqc_output"
    output_fastq = file + "/output_trimmed.fastq"
    QC_merger(input_fastq,output_dir,output_fastq)
    input_fastq = output_fastq
    output_directory = file
    classifier = BarcodeClassifier(input_fastq, tempalte_CSV, output_directory)
    classifier.classify_by_barcodes()
    if purpose == 1:
        config_file = tempalte_CSV
        process_sequences(config_file)
    elif purpose == 2:
        config_file = tempalte_CSV
        output_file = file+ "/results.csv"
        process_base_counts(config_file, output_file)
    elif purpose == 3:
        config_file = tempalte_CSV
        process_all_sequences(config_file)
    elif purpose == 4:
        config_file = tempalte_CSV
        process_surrounding_sequences(config_file, spacer_front=20, spacer_after=20)

上述程序直接点击运行即可，不要进行任何修改，如果想修改的话请复制后进行修改
二代测序数据下载后请自行解压，并且把文件名修改为F.fq和R.fq
上传后确定文件夹后，在file中输入上传的路径，注意所有的后续文件都会有在这个文件夹中生成，建议每次都新建一个文件夹
purpose只有三个选项有意义，1,2,3或者4，1是针对spacer定位的区域进行扫描并保存截取部分，2是针对碱基编辑器，3是针对spacer定位的区域进行扫描并将全部序列保存，4是针对spacer定位的区域前后特定bp的序列进行保存

In [13]:
file = "./" # 生成文件的目录
tempalte_CSV = "tempalte_CSV.csv" # tempalte_CSV文件目录
origin_F = './1-LHH23167_L1_1.fq' # 正向原始文件目录
origin_R = './1-LHH23167_L1_2.fq' # 反向原始文件目录

In [14]:
validate_and_update_csv(tempalte_CSV)

检验报告已生成: validation_report.txt
检验完成: 共36行, 通过36行, 失败0行


In [15]:
strict_QC(origin_F, origin_R, file, primer_f="", primer_r="")

null
null


Started analysis of 1-LHH23167_L1_1.fq
Started analysis of 1-LHH23167_L1_2.fq
Approx 5% complete for 1-LHH23167_L1_1.fq
Approx 5% complete for 1-LHH23167_L1_2.fq
Approx 10% complete for 1-LHH23167_L1_1.fq
Approx 10% complete for 1-LHH23167_L1_2.fq
Approx 15% complete for 1-LHH23167_L1_1.fq
Approx 15% complete for 1-LHH23167_L1_2.fq
Approx 20% complete for 1-LHH23167_L1_1.fq
Approx 20% complete for 1-LHH23167_L1_2.fq
Approx 25% complete for 1-LHH23167_L1_1.fq
Approx 25% complete for 1-LHH23167_L1_2.fq
Approx 30% complete for 1-LHH23167_L1_1.fq
Approx 30% complete for 1-LHH23167_L1_2.fq
Approx 35% complete for 1-LHH23167_L1_1.fq
Approx 35% complete for 1-LHH23167_L1_2.fq
Approx 40% complete for 1-LHH23167_L1_1.fq
Approx 40% complete for 1-LHH23167_L1_2.fq
Approx 45% complete for 1-LHH23167_L1_1.fq
Approx 45% complete for 1-LHH23167_L1_2.fq
Approx 50% complete for 1-LHH23167_L1_1.fq
Approx 50% complete for 1-LHH23167_L1_2.fq
Approx 55% complete for 1-LHH23167_L1_1.fq
Approx 55% complete f

Analysis complete for 1-LHH23167_L1_1.fq
Analysis complete for 1-LHH23167_L1_2.fq
This is cutadapt 5.1 with Python 3.12.11
Command line parameters: -a AGATCGGAAGAGCGTCGTGTAGGGAAAGAGTGT -A AGATCGGAAGAGCACACGTCTGAACTCCAGTCAC -o .//F.fq.gz -p .//R.fq.gz --minimum-length 50 --max-n 0 --error-rate 0.1 --json=.//cutadapt.json --cores=14 ./1-LHH23167_L1_1.fq ./1-LHH23167_L1_2.fq
Processing paired-end reads on 14 cores ...

=== Summary ===

Total read pairs processed:         15,485,535
  Read 1 with adapter:                  56,718 (0.4%)
  Read 2 with adapter:                  72,333 (0.5%)

== Read fate breakdown ==
Pairs that were too short:                   0 (0.0%)
Pairs with too many N:                 138,517 (0.9%)
Pairs written (passing filters):    15,347,018 (99.1%)

Total basepairs processed: 4,645,660,500 bp
  Read 1: 2,322,830,250 bp
  Read 2: 2,322,830,250 bp
Total written (filtered):  4,603,240,037 bp (99.1%)
  Read 1: 2,301,647,966 bp
  Read 2: 2,301,592,071 bp

=== First re

Started analysis of F.fq.gz
Started analysis of R.fq.gz
Approx 5% complete for F.fq.gz
Approx 5% complete for R.fq.gz
Approx 10% complete for F.fq.gz
Approx 10% complete for R.fq.gz
Approx 15% complete for F.fq.gz
Approx 15% complete for R.fq.gz
Approx 20% complete for F.fq.gz
Approx 20% complete for R.fq.gz
Approx 25% complete for F.fq.gz
Approx 25% complete for R.fq.gz
Approx 30% complete for F.fq.gz
Approx 30% complete for R.fq.gz
Approx 35% complete for F.fq.gz
Approx 35% complete for R.fq.gz
Approx 40% complete for F.fq.gz
Approx 40% complete for R.fq.gz
Approx 45% complete for F.fq.gz
Approx 45% complete for R.fq.gz
Approx 50% complete for F.fq.gz
Approx 50% complete for R.fq.gz
Approx 55% complete for F.fq.gz
Approx 55% complete for R.fq.gz
Approx 60% complete for F.fq.gz
Approx 60% complete for R.fq.gz
Approx 65% complete for F.fq.gz
Approx 65% complete for R.fq.gz
Approx 70% complete for F.fq.gz
Approx 70% complete for R.fq.gz
Approx 75% complete for F.fq.gz
Approx 75% complet

Analysis complete for F.fq.gz
Analysis complete for R.fq.gz


In [16]:
purpose = 1

In [17]:
NGS(file,purpose,tempalte_CSV)

vsearch v2.30.0_linux_x86_64, 503.5GB RAM, 104 cores
https://github.com/torognes/vsearch

Merging reads 100%
  15347018  Pairs
  15162346  Merged (98.8%)
    184672  Not merged (1.2%)

Pairs that failed merging due to various reasons:
     90249  too few kmers found on same diagonal
        73  multiple potential alignments
     15948  too many differences
     78377  alignment score too low, or score drop too high
        25  overlap too short

Statistics of all reads:
    149.97  Mean read length

Statistics of merged reads:
    214.10  Mean fragment length
     17.17  Standard deviation of fragment length
      0.15  Mean expected error in forward sequences
      0.17  Mean expected error in reverse sequences
      0.13  Mean expected error in merged sequences
      0.06  Mean observed errors in merged region of forward sequences
      0.06  Mean observed errors in merged region of reverse sequences
      0.12  Mean observed errors in merged region
Started analysis of merged.fasta


双端合并完成 → .//merged.fasta
null


Approx 5% complete for merged.fasta
Approx 10% complete for merged.fasta
Approx 15% complete for merged.fasta
Approx 20% complete for merged.fasta
Approx 25% complete for merged.fasta
Approx 30% complete for merged.fasta
Approx 35% complete for merged.fasta
Approx 40% complete for merged.fasta
Approx 45% complete for merged.fasta
Approx 50% complete for merged.fasta
Approx 55% complete for merged.fasta
Approx 60% complete for merged.fasta
Approx 65% complete for merged.fasta
Approx 70% complete for merged.fasta
Approx 75% complete for merged.fasta
Approx 80% complete for merged.fasta
Approx 85% complete for merged.fasta
Approx 90% complete for merged.fasta
Approx 95% complete for merged.fasta


Analysis complete for merged.fasta


TrimmomaticSE: Started with arguments:
 -threads 14 -phred33 .//merged.fasta .//output_trimmed.fastq LEADING:3 TRAILING:3 SLIDINGWINDOW:4:15
Input Reads: 15162346 Surviving: 15160897 (99.99%) Dropped: 1449 (0.01%)
TrimmomaticSE: Completed successfully


详细报告已保存至: ./file_validation_report.txt
开始处理序列...


Processing sequences: 15160897it [14:36, 17293.03it/s]
36it [00:09,  3.71it/s]


In [21]:
purpose = 2

In [22]:
NGS(file,purpose,tempalte_CSV)

文件 .//merged.fasta 已存在，跳过合并操作
文件 .//output_trimmed.fastq 已存在，跳过质量控制和修剪步骤
详细报告已保存至: ./file_validation_report.txt
有5个条形码文件缺失，但未超过阈值 (18.0)，忽略缺失。
输出文件未超过缺失阈值，跳过分类操作。


36it [00:09,  3.91it/s]


In [23]:
purpose = 3

In [24]:
NGS(file,purpose,tempalte_CSV)

文件 .//merged.fasta 已存在，跳过合并操作
文件 .//output_trimmed.fastq 已存在，跳过质量控制和修剪步骤
详细报告已保存至: ./file_validation_report.txt
有5个条形码文件缺失，但未超过阈值 (18.0)，忽略缺失。
输出文件未超过缺失阈值，跳过分类操作。


36it [00:11,  3.18it/s]


In [25]:
purpose = 4
spacer_front=20
spacer_after=20

In [26]:
NGS(file,purpose,tempalte_CSV,spacer_front,spacer_after)

文件 .//merged.fasta 已存在，跳过合并操作
文件 .//output_trimmed.fastq 已存在，跳过质量控制和修剪步骤
详细报告已保存至: ./file_validation_report.txt
有5个条形码文件缺失，但未超过阈值 (18.0)，忽略缺失。
输出文件未超过缺失阈值，跳过分类操作。


Processing spacer surrounding sequences: 36it [00:11,  3.08it/s]
