Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ orbs:
jobs:
build:
docker:
- image: circleci/python:3.6.9-stretch
- image: circleci/python:3.7.5-stretch

working_directory: ~/scp-ingest-pipeline

Expand Down Expand Up @@ -45,4 +45,4 @@ jobs:
pytest --cov-report=xml --cov=../ingest/

- codecov/upload:
file: tests/coverage.xml
file: tests/coverage.xml
2 changes: 0 additions & 2 deletions ingest/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
from .annotations import Annotations
from .monitor import setup_logger, log

# from ingest_pipeline import log


@dataclass
class DomainRanges(TypedDict):
Expand Down
6 changes: 6 additions & 0 deletions ingest/dense.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,20 @@
try:
from expression_files import GeneExpression
from ingest_files import IngestFiles
from monitor import trace

except ImportError:
# Used when importing as external package, e.g. imports in single_cell_portal code
from .expression_files import GeneExpression
from .ingest_files import IngestFiles
from .monitor import trace


class Dense(GeneExpression, IngestFiles):
ALLOWED_FILE_TYPES = ["text/csv", "text/plain", "text/tab-separated-values"]

def __init__(self, file_path, study_file_id, study_id, **kwargs):
self.tracer = kwargs.pop("tracer")
GeneExpression.__init__(self, file_path, study_file_id, study_id)
IngestFiles.__init__(
self, file_path, allowed_file_types=self.ALLOWED_FILE_TYPES
Expand All @@ -33,6 +36,7 @@ def __init__(self, file_path, study_file_id, study_id, **kwargs):

self.preprocess()

@trace
def preprocess(self):
csv_file, open_file_object = self.open_file(self.file_path)
header = next(csv_file)
Expand All @@ -59,6 +63,7 @@ def preprocess(self):
# chunksize=100000, Save for when we chunk data
)[0]

@trace
def transform(self):
"""Transforms dense matrix into firestore data model for genes.
"""
Expand All @@ -85,6 +90,7 @@ def transform(self):
),
)

@trace
def set_data_array(
self,
linear_data_id,
Expand Down
6 changes: 3 additions & 3 deletions ingest/ingest_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def __init__(self, file_path, allowed_file_types):
self.is_gzip_file = self.get_file_type(file_path)[1] == 'gzip'

self.verify_file_exists(file_path)

# Allowed files for a given file type (expression file, cluster files, etc.)
self.allowed_file_types = allowed_file_types
# Keeps tracks of lines parsed
self.amount_of_lines = 0
Expand Down Expand Up @@ -199,7 +199,7 @@ def delocalize_file(
)

def open_file(self, file_path, open_as=None, start_point: int = 0, **kwargs):
""" Opens txt (txt is expected to be tsv or csv), csv, or tsv formatted files"""
""" A wrapper function for opening txt (txt is expected to be tsv or csv), csv, or tsv formatted files"""
open_file, file_path = self.resolve_path(file_path)
file_connections = {
"text/csv": self.open_csv,
Expand Down Expand Up @@ -261,7 +261,7 @@ def get_file_type(self, file_path):
def open_txt(self, open_file_object, **kwargs):
"""Method for opening txt files that are expected be tab
or comma delimited"""
# Determined if file is tsv or csv
# Determine if file is tsv or csv
csv_dialect = csv.Sniffer().sniff(open_file_object.read(1024))
csv_dialect.skipinitialspace = True
open_file_object.seek(0)
Expand Down
31 changes: 25 additions & 6 deletions ingest/ingest_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import argparse
from typing import Dict, Generator, List, Tuple, Union # noqa: F401
import ast
from contextlib import nullcontext

import sys
import json
Expand All @@ -50,6 +51,11 @@
from google.cloud.exceptions import NotFound
from bson.objectid import ObjectId

# For tracing
from opencensus.ext.stackdriver.trace_exporter import StackdriverExporter
from opencensus.trace.tracer import Tracer
from opencensus.trace.samplers import AlwaysOnSampler

# from google.cloud.logging.resource import Resource

try:
Expand All @@ -62,7 +68,7 @@
report_issues,
write_metadata_to_bq,
)
from monitor import setup_logger, log
from monitor import setup_logger, log, trace
from cell_metadata import CellMetadata
from clusters import Clusters
from dense import Dense
Expand All @@ -77,13 +83,12 @@
report_issues,
write_metadata_to_bq,
)
from .monitor import setup_logger, log
from .monitor import setup_logger, log, trace
from .cell_metadata import CellMetadata
from .clusters import Clusters
from .dense import Dense
from .mtx import Mtx


# Ingest file types
EXPRESSION_FILE_TYPES = ["dense", "mtx", "loom"]

Expand Down Expand Up @@ -124,6 +129,15 @@ def __init__(
self.cluster_file = cluster_file
self.kwargs = kwargs
self.cell_metadata_file = cell_metadata_file
if 'GOOGLE_CLOUD_PROJECT' in os.environ:
# instantiate trace exporter
exporter = StackdriverExporter(
project_id=os.environ['GOOGLE_CLOUD_PROJECT']
)
self.tracer = Tracer(exporter=exporter, sampler=AlwaysOnSampler())

else:
self.tracer = nullcontext()
if matrix_file is not None:
self.matrix = self.initialize_file_connection(matrix_file_type, matrix_file)
if ingest_cell_metadata:
Expand Down Expand Up @@ -177,14 +191,20 @@ def initialize_file_connection(self, file_type, file_path):
"mtx": Mtx,
"loom": Loom,
}

return file_connections.get(file_type)(
file_path, self.study_id, self.study_file_id, **self.kwargs
file_path,
self.study_id,
self.study_file_id,
tracer=self.tracer,
**self.kwargs,
)

def close_matrix(self):
"""Closes connection to file"""
self.matrix.close()

@trace
def load(
self,
collection_name,
Expand Down Expand Up @@ -298,6 +318,7 @@ def upload_metadata_to_bq(self):
return 1
return 0

@trace
@my_debug_logger()
def ingest_expression(self) -> int:
"""Ingests expression files.
Expand Down Expand Up @@ -403,11 +424,9 @@ def ingest_cluster(self):
@my_debug_logger()
def subsample(self):
"""Method for subsampling cluster and metadata files"""

subsample = SubSample(
cluster_file=self.cluster_file, cell_metadata_file=self.cell_metadata_file
)

for data in subsample.subsample('cluster'):
load_status = self.load_subsample(
Clusters.COLLECTION_NAME, data, subsample.set_data_array, 'cluster'
Expand Down
20 changes: 20 additions & 0 deletions ingest/monitor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
import os
import time
from contextlib import nullcontext


def setup_logger(logger_name, log_file, level=logging.DEBUG):
Expand Down Expand Up @@ -50,3 +52,21 @@ def wrap(*args, **kwargs):
return wrapper

return debug


def trace(fn):
"""Function decorator that enables tracing via stackdriver for performance
metrics."""

def trace_fn(*args, **kwargs):
span = args[0].tracer
if 'GOOGLE_CLOUD_PROJECT' in os.environ:
span_cm = span.span(name=f'{args[0].__class__.__name__} {fn.__name__}')
# In the event where the environment variable is not set, use nullcontext
# manger which does nothing
else:
span_cm = nullcontext()
with span_cm:
return fn(*args, **kwargs)

return trace_fn
6 changes: 6 additions & 0 deletions ingest/mtx.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
try:
from expression_files import GeneExpression
from ingest_files import IngestFiles
from monitor import trace
except ImportError:
# Used when importing as external package, e.g. imports in single_cell_portal code
from .expression_files import GeneExpression
from .ingest_files import IngestFiles
from .monitor import trace


class Mtx(GeneExpression):
ALLOWED_FILE_TYPES = ["text/csv", "text/plain", "text/tab-separated-values"]

def __init__(self, mtx_path: str, study_file_id: str, study_id: str, **kwargs):
self.tracer = kwargs.pop("tracer")
GeneExpression.__init__(self, mtx_path, study_file_id, study_id)

genes_path = kwargs.pop("gene_file")
Expand All @@ -46,13 +49,15 @@ def __init__(self, mtx_path: str, study_file_id: str, study_id: str, **kwargs):
self.matrix_params = kwargs
self.exp_by_gene = {}

@trace
def extract(self):
"""Sets relevant iterables for each file of the MTX bundle
"""
self.matrix_file = scipy.io.mmread(self.mtx_local_path)
self.genes = [g.strip() for g in self.genes_file.readlines()]
self.cells = [c.strip() for c in self.barcodes_file.readlines()]

@trace
def transform(self):
"""Transforms matrix gene data model
"""
Expand Down Expand Up @@ -92,6 +97,7 @@ def transform(self):
),
)

@trace
def set_data_array(
self,
linear_data_id,
Expand Down
6 changes: 6 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ pytest-cov==2.8.1

# Don't add this until we actually support Zarr
#zarr==2.2.0

#Logging
google-cloud-logging==1.14.0
opencensus==0.7.6
opencensus-context==0.1.1
opencensus-ext-stackdriver==0.7.2
google-cloud-trace==0.23.0
29 changes: 0 additions & 29 deletions tests/test_ingest_files.py

This file was deleted.