This repository has been archived by the owner on Nov 9, 2023. It is now read-only.
/
add_qiime_labels.py
executable file
·144 lines (112 loc) · 5.12 KB
/
add_qiime_labels.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python
from __future__ import division
__author__ = "William Walters"
__copyright__ = "Copyright 2011, The QIIME Project"
__credits__ = ["William Walters"]
__license__ = "GPL"
__version__ = "1.8.0"
__maintainer__ = "William Walters"
__email__ = "William.A.Walters@colorado.edu"
from os.path import join, basename
from string import letters, digits
from cogent.parse.fasta import MinimalFastaParser
from qiime.util import duplicates_indices
from qiime.check_id_map import process_id_map
def add_qiime_labels(mapping_f,
fasta_dir,
filename_column,
output_dir=".",
count_start=0):
""" Main function for combining fasta files, writing valid QIIME labels
mapping_f: open file object of the metadata mapping file
fasta_dir: Directory of fasta files to combine into a single file
filename_column: Column of metadata mapping file containing fasta filenames
output_dir: Directory to write output combined file to
count_start: Number to start enumeration of fasta labels with
"""
headers, mapping_data, run_description, errors, warnings= \
process_id_map(mapping_f, has_barcodes=False, \
disable_primer_check=True, added_demultiplex_field=None,
variable_len_barcodes=False)
fasta_name_to_sample_id = check_mapping_data(mapping_data, headers,
filename_column)
fasta_files = get_fasta_fps(fasta_dir, fasta_name_to_sample_id.keys())
write_combined_fasta(fasta_name_to_sample_id, fasta_files, output_dir,
counter=count_start)
def check_mapping_data(mapping_data, headers, filename_column):
""" Checks mapping data for MIMARKS SampleIDs, unique IDs, fasta file names
Also returns a dict of fasta file name: SampleID
mapping_data: list of lines of data from mapping file
headers: list of header strings
filename_column: Column of metadata mapping file containing fasta filenames
"""
# First make sure there is a SampleID and filename_column present
try:
sample_id_ix = headers.index("SampleID")
except ValueError:
raise ValueError,("SampleID column not found in mapping file, please "+\
"check mapping file with check_id_map.py")
try:
filename_col_ix = headers.index(filename_column)
except ValueError:
raise ValueError,("Specified column %s not found in mapping file." %\
filename_column)
valid_mimarks = letters + digits + "."
fasta_name_to_sample_id = {}
fasta_names = []
sample_ids = []
for line in mapping_data:
try:
fasta_name_to_sample_id[basename(line[filename_col_ix].strip())] =\
line[sample_id_ix]
except IndexError:
raise IndexError,("Missing filename column data in line %s " %\
line)
for curr_char in line[sample_id_ix]:
if curr_char not in valid_mimarks:
raise ValueError,("Found invalid character in line: %s\n" %\
line + "SampleIDs must be alphanumeric and . characters "+\
"only")
sample_ids.append(line[sample_id_ix].strip())
fasta_names.append(line[filename_col_ix].strip())
fasta_name_dups = duplicates_indices(fasta_names)
if fasta_name_dups:
raise ValueError,("Found duplicate fasta names: %s" %\
"\t".join([fasta_name for fasta_name in fasta_name_dups.keys()]))
sample_id_dups = duplicates_indices(sample_ids)
if sample_id_dups:
raise ValueError,("Found duplicate SampleID names: %s" %\
"\t".join([sample_id for sample_id in sample_id_dups.keys()]))
return fasta_name_to_sample_id
def get_fasta_fps(fasta_dir, fasta_files):
""" Returns list of fasta filepaths (only .fna, .fasta, and .fa files)
fasta_dir: Directory of fasta files to check
fasta_files: list of fasta filenames to open
"""
fasta_filepaths = []
for curr_file in fasta_files:
curr_fp = join(fasta_dir, curr_file)
try:
file_test = open(curr_fp, "U")
file_test.close()
except IOError:
raise IOError,("Unable to open %s" % curr_fp)
fasta_filepaths.append(curr_fp)
return fasta_filepaths
def write_combined_fasta(fasta_name_to_sample_id,
fasta_files,
output_dir=".",
counter=0):
""" Writes combined, enumerated fasta file
fasta_name_to_sample_id: dict of fasta file name to SampleID
fasta_files: list of filepaths to iterate through
output_dir: output directory to write combined file to
counter: Starting number to enumerate sequences with
"""
combined_file_out = open(join(output_dir + "/", "combined_seqs.fna"), "w")
for curr_fasta in fasta_files:
for label, seq in MinimalFastaParser(open(curr_fasta, "U")):
combined_file_out.write(">%s_%d %s\n" %\
(fasta_name_to_sample_id[basename(curr_fasta)], counter, label))
combined_file_out.write("%s\n" % seq)
counter += 1