Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions ingest/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ def validate_arguments(parsed_args):
raise ValueError(
f" Invalid argument: unable to connect to a BigQuery table called {parsed_args.bq_table}."
)
if (
"differential_expression" in parsed_args
and parsed_args.annotation_type != "group"
):
raise ValueError(
"Differential expression analysis restricted to group-type annotations,"
f" cannot run on data of type \"{parsed_args.annotation_type}\"."
)


def create_parser():
Expand Down Expand Up @@ -268,15 +276,23 @@ def create_parser():
)

parser_differential_expression.add_argument(
"--annotation", required=True, help="Name of annotation for DE analysis"
"--annotation-name", required=True, help="Name of annotation for DE analysis"
)

parser_differential_expression.add_argument(
"--annotation-type", required=True, help="Type of annotation for DE analysis"
)

parser_differential_expression.add_argument(
"--annotation-scope", required=True, help="Scope of annotation for DE analysis"
)

parser_differential_expression.add_argument(
"--method", default="wilcoxon", help="method for DE"
)

parser_differential_expression.add_argument(
"--name", required=True, help="study owner-specified cluster anem"
"--cluster-name", required=True, help="study owner-specified cluster name"
)

parser_differential_expression.add_argument(
Expand All @@ -286,9 +302,9 @@ def create_parser():
)

parser_differential_expression.add_argument(
"--cell-metadata-file",
"--annotation-file",
required=True,
help="Absolute or relative path to cell metadata file.",
help="Absolute or relative path to cell metadata or cluster file of annotations.",
)

parser_differential_expression.add_argument(
Expand Down
41 changes: 27 additions & 14 deletions ingest/de.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from email.headerregistry import Group
import logging
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -36,19 +37,20 @@ def __init__(
cell_metadata,
matrix_file_path,
matrix_file_type,
annotation,
annotation_name,
**kwargs,
):
DifferentialExpression.de_logger.info(
"Initializing DifferentialExpression instance"
)
self.cluster = cluster
self.metadata = cell_metadata
self.annotation = annotation
self.annotation = annotation_name
self.matrix_file_path = matrix_file_path
self.matrix_file_type = matrix_file_type
self.kwargs = kwargs
self.accession = self.kwargs["study_accession"]
self.annot_scope = self.kwargs["annotation_scope"]
# only used in output filename, replacing non-alphanumeric with underscores
self.cluster_name = re.sub(r'\W+', '_', self.kwargs["name"])
self.method = self.kwargs["method"]
Expand Down Expand Up @@ -175,6 +177,7 @@ def subset_adata(adata, de_cells):
return adata

def execute_de(self):
print(f'dev_info: Starting DE for {self.accession}')
try:
if self.matrix_file_type == "mtx":
DifferentialExpression.de_logger.info("preparing DE on sparse matrix")
Expand All @@ -184,6 +187,7 @@ def execute_de(self):
self.matrix_file_path,
self.matrix_file_type,
self.annotation,
self.annot_scope,
self.accession,
self.cluster_name,
self.method,
Expand All @@ -198,6 +202,7 @@ def execute_de(self):
self.matrix_file_path,
self.matrix_file_type,
self.annotation,
self.annot_scope,
self.accession,
self.cluster_name,
self.method,
Expand All @@ -222,18 +227,17 @@ def get_genes(genes_path):
"""
genes_df = pd.read_csv(genes_path, sep="\t", header=None)
if len(genes_df.columns) > 1:
# unclear if falling back to gene_id is useful (SCP-4283)
# print so we're aware of dups during dev testing
if genes_df[1].count() == genes_df[1].nunique():
return genes_df[1].tolist()
elif genes_df[0].count() == genes_df[0].nunique():
return genes_df[0].tolist()
msg = "dev_info: Features file contains duplicate identifiers (col 2)"
print(msg)
return genes_df[1].tolist()
else:
msg = "Features file contains duplicate identifiers"
print(msg)
log_exception(
DifferentialExpression.dev_logger, DifferentialExpression.de_logger, msg
)
raise ValueError(msg)
return genes
if genes_df[0].count() == genes_df[0].nunique():
msg = "dev_info: Features file contains duplicate identifiers (col 1)"
print(msg)
return genes_df[0].tolist()

@staticmethod
def get_barcodes(barcodes_path):
Expand Down Expand Up @@ -264,6 +268,7 @@ def run_scanpy_de(
matrix_file_path,
matrix_file_type,
annotation,
annot_scope,
study_accession,
cluster_name,
method,
Expand Down Expand Up @@ -315,15 +320,23 @@ def run_scanpy_de(
DifferentialExpression.dev_logger, DifferentialExpression.de_logger, msg
)
raise KeyError(msg)
# ToDo - detection and handling of annotations with only one sample (SCP-4282)
except ValueError as e:
print(e)
log_exception(
DifferentialExpression.dev_logger, DifferentialExpression.de_logger, e
)
raise KeyError(e)

DifferentialExpression.de_logger.info("Gathering DE annotation labels")
groups = np.unique(adata.obs[annotation]).tolist()
for group in groups:
group_filename = re.sub(r'\W+', '_', group)
clean_group = re.sub(r'\W+', '_', group)
clean_annotation = re.sub(r'\W+', '_', annotation)
DifferentialExpression.de_logger.info(f"Writing DE output for {group}")
rank = sc.get.rank_genes_groups_df(adata, key=rank_key, group=group)

out_file = f'{cluster_name}--{annotation}--{group_filename}--{method}.tsv'
out_file = f'{cluster_name}--{clean_annotation}--{clean_group}--{annot_scope}--{method}.tsv'
# Round numbers to 4 significant digits while respecting fixed point
# and scientific notation (note: trailing zeros are removed)
rank.to_csv(out_file, sep='\t', float_format='%.4g')
Expand Down
30 changes: 12 additions & 18 deletions ingest/ingest_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,30 +579,24 @@ def main() -> None:
parsed_args = create_parser().parse_args()
validate_arguments(parsed_args)
arguments = vars(parsed_args)
status = 0
status_cell_metadata = 0
ingest = None

if "differential_expression" in arguments:
# DE may use metadata or cluster file for annots BUT
# IngestPipeline initialization assumes a "cell_metadata_file"
arguments["cell_metadata_file"] = arguments["annotation_file"]
# IngestPipeline initialization expects "name" and not "cluster_name"
arguments["name"] = arguments["cluster_name"]
# Initialize global variables for current ingest job
config.init(
arguments["study_id"],
arguments["study_file_id"],
arguments["user_metrics_uuid"],
)

try:

ingest = IngestPipeline(**arguments)
status, status_cell_metadata = run_ingest(ingest, arguments, parsed_args)
# Print metrics properties
metrics_dump = config.get_metric_properties().get_properties()
for key in metrics_dump.keys():
print(f'{key}: {metrics_dump[key]}')

except Exception as e:
config.set_parent_event_name("ingest-pipeline:unhandled-exception:ingest")
log_exception(IngestPipeline.dev_logger, IngestPipeline.user_logger, e)
status = 1
ingest = IngestPipeline(**arguments)
status, status_cell_metadata = run_ingest(ingest, arguments, parsed_args)
# Print metrics properties
metrics_dump = config.get_metric_properties().get_properties()
for key in metrics_dump.keys():
print(f'{key}: {metrics_dump[key]}')

# Log Mixpanel events
MetricsService.log(config.get_parent_event_name(), config.get_metric_properties())
Expand Down
Loading