Skip to content

Commit

Permalink
Merge 886de4c into b4a23f9
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-l-kong committed Sep 14, 2022
2 parents b4a23f9 + 886de4c commit 416ca5c
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 36 deletions.
15 changes: 6 additions & 9 deletions ark/phenotyping/cell_cluster_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,9 +497,8 @@ def train_cell_som(fovs, channels, base_dir, pixel_data_dir, cell_table_path,
print(output.strip())

if process.returncode != 0:
raise MemoryError(
"Process terminated: you likely have a memory-related error. Try increasing "
"your Docker memory limit."
raise OSError(
"Process terminated: please view error messages displayed above for debugging."
)

# read in the pixel channel averages table
Expand Down Expand Up @@ -602,9 +601,8 @@ def cluster_cells(base_dir, cluster_counts_norm_name='cluster_counts_norm.feathe
print(output.strip())

if process.returncode != 0:
raise MemoryError(
"Process terminated: you likely have a memory-related error. Try increasing "
"your Docker memory limit."
raise OSError(
"Process terminated: please view error messages displayed above for debugging."
)

# compute the average pixel SOM/meta counts per cell SOM cluster
Expand Down Expand Up @@ -726,9 +724,8 @@ def cell_consensus_cluster(fovs, channels, base_dir, pixel_cluster_col, max_k=20
print(output.strip())

if process.returncode != 0:
raise MemoryError(
"Process terminated: you likely have a memory-related error. Try increasing "
"your Docker memory limit."
raise OSError(
"Process terminated: please view error messages displayed above for debugging."
)

# compute the average pixel SOM/meta counts per cell meta cluster
Expand Down
28 changes: 15 additions & 13 deletions ark/phenotyping/pixel_cluster_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -895,17 +895,17 @@ def train_pixel_som(fovs, channels, base_dir,
print(output.strip())

if process.returncode != 0:
raise MemoryError(
"Process terminated: you likely have a memory-related error. Try increasing "
"your Docker memory limit."
raise OSError(
"Process terminated: please view error messages displayed above for debugging. "
"For pixel SOM training, you will likely need to decrease the pixel subset proportion."
)


def cluster_pixels(fovs, channels, base_dir, data_dir='pixel_mat_data',
norm_vals_name='post_rowsum_chan_norm.feather',
weights_name='pixel_weights.feather',
pc_chan_avg_som_cluster_name='pixel_channel_avg_som_cluster.csv',
batch_size=5):
batch_size=5, ncores=multiprocessing.cpu_count() - 1):
"""Uses trained weights to assign cluster labels on full pixel data
Saves data with cluster labels to `cluster_dir`. Computes and saves the average channel
expression across pixel SOM clusters.
Expand All @@ -927,6 +927,8 @@ def cluster_pixels(fovs, channels, base_dir, data_dir='pixel_mat_data',
The name of the file to save the average channel expression across all SOM clusters
batch_size (int):
The number of FOVs to process in parallel
ncores (int):
The number of cores desired for multiprocessing
"""

# define the paths to the data
Expand Down Expand Up @@ -1010,7 +1012,7 @@ def cluster_pixels(fovs, channels, base_dir, data_dir='pixel_mat_data',

# run the trained SOM on the dataset, assigning clusters
process_args = ['Rscript', '/run_pixel_som.R', ','.join(fovs_list),
data_path, norm_vals_path, weights_path, str(batch_size)]
data_path, norm_vals_path, weights_path, str(batch_size), str(ncores)]

process = subprocess.Popen(process_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

Expand All @@ -1026,9 +1028,8 @@ def cluster_pixels(fovs, channels, base_dir, data_dir='pixel_mat_data',
print(output.strip())

if process.returncode != 0:
raise MemoryError(
"Process terminated: you likely have a memory-related error. Try increasing "
"your Docker memory limit."
raise OSError(
"Process terminated: please view error messages displayed above for debugging."
)

# remove the data directory and rename the temp directory to the data directory
Expand Down Expand Up @@ -1059,7 +1060,7 @@ def pixel_consensus_cluster(fovs, channels, base_dir, max_k=20, cap=3,
pc_chan_avg_som_cluster_name='pixel_channel_avg_som_cluster.csv',
pc_chan_avg_meta_cluster_name='pixel_channel_avg_meta_cluster.csv',
clust_to_meta_name='pixel_clust_to_meta.feather',
batch_size=5, seed=42):
batch_size=5, ncores=multiprocessing.cpu_count() - 1, seed=42):
"""Run consensus clustering algorithm on pixel-level summed data across channels
Saves data with consensus cluster labels to `consensus_dir`. Computes and saves the
average channel expression across pixel meta clusters. Assigns meta cluster labels
Expand Down Expand Up @@ -1087,6 +1088,8 @@ def pixel_consensus_cluster(fovs, channels, base_dir, max_k=20, cap=3,
Name of file storing the SOM cluster to meta cluster mapping
batch_size (int):
The number of FOVs to process in parallel
ncores (int):
The number of cores desired for multiprocessing
seed (int):
The random seed to set for consensus clustering
"""
Expand Down Expand Up @@ -1133,7 +1136,7 @@ def pixel_consensus_cluster(fovs, channels, base_dir, max_k=20, cap=3,
process_args = ['Rscript', '/pixel_consensus_cluster.R',
','.join(fovs_list), ','.join(channels),
str(max_k), str(cap), data_path, som_cluster_avg_path,
clust_to_meta_path, str(batch_size), str(seed)]
clust_to_meta_path, str(batch_size), str(ncores), str(seed)]

process = subprocess.Popen(process_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

Expand All @@ -1149,9 +1152,8 @@ def pixel_consensus_cluster(fovs, channels, base_dir, max_k=20, cap=3,
print(output.strip())

if process.returncode != 0:
raise MemoryError(
"Process terminated: you likely have a memory-related error. Try increasing "
"your Docker memory limit."
raise OSError(
"Process terminated: please view error messages displayed above for debugging."
)

# remove the data directory and rename the temp directory to the data directory
Expand Down
29 changes: 23 additions & 6 deletions ark/phenotyping/pixel_consensus_cluster.R
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,11 @@ clustToMetaPath <- args[7]
# retrieve the batch size to determine number of threads to run in parallel
batchSize <- strtoi(args[8])

# get the number of cores
nCores <- strtoi(args[9])

# set the random seed
seed <- strtoi(args[9])
seed <- strtoi(args[10])
set.seed(seed)

# read cluster averaged data
Expand Down Expand Up @@ -82,7 +85,20 @@ for (batchStart in seq(1, length(fovs), batchSize)) {
# define the parallel cluster for this batch of fovs
# NOTE: to prevent the occassional hanging first FOV issue, we need to log to an outfile
# to "force" a return out of the foreach loop in this case
parallelCluster <- parallel::makeCluster(nCores, type="FORK", outfile='log.txt')
parallelStatus <- tryCatch(
{
parallelCluster <- parallel::makeCluster(nCores, type="FORK", outfile='log.txt')
0
},
error=function(cond) {
1
}
)

if (parallelStatus == 1) {
print(paste0("Too many cores (", nCores, ") specifed, reduce this using the ncores parameter"))
quit(status=1)
}

# register parallel cluster for dopar
doParallel::registerDoParallel(cl=parallelCluster)
Expand Down Expand Up @@ -112,23 +128,24 @@ for (batchStart in seq(1, length(fovs), batchSize)) {
# this won't be displayed to the user but is used as a helper to break out
# in the rare first FOV hang issue
print(paste('Done writing fov', fovs[i]))
0
c(0, '')
},
error=function(cond) {
# this won't be displayed to the user but is used as a helper to break out
# in the rare first FOV hang issue
print(paste('Error encountered for fov', fovs[i]))
1
c(1, cond)
}
)

data.frame(fov=fovs[i], status=status)
data.frame(fov=fovs[i], status=status[1], errCond=status[2])
}

# report any erroneous feather files
for (i in 1:nrow(fovStatuses)) {
if (fovStatuses[i, 'status'] == 1) {
print(paste("The data for FOV", fovStatuses[i, 'fov'], "has been corrupted, removing"))
print(paste("Processing for FOV", fovStatuses[i, 'fov'], "failed, removing from pipeline. Error message:"))
print(fovStatuses[i, 'errCond'])
fovsProcessed <- fovsProcessed - 1
}
}
Expand Down
30 changes: 22 additions & 8 deletions ark/phenotyping/run_pixel_som.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ suppressPackageStartupMessages({
library(parallel)
})

# get the number of cores
nCores <- parallel::detectCores() - 1

# get the command line arguments
args <- commandArgs(trailingOnly=TRUE)

Expand All @@ -37,6 +34,9 @@ pixelWeightsPath <- args[4]
# retrieve the batch size to determine number of threads to run in parallel
batchSize <- strtoi(args[5])

# get the number of cores
nCores <- strtoi(args[6])

# read the weights
somWeights <- as.matrix(arrow::read_feather(pixelWeightsPath))

Expand All @@ -58,7 +58,20 @@ for (batchStart in seq(1, length(fovs), batchSize)) {
# define the parallel cluster for this batch of fovs
# NOTE: to prevent the occassional hanging first FOV issue, we need to log to an outfile
# to "force" a return out of the foreach loop in this case
parallelCluster <- parallel::makeCluster(nCores, type="FORK", outfile='log.txt')
parallelStatus <- tryCatch(
{
parallelCluster <- parallel::makeCluster(nCores, type="FORK", outfile='log.txt')
0
},
error=function(cond) {
1
}
)

if (parallelStatus == 1) {
print(paste0("Too many cores (", nCores, ") specified, reduce this using the ncores parameter"))
quit(status=1)
}

# register parallel cluster for dopar
doParallel::registerDoParallel(cl=parallelCluster)
Expand Down Expand Up @@ -97,23 +110,24 @@ for (batchStart in seq(1, length(fovs), batchSize)) {
# this won't be displayed to the user but is used as a helper to break out
# in the rare first FOV hang issue
print(paste('Done writing fov', fovs[i]))
0
c(0, '')
},
error=function(cond) {
# this won't be displayed to the user but is used as a helper to break out
# in the rare first FOV hang issue
print(paste('Error encountered for fov', fovs[i]))
1
c(1, cond)
}
)

data.frame(fov=fovs[i], status=status)
data.frame(fov=fovs[i], status=status[1], errCond=status[2])
}

# report any erroneous feather files
for (i in 1:nrow(fovStatuses)) {
if (fovStatuses[i, 'status'] == 1) {
print(paste("The data for FOV", fovStatuses[i, 'fov'], "has been corrupted, removing"))
print(paste("Processing for FOV", fovStatuses[i, 'fov'], "failed, removing from pipeline. Error message:"))
print(fovStatuses[i, 'errCond'])
fovsProcessed <- fovsProcessed - 1
}
}
Expand Down

0 comments on commit 416ca5c

Please sign in to comment.