Skip to content

Commit

Permalink
handles blank sample_name and ensures names are unique.
Browse files Browse the repository at this point in the history
  • Loading branch information
marcmaxson committed Sep 24, 2019
1 parent 076a8c2 commit 217ba6a
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 41 deletions.
70 changes: 36 additions & 34 deletions methylprep/files/sample_sheets.py
Expand Up @@ -265,6 +265,8 @@ def get_samples(self):
return self.__samples

def get_sample(self, sample_name):
""" scans all samples for one matching sample_name, if provided.
If no sample_name, then it returns all samples."""
# this isn't automatically done, but needed here to work.
null = self.get_samples()

Expand All @@ -278,10 +280,43 @@ def get_sample(self, sample_name):

num_candidates = len(candidates)
if num_candidates != 1:
raise ValueError(f'Expected sample with name {sample_name}. Found {num_candidates}')
raise ValueError(f'Expected sample with name `{sample_name}`. Found {num_candidates}')

return candidates[0]

def build_samples(self):
"""Builds Sample objects from the processed sample sheet rows.
Added to Sample as class_method: if the idat file is not in the same folder, (check if exists!) looks recursively for that filename and updates the data_dir for that Sample.
"""

self.__samples = []

logging.info('Building samples')

for _index, row in self.__data_frame.iterrows():
sentrix_id = row['Sentrix_ID'].strip()
sentrix_position = row['Sentrix_Position'].strip()

if not (sentrix_id and sentrix_position):
continue

sample = Sample(
data_dir=self.data_dir, # this assumes the .idat files are in the same folder with the samplesheet.
sentrix_id=sentrix_id,
sentrix_position=sentrix_position,
**row,
)

self.__samples.append(sample)

def contains_column(self, column_name):
""" helper function to determine if sample_sheet contains a specific column, such as GSM_ID.
SampleSheet must already have __data_frame in it."""
if column_name in self.__data_frame:
return True
return False

def read(self, sample_sheet_file):
"""Validates and reads a sample sheet file, building a DataFrame from the parsed rows.
Expand Down Expand Up @@ -354,36 +389,3 @@ def read(self, sample_sheet_file):
dtype=str,
)
reset_file(sample_sheet_file)

def build_samples(self):
"""Builds Sample objects from the processed sample sheet rows.
Added to Sample as class_method: if the idat file is not in the same folder, (check if exists!) looks recursively for that filename and updates the data_dir for that Sample.
"""

self.__samples = []

logging.info('Building samples')

for _index, row in self.__data_frame.iterrows():
sentrix_id = row['Sentrix_ID'].strip()
sentrix_position = row['Sentrix_Position'].strip()

if not (sentrix_id and sentrix_position):
continue

sample = Sample(
data_dir=self.data_dir, # this assumes the .idat files are in the same folder with the samplesheet.
sentrix_id=sentrix_id,
sentrix_position=sentrix_position,
**row,
)

self.__samples.append(sample)

def contains_column(self, column_name):
""" helper function to determine if sample_sheet contains a specific column, such as GSM_ID.
SampleSheet must already have __data_frame in it."""
if column_name in self.__data_frame:
return True
return False
35 changes: 28 additions & 7 deletions methylprep/processing/pipeline.py
Expand Up @@ -3,6 +3,7 @@
import numpy as np
import pandas as pd
from tqdm import tqdm
from collections import Counter
# App
from ..files import Manifest, get_sample_sheet, create_sample_sheet
from ..models import Channel
Expand All @@ -17,7 +18,6 @@
from .preprocess import preprocess_noob
from .raw_dataset import get_raw_datasets


__all__ = ['SampleDataContainer', 'get_manifest', 'run_pipeline', 'consolidate_values_for_sheet']


Expand Down Expand Up @@ -95,7 +95,11 @@ def run_pipeline(data_dir, array_type=None, export=False, manifest_filepath=None
if True, will return a single data frame of m_factor values instead of a list of SampleDataContainer objects.
Format is a "wide matrix": columns contain probes and rows contain samples.
if batch_size is set to more than 200 samples, nothing is returned but, all the files are saved."""
if batch_size is set to more than 200 samples, nothing is returned but, all the files are saved.
Processing note:
The sample_sheet parser will ensure every sample has a unique name and assign one (e.g. Sample1) if missing, or append a number (e.g. _1) if not unique.
This may cause sample_sheets and processed data in dataframes to not match up. Will fix in future version."""
LOGGER.info('Running pipeline in: %s', data_dir)
if sample_name:
LOGGER.info('Sample names: {0}'.format(sample_name))
Expand All @@ -107,12 +111,23 @@ def run_pipeline(data_dir, array_type=None, export=False, manifest_filepath=None
samples = sample_sheet.get_samples()
batches = []
batch = []
sample_id_counter = 1
if batch_size:
if type(batch_size) != int or batch_size < 1:
raise ValueError('batch_size must be an integer greater than 0')
for sample in samples:
if sample_name and sample.name not in sample_name:
continue

# batch uses Sample_Name, so ensure these exist
if sample.name in (None,''):
sample.name = f'Sample_{sample_id_counter}'
sample_id_counter += 1
# and are unique.
if Counter((s.name for s in samples)).get(sample.name) > 1:
sample.name = f'{sample.name}_{sample_id_counter}'
sample_id_counter += 1

if len(batch) < batch_size:
batch.append(sample.name)
else:
Expand All @@ -124,6 +139,16 @@ def run_pipeline(data_dir, array_type=None, export=False, manifest_filepath=None
for sample in samples:
if sample_name and sample.name not in sample_name:
continue

# batch uses Sample_Name, so ensure these exist
if sample.name in (None,''):
sample.name = f'Sample_{sample_id_counter}'
sample_id_counter += 1
# and are unique.
if Counter((s.name for s in samples)).get(sample.name) > 1:
sample.name = f'{sample.name}_{sample_id_counter}'
sample_id_counter += 1

batch.append(sample.name)
batches.append(batch)

Expand Down Expand Up @@ -164,11 +189,7 @@ def run_pipeline(data_dir, array_type=None, export=False, manifest_filepath=None
pkl_name = f'm_values_{batch_num}.pkl'
pd.to_pickle(df, pkl_name)
LOGGER.info(f"saved {pkl_name}")
m_value_dfs.append(df)
if export:
# not using LOGGER because this should appear regardless of verbose flag.
# print(f"[!] Exported results (csv) to: {export_paths}")
# requires --verbose too.
LOGGER.info(f"[!] Exported results (csv) to: {export_paths}")

# consolidating data_containers this will break with really large sample sets, so skip here.
Expand All @@ -178,7 +199,7 @@ def run_pipeline(data_dir, array_type=None, export=False, manifest_filepath=None

# batch processing done; consolidate and return data. This uses much more memory, but not called if in batch mode.
if batch_size and batch_size >= 200:
print("Because the batch size was >100 samples, files are saved but no data objects are returned.")
print("Because the batch size was >200 samples, files are saved but no data objects are returned.")
return
elif betas:
return consolidate_values_for_sheet(data_containers, postprocess_func_colname='beta_value')
Expand Down

0 comments on commit 217ba6a

Please sign in to comment.