Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ jobs:
build:
docker:
- image: circleci/python:3.7.5-stretch
resource_class: large

working_directory: ~/scp-ingest-pipeline

Expand Down
42 changes: 41 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,49 @@ pytest test_ingest.py
# Run all tests, show code coverage metrics
pytest --cov=../ingest/
```

For more, see <https://docs.pytest.org/en/stable/usage.html>.

## Testing in Docker
If you have difficulties installing and configuring `scp-ingest-pipeline` due to hardware issues (e.g. Mac M1 chips),
you can alternatively test locally by building the Docker image and then running any commands inside the container.
There are some extra steps required, but this sidesteps the need to install packages locally.

### 1. Build the image
Run the following command to build the testing Docker image locally (make sure Docker is running first):
```
docker build -t gcr.io/broad-singlecellportal-staging/ingest-pipeline:test-candidate .
```
### 2. Set up environment variables
Run the following to pull database-specific secrets out of vault (passing in the path to your vault token):
```
source scripts/setup_mongo_dev.sh ~/.your-vault-token
```
Now run `env` to make sure you've set the following values:
```
MONGODB_USERNAME=single_cell
DATABASE_NAME=single_cell_portal_development
MONGODB_PASSWORD=<password>
DATABASE_HOST=<ip address>
```
### 3. Print out your service account keyfile
Run the following to export out your default service account JSON keyfile:
```
vault read -format=json secret/kdux/scp/development/$(whoami)/scp_service_account.json | jq .data > /tmp/keyfile.json
```
### 4. Start the Docker container
Run the container, passing in the proper environment variables:
```
docker run --name scp-ingest-test -e MONGODB_USERNAME="$MONGODB_USERNAME" -e DATABASE_NAME="$DATABASE_NAME" \
-e MONGODB_PASSWORD="$MONGODB_PASSWORD" -e DATABASE_HOST="$DATABASE_HOST" \
-e GOOGLE_APPLICATION_CREDENTIALS=/tmp/keyfile.json --rm -it \
gcr.io/broad-singlecellportal-staging/ingest-pipeline:test-candidate bash
```
### 5. Copy keyfile to running container
In a separate terminal window, copy the JSON keyfile from above to the expected location:
```
docker cp /tmp/keyfile.json scp-ingest-test:/tmp
```
You can now run any `ingest_pipeline.py` command you wish inside the container.
# Use

Run this every time you start a new terminal to work on this project:
Expand Down
28 changes: 28 additions & 0 deletions ingest/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,34 @@ def create_parser():
"--anndata-file", required=True, help="Path to AnnData file"
)

parser_expression_writer = subparsers.add_parser(
"render_expression_arrays",
help="Indicates preprocessing of cluster/expression files for image pipeline"
)

parser_expression_writer.add_argument(
'--render-expression-arrays', action="store_true", help='Invoke expression_writer.py', required=True
)

parser_expression_writer.add_argument(
'--cluster-file', help='path to cluster file', required=True
)
parser_expression_writer.add_argument(
'--cluster-name', help='name of cluster object', required=True
)
parser_expression_writer.add_argument(
'--matrix-file-path', help='path to matrix file', required=True
)
parser_expression_writer.add_argument(
'--matrix-file-type', help='type to matrix file (dense or mtx)', required=True, choices=['dense', 'mtx']
)
parser_expression_writer.add_argument(
'--gene-file', help='path to gene file (omit for dense matrix files)'
)
parser_expression_writer.add_argument(
'--barcode-file', help='path to barcode file (omit for dense matrix files)'
)

return parser


Expand Down
273 changes: 273 additions & 0 deletions ingest/expression_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
"""expression_writer.py
extract gene-level expression data from dense/sparse matrix files and process in the
context of a cluster file's cells.
this mimics the `expression` data array in visualization API responses

EXAMPLES (must be invoked via ingest_pipeline.py)

dense matrix:
python3 ingest_pipeline.py --study-id 5d276a50421aa9117c982845 --study-file-id 5dd5ae25421aa910a723a337 \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

study-file-id ought to be extraneous. I imagine it's only included because of baked in assumptions of Ingest Pipeline. Could you refactor things so this argument can be omitted, or open a ticket to resolve this tech debt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely baked into ingest pipeline, and I don't feel confident removing this. My argument against doing so would be that what we're trying to do with image pipeline & DE fall outside of traditional "ingest", and perhaps we shouldn't be invoking those jobs through the ingest harness. But either way, I can create a ticket to do so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

render_expression_arrays --matrix-file-path ../tests/data/dense_expression_matrix.txt \
--matrix-file-type dense \
--cluster-file ../tests/data/cluster_example.txt \
--cluster-name 'Dense Example' --render-expression-arrays
Comment on lines +10 to +13
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needing to specify both the subparser (render_expression_arrays) and pass essentially the same value as a flag (--render-expression-arrays) is another case of pre-existing tech debt that makes these examples more complex than necessary. Could you refactor or open a tech debt ticket for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same - see above, will ticket.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


sparse matrix:
python3 ingest_pipeline.py --study-id 5d276a50421aa9117c982845 --study-file-id 5dd5ae25421aa910a723a337 \
render_expression_arrays --matrix-file-path ../tests/data/mtx/matrix_with_header.mtx \
--matrix-file-type mtx \
--gene-file ../tests/data/mtx/sampled_genes.tsv \
--barcode-file ../tests/data/mtx/barcodes.tsv \
--cluster-file ../tests/data/mtx/cluster_mtx_barcodes.tsv \
--cluster-name 'Sparse Example' --render-expression-arrays
"""
from __future__ import annotations

import os
import re
import multiprocessing
import sys
import datetime
from dateutil.relativedelta import relativedelta
from functools import partial

try:
from writer_functions import round_exp, encode_cluster_name, open_file, make_data_dir, get_matrix_size, \
get_cluster_cells, load_entities_as_list, process_sparse_fragment, write_gene_scores, process_dense_line, \
COMMA_OR_TAB
from monitor import setup_logger
from ingest_files import IngestFiles
except ImportError:
from .writer_functions import round_exp, encode_cluster_name, open_file, make_data_dir, get_matrix_size, \
get_cluster_cells, load_entities_as_list, process_sparse_fragment, write_gene_scores, process_dense_line, \
COMMA_OR_TAB
from .monitor import setup_logger
from .ingest_files import IngestFiles
Comment on lines +34 to +45
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduced this inelegant import technique to deal with a quirk of supporting CSFV in our public CLI / manage_study.py. We no longer support the public CLI, so it'd make sense to remove these try/ except clauses in general. And (in particular) we wouldn't call this module from the public CLI, so we can certainly omit it here.

Suggested change
try:
from writer_functions import round_exp, encode_cluster_name, open_file, make_data_dir, get_matrix_size, \
get_cluster_cells, load_entities_as_list, process_sparse_fragment, write_gene_scores, process_dense_line, \
COMMA_OR_TAB
from monitor import setup_logger
from ingest_files import IngestFiles
except ImportError:
from .writer_functions import round_exp, encode_cluster_name, open_file, make_data_dir, get_matrix_size, \
get_cluster_cells, load_entities_as_list, process_sparse_fragment, write_gene_scores, process_dense_line, \
COMMA_OR_TAB
from .monitor import setup_logger
from .ingest_files import IngestFiles
from writer_functions import round_exp, encode_cluster_name, open_file, make_data_dir, get_matrix_size, \
get_cluster_cells, load_entities_as_list, process_sparse_fragment, write_gene_scores, process_dense_line, \
COMMA_OR_TAB
from monitor import setup_logger
from ingest_files import IngestFiles


class ExpressionWriter:
DELOCALIZE_FOLDER = "_scp_internal/cache/expression_scatter/data"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ending folder paths with "/" has always struck me as clearer. I do this in my code, but we should probably land on some consensus.

Suggested change
DELOCALIZE_FOLDER = "_scp_internal/cache/expression_scatter/data"
DELOCALIZE_FOLDER = "_scp_internal/cache/expression_scatter/data/"

This is my subjective preference; ideally I'd examine how such strings are ended more broadly.

denominator = 2 if re.match('darwin', sys.platform) else 1
num_cores = int(multiprocessing.cpu_count() / denominator) - 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An edge case (running on a 1-CPU machine) that wouldn't hurt to handle:

Suggested change
num_cores = int(multiprocessing.cpu_count() / denominator) - 1
num_cores_available = int(multiprocessing.cpu_count() / denominator) - 1
num_cores = num_cores_available if num_cores_available > 0 else 1


def __init__(
self, matrix_file_path, matrix_file_type, cluster_file_path, cluster_name, gene_file, barcode_file, **kwargs
):
self.matrix_file_path = matrix_file_path
self.matrix_file_type = matrix_file_type
self.cluster_file_path = cluster_file_path
self.cluster_name = cluster_name
self.gene_file = gene_file
self.barcode_file = barcode_file

# localize main files, if needed
self.local_matrix_path = open_file(self.matrix_file_path)[1]
self.local_cluster_path = open_file(self.cluster_file_path)[1]

# set storage bucket name, if needed
self.bucket = self.get_storage_bucket_name()

timestamp = datetime.datetime.now().isoformat(sep="T", timespec="seconds")
url_safe_timestamp = re.sub(':', '', timestamp)
log_name = f"expression_scatter_data_{url_safe_timestamp}_log.txt"
self.dev_logger = setup_logger(__name__, log_name, format="support_configs")

def get_storage_bucket_name(self):
"""
get GCS storage bucket name, if available
"""
path_header = self.matrix_file_path[:5]
if path_header == 'gs://':
path_segments = self.matrix_file_path[5:].split("/")
bucket_name = path_segments[0]
return f"{path_header}{bucket_name}"

def get_file_seek_points(self) -> list[list]:
"""
Determine start/stop points in a matrix to process in parallel
Will read in chunks and return a list of start/stop points
Ensures breaks on newlines

:returns: list[list]
"""
file_size = get_matrix_size(self.local_matrix_path)
chunk_size = int(file_size / self.num_cores)
self.dev_logger.info(
f" determining seek points for {self.local_matrix_path} with chunk size {chunk_size}"
)
with open_file(self.local_matrix_path)[0] as matrix_file:
# fast-forward through any comments if this is a sparse matrix
first_char = matrix_file.read(1)
header_lines = 3 if first_char == '%' else 1
for i in range(header_lines):
matrix_file.readline()
current_pos = matrix_file.tell()
current_seek = [current_pos]
seek_points = []
for index in range(self.num_cores):
seek_point = current_pos + chunk_size
matrix_file.seek(seek_point)
current_byte = matrix_file.read(1)
if current_byte == '': # eof
current_seek.append(file_size)
seek_points.append(current_seek)
break
while current_byte != "\n":
current_byte = matrix_file.read(1)
seek_point += 1
current_seek.append(seek_point)
seek_points.append(current_seek)
current_pos = seek_point + 1
current_seek = [current_pos]
return seek_points

def divide_sparse_matrix(self, genes, data_dir):
"""
Slice a sparse matrix into 1GB chunks and write out individual
gene-level files in parallel

:param genes: (list) gene names from features file
:param data_dir: (str) name of output dir
"""
self.dev_logger.info(f" loading sparse data from {self.local_matrix_path}")
slice_indexes = self.get_file_seek_points()
pool = multiprocessing.Pool(self.num_cores)
processor = partial(self.read_sparse_matrix_slice, genes=genes, data_dir=data_dir)
pool.map(processor, slice_indexes)

def read_sparse_matrix_slice(self, indexes, genes, data_dir):
"""
Read a sparse matrix using start/stop indexes and append to individual gene-level files

:param indexes: (list) start/stop index points to read from/to
:param genes: (list) gene names from features file
:param data_dir: (str) name of output dir
"""
start_pos, end_pos = indexes
self.dev_logger.info(f"reading {self.local_matrix_path} at index {start_pos}:{end_pos}")
with open_file(self.local_matrix_path)[0] as matrix_file:
current_pos = start_pos
matrix_file.seek(current_pos)
while current_pos < end_pos:
line = matrix_file.readline()
gene_idx = int(line.split()[0])
gene_name = genes[gene_idx - 1]
fragment_path = f"{data_dir}/gene_entries/{gene_name}__entries.txt"
with open(fragment_path, 'a+') as file:
file.write(line)
current_pos += len(line)

def process_sparse_data_fragments(self, barcodes, cluster_cells, data_dir):
"""
Find and process all generated single-gene sparse data fragments

:param barcodes: (list) list of cell barcodes
:param cluster_cells: (list) list of cells from cluster file
:param data_dir: (str) name of output dir
"""
fragments = os.listdir(f"{data_dir}/gene_entries")
self.dev_logger.info(f" subdivision complete, processing {len(fragments)} fragments")
pool = multiprocessing.Pool(self.num_cores)
processor = partial(process_sparse_fragment, barcodes=barcodes, cluster_cells=cluster_cells, data_dir=data_dir)
pool.map(processor, fragments)

def write_empty_sparse_genes(self, genes, num_cluster_cells, data_dir):
"""
Write out empty arrays of expression values for genes with no significant expression in a sparse matrix

:param genes: (list) gene names from features file
:param num_cluster_cells: (Integer) number of cells from cluster file
:param data_dir: (str) name of output dir
"""
gene_fragments = filter(lambda file: file[0] != '.', os.listdir(f"{data_dir}/gene_entries"))
significant_genes = set([gene.split('__')[0] for gene in gene_fragments])
missing_genes = [gene for gene in genes if gene not in significant_genes]
empty_expression = [0] * num_cluster_cells
pool = multiprocessing.Pool(self.num_cores)
processor = partial(write_gene_scores, exp_values=empty_expression, data_dir=data_dir)
pool.map(processor, missing_genes)

def process_dense_data(self, cluster_cells, data_dir):
"""
Main handler to read dense matrix data and process entries at the gene level

:param cluster_cells: (list) cell names from cluster file
:param data_dir: (str) name of output dir
"""
pool = multiprocessing.Pool(self.num_cores)
slice_indexes = self.get_file_seek_points()
matrix_file, local_matrix_path = open_file(self.matrix_file_path)
header = matrix_file.readline().rstrip()
values = re.split(COMMA_OR_TAB, header)
cells = values[1:]
processor = partial(
self.read_dense_matrix_slice, matrix_cells=cells, cluster_cells=cluster_cells, data_dir=data_dir
)
pool.map(processor, slice_indexes)

def read_dense_matrix_slice(self, indexes, matrix_cells, cluster_cells, data_dir):
"""
Read a dense matrix using start/stop indexes and create to individual gene-level files
:param indexes: (list) start/stop index points to read from/to
:param matrix_cells: (list) cell names from matrix file
:param cluster_cells: (list) cell names from cluster file
:param data_dir: (str) name of output dir
"""
start_pos, end_pos = indexes
self.dev_logger.info(f" reading {self.local_matrix_path} at index {start_pos}:{end_pos}")
with open_file(self.local_matrix_path)[0] as matrix_file:
current_pos = start_pos
matrix_file.seek(current_pos)
while current_pos < end_pos:
line = matrix_file.readline()
process_dense_line(line, matrix_cells, cluster_cells, data_dir)
current_pos += len(line)

def render_artifacts(self):
"""
Main handler, determines type of processing to execute (dense vs. sparse)
"""
start_time = datetime.datetime.now()
sanitized_cluster_name = encode_cluster_name(self.cluster_name)
self.dev_logger.info(f" creating data directory at {sanitized_cluster_name}")
make_data_dir(sanitized_cluster_name)
cluster_cells = get_cluster_cells(self.local_cluster_path)
if self.matrix_file_type == 'mtx' and self.gene_file is not None and self.barcode_file is not None:
self.dev_logger.info(f" reading {self.matrix_file_path} as sparse matrix")
self.dev_logger.info(f" reading entities from {self.gene_file}")
genes_file = open_file(self.gene_file)[0]
genes = load_entities_as_list(genes_file)
self.dev_logger.info(f" reading entities from {self.barcode_file}")
barcodes_file = open_file(self.barcode_file)[0]
barcodes = load_entities_as_list(barcodes_file)
self.divide_sparse_matrix(genes, sanitized_cluster_name)
self.write_empty_sparse_genes(genes, len(cluster_cells), sanitized_cluster_name)
self.process_sparse_data_fragments(barcodes, cluster_cells, sanitized_cluster_name)
elif self.matrix_file_type == 'dense':
self.dev_logger.info(f" reading {self.matrix_file_path} as dense matrix")
self.process_dense_data(cluster_cells, sanitized_cluster_name)
end_time = datetime.datetime.now()
time_diff = relativedelta(end_time, start_time)
self.dev_logger.info(
f" completed, total runtime: {time_diff.hours}h, {time_diff.minutes}m, {time_diff.seconds}s"
)
self.delocalize_outputs(sanitized_cluster_name)

def delocalize_outputs(self, cluster_name):
"""
Copy all output files to study bucket in parallel using gsutil (since there are usually ~25-30K files)

:param cluster_name: (str) encoded name of cluster
"""
if self.bucket is not None:
bucket_path = f"{self.DELOCALIZE_FOLDER}/{cluster_name}"
self.dev_logger.info(f" pushing all output files to {bucket_path}")
dir_files = os.listdir(cluster_name)
files_to_push = list(file for file in dir_files if 'gene_entries' not in file)
for file in files_to_push:
local_path = f"{cluster_name}/{file}"
IngestFiles.delocalize_file(None, None, self.matrix_file_path, local_path, f"{bucket_path}/{file}")
self.dev_logger.info(" push completed")
handler = self.dev_logger.handlers[0]
log_filename = handler.baseFilename.split("/").pop()
IngestFiles.delocalize_file(None, None, self.matrix_file_path, log_filename, f"parse_logs/{log_filename}")

Loading