Skip to content

Commit

Permalink
Fixes_93, Updated to latest changes in metapool (#103)
Browse files Browse the repository at this point in the history
* Fixes_93, Updated to latest changes in metapool

* Update sequence_processing_pipeline/Pipeline.py

Co-authored-by: Daniel McDonald <d3mcdonald@eng.ucsd.edu>

* Update sequence_processing_pipeline/Pipeline.py

Co-authored-by: Daniel McDonald <d3mcdonald@eng.ucsd.edu>

* Update sequence_processing_pipeline/Pipeline.py

Co-authored-by: Daniel McDonald <d3mcdonald@eng.ucsd.edu>

* bugfix

* _validate_mapping_file(), is_mapping_file() rewritten

_validate_mapping_file() and is_mapping_file() rewritten to be more like
their sample-sheet counterparts. Fixes based on feedback.

* Updates based on feedback

* Updates based on feedback

* Updates based on feedback

---------

Co-authored-by: Daniel McDonald <d3mcdonald@eng.ucsd.edu>
  • Loading branch information
charles-cowart and wasade committed Sep 13, 2023
1 parent 85d1149 commit ae0ec58
Show file tree
Hide file tree
Showing 6 changed files with 2,869 additions and 2,845 deletions.
170 changes: 96 additions & 74 deletions sequence_processing_pipeline/Pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from re import sub, findall, search
import sample_sheet
import pandas as pd
from collections import defaultdict


logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)
Expand All @@ -33,6 +34,20 @@ class Pipeline:
None, 32.5, -117.25, 'control blank', 'metagenome', 256318,
None, 'adaptation', 'TRUE', 'UCSD', 'FALSE']

mapping_file_columns = {'barcode', 'library_construction_protocol',
'mastermix_lot', 'sample_plate',
'center_project_name', 'instrument_model',
'tm1000_8_tool', 'well_id', 'tm50_8_tool',
'well_description', 'run_prefix', 'run_date',
'center_name', 'tm300_8_tool', 'extraction_robot',
'experiment_design_description', 'platform',
'water_lot', 'project_name', 'pcr_primers',
'sequencing_meth', 'plating', 'orig_name',
'linker', 'runid', 'target_subfragment', 'primer',
'primer_plate', 'sample_name', 'run_center',
'primer_date', 'target_gene', 'processing_robot',
'extractionkit_lot', 'qiita_prep_id'}

METAGENOMIC_PTYPE = 'Metagenomic'
METATRANSCRIPTOMIC_PTYPE = 'Metatranscriptomic'
AMPLICON_PTYPE = 'Amplicon'
Expand Down Expand Up @@ -215,15 +230,20 @@ def add(self, job):
def _validate_sample_sheet(self, sample_sheet_path):
"""
Performs additional validation for sample-sheet on top of metapool.
:return: If successful, an empty list of strings and a valid
sample-sheet. If unsuccessful, a list of warning and error
messages and None.
:return: If successful, a valid sample-sheet. Raises descriptive
PipelineError() on all failures. Warning messages are
appended to self.warnings.
"""
# validate the sample-sheet using metapool package.
sheet = KLSampleSheet(sample_sheet_path)
msgs, val_sheet = quiet_validate_and_scrub_sample_sheet(sheet)

if val_sheet is None:
# msgs will contain both ErrorMessages and WarningMessages.
# we want to identify if there are any messages and if so, create
# a separate list for them. An Error should only be raised on
# Error messages and in this case, all error messages should be
# concatenated.
errors = [x for x in msgs if isinstance(x, ErrorMessage)]

if errors:
Expand Down Expand Up @@ -276,6 +296,68 @@ def _validate_sample_sheet(self, sample_sheet_path):
isinstance(x, WarningMessage)]
return val_sheet

def _validate_mapping_file(self, mapping_file_path):
"""
Performs validation for mapping-files.
:return: If successful, a valid mapping-file. Raises descriptive
PipelineError() on all failures. Warning messages are
appended to self.warnings.
"""
try:
df = pd.read_csv(mapping_file_path, delimiter='\t', dtype=str)
except pd.errors.ParserError:
raise PipelineError('Cannot parse mapping-file.')

# first, detect any duplicate column names, regardless of any mixed-
# capitalization, and notify the user.
d = defaultdict(list)
for column in df.columns:
d[column.lower()].append(column)

# generate a list of all unique column names that appear more than
# once, regardless of capitalization. Then generate a list containing
# lists of duplicate column names in their original case to report to
# the user.
dupes = [d[column] for column in
[col for col in d.keys() if len(d[col]) > 1]]

if dupes:
# column-names are case-insensitive, and must be unique.
# return groups of duplicate column names (differentiated only by
# a different mixed-case) to the user.
raise PipelineError("Mapping-file contains duplicate columns: "
"%s" % ', '.join([str(tpl) for tpl in dupes]))

# if columns are unique, determine if any columns are missing and/or
# unexpected and notify the user.
obs = set(df.columns.str.lower())

# Note that Pipeline.mapping_file_columns is expected to be all lower-
# case.

# if an expected column is missing in observed, that is an error.
# Note that since a mapping-file is just a DataFrame, there isn't a
# distinction between a mapping-file that is missing n columns and has
# n additional columns and a dataframe that is not a mapping-file at
# all. This method assumes an external test has determined that the
# file is a mapping-file already.
missing_columns = Pipeline.mapping_file_columns - obs
if missing_columns:
raise PipelineError("Mapping-file is missing columns: "
"%s" % ', '.join(missing_columns))

# if an observed column is unexpected, that is a warning.
unexpected_columns = obs - Pipeline.mapping_file_columns
if unexpected_columns:
self.warnings += [("Mapping-file contains additional columns: "
"%s" % ', '.join(unexpected_columns))]

# rename all columns to their lower-case versions.
# we will want to return this version to the user.
df.columns = df.columns.str.lower()

return df

def generate_sample_info_files(self, addl_info=None):
"""
Generate sample-information files in self.output_path.
Expand Down Expand Up @@ -444,23 +526,20 @@ def get_project_info(self, short_names=False):
@staticmethod
def is_mapping_file(mapping_file_path):
'''
Returns true if file is a mapping file.
Returns True if file follows basic mapping-file format.
'''
try:
Pipeline._validate_mapping_file(mapping_file_path)
return True
except PipelineError as e:
# we want to distinguish between a file that is clearly not a
# mapping file e.g. sample-file, and a mapping-file that is perhaps
# missing a column or has duplicate sample-names.
messages = ['duplicate sample-names detected:', 'missing columns:',
'Column names are case-insensitive.']

for message in messages:
if str(e).startswith(message):
return True
df = pd.read_csv(mapping_file_path, delimiter='\t', dtype=str)
except pd.errors.ParserError:
return False

return False
# if the expected subset of columns required for a mapping-file
# are present, then consider this a mapping file, even if it's
# an invalid one.
exp_columns = frozenset({'barcode', 'tm1000_8_tool',
'extraction_robot', 'pcr_primers'})

return set(df.columns.str.lower()).issuperset(exp_columns)

@staticmethod
def is_sample_sheet(sample_sheet_path):
Expand All @@ -483,63 +562,6 @@ def is_sample_sheet(sample_sheet_path):

return False

@staticmethod
def _validate_mapping_file(mapping_file_path):
exp = {'barcode', 'library_construction_protocol', 'mastermix_lot',
'sample_plate', 'center_project_name', 'instrument_model',
'tm1000_8_tool', 'well_id', 'tm50_8_tool', 'well_description',
'run_prefix', 'run_date', 'center_name', 'tm300_8_tool',
'extraction_robot', 'experiment_design_description',
'platform', 'water_lot', 'project_name', 'pcr_primers',
'sequencing_meth', 'plating', 'orig_name', 'linker', 'runid',
'target_subfragment', 'primer', 'primer_plate', 'sample_name',
'run_center', 'primer_date', 'target_gene', 'processing_robot',
'extractionkit_lot'}

try:
df = pd.read_csv(mapping_file_path, delimiter='\t', dtype=str)

columns = [x.lower() for x in list(df.columns)]
if len(set(columns)) < len(columns):
raise PipelineError("Column names are case-insensitive. You "
"have one or more duplicate columns in "
"your mapping-file.")

# rename all columns to their lower-case versions.
# we will want to return this version to the user.
df.columns = df.columns.str.lower()

# if the two sets of headers are equal, verify there are no
# duplicate sample-names and then return the dataframe and no
# error message.
if len(exp - set(df.columns)) == 0:
# count the number of occurances of each sample-name.
dupes = df['sample_name'].value_counts()
# filter for duplicate sample-names
dupes = dupes.loc[lambda x: x > 1]
dupes = dupes.index.tolist()

if dupes:
msg = ('duplicate sample-names detected: '
'%s' % ', '.join(dupes))
else:
return df
else:
if exp - set(df.columns) == exp:
# since pandas.read_csv() will successfully open a sample-
# sheet, if none of the expected columns exist then assume
# that this file is not a mapping file.
msg = "not a mapping-file"
else:
msg = 'missing columns: %s' % ', '.join(
exp - set(df.columns))
except pd.errors.ParserError:
# ignore parser errors as they obviously prove this is not a
# valid mapping file.
msg = 'could not parse file "%s"' % mapping_file_path

raise PipelineError(msg)

def _generate_dummy_sample_sheet(self, first_read, last_read,
indexed_reads, dummy_sample_id):
# create object and initialize header
Expand Down
Loading

0 comments on commit ae0ec58

Please sign in to comment.