Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

decouple genotype api #772

Merged
merged 21 commits into from Sep 29, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 1 addition & 3 deletions Pipfile
Expand Up @@ -14,6 +14,7 @@ docutils = "*"
# black
black = "*"
pre-commit = "*"
Cython = "*"

[packages]
alchy = "*"
Expand All @@ -36,14 +37,11 @@ cyvcf2 = "<0.10.0"
pymongo = "<3.7"
colorclass = "*"
cairocffi = "*"
pysam = "*"
genologics = "*"
trailblazer = "*"
housekeeper = "*"
genotype = "*"
scout-browser = "*"
cgstats = "*"
Cython = "*"
SQLAlchemy = "*"
Flask = "*"
Flask-Admin = "*"
Expand Down
229 changes: 131 additions & 98 deletions Pipfile.lock

Large diffs are not rendered by default.

97 changes: 49 additions & 48 deletions cg/apps/gt.py
@@ -1,18 +1,14 @@
import logging
"""Interactions with the genotype tool"""

from subprocess import CalledProcessError
import subprocess
import logging

from alchy import Manager
from genotype.store import api, models
from genotype.load.vcf import load_vcf
from cg.exc import CaseNotFoundError
from cg.utils.commands import Process

LOG = logging.getLogger(__name__)


class GenotypeAPI(Manager):

class GenotypeAPI:
"""Interface with Genotype app.

The config should contain a 'genotype' key:
Expand All @@ -21,60 +17,65 @@ class GenotypeAPI(Manager):
"""

def __init__(self, config: dict):
alchy_config = dict(SQLALCHEMY_DATABASE_URI=config["genotype"]["database"])
super(GenotypeAPI, self).__init__(config=alchy_config, Model=models.Model)
self.process = Process(
binary=config["genotype"]["binary_path"], config=config["genotype"]["config_path"]
)
self.dry_run = False

self.genotype_config = config["genotype"]["config_path"]
self.genotype_binary = config["genotype"]["binary_path"]
self.base_call = [self.genotype_binary, "--config", self.genotype_config]
def set_dry_run(self, dry_run: bool) -> None:
"""Set the dry run state"""
self.dry_run = dry_run

def upload(self, bcf_path: str, samples_sex: dict, force: bool = False):
def upload(self, bcf_path: str, samples_sex: dict, force: bool = False) -> None:
"""Upload genotypes for a family of samples."""
snps = api.snps()
analyses = load_vcf(bcf_path, snps)
for analysis_obj in analyses:
LOG.debug("loading VCF genotypes for sample: %s", analysis_obj.sample_id)
is_saved = api.add_analysis(self, analysis_obj, replace=force)
if is_saved:
LOG.info("loaded VCF genotypes for sample: %s", analysis_obj.sample_id)
else:
LOG.warning("skipped, found previous analysis: %s", analysis_obj.sample_id)

if is_saved or force:
analysis_obj.sex = samples_sex[analysis_obj.sample_id]["analysis"]
analysis_obj.sample.sex = samples_sex[analysis_obj.sample_id]["pedigree"]
self.commit()
upload_parameters = ["load", str(bcf_path)]
if force:
upload_parameters.append("--force")

LOG.debug("loading VCF genotypes for sample(s): %s", ", ".join(samples_sex.keys()))
self.process.run_command(parameters=upload_parameters, dry_run=self.dry_run)

for sample_id in samples_sex:
# This is the sample sex specified by the customer
sample_sex = samples_sex[sample_id]["pedigree"]
self.update_sample_sex(sample_id, sample_sex)
# This is the predicted sex based on variant calls from the pipeline
analysis_predicted_sex = samples_sex[sample_id]["analysis"]
self.update_analysis_sex(sample_id, sex=analysis_predicted_sex)

def update_sample_sex(self, sample_id: str, sex: str) -> None:
"""Update the sex for a sample in the genotype tool"""
sample_sex_parameters = ["add_sex", sample_id, "-s", sex]
LOG.debug("Set sex for sample %s to %s", sample_id, sex)
self.process.run_command(parameters=sample_sex_parameters, dry_run=self.dry_run)

def update_analysis_sex(self, sample_id: str, sex: str) -> None:
"""Update the predicted sex for a sample based on genotype analysis in the genotype tool"""
analysis_sex_parameters = ["add_sex", sample_id, "-a", "sequence", sex]
LOG.debug("Set predicted sex for sample %s to %s for the sequence analysis", sample_id, sex)
self.process.run_command(parameters=analysis_sex_parameters, dry_run=self.dry_run)

def export_sample(self, days: int = 0) -> str:
"""Export sample info."""
trending_call = self.base_call[:]
trending_call.extend(["export-sample", "-d", str(days)])
try:
LOG.info("Running Genotype API to get data.")
LOG.debug(trending_call)
output = subprocess.check_output(trending_call)
except CalledProcessError as error:
LOG.critical("Could not run command: %s", " ".join(trending_call))
raise error
output = output.decode("utf-8")
export_sample_parameters = ["export-sample", "-d", str(days)]

self.process.run_command(parameters=export_sample_parameters, dry_run=self.dry_run)
output = self.process.stdout
# If sample not in genotype db, stdout of genotype command will be empty.
if not output:
raise CaseNotFoundError("samples not found in genotype db")
return output

def export_sample_analysis(self, days: int = 0) -> str:
"""Export analysis."""
trending_call = self.base_call[:]
trending_call.extend(["export-sample-analysis", "-d", str(days)])
try:
LOG.info("Running Genotype API to get data.")
LOG.debug(trending_call)
output = subprocess.check_output(trending_call)
except CalledProcessError as error:
LOG.critical("Could not run command: %s", " ".join(trending_call))
raise error
output = output.decode("utf-8")
export_sample_analysis_parameters = ["export-sample-analysis", "-d", str(days)]

self.process.run_command(parameters=export_sample_analysis_parameters, dry_run=self.dry_run)
output = self.process.stdout
# If sample not in genotype db, stdout of genotype command will be empty.
if not output:
raise CaseNotFoundError("samples not found in genotype db")
return output

def __str__(self):
return f"GenotypeAPI(dry_run: {self.dry_run})"
3 changes: 1 addition & 2 deletions cg/cli/upload/genotype.py
Expand Up @@ -19,13 +19,12 @@ def genotypes(context, re_upload, family_id):
_suggest_cases_to_upload(context)
context.abort()

tb_api = context.obj["tb_api"]
gt_api = context.obj["genotype_api"]
hk_api = context.obj["housekeeper_api"]
status_api = context.obj["status"]
family_obj = status_api.family(family_id)

api = UploadGenotypesAPI(status_api, hk_api, tb_api, gt_api)
api = UploadGenotypesAPI(hk_api=hk_api, gt_api=gt_api)
results = api.data(family_obj.analyses[0])
if results:
api.upload(results, replace=re_upload)
79 changes: 61 additions & 18 deletions cg/meta/upload/gt.py
Expand Up @@ -3,53 +3,96 @@

import ruamel.yaml

from cg.apps import hk, gt, tb
from cg.store import models, Store
from cg.apps import hk, gt
from cg.store import models
from cg.apps.tb.api import TrailblazerAPI

LOG = logging.getLogger(__name__)


class UploadGenotypesAPI(object):
def __init__(
self,
status_api: Store,
hk_api: hk.HousekeeperAPI,
tb_api: tb.TrailblazerAPI,
gt_api: gt.GenotypeAPI,
):
self.status = status_api
self.hk = hk_api
self.tb = tb_api
self.gt = gt_api

def data(self, analysis_obj: models.Analysis) -> dict:
"""Fetch data about an analysis to load genotypes."""
"""Fetch data about an analysis to load genotypes.

Returns: dict on form

{
"bcf": path_to_bcf,
"samples_sex": [
"sample_id: {
"pedigree": "male",
"analysis": "male"
}
]
}

"""
analysis_date = analysis_obj.started_at or analysis_obj.completed_at
hk_version = self.hk.version(analysis_obj.family.internal_id, analysis_date)
hk_bcf = self.hk.files(version=hk_version.id, tags=["snv-gbcf"]).first()
hk_bcf = self.get_bcf_file(hk_version)
if hk_bcf is None:
LOG.warning("unable to find GBCF for genotype upload")
return None
data = {"bcf": hk_bcf.full_path, "samples_sex": {}}
analysis_sexes = self._analysis_sex(hk_version)
qc_metrics_file = self.get_qcmetrics_file(hk_version)
analysis_sexes = self.analysis_sex(qc_metrics_file)
for link_obj in analysis_obj.family.links:
data["samples_sex"][link_obj.sample.internal_id] = {
sample_id = link_obj.sample.internal_id
data["samples_sex"][sample_id] = {
"pedigree": link_obj.sample.sex,
"analysis": analysis_sexes[link_obj.sample.internal_id],
"analysis": analysis_sexes[sample_id],
}
return data

def _analysis_sex(self, hk_version: hk.models.Version) -> dict:
def analysis_sex(self, qc_metrics_file: Path) -> dict:
"""Fetch analysis sex for each sample of an analysis."""
hk_qcmetrics = self.hk.files(version=hk_version.id, tags=["qcmetrics"]).first()
with Path(hk_qcmetrics.full_path).open() as in_stream:
qcmetrics_raw = ruamel.yaml.safe_load(in_stream)
qcmetrics_data = self.tb.parse_qcmetrics(qcmetrics_raw)
qcmetrics_data = self.get_parsed_qc_metrics_data(qc_metrics_file)
samples = [sample["id"] for sample in qcmetrics_data["samples"]]
data = {}
for sample_data in qcmetrics_data["samples"]:
data[sample_data["id"]] = sample_data["predicted_sex"]
for sample_id in samples:
data[sample_id] = self.get_sample_predicted_sex(
sample_id=sample_id, parsed_qcmetrics_data=qcmetrics_data
)
return data

def get_bcf_file(self, hk_version_obj: hk.models.Version) -> hk.models.File:
"""Fetch a bcf file and return the file object"""

bcf_file = self.hk.files(version=hk_version_obj.id, tags=["snv-gbcf"]).first()
LOG.debug("Found bcf file %s", bcf_file.full_path)
return bcf_file

def get_qcmetrics_file(self, hk_version_obj: hk.models.Version) -> Path:
"""Fetch a qc_metrics file and return the path"""
hk_qcmetrics = self.hk.files(version=hk_version_obj.id, tags=["qcmetrics"]).first()
LOG.debug("Found qc metrics file %s", hk_qcmetrics.full_path)
return Path(hk_qcmetrics.full_path)

@staticmethod
def get_parsed_qc_metrics_data(qc_metrics: Path) -> dict:
"""Parse the information from a qc metrics file"""
with qc_metrics.open() as in_stream:
qcmetrics_raw = ruamel.yaml.safe_load(in_stream)
return TrailblazerAPI.parse_qcmetrics(qcmetrics_raw)

@staticmethod
def get_sample_predicted_sex(sample_id: str, parsed_qcmetrics_data: dict) -> str:
"""Get the predicted sex for a sample"""
predicted_sex = "unknown"
for sample_info in parsed_qcmetrics_data["samples"]:
if sample_info["id"] == sample_id:
return sample_info["predicted_sex"]

return predicted_sex

def upload(self, data: dict, replace: bool = False):
"""Upload data about genotypes for a family of samples."""
self.gt.upload(str(data["bcf"]), data["samples_sex"], force=replace)
1 change: 1 addition & 0 deletions cg/utils/commands.py
Expand Up @@ -34,6 +34,7 @@ def __init__(
"""
super(Process, self).__init__()
self.binary = binary
self.config = config
self.environment = environment
LOG.debug("Initialising Process with binary: %s", self.binary)
self.base_call = [self.binary]
Expand Down
2 changes: 0 additions & 2 deletions requirements.txt
Expand Up @@ -39,12 +39,10 @@ pymongo<3.7
colorclass
setuptools>=39.2.0 # due to WeasyPrint 45, tinycss2 1.0.1 and cairocffi file-.cairocffi-VERSION
cairocffi==0.9.0 # due to strange version number in package
pysam==0.15.2 # due to inability to rebuild newer pysam version

# apps
genologics
trailblazer>=6.7.0
housekeeper>=2.0.0b2
genotype>=2.2.0
scout-browser>=4.4.1
cgstats>=1.1.2
76 changes: 52 additions & 24 deletions tests/apps/gt/conftest.py
Expand Up @@ -3,33 +3,61 @@
"""

import pytest
from cg.apps.gt import GenotypeAPI
import json

CONFIG = {
"genotype": {
"database": "database",
"config_path": "config/path",
"binary_path": "gtdb",
}
}


@pytest.fixture(scope="function")
def genotype_config():
"""
genotype config fixture
"""

_config = CONFIG

return _config


@pytest.fixture(scope="function")
def genotypeapi():
@pytest.fixture(name="genotype_export_sample_output")
def fixture_genotype_export_sample_output(genotype_config: dict) -> str:
"""
genotype API fixture
"""
data = {
"ACC6987A15": {
"status": None,
"sample_created_in_genotype_db": "2020-07-15",
"sex": "female",
"comment": None,
},
"ACC6987A16": {
"status": None,
"sample_created_in_genotype_db": "2020-07-15",
"sex": "female",
"comment": None,
},
}

_genotype_api = GenotypeAPI(CONFIG)
return _genotype_api
return json.dumps(data)


@pytest.fixture(name="genotype_export_sample_analysis_output")
def fixture_genotype_export_sample_analysis_output() -> str:
"""Return some output from a sample analysis export"""
data = {
"ACC6987A15": {
"snps": {
"sequence": {
"rs10144418": ["T", "T"],
"rs1037256": ["G", "A"],
"rs1044973": ["C", "T"],
"rs1065772": ["C", "T"],
"rs11010": ["T", "C"],
"rs11789987": ["C", "T"],
"rs11797": ["C", "C"],
}
}
},
"ACC6987A16": {
"snps": {
"sequence": {
"rs10144418": ["T", "C"],
"rs1037256": ["G", "A"],
"rs1044973": ["C", "T"],
"rs1065772": ["C", "C"],
"rs11010": ["T", "T"],
"rs11789987": ["C", "T"],
"rs11797": ["T", "T"],
}
}
},
}
return json.dumps(data)