In [None]:
from pyspark import SparkContext, SparkConf
import subprocess
import os

In [None]:
bam_dir = "/data/haocheng/data/bam/GM"  # BAM 文件目录
bam_files = os.listdir(bam_dir)  # 列出所有 BAM 文件
chromosomes = [f"chr{i}" for i in range(1, 23)] + ["chrX", "chrY"]  # 选择的染色体

In [None]:
# 函数：计算 BAM 文件的覆盖度
def get_coverage(bam_file):
    cmd = ["samtools", "depth", bam_file]
    result = subprocess.run(cmd, capture_output=True, text=True)
    return result.stdout

In [None]:
# 创建 SparkSession
spark = SparkSession.builder \
    .appName("Normalization") \
    .getOrCreate()

In [None]:
# 初始化 m 和 n
m = 2
n = 1
bam_file1 = os.path.join(bam_dir, bam_files[0])
bam_file2 = os.path.join(bam_dir, bam_files[1])

In [None]:
coverage1 = get_coverage(bam_file1)
coverage2 = get_coverage(bam_file2)

In [None]:
# 将覆盖度数据转换为 DataFrame
def create_df(coverage_data):
    data = [line.split() for line in coverage_data.strip().split("\n")]
    return spark.createDataFrame(data, schema=["chrom", "position", "coverage"]).withColumn("coverage", F.col("coverage").cast("int"))

In [None]:
coverage1_df = create_df(coverage1)
coverage2_df = create_df(coverage2)

In [None]:
import subprocess
import os
import pandas as pd

# 函数：计算 BAM 文件的覆盖度并返回结果
def get_coverage(bam_file):
    cmd = ["samtools", "depth", bam_file]
    result = subprocess.run(cmd, capture_output=True, text=True)
    return result.stdout.strip().splitlines()

# 获取目录下的所有 BAM 文件
bam_dir = "/data/haocheng/data/bam/GM/"
bam_files = [f for f in os.listdir(bam_dir) if f.endswith('.bam')]
bam_files = sorted(bam_files)  # 确保文件按字母顺序排序

# 初始化 m 和 n
m = 2
n = 1

# 用于存储覆盖度的 DataFrame
coverage_dfs = []

# 处理前两个 BAM 文件
for bam_file in bam_files[:2]:
    bam_path = os.path.join(bam_dir, bam_file)
    coverage = get_coverage(bam_path)
    
    # 将覆盖度数据添加到 DataFrame
    df = pd.DataFrame([line.split() for line in coverage], columns=["chrom", "position", "coverage"])
    df["coverage"] = df["coverage"].astype(int)
    coverage_dfs.append(df)

# 计算每个覆盖度的最大值
max_cov1 = coverage_dfs[0]["coverage"].max()
max_cov2 = coverage_dfs[1]["coverage"].max()

# 合并两个 DataFrame
combined_coverage = pd.concat(coverage_dfs)
combined_coverage = combined_coverage.groupby(["chrom", "position"], as_index=False).sum()

# 归一化
norm_max = max(max_cov1, max_cov2)
combined_coverage['normalized_coverage'] = (combined_coverage['coverage'] / max_cov1 * norm_max * (n/m)) + \
                                            (combined_coverage['coverage'] / max_cov2 * norm_max * (1/m))

# 输出结果
output_file = "/data/haocheng/data/bed/GM/normalized_coverage.bedgraph"
combined_coverage[['chrom', 'position', 'normalized_coverage']].to_csv(output_file, sep='\t', index=False, header=False)

print(f"所有 BAM 文件处理完成，结果将输出到 {output_file}")


In [1]:
import subprocess
import os
import numpy as np

# 函数：计算 BAM 文件的覆盖度并返回结果
def get_coverage(bam_file):
    cmd = ["samtools", "depth", bam_file]
    result = subprocess.run(cmd, capture_output=True, text=True)
    return result.stdout.strip().splitlines()

# 获取目录下的所有 BAM 文件
bam_dir = "/data/haocheng/data/bam/GM/"
bam_files = [f for f in os.listdir(bam_dir) if f.endswith('.bam')]
bam_files = sorted(bam_files)  # 确保文件按字母顺序排序

# 初始化 m 和 n
m = 2
n = 1

# 用于存储覆盖度的字典
coverage_dict = {}

# 处理前两个 BAM 文件
for bam_file in bam_files[:2]:
    bam_path = os.path.join(bam_dir, bam_file)
    coverage = get_coverage(bam_path)
    
    for line in coverage:
        chrom, position, cov = line.split()
        position = int(position)
        cov = int(cov)

        # 使用字典来存储覆盖度数据
        if (chrom, position) not in coverage_dict:
            coverage_dict[(chrom, position)] = [0, 0]  # [cov1, cov2]
        
        # 根据文件索引更新覆盖度
        if bam_file == bam_files[0]:
            coverage_dict[(chrom, position)][0] += cov
        else:
            coverage_dict[(chrom, position)][1] += cov

# 计算最大覆盖度
max_cov1 = max(cov[0] for cov in coverage_dict.values())
max_cov2 = max(cov[1] for cov in coverage_dict.values())

# 归一化并输出结果
norm_max = max(max_cov1, max_cov2)

output_lines = []
for (chrom, position), cov in coverage_dict.items():
    norm_cov1 = (cov[0] / max_cov1 * norm_max * (n / m)) if max_cov1 > 0 else 0
    norm_cov2 = (cov[1] / max_cov2 * norm_max * (1 / m)) if max_cov2 > 0 else 0
    norm_cov = norm_cov1 + norm_cov2
    output_lines.append(f"{chrom}\t{position}\t{norm_cov}")

# 将结果写入文件
output_file = "/data/haocheng/data/bed/GM/normalized_coverage.bedgraph"
with open(output_file, 'w') as f:
    f.write("\n".join(output_lines))

print(f"所有 BAM 文件处理完成，结果将输出到 {output_file}")


In [None]:

# 计算最大覆盖度
max_cov1 = cov1_rdd.map(lambda x: x[1]).max()
max_cov2 = cov2_rdd.map(lambda x: x[1]).max()

# 归一化
norm_max = max(max_cov1, max_cov2)
m = 2  # 根据你的需求设置
n = 1  # 根据你的需求设置

def normalize_coverage(record):
    key, total_cov = record
    chrom, position = key
    norm1 = (total_cov / max_cov1) * norm_max * (n / m)
    norm2 = (total_cov / max_cov2) * norm_max * (1 / m)
    norm_cov = norm1 + norm2
    return (chrom, position, norm_cov)

normalized_rdd = combined_rdd.map(normalize_coverage)

In [None]:
# 合并两个 DataFrame
combined_df = coverage1_df.join(coverage2_df, on=["chrom", "position"], how="outer").fillna(0)
# 计算最大覆盖度


In [None]:
# 计算最大覆盖度
max_cov1 = combined_df.agg(F.max("coverage")).first()[0]
max_cov2 = combined_df.agg(F.max("coverage")).first()[0]

In [None]:
# 归一化
norm_max = max(max_cov1, max_cov2)
m = 2  # 根据你的需求设置
n = 1  # 根据你的需求设置
normalized_df = combined_df.withColumn(
    "normalized_coverage",
    (F.col("coverage1") / max_cov1 * norm_max * (n/m)) +
    (F.col("coverage2") / max_cov2 * norm_max * (1/m))
)


In [None]:
# 收集结果到本地
result = normalized_df.select("chrom", "position", "normalized_coverage").rdd.map(lambda row: f"{row.chrom}\t{row.position}\t{row.normalized_coverage}").collect()

In [None]:
# 输出结果
normalized_df.select("chrom", "position", "normalized_coverage") \
    .write.csv("path/to/output/normalized_coverage.csv", sep="\t", header=True)


In [None]:

# 关闭 SparkSession
spark.stop()


In [None]:
while bam_files:
    next_bam = os.path.join(bam_dir, bam_files[0])
    print(f"现在正在处理 BAM 文件: {next_bam}")

    # 计算新的覆盖度
    coverage2 = get_coverage(next_bam)

    # 归一化并处理结果
    print("归一化覆盖度...")
    result = normalize_with_spark(result, coverage2, 1000, m, n)

    # 从数组中移除已使用的 BAM 文件
    bam_files = bam_files[1:]  # 删除第一个元素
    m += 1
    n += 1


In [None]:

# 将最终结果写入文件
output_file = "/data/haocheng/data/bed/GM/GM_normalized_coverage.bedgraph"

print(f"所有 BAM 文件处理完成，结果将输出到 {output_file}")

if result:
    with open(output_file, 'w') as f:
        f.write("\n".join(result))
    print("结果已成功写入文件。")
else:
    print("结果为空，未写入文件。")