-
Notifications
You must be signed in to change notification settings - Fork 0
/
get_sites.py
77 lines (63 loc) · 3.44 KB
/
get_sites.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from Bio import SeqIO
import os
import argparse
import gzip
import subprocess
def get_args():
parser = argparse.ArgumentParser(description="Use to extract AT/CG(nonCpG) & CpG sites.")
input_group = parser.add_argument_group("Input")
input_group.add_argument("-g", "--genome", required=True, help="The path of genome sequence file.")
input_group.add_argument("-c", "--chr", required=True, help="The target chromosome to extract.")
option_group = parser.add_argument_group("Optional parameters")
option_group.add_argument("--onlycpg", action="store_true", help="Only extract CpG sites.")
option_group.add_argument("--merge", action="store_true", help="Use bedtools to merge bed files.")
option_group.add_argument("--gzip", action="store_true", help="Use gzip to compress output.")
option_group.add_argument("--nosoftmask", action="store_true", help="Remove soft-masked where repeats are in lower-case text.")
output_group = parser.add_argument_group("Output")
output_group.add_argument("-n", "--name", help="The name of output file.")
output_group.add_argument("-p", "--path", help="The path of output file.", default='./')
args = parser.parse_args()
if not args.name:
args.name = os.path.splitext(os.path.basename(args.genome))[0]
return args
def process_command(command):
result = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
print(f"Error when executing command: {command}\n{result.stderr.decode()}")
def main():
args = get_args()
with gzip.open(args.genome, "rt") if args.genome.endswith(".gz") else open(args.genome, "r") as genome_file:
chr_num = "chr" + args.chr.replace("chr", "")
cpg_bed_name = os.path.join(args.path, f"{args.name}_{chr_num}_CpGsites.bed.tmp")
atcg_bed_name = os.path.join(args.path, f"{args.name}_{chr_num}_ATnonCpGsites.bed.tmp")
with open(cpg_bed_name, "w") as cpg_bed, open(atcg_bed_name, "w") as atcg_bed:
for record in SeqIO.parse(genome_file, "fasta"):
if record.id != args.chr:
continue
if not args.nosoftmask:
seq = str(record.seq).upper()
else:
seq = str(record.seq)
for i, nuc in enumerate(seq):
if nuc == "N":
continue
if nuc in ["A", "T"] and not args.onlycpg:
atcg_bed.write(f"{chr_num}\t{i}\t{i+1}\n")
elif nuc in ["C", "G"]:
if (nuc == "C" and i + 1 < len(seq) and seq[i + 1] == "G") or (nuc == "G" and i > 0 and seq[i - 1] == "C"):
cpg_bed.write(f"{chr_num}\t{i}\t{i+1}\n")
elif not args.onlycpg:
atcg_bed.write(f"{chr_num}\t{i}\t{i+1}\n")
for bed_file in [cpg_bed_name, atcg_bed_name] if not args.onlycpg else [cpg_bed_name]:
outfile = bed_file.replace(".tmp", "")
if args.merge:
command = f"bedtools merge -i {bed_file} > {outfile} && rm {bed_file}"
else:
command = f"mv {bed_file} {outfile}"
process_command(command)
if args.gzip:
process_command(f"gzip -f {outfile}")
if args.onlycpg:
os.remove(atcg_bed_name)
if __name__ == "__main__":
main()