Skip to content

Commit

Permalink
remove multithreading from ExtractIlluminaBarcodes
Browse files Browse the repository at this point in the history
  • Loading branch information
akiezun committed Apr 29, 2015
1 parent de85dc5 commit e12e51b
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,60 +2,34 @@

import htsjdk.samtools.metrics.MetricBase;
import htsjdk.samtools.metrics.MetricsFile;
import htsjdk.samtools.util.IOUtil;
import htsjdk.samtools.util.Log;
import htsjdk.samtools.util.SequenceUtil;
import htsjdk.samtools.util.StringUtil;
import org.broadinstitute.hellbender.cmdline.Argument;
import org.broadinstitute.hellbender.cmdline.CommandLineProgram;
import org.broadinstitute.hellbender.cmdline.CommandLineProgramProperties;
import org.broadinstitute.hellbender.cmdline.PicardCommandLineProgram;
import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions;
import org.broadinstitute.hellbender.cmdline.programgroups.IlluminaProgramGroup;
import org.broadinstitute.hellbender.tools.picard.illumina.parser.ClusterData;
import org.broadinstitute.hellbender.tools.picard.illumina.parser.IlluminaDataProvider;
import org.broadinstitute.hellbender.tools.picard.illumina.parser.IlluminaDataProviderFactory;
import org.broadinstitute.hellbender.tools.picard.illumina.parser.IlluminaDataType;
import org.broadinstitute.hellbender.tools.picard.illumina.parser.ReadDescriptor;
import org.broadinstitute.hellbender.tools.picard.illumina.parser.ReadStructure;
import org.broadinstitute.hellbender.tools.picard.illumina.parser.ReadType;
import org.broadinstitute.hellbender.tools.picard.illumina.parser.*;
import org.broadinstitute.hellbender.tools.picard.illumina.parser.readers.BclQualityEvaluationStrategy;
import org.broadinstitute.hellbender.utils.illumina.IlluminaUtil;
import org.broadinstitute.hellbender.utils.text.parsers.TabbedTextFileWithHeaderParser;

import java.io.BufferedWriter;
import java.io.File;
import java.text.NumberFormat;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static htsjdk.samtools.util.IOUtil.assertDirectoryIsWritable;
import static htsjdk.samtools.util.IOUtil.assertFileIsWritable;
import static htsjdk.samtools.util.IOUtil.openFileForBufferedWriting;
import static htsjdk.samtools.util.IOUtil.*;
import static htsjdk.samtools.util.Log.getInstance;
import static htsjdk.samtools.util.SequenceUtil.basesEqual;
import static htsjdk.samtools.util.SequenceUtil.isNoCall;
import static htsjdk.samtools.util.StringUtil.bytesToString;
import static htsjdk.samtools.util.StringUtil.join;
import static htsjdk.samtools.util.StringUtil.repeatCharNTimes;
import static htsjdk.samtools.util.StringUtil.stringToBytes;
import static htsjdk.samtools.util.StringUtil.*;
import static java.lang.Math.min;
import static java.lang.Runtime.getRuntime;
import static java.lang.String.format;
import static java.lang.String.valueOf;
import static java.text.NumberFormat.getNumberInstance;
import static java.util.Arrays.asList;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions.LANE_SHORT_NAME;
import static org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions.METRICS_FILE_SHORT_NAME;
import static org.broadinstitute.hellbender.tools.picard.illumina.ExtractIlluminaBarcodes.BarcodeMetric.copy;
import static org.broadinstitute.hellbender.tools.picard.illumina.parser.IlluminaDataType.BaseCalls;
import static org.broadinstitute.hellbender.tools.picard.illumina.parser.IlluminaDataType.PF;
import static org.broadinstitute.hellbender.tools.picard.illumina.parser.IlluminaDataType.QualityScores;
import static org.broadinstitute.hellbender.tools.picard.illumina.parser.IlluminaDataType.*;
import static org.broadinstitute.hellbender.tools.picard.illumina.parser.ReadStructure.PARAMETER_DOC;
import static org.broadinstitute.hellbender.tools.picard.illumina.parser.ReadType.Barcode;
import static org.broadinstitute.hellbender.tools.picard.illumina.parser.readers.BclQualityEvaluationStrategy.ILLUMINA_ALLEGED_MINIMUM_QUALITY;
Expand Down Expand Up @@ -87,7 +61,7 @@
usageShort = "Tool to determine the barcode for each read in an Illumina lane",
programGroup = IlluminaProgramGroup.class
)
public class ExtractIlluminaBarcodes extends PicardCommandLineProgram {
public final class ExtractIlluminaBarcodes extends PicardCommandLineProgram {

@Argument(doc = "The Illumina basecalls directory. ", shortName = "B")
public File BASECALLS_DIR;
Expand Down Expand Up @@ -132,11 +106,6 @@ public class ExtractIlluminaBarcodes extends PicardCommandLineProgram {
@Argument(shortName = "GZIP", doc = "Compress output s_l_t_barcode.txt files using gzip and append a .gz extension to the file names.")
public boolean COMPRESS_OUTPUTS = false;

@Argument(doc = "Run this many PerTileBarcodeExtractors in parallel. If NUM_PROCESSORS = 0, number of cores is automatically set to " +
"the number of cores available on the machine. If NUM_PROCESSORS < 0 then the number of cores used will be " +
"the number available on the machine less NUM_PROCESSORS.")
public int NUM_PROCESSORS = 1;

private static final Log LOG = getInstance(ExtractIlluminaBarcodes.class);

/**
Expand Down Expand Up @@ -176,18 +145,6 @@ protected Object doWork() {

final BarcodeMetric noMatchMetric = new BarcodeMetric(null, null, barcodeSeqsToString(noMatchBarcode), noMatchBarcode);

final int numProcessors;
if (NUM_PROCESSORS == 0) {
numProcessors = getRuntime().availableProcessors();
} else if (NUM_PROCESSORS < 0) {
numProcessors = getRuntime().availableProcessors() + NUM_PROCESSORS;
} else {
numProcessors = NUM_PROCESSORS;
}

LOG.info("Processing with " + numProcessors + " PerTileBarcodeExtractor(s).");
final ExecutorService pool = newFixedThreadPool(numProcessors);

// TODO: This is terribly inefficient; we're opening a huge number of files via the extractor constructor and we never close them.
final List<PerTileBarcodeExtractor> extractors = new ArrayList<>(factory.getAvailableTiles().size());
for (final int tile : factory.getAvailableTiles()) {
Expand All @@ -204,25 +161,7 @@ protected Object doWork() {
);
extractors.add(extractor);
}
try {
for (final PerTileBarcodeExtractor extractor : extractors) {
pool.submit(extractor);
}
pool.shutdown();
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(6, HOURS)) {
pool.shutdownNow(); // Cancel any still-executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, SECONDS))
LOG.error("Pool did not terminate");
return 1;
}
} catch (final Throwable e) {
// (Re-)Cancel if current thread also interrupted
LOG.error(e, "Parent thread encountered problem submitting extractors to thread pool or awaiting shutdown of threadpool. Attempting to kill threadpool.");
pool.shutdownNow();
return 2;
}
extractors.forEach(ExtractIlluminaBarcodes.PerTileBarcodeExtractor::run);

LOG.info("Processed " + extractors.size() + " tiles.");
for (final PerTileBarcodeExtractor extractor : extractors) {
Expand Down Expand Up @@ -366,39 +305,39 @@ protected String[] customCommandLineValidation() {
private static final String LIBRARY_NAME_COLUMN = "library_name";

private void parseBarcodeFile(final ArrayList<String> messages) {
final TabbedTextFileWithHeaderParser barcodesParser = new TabbedTextFileWithHeaderParser(BARCODE_FILE);
final String sequenceColumn = barcodesParser.hasColumn(BARCODE_SEQUENCE_COLUMN)
? BARCODE_SEQUENCE_COLUMN : barcodesParser.hasColumn(BARCODE_SEQUENCE_1_COLUMN)
? BARCODE_SEQUENCE_1_COLUMN : null;
if (sequenceColumn == null) {
messages.add(BARCODE_FILE + " does not have " + BARCODE_SEQUENCE_COLUMN + " or " +
BARCODE_SEQUENCE_1_COLUMN + " column header");
return;
}
final boolean hasBarcodeName = barcodesParser.hasColumn(BARCODE_NAME_COLUMN);
final boolean hasLibraryName = barcodesParser.hasColumn(LIBRARY_NAME_COLUMN);
final int numBarcodes = readStructure.barcodes.length();
final Set<String> barcodes = new HashSet<>();
for (final TabbedTextFileWithHeaderParser.Row row : barcodesParser) {
final String bcStrings[] = new String[numBarcodes];
int barcodeNum = 1;
for (final ReadDescriptor rd : readStructure.descriptors) {
if (rd.type != Barcode) continue;
final String header = barcodeNum == 1 ? sequenceColumn : "barcode_sequence_" + valueOf(barcodeNum);
bcStrings[barcodeNum - 1] = row.getField(header);
barcodeNum++;
try (final TabbedTextFileWithHeaderParser barcodesParser = new TabbedTextFileWithHeaderParser(BARCODE_FILE)) {
final String sequenceColumn = barcodesParser.hasColumn(BARCODE_SEQUENCE_COLUMN)
? BARCODE_SEQUENCE_COLUMN : barcodesParser.hasColumn(BARCODE_SEQUENCE_1_COLUMN)
? BARCODE_SEQUENCE_1_COLUMN : null;
if (sequenceColumn == null) {
messages.add(BARCODE_FILE + " does not have " + BARCODE_SEQUENCE_COLUMN + " or " +
BARCODE_SEQUENCE_1_COLUMN + " column header");
return;
}
final String bcStr = barcodeSeqsToString(bcStrings);
if (barcodes.contains(bcStr)) {
messages.add("Barcode " + bcStr + " specified more than once in " + BARCODE_FILE);
final boolean hasBarcodeName = barcodesParser.hasColumn(BARCODE_NAME_COLUMN);
final boolean hasLibraryName = barcodesParser.hasColumn(LIBRARY_NAME_COLUMN);
final int numBarcodes = readStructure.barcodes.length();
final Set<String> barcodes = new HashSet<>();
for (final TabbedTextFileWithHeaderParser.Row row : barcodesParser) {
final String bcStrings[] = new String[numBarcodes];
int barcodeNum = 1;
for (final ReadDescriptor rd : readStructure.descriptors) {
if (rd.type != Barcode) continue;
final String header = barcodeNum == 1 ? sequenceColumn : "barcode_sequence_" + valueOf(barcodeNum);
bcStrings[barcodeNum - 1] = row.getField(header);
barcodeNum++;
}
final String bcStr = barcodeSeqsToString(bcStrings);
if (barcodes.contains(bcStr)) {
messages.add("Barcode " + bcStr + " specified more than once in " + BARCODE_FILE);
}
barcodes.add(bcStr);
final String barcodeName = (hasBarcodeName ? row.getField(BARCODE_NAME_COLUMN) : "");
final String libraryName = (hasLibraryName ? row.getField(LIBRARY_NAME_COLUMN) : "");
final BarcodeMetric metric = new BarcodeMetric(barcodeName, libraryName, bcStr, bcStrings);
barcodeToMetrics.put(join("", bcStrings), metric);
}
barcodes.add(bcStr);
final String barcodeName = (hasBarcodeName ? row.getField(BARCODE_NAME_COLUMN) : "");
final String libraryName = (hasLibraryName ? row.getField(LIBRARY_NAME_COLUMN) : "");
final BarcodeMetric metric = new BarcodeMetric(barcodeName, libraryName, bcStr, bcStrings);
barcodeToMetrics.put(join("", bcStrings), metric);
}
barcodesParser.close();
}

/**
Expand Down Expand Up @@ -523,7 +462,7 @@ public void merge(final BarcodeMetric metric) {
/**
* Extracts barcodes and accumulates metrics for an entire tile.
*/
private static class PerTileBarcodeExtractor implements Runnable {
private static class PerTileBarcodeExtractor {
private final int tile;
private final File barcodeFile;
private final Map<String, BarcodeMetric> metrics;
Expand Down Expand Up @@ -581,55 +520,52 @@ public PerTileBarcodeExtractor(
}

// These methods return the results of the extraction
public synchronized Map<String, BarcodeMetric> getMetrics() {
public Map<String, BarcodeMetric> getMetrics() {
return this.metrics;
}

public synchronized BarcodeMetric getNoMatchMetric() {
public BarcodeMetric getNoMatchMetric() {
return this.noMatch;
}

public synchronized Exception getException() {
public Exception getException() {
return this.exception;
}

/**
* run method which extracts barcodes and accumulates metrics for an entire tile
*/
synchronized public void run() {
public void run() {
try {
LOG.info("Extracting barcodes for tile " + tile);

//Sometimes makeDataProvider takes a while waiting for slow file IO, for each tile the needed set of files
//is non-overlapping sets of files so make the data providers in the individual threads for PerTileBarcodeExtractors
//so they are not all waiting for each others file operations

//Most likely we have SKIPS in our read structure since we replace all template reads with skips in the input data structure
//(see customCommnandLineValidation), therefore we must use the outputReadStructure to index into the output cluster data
final int[] barcodeIndices = outputReadStructure.barcodes.getIndices();
final BufferedWriter writer = openFileForBufferedWriting(barcodeFile);
final byte barcodeSubsequences[][] = new byte[barcodeIndices.length][];
final byte qualityScores[][] = usingQualityScores ? new byte[barcodeIndices.length][] : null;
while (provider.hasNext()) {
// Extract the barcode from the cluster and write it to the file for the tile
final ClusterData cluster = provider.next();
for (int i = 0; i < barcodeIndices.length; i++) {
barcodeSubsequences[i] = cluster.getRead(barcodeIndices[i]).getBases();
if (usingQualityScores) qualityScores[i] = cluster.getRead(barcodeIndices[i]).getQualities();
}
final boolean passingFilter = cluster.isPf();
final BarcodeMatch match = findBestBarcodeAndUpdateMetrics(barcodeSubsequences, qualityScores, passingFilter, metrics, noMatch);

final String yOrN = (match.matched ? "Y" : "N");

for (final byte[] bc : barcodeSubsequences) {
writer.write(bytesToString(bc));
try (final BufferedWriter writer = openFileForBufferedWriting(barcodeFile)) {
final int[] barcodeIndices = outputReadStructure.barcodes.getIndices();
final byte barcodeSubsequences[][] = new byte[barcodeIndices.length][];
final byte qualityScores[][] = usingQualityScores ? new byte[barcodeIndices.length][] : null;
while (provider.hasNext()) {
// Extract the barcode from the cluster and write it to the file for the tile
final ClusterData cluster = provider.next();
for (int i = 0; i < barcodeIndices.length; i++) {
barcodeSubsequences[i] = cluster.getRead(barcodeIndices[i]).getBases();
if (usingQualityScores)
qualityScores[i] = cluster.getRead(barcodeIndices[i]).getQualities();
}
final boolean passingFilter = cluster.isPf();
final BarcodeMatch match = findBestBarcodeAndUpdateMetrics(barcodeSubsequences, qualityScores, passingFilter, metrics, noMatch);

final String yOrN = (match.matched ? "Y" : "N");

for (final byte[] bc : barcodeSubsequences) {
writer.write(bytesToString(bc));
}
writer.write("\t" + yOrN + "\t" + match.barcode + "\t" + valueOf(match.mismatches) +
"\t" + valueOf(match.mismatchesToSecondBest));
writer.newLine();
}
writer.write("\t" + yOrN + "\t" + match.barcode + "\t" + valueOf(match.mismatches) +
"\t" + valueOf(match.mismatchesToSecondBest));
writer.newLine();
}
writer.close();
} catch (final Exception e) {
LOG.error(e, "Error processing tile ", this.tile);
this.exception = e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
*
* @author alecw@broadinstitute.org
*/
public class TabbedTextFileWithHeaderParser implements Iterable<TabbedTextFileWithHeaderParser.Row> {
public class TabbedTextFileWithHeaderParser implements Iterable<TabbedTextFileWithHeaderParser.Row>, AutoCloseable{
public class Row {
private final String[] fields;
private final String currentLine;
Expand Down Expand Up @@ -142,8 +142,4 @@ public void close() {
public int getCurrentLineNumber() {
return parser.getCurrentLineNumber();
}

public Set<String> getColumnNames() {
return Collections.unmodifiableSet(this.columnLabelIndices.keySet());
}
}

0 comments on commit e12e51b

Please sign in to comment.