Skip to content

Commit

Permalink
Merge 65b1c8a into 096af2f
Browse files Browse the repository at this point in the history
  • Loading branch information
Timothee Cezard committed Mar 18, 2019
2 parents 096af2f + 65b1c8a commit 1997fe0
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 46 deletions.
8 changes: 4 additions & 4 deletions analysis_driver/pipelines/demultiplexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,10 @@ def _run(self):
])

# get the last cycle extracted
current_cycle = sorted(interop_metrics.get_cycles_extracted(self.dataset.input_dir))[-1]
current_cycle = interop_metrics.get_last_cycles_with_existing_bcls(self.dataset.input_dir)
while current_cycle < required_number_of_cycle and self.dataset.is_sequencing():
time.sleep(1200)
current_cycle = sorted(interop_metrics.get_cycles_extracted(self.dataset.input_dir))[-1]
current_cycle = interop_metrics.get_last_cycles_with_existing_bcls(self.dataset.input_dir)

# make sure the run is not aborted or errored before continuing with the rest of the pipeline
run_status = self.dataset.lims_run.udf.get('Run Status')
Expand Down Expand Up @@ -492,8 +492,8 @@ def stage(cls, **params):
previous_stages=[welldups, integrity_check, fastqc, seqtk, md5])

qc_output2 = stage(QCOutput, stage_name='qcoutput2', run_crawler_stage=RunCrawler.STAGE_MAPPING,
previous_stages=[stats_output, depth_output, md_output, is_output, gc_bias])
data_output = stage(DataOutput, previous_stages=[qc_output, qc_output2])
previous_stages=[qc_output, stats_output, depth_output, md_output, is_output, gc_bias])
data_output = stage(DataOutput, previous_stages=[qc_output2])
_cleanup = stage(Cleanup, previous_stages=[data_output])
review = stage(RunReview, previous_stages=[_cleanup])
return review
21 changes: 6 additions & 15 deletions analysis_driver/quality_control/bcl_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,18 @@ def get_bcls_to_check(self):
cycle (i.e. has a full set of tiles), and find all bcls for those cycles/tiles that haven't yet been
checked.
"""
all_cycles = self._all_cycles_from_interop()
ncycles = sum(Reads.num_cycles(r) for r in self.dataset.run_info.reads.reads)
if all_cycles and all_cycles[-1] > ncycles:
raise AnalysisDriverError('Number of cycles (%s) disagrees with RunInfo (%s)' % (all_cycles[-1], ncycles))

tile_ids = self.dataset.run_info.tiles
last_completed_cycle = 0
for c in sorted(set(all_cycles), reverse=True):
if all_cycles.count(c) >= len(tile_ids):
last_completed_cycle = c + 1 # compensate for zero-indexing
break

last_completed_cycle = interop_metrics.get_last_cycles_with_existing_bcls(self.run_dir)
if last_completed_cycle == 0: # no cycles are complete, so do nothing
return []

ncycles = sum(Reads.num_cycles(r) for r in self.dataset.run_info.reads.reads)
if last_completed_cycle > ncycles:
raise AnalysisDriverError('Number of cycles (%s) disagrees with RunInfo (%s)' % (last_completed_cycle, ncycles))

validated_bcls = self.read_valid_files()
bcls_to_check = []
for c in range(1, last_completed_cycle):
for c in range(1, last_completed_cycle + 1): # compensate for zero-indexing
cycle_id = 'C%s.1' % c
for t in tile_ids:
lane = t[0] # 1_1101
Expand Down Expand Up @@ -133,6 +127,3 @@ def read_valid_files(self):
return [bcl for bcl, exit_status in self.read_check_bcl_files() if int(exit_status) == 0]
else:
return []

def _all_cycles_from_interop(self):
return interop_metrics.get_cycles_extracted(self.run_dir)
22 changes: 19 additions & 3 deletions analysis_driver/quality_control/interop_metrics.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import glob
import os
from collections import defaultdict
from itertools import islice
from egcg_core.config import cfg
from egcg_core.app_logging import AppLogger
from egcg_core import util
from interop.py_interop_run_metrics import run_metrics as RunMetrics
from analysis_driver.reader.run_info import Reads

Expand Down Expand Up @@ -109,9 +112,22 @@ def detect_bad_cycles(self):
return bad_cycle_per_lanes


def get_cycles_extracted(run_dir):
def get_last_cycles_with_existing_bcls(run_dir):
"""
This function checks the extracted cycle from the interop and confirm the presence of the bcl files on the filesystem.
The confirmation is only performed from the last cycles and the first full confirmed cycle is returned.
:param run_dir: The location where the run is stored
:returns: the last cycle of the run with existing bcls
"""
run_metrics = RunMetrics()
run_metrics.read(run_dir)
extraction_metrics = run_metrics.extraction_metric_set()
return sorted(extraction_metrics.cycles())

all_cycles = sorted(extraction_metrics.cycles())
last_complete_cycles = 0
# start from the last cycle and walk back until found a cycle with all the bcl present.
for cycle in all_cycles[::-1]:
all_bcls = util.find_files(run_dir, 'Data', 'Intensities', 'BaseCalls', 'L00*', 'C%s.1' % cycle, '*.bcl.gz')
if len(all_bcls) == extraction_metrics.metrics_for_cycle(cycle).size():
last_complete_cycles = cycle
break
return last_complete_cycles
1 change: 0 additions & 1 deletion analysis_driver/report_generation/run_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ def _populate_unknown_elements(self, unknown_run_elements, reads_per_lane):
ELEMENT_NB_READS_PASS_FILTER: int(clust_count)
}


def _populate_from_mapping_stats(self, run_dir):
for run_element_id in self.barcodes_info:
barcode_info = self.barcodes_info.get(run_element_id)
Expand Down
14 changes: 7 additions & 7 deletions tests/test_pipelines/test_demultiplexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ def test_run(self):
lims_run=Mock(udf={'Run Status': 'RunStarted'}))

# cycle extracted states 310 cycles done
pcycles = patch('analysis_driver.quality_control.interop_metrics.get_cycles_extracted',
return_value=range(1, 311))
pcycles = patch('analysis_driver.quality_control.interop_metrics.get_last_cycles_with_existing_bcls',
return_value=310)

with pcycles as mcycle:
# No sleep time
Expand All @@ -118,8 +118,8 @@ def test_run(self):
mcycle.assert_called_once_with('path/to/input')

# cycle extracted states first 207 then 208 cycles done
pcycles = patch('analysis_driver.quality_control.interop_metrics.get_cycles_extracted',
side_effect=[range(1, 208), range(1, 209)])
pcycles = patch('analysis_driver.quality_control.interop_metrics.get_last_cycles_with_existing_bcls',
side_effect=[207, 208])
with pcycles as mcycle, patch('time.sleep') as msleep:
self.stage = dm.WaitForRead2(dataset=dataset)
assert self.stage._run() == 0
Expand All @@ -133,9 +133,9 @@ def test_run_aborted(self):
dataset = NamedMock(real_name='testrun', run_info=run_info, input_dir='path/to/input',
lims_run=Mock(udf={'Run Status': 'RunAborted'}))

# cycle extracted states 209 cycles done
pcycles = patch('analysis_driver.quality_control.interop_metrics.get_cycles_extracted',
return_value=range(1, 209))
# cycle extracted states 208 cycles done
pcycles = patch('analysis_driver.quality_control.interop_metrics.get_last_cycles_with_existing_bcls',
return_value=208)

with pcycles as mcycle:
with pytest.raises(SequencingRunError):
Expand Down
22 changes: 8 additions & 14 deletions tests/test_quality_control/test_bcl_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ def setUp(self):
if os.path.isfile(self.val.validation_log):
os.remove(self.val.validation_log)

@patch('analysis_driver.quality_control.BCLValidator._all_cycles_from_interop')
@patch('analysis_driver.quality_control.interop_metrics.get_last_cycles_with_existing_bcls')
def test_get_bcl_files_to_check(self, mocked_cycles):
mocked_cycles.return_value = [1, 1, 1] # no completed cycles
mocked_cycles.return_value = 0 # no completed cycles
assert self.val.get_bcls_to_check() == []

mocked_cycles.return_value.extend([1, 2, 2, 2]) # completed cycle 1, but not cycle 2
mocked_cycles.return_value = 1 # completed cycle 1
assert self.val.get_bcls_to_check() == [
'L001/C1.1/s_1_1101.bcl.gz', 'L002/C1.1/s_2_1101.bcl.gz',
'L001/C1.1/s_1_1102.bcl.gz', 'L002/C1.1/s_2_1102.bcl.gz'
]

mocked_cycles.return_value.extend([2, 3, 3, 3, 3])
mocked_cycles.return_value = 3 # completed cycle 3
obs = self.val.get_bcls_to_check()
exp = [
'L001/C1.1/s_1_1101.bcl.gz', 'L001/C1.1/s_1_1102.bcl.gz',
Expand All @@ -40,8 +40,8 @@ def test_get_bcl_files_to_check(self, mocked_cycles):

@patch('analysis_driver.quality_control.bcl_validation.executor.execute')
def test_run_bcl_check(self, mocked_execute):
with patch('analysis_driver.quality_control.BCLValidator._all_cycles_from_interop',
return_value=[1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3]):
with patch('analysis_driver.quality_control.interop_metrics.get_last_cycles_with_existing_bcls',
return_value=3):
bcls = self.val.get_bcls_to_check()
validation_log_tmp = os.path.join(self.val.job_dir, 'tmp_checked_bcls.csv')
self.val.run_bcl_check(bcls, slice_size=2, max_job_number=5)
Expand Down Expand Up @@ -71,18 +71,12 @@ def test_run_bcl_check(self, mocked_execute):
)
mocked_execute.assert_has_calls([call_1, call().join(), call_2, call().join()])

def test_cycles_from_interop(self):
interop_dir = os.path.join(self.run_dir, 'InterOp')
os.makedirs(interop_dir, exist_ok=True)
assert self.val._all_cycles_from_interop() == [] # no ExtractionMetrics
open(os.path.join(interop_dir, 'ExtractionMetricsOut.bin'), 'w').close()
assert self.val._all_cycles_from_interop() == [] # empty ExtractionMetrics

@patch('analysis_driver.quality_control.BCLValidator.call_bcl_check')
def test_check_bcls(self, mocked_check_bcls):
patched_cycles = patch(
'analysis_driver.quality_control.BCLValidator._all_cycles_from_interop',
return_value=[1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3]
'analysis_driver.quality_control.interop_metrics.get_last_cycles_with_existing_bcls',
return_value=3
)
patched_execute = patch(
'analysis_driver.quality_control.bcl_validation.executor.execute',
Expand Down
18 changes: 16 additions & 2 deletions tests/test_quality_control/test_interop_metrics.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from unittest.mock import Mock
from unittest.mock import Mock, patch

from analysis_driver.quality_control.interop_metrics import BadTileCycleDetector
from analysis_driver.quality_control.interop_metrics import BadTileCycleDetector, get_last_cycles_with_existing_bcls
from tests.test_analysisdriver import TestAnalysisDriver


Expand Down Expand Up @@ -93,3 +93,17 @@ def test_detect_bad_cycles(self):
]
self.detector.all_lanes = {1: ({}, {'1': list_q_hist1, '2': list_q_hist2})}
assert dict(self.detector.detect_bad_cycles()) == {1: ['1']}

def test_get_last_cycles_with_existing_bcls(self):
# no valid Interop files
assert get_last_cycles_with_existing_bcls(self.job_dir) == 0

with patch('analysis_driver.quality_control.interop_metrics.RunMetrics.extraction_metric_set') as m_run_metrics:
cycles = [1, 2, 3]
m_run_metrics.return_value = Mock(
cycles=Mock(return_value=cycles),
# will need to find 1 file on first cycle tested then 0 file
metrics_for_cycle=Mock(return_value=Mock(size=Mock(return_value=4)))
)
# cycle 3 only has 3 bcls so it should return cycle 2
assert get_last_cycles_with_existing_bcls(self.job_dir) == 2

0 comments on commit 1997fe0

Please sign in to comment.