diff --git a/.gitignore b/.gitignore index 34b69f0a..e4bda13c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ pcpostprocess/_version.py /tests/test_data /test_output +/output *__pycache__* *.egg-info *.DS_Store diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d3fe562..685a6de4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,10 +4,13 @@ This page lists the main changes made to pcpostprocess in each release. ## Unreleased - Added + - [#81](https://github.com/CardiacModelling/pcpostprocess/pull/81) Added docstrings to the `hERGQC` class. - [#104](https://github.com/CardiacModelling/pcpostprocess/pull/104) Added a CHANGELOG.md and CONTRIBUTING.md - Changed + - [#81](https://github.com/CardiacModelling/pcpostprocess/pull/81) Changed the constructor arguments for `hERGQC`. - Deprecated - Removed + - [#81](https://github.com/CardiacModelling/pcpostprocess/pull/81) Removed `hERGQC.plot_dir`, `hERGQC.set_trace` and `hERGQC.set_debug`. - Fixed ## [0.2.0] - 2025-11-11 diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index a8770f83..15e1b12c 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -149,6 +149,9 @@ For example, the first entry in our Changelog was: - [#104](https://github.com/CardiacModelling/pcpostprocess/pull/104) Added a CHANGELOG.md and CONTRIBUTING.md ``` +Changelog entries are intended for _users_, and so should focus on changes to the public API or command line interface. +Changes to internals are less likely to need reporting. + ## Packaging This project uses a minimal [`setup.py`](./setup.py), and instead uses [pyproject.toml](./pyproject.toml). diff --git a/pcpostprocess/hergQC.py b/pcpostprocess/hergQC.py index e248a3ca..9f4f9311 100644 --- a/pcpostprocess/hergQC.py +++ b/pcpostprocess/hergQC.py @@ -4,83 +4,89 @@ import matplotlib.pyplot as plt import numpy as np -import scipy.stats - -NOISE_LEN = 200 class QCDict(UserDict): """ Stores the results from QC checks. - Each entry is a label -> [(bool, value),...] mapping. + Each entry is a ``label -> [(bool, value),...]`` mapping. The bool in each tuple indicates whether the QC passed, and the value is the result that was checked (e.g. the SNR value). The list can contain multiple tuples if the QC checks multiple values, as QC1 does, or if the checks are run multiple times e.g. once per sweep. """ - labels = [ - "qc1.rseal", - "qc1.cm", - "qc1.rseries", - "qc2.raw", - "qc2.subtracted", - "qc3.raw", - "qc3.E4031", - "qc3.subtracted", - "qc4.rseal", - "qc4.cm", - "qc4.rseries", - "qc5.staircase", - "qc5.1.staircase", - "qc6.subtracted", - "qc6.1.subtracted", - "qc6.2.subtracted", - ] + labels = ( + 'qc1.rseal', # [before, after] + 'qc1.cm', # [before, after] + 'qc1.rseries', # [before, after] + 'qc2.raw', # [sweep[0], sweep[1], ...] + 'qc2.subtracted', # [sweep[0], sweep[1], ...] + 'qc3.raw', + 'qc3.E4031', + 'qc3.subtracted', + 'qc4.rseal', + 'qc4.cm', + 'qc4.rseries', + 'qc5.staircase', + 'qc5.1.staircase', + 'qc6.subtracted', + 'qc6.1.subtracted', + 'qc6.2.subtracted', + ) def __init__(self): super().__init__([(label, [(False, None)]) for label in QCDict.labels]) def __setitem__(self, key, value): if key not in QCDict.labels: - raise KeyError(f"Invalid QC key: {key}") + raise KeyError(f'Invalid QC key: {key}') super().__setitem__(key, value) def qc_passed(self, label): """Return whether a single QC passed.""" - return all([x for x, _ in self[label]]) + return all(x for x, _ in self[label]) def passed_list(self): """Return a list of booleans indicating whether each QC passed.""" - return [self.qc_passed(label) for label in QCDict.labels] + return [all(x[0] for x in tuples) for tuples in self.values()] def all_passed(self): """Return whether all QC passed.""" - return all(self.passed_list()) + return all(all(x[0] for x in tuples) for tuples in self.values()) class hERGQC: + """ + Performs hERG staircase quality control similar to Lei 2019. - def __init__(self, sampling_rate=5, plot_dir=None, voltage=np.array([]), - n_sweeps=None, removal_time=5): - # TODO docstring - - self._plot_dir = plot_dir - - self._n_qc = 16 - - self.removal_time = removal_time + @param voltage: A voltage trace, sampled at ``sampling_rate``. + @param sampling_rate: The number of samples per time unit. + @param removal_time: Number of time units to remove after each step in the protocol + @param noise_len: Number of initial samples during which the signal is flat, to use for noise estimates. + @param plot_dir: An optional directory to store plots in + """ + def __init__(self, voltage, sampling_rate=5, removal_time=5, noise_len=200, + plot_dir=None): self.voltage = np.array(voltage) + self.sampling_rate = sampling_rate + self.removal_time = removal_time + self.noise_len = int(noise_len) + # Passing in a plot dir enables debug mode + self._plot_dir = plot_dir self.logger = logging.getLogger(__name__) - self.logger.setLevel(logging.DEBUG) - - self.sampling_rate = sampling_rate + if self._plot_dir is not None: + self.logger.setLevel(logging.DEBUG) + # https://github.com/CardiacModelling/pcpostprocess/issues/42 + self._plot_dir = plot_dir # Define all thresholds + # TODO: These should be args? Or perhaps this is good so that this QC + # class can be extended? # qc1 self.rsealc = [1e8, 1e12] # in Ohm, converted from [0.1, 1000] GOhm self.cmc = [1e-12, 1e-10] # in F, converted from [1, 100] pF @@ -122,47 +128,28 @@ def __init__(self, sampling_rate=5, plot_dir=None, voltage=np.array([]), self.qc6_1_win = self.qc6_1_win.astype(int) self.qc6_2_win = self.qc6_2_win.astype(int) - self._debug = True - - @property - def plot_dir(self): - return self._plot_dir - - @plot_dir.setter - def plot_dir(self, path): - self._plot_dir = path - - def set_trace(self, before, after, qc_vals_before, qc_vals_after, n_sweeps): - self._before = before - self._qc_vals_before = qc_vals_before - self._after = after - self._qc_vals_after = qc_vals_after - - def set_debug(self, debug): - self._debug = debug - - def run_qc(self, voltage_steps, times, - before, after, qc_vals_before, + def run_qc(self, voltage_steps, times, before, after, qc_vals_before, qc_vals_after, n_sweeps=None): """Run each QC criteria on a single (before-trace, after-trace) pair. @param voltage_steps is a list of times at which there are discontinuities in Vcmd @param times is the array of observation times - @param before is the before-drug current trace - @param after is the post-drug current trace - @param qc_vals_before is an array of values for each pre-drug sweep where each row is (Rseal, Cm, Rseries) - @param qc_vals_after is an array of values for each post-drug sweep where each row is (Rseal, Cm, Rseries) - @n_sweeps is the number of sweeps to be considered + @param before are the before-drug current traces, in an array with + sweeps on the first index and values on the second. + @param after is the post-drug current traces, in an array with sweeps + on the first index and values on the second. + @param qc_vals_before is a sequence (Rseal, Cm, Rseries) + @param qc_vals_after is a sequence (Rseal, Cm, Rseries) + @n_sweeps is the number of sweeps to be run QC on. + @returns A :class:`QCDict` with the test results. """ + # TODO: Why doesn't each sweep have its own "qc_vals" ? if not n_sweeps: n_sweeps = len(before) - before = np.array(before) - after = np.array(after) - - before = self.filter_capacitive_spikes(before, times, voltage_steps) - after = self.filter_capacitive_spikes(after, times, voltage_steps) + before = self.filter_capacitive_spikes(np.array(before), times, voltage_steps) + after = self.filter_capacitive_spikes(np.array(after), times, voltage_steps) QC = QCDict() @@ -190,11 +177,10 @@ def run_qc(self, voltage_steps, times, qc3_1 = self.qc3(before[0, :], before[1, :]) qc3_2 = self.qc3(after[0, :], after[1, :]) - qc3_3 = self.qc3(before[0, :] - after[0, :], - before[1, :] - after[1, :]) + qc3_3 = self.qc3(before[0, :] - after[0, :], before[1, :] - after[1, :]) QC['qc3.raw'] = [qc3_1] - QC['qc3.E4031'] = [qc3_2] + QC['qc3.E4031'] = [qc3_2] # Change to 'control' and 'blocker' ? QC['qc3.subtracted'] = [qc3_3] rseals = [qc_vals_before[0], qc_vals_after[0]] @@ -207,9 +193,7 @@ def run_qc(self, voltage_steps, times, QC['qc4.rseries'] = [qc4[2]] # indices where hERG peaks - qc5 = self.qc5(before[0, :], after[0, :], - self.qc5_win) - + qc5 = self.qc5(before[0, :], after[0, :], self.qc5_win) qc5_1 = self.qc5_1(before[0, :], after[0, :], label='1') QC['qc5.staircase'] = [qc5] @@ -230,18 +214,18 @@ def run_qc(self, voltage_steps, times, QC['qc6.1.subtracted'] = [] QC['qc6.2.subtracted'] = [] for i in range(before.shape[0]): - qc6 = self.qc6((before[i, :] - after[i, :]), - self.qc6_win, label="0") - qc6_1 = self.qc6((before[i, :] - after[i, :]), - self.qc6_1_win, label="1") - qc6_2 = self.qc6((before[i, :] - after[i, :]), - self.qc6_2_win, label="2") + qc6 = self.qc6( + before[i, :] - after[i, :], self.qc6_win, label='0') + qc6_1 = self.qc6( + before[i, :] - after[i, :], self.qc6_1_win, label='1') + qc6_2 = self.qc6( + before[i, :] - after[i, :], self.qc6_2_win, label='2') QC['qc6.subtracted'].append(qc6) QC['qc6.1.subtracted'].append(qc6_1) QC['qc6.2.subtracted'].append(qc6_2) - if self.plot_dir and self._debug: + if self._plot_dir is not None: fig = plt.figure(figsize=(8, 5)) ax = fig.subplots() ax.plot(times, (before - after).T, label='subtracted') @@ -262,98 +246,118 @@ def run_qc(self, voltage_steps, times, by_label = OrderedDict(zip(labels, handles)) fig.legend(by_label.values(), by_label.keys()) - fig.savefig(os.path.join(self.plot_dir, 'qc_debug.png')) + fig.savefig(os.path.join(self._plot_dir, 'qc_debug.png')) plt.close(fig) return QC def qc1(self, rseal, cm, rseries): + """ + Checks that the given ``rseal``, ``cm`` and ``rseries`` are within the + desired range. + + @param rseal A scalar indicating a seal resistance in Ohm + @param cm A scalar cell capacitance in Farad + @param rseries A scalar series resistance in Ohm + @returns A tuple ``((qc11_passed, rseal), (qc12_passed, cm), (qc13_passed, rseries))``. + """ + # TODO Is there any reason these are public, other than that it was + # convenient for testing? + # TODO Could just be 1 method called 3 times + # TODO Should boundaries be arguments? + # Check R_seal, C_m, R_series within desired range - if ( - rseal is None - or rseal < self.rsealc[0] - or rseal > self.rsealc[1] - or not np.isfinite(rseal) - ): - self.logger.debug(f"rseal: {rseal}") - qc11 = False - else: - qc11 = True - - if ( - cm is None - or cm < self.cmc[0] - or cm > self.cmc[1] - or not np.isfinite(cm) - ): - self.logger.debug(f"cm: {cm}") - qc12 = False - else: - qc12 = True - - if ( - rseries is None - or rseries < self.rseriesc[0] - or rseries > self.rseriesc[1] - or not np.isfinite(rseries) - ): - self.logger.debug(f"rseries: {rseries}") - qc13 = False - else: - qc13 = True + qc11 = not (rseal is None + or rseal < self.rsealc[0] + or rseal > self.rsealc[1] + or not np.isfinite(rseal)) + qc12 = not (cm is None + or cm < self.cmc[0] + or cm > self.cmc[1] + or not np.isfinite(cm)) + qc13 = not (rseries is None + or rseries < self.rseriesc[0] + or rseries > self.rseriesc[1] + or not np.isfinite(rseries)) + + if not qc11: + self.logger.debug(f'rseal: {rseal}') + if not qc12: + self.logger.debug(f'cm: {cm}') + if not qc13: + self.logger.debug(f'rseries: {rseries}') return [(qc11, rseal), (qc12, cm), (qc13, rseries)] - def qc2(self, recording, method=3): - # Check SNR is good - if method == 1: - # Not sure if this is good... - snr = scipy.stats.signaltonoise(recording) - elif method == 2: - noise = np.std(recording[:NOISE_LEN]) - snr = (np.max(recording) - np.min(recording) - 2 * noise) / noise - elif method == 3: - noise = np.std(recording[:NOISE_LEN]) - snr = (np.std(recording) / noise) ** 2 - - if snr < self.snrc or not np.isfinite(snr): - self.logger.debug(f"snr: {snr}") - result = False - else: - result = True + def qc2(self, recording): + """ + Checks that the signal-to-noise ratio is below a certain threshold. - return (result, snr) - - def qc3(self, recording1, recording2, method=3): - # Check 2 sweeps similar - if method == 1: - rmsdc = 2 # A/F * F - elif method == 2: - noise_1 = np.std(recording1[:NOISE_LEN]) - peak_1 = (np.max(recording1) - noise_1) - noise_2 = np.std(recording2[:NOISE_LEN]) - peak_2 = (np.max(recording2) - noise_2) - rmsdc = max(np.mean([peak_1, peak_2]) * 0.1, - np.mean([noise_1, noise_2]) * 5) - elif method == 3: - noise_1 = np.std(recording1[:NOISE_LEN]) - noise_2 = np.std(recording2[:NOISE_LEN]) - rmsd0_1 = np.sqrt(np.mean((recording1) ** 2)) - rmsd0_2 = np.sqrt(np.mean((recording2) ** 2)) - rmsdc = max(np.mean([rmsd0_1, rmsd0_2]) * self.rmsd0c, - np.mean([noise_1, noise_2]) * 6) - rmsd = np.sqrt(np.mean((recording1 - recording2) ** 2)) - if rmsd > rmsdc or not (np.isfinite(rmsd) and np.isfinite(rmsdc)): - self.logger.debug(f"rmsd: {rmsd}, rmsdc: {rmsdc}") - result = False - else: - result = True + Here the signal-to-noise ratio is defined as + ``(std(recording) / std(noise))``, where ``noise`` is the initial part + of ``recording``. + + @param recording A 1-d numpy array containing recorded currents. + @returns a tuple ``(passed, signal-to-noise-ratio)``. + """ + snr = (np.std(recording) / np.std(recording[:self.noise_len])) ** 2 + passed = snr >= self.snrc and np.isfinite(snr) + if not passed: + self.logger.debug(f'snr: {snr}') + return (passed, snr) - return (result, rmsdc - rmsd) + def qc3(self, recording1, recording2): + """ + Checks that ``recording1`` and ``recording2`` are similar, using a + measure based on root-mean-square-distance. + + Particularly, it checks whether + ``RMSD(recording1, recording2) < threshold``, + where ``threshold`` is determined as the maximum of + ``w * (RMSD(recording1, 0) + RMSD(recording2, 0))`` and + ``6 * (std(noise1) + std(noise2))``. + + @param recording1 A 1-d numpy array containing recorded currents + @param recording2 The same, but for a repeat measurement + @returns a tuple ``(passed, RMSD - threshold)`` + """ + def rmsd(x, y=0): + return np.sqrt(np.mean((x - y)**2)) + + t1 = self.rmsd0c * np.mean((rmsd(recording1), rmsd(recording2))) + t2 = 6 * np.mean(( + np.std(recording1[:self.noise_len]), + np.std(recording2[:self.noise_len]))) + # TODO: This second cut-off is not documented in the paper + t = max(t1, t2) + r = rmsd(recording1, recording2) + passed = r <= t and np.isfinite(t) and np.isfinite(r) + if not passed: + self.logger.debug(f'rmsd: {r}, rmsdc: {t}') + + # TODO: Is this the best thing to return? Why not t _and_ r? + return (passed, t - r) def qc4(self, rseals, cms, rseriess): - # Check R_seal, C_m, R_series stability - # Require std/mean < x% + """ + Checks that Rseal, Cm, and Rseries values remain steady during an + experiment. + + For each quantity, it checks if ``std(x) / mean(x) <= cutoff``. + + @param rseals A list of Rseal values, for repeated experiments, in Ohm. + @param cms A list of Cm values, in Farad. + @param rseriess A list of Rseries values, in Ohm + @returns a tuple ``((qc41_passed, ex41), (qc42_passed, ex42), (qc43_passed, ex43))`` + where ``exY`` is the amount by which the criterion exceeded the threshold. + """ + # TODO: Run this on values for all sweeps, not 1 before and after ? + # Or, if using only 2, write as just |a-b|/(a+b) + + # TODO: This uses numpy's standard `std` call without the bias + # correction. Do we want to use the bias corrected one instead? + # matters for above, as becomes 2|a-b|/(a+b) ! + qc41 = True qc42 = True qc43 = True @@ -365,7 +369,7 @@ def qc4(self, rseals, cms, rseriess): d_rseal = np.std(rseals) / np.mean(rseals) if d_rseal > self.rsealsc or not ( np.isfinite(np.mean(rseals)) and np.isfinite(np.std(rseals))): - self.logger.debug(f"d_rseal: {d_rseal}") + self.logger.debug(f'd_rseal: {d_rseal}') qc41 = False if None in list(cms): @@ -375,7 +379,7 @@ def qc4(self, rseals, cms, rseriess): d_cm = np.std(cms) / np.mean(cms) if d_cm > self.cmsc or not ( np.isfinite(np.mean(cms)) and np.isfinite(np.std(cms))): - self.logger.debug(f"d_cm: {d_cm}") + self.logger.debug(f'd_cm: {d_cm}') qc42 = False if None in list(rseriess): @@ -386,124 +390,150 @@ def qc4(self, rseals, cms, rseriess): if d_rseries > self.rseriessc or not ( np.isfinite(np.mean(rseriess)) and np.isfinite(np.std(rseriess))): - self.logger.debug(f"d_rseries: {d_rseries}") + self.logger.debug(f'd_rseries: {d_rseries}') qc43 = False return [(qc41, d_rseal), (qc42, d_cm), (qc43, d_rseries)] - def qc5(self, recording1, recording2, win=None, label=''): - # Check pharma peak value drops after E-4031 application - # Require subtracted peak > 70% of the original peak - if win is not None: - i, f = win - else: - i, f = 0, None - - if self.plot_dir and self._debug: - if win is not None: - plt.axvspan(win[0], win[1], color='grey', alpha=.1) + def qc5(self, recording1, recording2, win, label=None): + """ + Checks whether the peak current in ``recording1`` during a given window + has been blocked in ``recording2``. + + First, find an ``i`` such that ``recording1[i]`` is the largest + (positive) current (if a window is given, only this window is + searched). Then, checks whether + ``(recording1[i] - recording2[i]) / recording1[i] >= 0.75`` (in other + words whether at least 75% of the peak current has been blocked). + + @param recording1 A staircase before blocker application + @param recording2 A staircase with a strong blocker, e.g. E-4031 + @param win A tuple ``(i, j)`` such that ``recording[i:j]`` is the window to search in + @param label An optional label for a plot + @returns a tuple ``(passed, 0.75 * recording1[i] - (recording1[i] - recording2[i]))``. + """ + if self._plot_dir is not None and label is not None: + plt.axvspan(win[0], win[1], color='grey', alpha=.1) plt.plot(recording1, label='recording1') plt.plot(recording2, label='recording2') - plt.savefig(os.path.join(self.plot_dir, f"qc5_{label}")) + plt.savefig(os.path.join(self._plot_dir, f'qc5_{label}')) plt.clf() + i, f = win wherepeak = np.argmax(recording1[i:f]) max_diff = recording1[i:f][wherepeak] - recording2[i:f][wherepeak] max_diffc = self.max_diffc * recording1[i:f][wherepeak] if (max_diff < max_diffc) or not (np.isfinite(max_diff) and np.isfinite(max_diffc)): - self.logger.debug(f"max_diff: {max_diff}, max_diffc: {max_diffc}") + self.logger.debug(f'max_diff: {max_diff}, max_diffc: {max_diffc}') result = False else: result = True + # TODO: More sensible to return max_diff / recording1[i:f][wherepeak] here? return (result, max_diffc - max_diff) - def qc5_1(self, recording1, recording2, win=None, label=''): - # Check RMSD_0 drops after E-4031 application - # Require RMSD_0 (after E-4031 / before) diff > 50% of RMSD_0 before - if win is not None: - i, f = win - else: - i, f = 0, -1 + def qc5_1(self, recording1, recording2, label=None): + """ + Checks whether root-mean-squared current in ``recording1`` has been + blocked in ``recording2``, - if self.plot_dir and self._debug: - if win is not None: - plt.axvspan(win[0], win[1], color='grey', alpha=.1) + The test passes if + ``(RMSD(recording2, 0) - RMSD(recording1, 0)) / RMSD(recording1, 0) <= 0.5`` + + @param recording1 A staircase before blocker application + @param recording2 A staircase with a strong blocker, e.g. E-4031 + @param label An optional label for a plot + @returns a tuple ``(passed, 0.5 * recording1[i] - (recording1[i] - recording2[i]))``. + """ + # TODO: Just rephrase this as RMSD(I2) / RMSD(I1) <= 0.5 + + if self._plot_dir is not None and label is not None: fig = plt.figure() ax = fig.subplots() ax.plot(recording1, label='recording1') ax.plot(recording2, label='recording2') - fig.savefig(os.path.join(self.plot_dir, f"qc5_{label}")) + fig.savefig(os.path.join(self._plot_dir, f'qc5_{label}')) plt.close(fig) - rmsd0_diff = np.sqrt(np.mean(recording1[i:f] ** 2)) \ - - np.sqrt(np.mean(recording2[i:f] ** 2)) + def rmsd(x, y=0): + return np.sqrt(np.mean((x - y)**2)) - rmsd0_diffc = self.rmsd0_diffc *\ - np.sqrt(np.mean(recording1[i:f] ** 2)) + rmsd0_diff = rmsd(recording1) - rmsd(recording2) + rmsd0_diffc = self.rmsd0_diffc * rmsd(recording1) if (rmsd0_diff < rmsd0_diffc) or not (np.isfinite(rmsd0_diff) and np.isfinite(rmsd0_diffc)): self.logger.debug( - f"rmsd0_diff: {rmsd0_diff}, rmsd0c: {rmsd0_diffc}") + f'rmsd0_diff: {rmsd0_diff}, rmsd0c: {rmsd0_diffc}') result = False else: result = True + # TODO Just return rmsd(I2) / RMSD(I1) return (result, rmsd0_diffc - rmsd0_diff) - def qc6(self, recording1, win=None, label=''): + def qc6(self, recording1, win, label=None): + """ + Checks that the current in a particular window is non-negative. + + Instead of comparing the whole current to 0, we compare the mean + current to -2 times the noise level (estimed from the start of the + trace). + + @param recording1 The full current trace + @param win A tuple ``(i, j)`` such that ``recording[i:j]`` is the window to check + @param label A label for an optional plot + @returns a tuple ``(passed, -2 * noise - + """ # Check subtracted staircase +40mV step up is non negative - if win is not None: - i, f = win - else: - i, f = 0, -1 - if self.plot_dir and self._debug: + if self._plot_dir is not None and label is not None: if win is not None: plt.axvspan(win[0], win[1], color='grey', alpha=.1) plt.plot(recording1, label='recording1') - plt.savefig(os.path.join(self.plot_dir, f"qc6_{label}")) + plt.savefig(os.path.join(self._plot_dir, f'qc6_{label}')) plt.clf() + i, f = win val = np.mean(recording1[i:f]) - - # valc = -0.005 * np.abs(np.sqrt(np.mean((recording1) ** 2))) # or just 0 - valc = self.negative_tolc * np.std(recording1[:NOISE_LEN]) + valc = self.negative_tolc * np.std(recording1[:self.noise_len]) if (val < valc) or not (np.isfinite(val) and np.isfinite(valc)): - self.logger.debug(f"qc6_{label} val: {val}, valc: {valc}") + self.logger.debug(f'qc6_{label} val: {val}, valc: {valc}') result = False else: result = True + # TODO Return val / valc here? return (result, valc - val) def filter_capacitive_spikes(self, current, times, voltage_step_times): """ - Set values to 0 where they lie less that self.removal time after a change in voltage + Set currents to 0 where they lie less than ``removal_time`` after a change in voltage. - @param current: The observed current + @param current: The observed current, as a 1 or 2-dimensional array. + If a 2-dimensional array is used, repeats must be + on the first axis, and time series values on the 2nd. @param times: the times at which the current was observed @param voltage_step_times: the times at which there are discontinuities in Vcmd + @returns the ``current`` with some samples set to 0 """ - - for tstart, tend in zip(voltage_step_times, - np.append(voltage_step_times[1:], np.inf)): + voltage_step_ends = np.append(voltage_step_times[1:], np.inf) + for tstart, tend in zip(voltage_step_times, voltage_step_ends): win_end = tstart + self.removal_time win_end = min(tend, win_end) i_start = np.argmax(times >= tstart) i_end = np.argmax(times > win_end) - if i_end == 0: break + if len(current.shape) == 2: current[:, i_start:i_end] = 0 elif len(current.shape) == 1: current[i_start:i_end] = 0 else: - raise ValueError("Current must have 1 or 2 dimensions") + raise ValueError('Current array must be 1 or 2-dimensional') return current diff --git a/pcpostprocess/scripts/run_herg_qc.py b/pcpostprocess/scripts/run_herg_qc.py index 9f618aec..fd9e106a 100644 --- a/pcpostprocess/scripts/run_herg_qc.py +++ b/pcpostprocess/scripts/run_herg_qc.py @@ -694,8 +694,8 @@ def extract_protocol(readname, savename, time_strs, selected_wells, args, savedi os.makedirs(plot_dir) hergqc = hERGQC(sampling_rate=before_trace.sampling_rate, - plot_dir=plot_dir, - n_sweeps=before_trace.NofSweeps) + voltage=voltage, + plot_dir=plot_dir) times = before_trace.get_times() voltage = before_trace.get_voltage() diff --git a/tests/test_herg_qc.py b/tests/test_herg_qc.py index 6ba2cf19..e2a4da54 100755 --- a/tests/test_herg_qc.py +++ b/tests/test_herg_qc.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 -import copy import os import string import unittest @@ -7,7 +6,7 @@ import numpy as np from syncropatch_export.trace import Trace -from pcpostprocess.hergQC import NOISE_LEN, hERGQC +from pcpostprocess.hergQC import hERGQC def all_passed(result): @@ -45,7 +44,8 @@ def setUp(self): for i in range(1, 25) ] - sampling_rate = int(1.0 / (self.times[1] - self.times[0])) # in kHz + # in kHz + self.sampling_rate = int(1 / (self.times[1] - self.times[0])) #  Assume that there are no discontinuities at the start or end of ramps voltage_protocol = trace_before.get_voltage_protocol() @@ -55,29 +55,38 @@ def setUp(self): if vend == vstart ] - plot_dir = os.path.join("test_output", self.__class__.__name__) - os.makedirs(plot_dir, exist_ok=True) + def create_hergqc(self, plot_dir=None): + """ + Creates and returns a hERGQC object. - self.hergqc = hERGQC( - sampling_rate=sampling_rate, + If a ``plot_dir`` is set this will be used for debug output + """ + # Set this to True to generate plot directories + debug = False + + if debug and plot_dir is not None: + plot_dir = os.path.join('test_output', plot_dir) + os.makedirs(plot_dir, exist_ok=True) + else: + plot_dir = None + + return hERGQC( + sampling_rate=self.sampling_rate, plot_dir=plot_dir, voltage=self.voltage, ) - self.hergqc._debug = False # Turn this on to output plots - - def clone_hergqc(self, plot_dir): - hergqc = copy.deepcopy(self.hergqc) - plot_dir = os.path.join(hergqc.plot_dir, plot_dir) - os.makedirs(plot_dir, exist_ok=True) - hergqc.plot_dir = plot_dir - return hergqc def test_qc_inputs(self): + hergqc = self.create_hergqc() times = self.times - voltage = self.hergqc.voltage - qc6_win = self.hergqc.qc6_win - qc6_1_win = self.hergqc.qc6_1_win - qc6_2_win = self.hergqc.qc6_2_win + + # TODO: This should work some nicer way, without accessing what should + # really be private properties. But first we probably need to + # stop hardcoding these windows + voltage = hergqc.voltage + qc6_win = hergqc.qc6_win + qc6_1_win = hergqc.qc6_1_win + qc6_2_win = hergqc.qc6_2_win self.assertTrue(np.all(np.isfinite(times))) self.assertTrue(np.all(np.isfinite(voltage))) @@ -89,7 +98,7 @@ def test_qc_inputs(self): def test_qc1(self): # qc1 checks that rseal, cm, rseries are within range - hergqc = self.clone_hergqc("test_qc1") + hergqc = self.create_hergqc('qc1') rseal_lo, rseal_hi = 1e8, 1e12 rseal_mid = (rseal_lo + rseal_hi) / 2 @@ -297,7 +306,7 @@ def test_qc1(self): def test_qc2(self): # qc2 checks that raw and subtracted SNR are above a minimum threshold - hergqc = self.clone_hergqc("test_qc2") + hergqc = self.create_hergqc('qc2') test_matrix = [ (10, True, 8082.1), @@ -310,7 +319,7 @@ def test_qc2(self): ] for i, ex_pass, ex_snr in test_matrix: - recording = np.asarray([0, 0.1] * (NOISE_LEN // 2) + [i] * 500) + recording = np.asarray([0, 0.1] * 100 + [i] * 500) pass_, snr = hergqc.qc2(recording) self.assertAlmostEqual( snr, ex_snr, 1, f"QC2: ({i}) {snr} != {ex_snr}" @@ -349,7 +358,7 @@ def test_qc2(self): def test_qc3(self): # qc3 checks that rmsd of two sweeps are similar - hergqc = self.clone_hergqc("test_qc3") + hergqc = self.create_hergqc('qc3') # Test with same noise, different signal test_matrix = [ @@ -363,10 +372,10 @@ def test_qc3(self): (10, False, -0.8), ] - recording1 = np.asarray([0, 0.1] * (NOISE_LEN // 2) + [40] * 500) + recording1 = np.asarray([0, 0.1] * 100 + [40] * 500) for i, ex_pass, ex_d_rmsd in test_matrix: recording2 = np.asarray( - [0, 0.1] * (NOISE_LEN // 2) + [40 + i] * 500 + [0, 0.1] * 100 + [40 + i] * 500 ) pass_, d_rmsd = hergqc.qc3(recording1, recording2) self.assertAlmostEqual( @@ -388,10 +397,10 @@ def test_qc3(self): (100, True, 11.4), ] - recording1 = np.asarray([0, 0.1] * (NOISE_LEN // 2) + [40] * 500) + recording1 = np.asarray([0, 0.1] * 100 + [40] * 500) for i, ex_pass, ex_d_rmsd in test_matrix: recording2 = np.asarray( - [0, 0.1 * i] * (NOISE_LEN // 2) + [40] * 500 + [0, 0.1 * i] * 100 + [40] * 500 ) pass_, d_rmsd = hergqc.qc3(recording1, recording2) self.assertAlmostEqual( @@ -456,7 +465,7 @@ def test_qc3(self): def test_qc4(self): # qc4 checks that rseal, cm, rseries are similar before/after E-4031 change - hergqc = self.clone_hergqc("test_qc4") + hergqc = self.create_hergqc('qc4') r_lo, r_hi = 1e6, 3e7 c_lo, c_hi = 1e-12, 1e-10 @@ -617,7 +626,7 @@ def test_qc4(self): def test_qc5(self): # qc5 checks that the maximum current during the second half of the # staircase changes by at least 75% of the raw trace after E-4031 addition - hergqc = self.clone_hergqc("test_qc5") + hergqc = self.create_hergqc('qc5') test_matrix = [ (-1.0, True, -12.5), @@ -631,12 +640,12 @@ def test_qc5(self): (1.0, False, 7.5), ] - recording1 = np.asarray([0, 0.1] * (NOISE_LEN // 2) + [10] * 500) + recording1 = np.asarray([0, 0.1] * 100 + [10] * 500) for i, ex_pass, ex_d_max_diff in test_matrix: recording2 = np.asarray( - [0, 0.1] * (NOISE_LEN // 2) + [10 * i] * 500 + [0, 0.1] * 100 + [10 * i] * 500 ) - pass_, d_max_diff = hergqc.qc5(recording1, recording2) + pass_, d_max_diff = hergqc.qc5(recording1, recording2, (0, -1)) self.assertAlmostEqual( d_max_diff, ex_d_max_diff, @@ -689,10 +698,10 @@ def test_qc5(self): def test_qc5_1(self): # qc5_1 checks that the RMSD to zero of staircase protocol changes # by at least 50% of the raw trace after E-4031 addition. - hergqc = self.clone_hergqc("test_qc5_1") + hergqc = self.create_hergqc('qc5_1') test_matrix = [ - (-1.0, False, 4.22), + (-1.0, False, 4.23), (-0.5, False, 0), (-0.412, True, -0.74), (-0.4, True, -0.84), @@ -701,13 +710,13 @@ def test_qc5_1(self): (0.4, True, -0.84), (0.412, True, -0.74), (0.5, False, 0), - (1.0, False, 4.22), + (1.0, False, 4.23), ] - recording1 = np.asarray([0, 0.1] * (NOISE_LEN // 2) + [10] * 500) + recording1 = np.asarray([0, 0.1] * 100 + [10] * 500) for i, ex_pass, ex_d_max_diff in test_matrix: recording2 = np.asarray( - [0, 0.1] * (NOISE_LEN // 2) + [10 * i] * 500 + [0, 0.1] * 100 + [10 * i] * 500 ) pass_, d_max_diff = hergqc.qc5_1(recording1, recording2) self.assertAlmostEqual( @@ -765,7 +774,7 @@ def test_qc5_1(self): def test_qc6(self): # qc6 checks that the first step up to +40 mV, before the staircase, in # the subtracted trace is bigger than -2 x estimated noise level. - hergqc = self.clone_hergqc("test_qc6") + hergqc = self.create_hergqc('qc6') test_matrix = [ (-100, False, 9.9), @@ -779,9 +788,9 @@ def test_qc6(self): for i, ex_pass, ex_d_val in test_matrix: recording = np.asarray( - [0, 0.1] * (NOISE_LEN // 2) + [0.1 * i] * 500 + [0, 0.1] * 100 + [0.1 * i] * 500 ) - pass_, d_val = hergqc.qc6(recording, win=[NOISE_LEN, -1]) + pass_, d_val = hergqc.qc6(recording, win=[200, -1]) self.assertAlmostEqual( d_val, ex_d_val, @@ -888,7 +897,7 @@ def test_qc6(self): def test_run_qc(self): # Test all wells - hergqc = self.clone_hergqc("test_run_qc") + hergqc = self.create_hergqc('run_qc') failed_wells = [ 'A04', 'A05', 'A06', 'A07', 'A08', 'A10', 'A11', 'A12', 'A13', 'A15',