Skip to content

Commit

Permalink
New routine: snappy pull-processed-data (#115)
Browse files Browse the repository at this point in the history
* Implemented new routine: `snappy pull-processed-data`.

* Pinned setuptools version, v58 - later versions break support for `use_2to3`. Affects test requirement only.
  • Loading branch information
eudesbarbosa committed Aug 18, 2022
1 parent 467ab02 commit e2d9c14
Show file tree
Hide file tree
Showing 12 changed files with 1,446 additions and 374 deletions.
13 changes: 13 additions & 0 deletions cubi_tk/snappy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from .itransfer_variant_calling import setup_argparse as setup_argparse_itransfer_variant_calling
from .itransfer_step import setup_argparse as setup_argparse_itransfer_step
from .pull_sheets import setup_argparse as setup_argparse_pull_sheets
from .pull_processed_data import setup_argparse as setup_argparse_pull_processed_data
from .pull_raw_data import setup_argparse as setup_argparse_pull_raw_data
from .kickoff import setup_argparse as setup_argparse_kickoff
from .varfish_upload import setup_argparse as setup_argparse_varfish_upload
Expand All @@ -54,6 +55,7 @@ def setup_argparse(parser: argparse.ArgumentParser) -> None:
help="Check consistency within local sample sheet and between local sheets and files",
)
)

setup_argparse_check_remote(
subparsers.add_parser(
"check-remote", help="Check consistency within remote sample sheet and files"
Expand All @@ -63,17 +65,20 @@ def setup_argparse(parser: argparse.ArgumentParser) -> None:
setup_argparse_itransfer_raw_data(
subparsers.add_parser("itransfer-raw-data", help="Transfer FASTQs into iRODS landing zone")
)

setup_argparse_itransfer_ngs_mapping(
subparsers.add_parser(
"itransfer-ngs-mapping", help="Transfer ngs_mapping results into iRODS landing zone"
)
)

setup_argparse_itransfer_variant_calling(
subparsers.add_parser(
"itransfer-variant-calling",
help="Transfer variant_calling results into iRODS landing zone",
)
)

setup_argparse_itransfer_step(
subparsers.add_parser(
"itransfer-step", help="Transfer snappy step results into iRODS landing zone"
Expand All @@ -83,6 +88,14 @@ def setup_argparse(parser: argparse.ArgumentParser) -> None:
setup_argparse_pull_sheets(
subparsers.add_parser("pull-sheets", help="Pull SODAR sample sheets into biomedsheet")
)

setup_argparse_pull_processed_data(
subparsers.add_parser(
"pull-processed-data",
help="Pull processed data from SODAR to specified output directory",
)
)

setup_argparse_pull_raw_data(
subparsers.add_parser(
"pull-raw-data", help="Pull raw data from SODAR to SNAPPY dataset raw data directory"
Expand Down
176 changes: 7 additions & 169 deletions cubi_tk/snappy/check_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,15 @@
from collections import defaultdict
import os
from pathlib import Path
import re
from types import SimpleNamespace
import typing

import attr
from biomedsheets import shortcuts
from logzero import logger
from sodar_cli import api

from .common import get_biomedsheet_path, load_sheet_tsv
from ..common import load_toml_config
from ..irods.check import IrodsCheckCommand, HASH_SCHEMES


#: Default hash scheme. Although iRODS provides alternatives, the whole of `snappy` pipeline uses MD5.
DEFAULT_HASH_SCHEME = "MD5"


@attr.s(frozen=True, auto_attribs=True)
class IrodsDataObject:
"""iRODS data object - simplified version of data provided in iRODS Collections."""

file_name: str
irods_path: str
file_md5sum: str
replicas_md5sum: list
from .retrieve_irods_collection import RetrieveIrodsCollection, DEFAULT_HASH_SCHEME


class FindFilesCommon:
Expand Down Expand Up @@ -236,150 +219,6 @@ def run(self):
return step_to_file_structure_dict


class FindRemoteFiles(IrodsCheckCommand, FindFilesCommon):
"""Class finds and lists remote files associated with samples."""

def __init__(self, args, sheet, sodar_url, sodar_api_token, assay_uuid, project_uuid):
"""Constructor.
:param sodar_url: SODAR url.
:type sodar_url: str
:param sodar_api_token: SODAR API token.
:type sodar_api_token: str
:param assay_uuid: Assay UUID.
:type assay_uuid: str
:param project_uuid: Project UUID.
:type project_uuid: str
"""
IrodsCheckCommand.__init__(self, args=args)
FindFilesCommon.__init__(self, sheet=sheet)
self.sodar_url = sodar_url
self.sodar_api_token = sodar_api_token
self.assay_uuid = assay_uuid
self.project_uuid = project_uuid

def perform(self):
"""Perform class routines.
:return: Returns dictionary of dictionaries: key: library name (e.g., 'P001-N1-DNA1-WES1'); value: dictionary
with list of files (values) per remote directory (key).
"""
logger.info("Starting remote files search ...")

# Get assay iRODS path
assay_path = self.get_assay_irods_path(assay_uuid=self.assay_uuid)

# Get iRODS collection
irods_collection_dict = self.retrieve_irods_data_objects(irods_path=assay_path)

logger.info("... done with remote files search.")
return irods_collection_dict

def get_assay_irods_path(self, assay_uuid=None):
"""Get Assay iRODS path.
:param assay_uuid: Assay UUID.
:type assay_uuid: str [optional]
:return: Returns Assay iRODS path - extracted via SODAR API.
"""
investigation = api.samplesheet.retrieve(
sodar_url=self.sodar_url,
sodar_api_token=self.sodar_api_token,
project_uuid=self.project_uuid,
)
for study in investigation.studies.values():
if assay_uuid:
logger.info(f"Using provided Assay UUID: {assay_uuid}")
try:
assay = study.assays[assay_uuid]
return assay.irods_path
except KeyError:
logger.error("Provided Assay UUID is not present in the Study.")
raise
else:
# Assumption: there is only one assay per study for `snappy` projects.
# If multi-assay project it will only consider the first one and throw a warning.
assays_ = list(study.assays.keys())
if len(assays_) > 1:
self.multi_assay_warning(assays=assays_)
for _assay_uuid in assays_:
assay = study.assays[_assay_uuid]
return assay.irods_path
return None

@staticmethod
def multi_assay_warning(assays):
"""Display warning for multi-assay study.
:param assays: Assays UUIDs as found in Studies.
:type assays: list
"""
multi_assay_str = "\n".join(assays)
logger.warn(
f"Project contains multiple Assays, will only consider UUID '{assays[0]}'.\n"
f"All available UUIDs:\n{multi_assay_str}"
)

def retrieve_irods_data_objects(self, irods_path):
"""Retrieve data objects from iRODS.
:param irods_path:
:return:
"""
# Connect to iRODS
with self._get_irods_sessions() as irods_sessions:
try:
root_coll = irods_sessions[0].collections.get(irods_path)
s_char = "s" if len(irods_sessions) != 1 else ""
logger.info(f"{len(irods_sessions)} iRODS connection{s_char} initialized")
except Exception as e:
logger.error("Failed to retrieve iRODS path: %s", self.get_irods_error(e))
raise

# Get files and run checks
logger.info("Querying for data objects")
irods_collection = self.get_data_objs(root_coll)
return self.parse_irods_collection(irods_collection=irods_collection)

@staticmethod
def parse_irods_collection(irods_collection):
"""
:param irods_collection: iRODS collection.
:type irods_collection: dict
:return: Returns dictionary version of iRODS collection information. Key: File name in iRODS (str);
Value: list of IrodsDataObject (attributes: 'file_name', 'irods_path', 'file_md5sum', 'replicas_md5sum').
"""
# Initialise variables
output_dict = defaultdict(list)
checksums = irods_collection["checksums"]

# Extract relevant info from iRODS collection: file and replicates MD5SUM
for data_obj in irods_collection["files"]:
chk_obj = checksums.get(data_obj.path + "." + DEFAULT_HASH_SCHEME.lower())
if not chk_obj:
logger.error(f"No checksum file for: {data_obj.path}")
continue
with chk_obj.open("r") as f:
file_sum = re.search(
HASH_SCHEMES[DEFAULT_HASH_SCHEME]["regex"], f.read().decode("utf-8")
).group(0)
output_dict[data_obj.name].append(
IrodsDataObject(
file_name=data_obj.name,
irods_path=data_obj.path,
file_md5sum=file_sum,
replicas_md5sum=[replica.checksum for replica in data_obj.replicas],
)
)
return output_dict


class Checker:
"""Class with common checker methods."""

Expand Down Expand Up @@ -493,22 +332,22 @@ def compare_md5_files(remote_dict, in_both_set):
local_md5 = local_md5.split(" ")[0]
# Compare to remote MD5
for irods_dat in remote_dict.get(original_file_name):
if local_md5 != irods_dat.file_md5sum:
if local_md5 != irods_dat.FILE_MD5SUM:
different_md5_list.append((md5_file.replace(".md5", ""), irods_dat.irods_path))
else:
same_md5_list.append(md5_file.replace(".md5", ""))
# BONUS - check remote replicas
if not all(
(
replica_md5 == irods_dat.file_md5sum
for replica_md5 in irods_dat.replicas_md5sum
replica_md5 == irods_dat.FILE_MD5SUM
for replica_md5 in irods_dat.REPLICAS_MD5SUM
)
):
logger.error(
f"iRODS metadata checksum not consistent with checksum file...\n"
f"File: {irods_dat.irods_path}\n"
f"File checksum: {irods_dat.file_md5sum}\n"
f"Metadata checksum: {', '.join(irods_dat.replicas_md5sum)}\n"
f"File checksum: {irods_dat.FILE_MD5SUM}\n"
f"Metadata checksum: {', '.join(irods_dat.REPLICAS_MD5SUM)}\n"
)

return same_md5_list, different_md5_list
Expand Down Expand Up @@ -846,9 +685,8 @@ def execute(self) -> typing.Optional[int]:

# Find all remote files (iRODS)
pseudo_args = SimpleNamespace(hash_scheme=DEFAULT_HASH_SCHEME)
library_remote_files_dict = FindRemoteFiles(
library_remote_files_dict = RetrieveIrodsCollection(
pseudo_args,
self.shortcut_sheet,
self.args.sodar_url,
self.args.sodar_api_token,
self.args.assay_uuid,
Expand Down
85 changes: 2 additions & 83 deletions cubi_tk/snappy/itransfer_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from ..exceptions import MissingFileException, ParameterException, UserCanceledException
from ..common import check_irods_icommands, is_uuid, load_toml_config, sizeof_fmt
from .common import get_biomedsheet_path, load_sheet_tsv
from .parse_sample_sheet import ParseSampleSheet

#: Default number of parallel transfers.
DEFAULT_NUM_TRANSFERS = 8
Expand Down Expand Up @@ -84,7 +85,7 @@ def check_args(args):
_ = args


class SnappyItransferCommandBase:
class SnappyItransferCommandBase(ParseSampleSheet):
"""Base class for itransfer commands."""

#: The command name.
Expand Down Expand Up @@ -209,88 +210,6 @@ def check_args(self, args):

return res

@staticmethod
def _build_family_max_batch(sheet, batch_key, family_key):
family_max_batch = {}
for donor in sheet.bio_entities.values():
if batch_key in donor.extra_infos and family_key in donor.extra_infos:
family_id = donor.extra_infos[family_key]
batch_no = donor.extra_infos[batch_key]
family_max_batch[family_id] = max(family_max_batch.get(family_id, 0), batch_no)
return family_max_batch

def _batch_of(self, donor, family_max_batch, batch_key, family_key):
if batch_key in donor.extra_infos:
batch = donor.extra_infos[batch_key]
else:
batch = 0
if self.start_batch_in_family and family_key in donor.extra_infos:
family_id = donor.extra_infos[family_key]
batch = max(batch, family_max_batch.get(family_id, 0))
return batch

def yield_ngs_library_names(
self, sheet, min_batch=None, max_batch=None, batch_key="batchNo", family_key="familyId"
):
"""Yield all NGS library names from sheet.
When ``min_batch`` is given then only the donors for which the ``extra_infos[batch_key]`` is greater than
``min_batch`` will be used.
This function can be overloaded, for example to only consider the indexes.
:param sheet: Sample sheet.
:type sheet: biomedsheets.models.Sheet
:param min_batch: Minimum batch number to be extracted from the sheet. All samples in batches below this values
will be skipped.
:type min_batch: int
:param max_batch: Maximum batch number to be extracted from the sheet. All samples in batches above this values
will be skipped.
:type max_batch: int
:param batch_key: Batch number key in sheet. Default: 'batchNo'.
:type batch_key: str
:param family_key: Family identifier key. Default: 'familyId'.
:type family_key: str
"""
family_max_batch = self._build_family_max_batch(sheet, batch_key, family_key)

# Process all libraries and filter by family batch ID.
for donor in sheet.bio_entities.values():
# Ignore below min batch number if applicable
if min_batch is not None:
batch = self._batch_of(donor, family_max_batch, batch_key, family_key)
if batch < min_batch:
logger.debug(
"Skipping donor %s because %s = %d < min_batch = %d",
donor.name,
batch_key,
batch,
min_batch,
)
continue
# Ignore above max batch number if applicable
if max_batch is not None:
batch = self._batch_of(donor, family_max_batch, batch_key, family_key)
if batch > max_batch:
logger.debug(
"Skipping donor %s because %s = %d > max_batch = %d",
donor.name,
batch_key,
batch,
max_batch,
)
# It would be tempting to add a `break`, but there is no guarantee that
# the sample sheet is sorted.
continue
for bio_sample in donor.bio_samples.values():
for test_sample in bio_sample.test_samples.values():
for library in test_sample.ngs_libraries.values():
yield library.name

def build_base_dir_glob_pattern(
self, library_name: str
) -> typing.Tuple[str, str]: # pragma: nocover
Expand Down

0 comments on commit e2d9c14

Please sign in to comment.