From 32f36fac8ee4ef5f4b5a63dcee9ac9a18a8df816 Mon Sep 17 00:00:00 2001 From: "deepsource-autofix[bot]" <62050782+deepsource-autofix[bot]@users.noreply.github.com> Date: Tue, 19 Oct 2021 10:10:50 +0000 Subject: [PATCH] Remove assert statement from non-test files --- ifpd2/asserts.py | 50 ++++++++++++++++----------- ifpd2/chromosome.py | 30 ++++++++++------ ifpd2/database.py | 20 +++++++---- ifpd2/dataclasses.py | 63 ++++++++++++++++++++++------------ ifpd2/fasta.py | 3 +- ifpd2/io.py | 12 ++++--- ifpd2/logging.py | 6 ++-- ifpd2/oligo.py | 18 ++++++---- ifpd2/probe.py | 42 +++++++++++++++-------- ifpd2/probe_set.py | 9 +++-- ifpd2/region.py | 38 ++++++++++++-------- ifpd2/scripts/autocomplete.py | 3 +- ifpd2/scripts/db/check.py | 3 +- ifpd2/scripts/db/dump.py | 14 +++++--- ifpd2/scripts/db/make.py | 10 +++--- ifpd2/scripts/db/settings.py | 17 +++++---- ifpd2/scripts/extract_kmers.py | 6 ++-- ifpd2/scripts/query.py | 9 +++-- ifpd2/scripts/query2.py | 12 ++++--- ifpd2/tests/test_scripts_db.py | 18 ++++++---- ifpd2/walker.py | 31 ++++++++++------- ifpd2/walker2.py | 6 ++-- 22 files changed, 272 insertions(+), 148 deletions(-) diff --git a/ifpd2/asserts.py b/ifpd2/asserts.py index d0e9f7fe..6aeea9a7 100644 --- a/ifpd2/asserts.py +++ b/ifpd2/asserts.py @@ -10,48 +10,58 @@ def ert_type(x, stype, label): - assert isinstance(x, stype), f"{label} should be {stype}, {type(x)} instead" + if not isinstance(x, stype): + raise AssertionError(f"{label} should be {stype}, {type(x)} instead") def ert_multiTypes(x, types, label): cond = any(isinstance(x, t) for t in types) - assert cond, f"{label} should be one of {types}, {type(x)} instead" + if not cond: + raise AssertionError(f"{label} should be one of {types}, {type(x)} instead") def ert_nonNeg(x, label, include_zero=False): if not include_zero: - assert x > 0, f"{label} should be greater than 0" + if x <= 0: + raise AssertionError(f"{label} should be greater than 0") else: - assert x >= 0, f"{label} should be greater than or equal to 0" + if x < 0: + raise AssertionError(f"{label} should be greater than or equal to 0") def ert_inInterv(x, vmin, vmax, label, leftClose=False, rightClose=True): if leftClose: if rightClose: - assert x >= vmin and x <= vmax, f"expected {vmin}<={label}<={vmax}" + if not (x >= vmin and x <= vmax): + raise AssertionError(f"expected {vmin}<={label}<={vmax}") else: - assert x >= vmin and x < vmax, f"expected {vmin}<={label}<{vmax}" + if not (x >= vmin and x < vmax): + raise AssertionError(f"expected {vmin}<={label}<{vmax}") elif rightClose: - assert x > vmin and x <= vmax, f"expected {vmin}<{label}<={vmax}" + if not (x > vmin and x <= vmax): + raise AssertionError(f"expected {vmin}<{label}<={vmax}") else: - assert x > vmin and x < vmax, f"expected {vmin}<{label}<{vmax}" + if not (x > vmin and x < vmax): + raise AssertionError(f"expected {vmin}<{label}<{vmax}") def ert_in_dtype(x, dtype): if dtype.startswith("f"): - assert x <= np.finfo(dtype).max, " ".join( - [ - "expected to be lower than {dtype} max:", - f"{x} < {np.finfo(dtype).max}", - ] - ) + if x > np.finfo(dtype).max: + raise AssertionError(" ".join( + [ + "expected to be lower than {dtype} max:", + f"{x} < {np.finfo(dtype).max}", + ] + )) elif dtype.startswith("u") or dtype.startswith("i"): - assert x <= np.iinfo(dtype).max, " ".join( - [ - "expected to be lower than {dtype} max:", - f"{x} < {np.iinfo(dtype).max}", - ] - ) + if x > np.iinfo(dtype).max: + raise AssertionError(" ".join( + [ + "expected to be lower than {dtype} max:", + f"{x} < {np.iinfo(dtype).max}", + ] + )) else: logging.warning(f"assert not implemented for dtype '{dtype}'") diff --git a/ifpd2/chromosome.py b/ifpd2/chromosome.py index ba89d1b0..9623fac5 100644 --- a/ifpd2/chromosome.py +++ b/ifpd2/chromosome.py @@ -20,7 +20,8 @@ class ChromosomeIndex(object): def __init__(self, bin_size: int): super(ChromosomeIndex, self).__init__() - assert bin_size >= 1 + if bin_size < 1: + raise AssertionError self._bin_size = bin_size def __init_index(self, chrom_db: pd.DataFrame) -> None: @@ -63,7 +64,8 @@ def __populate_bins( for i in range(chrom_db.shape[0]): track[0].update(track[1], advance=1) position_in_nt = chrom_db["start"].values[i] - assert position_in_nt > current_position + if position_in_nt <= current_position: + raise AssertionError current_position = position_in_nt position_in_bytes = record_byte_size * i @@ -101,16 +103,19 @@ def build( track {Tuple[Progress, TaskID]} -- progress bar details """ for colname in ("chromosome", "start", "end"): - assert colname in chrom_db.columns, f"missing '{colname}' column" + if colname not in chrom_db.columns: + raise AssertionError(f"missing '{colname}' column") chromosome_set: Set[bytes] = set(chrom_db["chromosome"].values) - assert len(chromosome_set) == 1 + if len(chromosome_set) != 1: + raise AssertionError self.__init_index(chrom_db) self.__populate_bins(chrom_db, record_byte_size, track) self.__fill_empty_bins() def __getitem__(self, position_in_nt: int) -> int: - assert self._index is not None + if self._index is None: + raise AssertionError binned_to = position_in_nt // self._bin_size if binned_to not in self._index: return -1 @@ -145,12 +150,15 @@ def __init__( ): super(ChromosomeData, self).__init__() - assert "chromosome" in chromosome_db.columns + if "chromosome" not in chromosome_db.columns: + raise AssertionError selected_chrom = chromosome_db["chromosome"][0] - assert len(set(chromosome_db["chromosome"].values)) == 1 + if len(set(chromosome_db["chromosome"].values)) != 1: + raise AssertionError self._record_byte_size = get_dtype_length(dtype) - assert self._record_byte_size > 0 + if self._record_byte_size <= 0: + raise AssertionError self._name = selected_chrom self._recordno = chromosome_db.shape[0] @@ -195,7 +203,8 @@ def _build_index( index_bin_size {int} -- index bin size progress {Progress} -- progress instance for progress bar with rich """ - assert index_bin_size > 0 + if index_bin_size <= 0: + raise AssertionError indexing_track = progress.add_task( f"indexing {self._name.decode()}.bin", total=chromosome_db.shape[0], @@ -226,7 +235,8 @@ class ChromosomeDict(object): def __init__(self, index_bin_size: int = const.DEFAULT_DATABASE_INDEX_BIN_SIZE): super(ChromosomeDict, self).__init__() self._data = {} - assert index_bin_size > 0 + if index_bin_size <= 0: + raise AssertionError self._index_bin_size = index_bin_size def __len__(self) -> int: diff --git a/ifpd2/database.py b/ifpd2/database.py index a95304d6..d2110c44 100644 --- a/ifpd2/database.py +++ b/ifpd2/database.py @@ -38,7 +38,8 @@ def __parse_bytes(self, record: bytes, column_dtypes: Dict[str, str]) -> None: record {bytes} -- record bytes column_dtypes {Dict[str, str]} -- column dtypes """ - assert len(record) == get_dtype_length(column_dtypes) + if len(record) != get_dtype_length(column_dtypes): + raise AssertionError self._data = {} current_location = 0 for label in const.database_columns: @@ -137,9 +138,11 @@ def __init__(self, path: str): path {str} -- absolute path to database folder """ super(DataBase, self).__init__() - assert os.path.isdir(path), f"cannot find database folder '{path}'" + if not os.path.isdir(path): + raise AssertionError(f"cannot find database folder '{path}'") db_pickle_path = os.path.join(path, "db.pickle") - assert os.path.isfile(db_pickle_path), f"'db.pickle is missing in '{path}'" + if not os.path.isfile(db_pickle_path): + raise AssertionError(f"'db.pickle is missing in '{path}'") with open(db_pickle_path, "rb") as IH: details = pickle.load(IH) @@ -150,15 +153,18 @@ def __init__(self, path: str): else: self._args = dict(details["args"]) - assert isinstance(self._chromosomes, ChromosomeDict) + if not isinstance(self._chromosomes, ChromosomeDict): + raise AssertionError for chromosome in self.chromosome_list: chromosome_path = os.path.join(path, f"{chromosome.decode()}.bin") - assert os.path.isfile( + if not os.path.isfile( chromosome_path - ), f"missing expected chromosome file: '{chromosome_path}'" + ): + raise AssertionError(f"missing expected chromosome file: '{chromosome_path}'") self._record_byte_size = get_dtype_length(self._dtype) - assert self._record_byte_size > 0 + if self._record_byte_size <= 0: + raise AssertionError self._root = path diff --git a/ifpd2/dataclasses.py b/ifpd2/dataclasses.py index 58a43c1d..83cf7e55 100644 --- a/ifpd2/dataclasses.py +++ b/ifpd2/dataclasses.py @@ -19,9 +19,11 @@ class Folder: def __post_init__(self): if self.exists: - assert isdir(self.path) + if not isdir(self.path): + raise AssertionError else: - assert not path_exists(self.path) + if path_exists(self.path): + raise AssertionError @dataclass(frozen=True) @@ -31,9 +33,11 @@ class File: def __post_init__(self): if self.exists: - assert isfile(self.path) + if not isfile(self.path): + raise AssertionError else: - assert not path_exists(self.path) + if path_exists(self.path): + raise AssertionError @dataclass(frozen=True) @@ -41,7 +45,8 @@ class PositiveInteger: n: int def __post_init__(self): - assert self.n >= 1 + if self.n < 1: + raise AssertionError @dataclass(frozen=True) @@ -49,7 +54,8 @@ class NonNegativeFloat: n: float def __post_init__(self): - assert self.n >= 0 + if self.n < 0: + raise AssertionError @dataclass(frozen=True) @@ -57,7 +63,8 @@ class NonNegativeInteger: n: int def __post_init__(self): - assert self.n > 0 + if self.n <= 0: + raise AssertionError @dataclass(frozen=True) @@ -68,12 +75,15 @@ class PositiveFloat: def __post_init__(self): if self.limit is None: - assert 0 < self.n + if 0 >= self.n: + raise AssertionError elif self.limit_included: - assert 0 < self.n <= self.limit + if not 0 < self.n <= self.limit: + raise AssertionError else: - assert 0 < self.n < self.limit + if not 0 < self.n < self.limit: + raise AssertionError @dataclass(frozen=True) @@ -82,8 +92,10 @@ class GenomicRegion: end: int def __post_init__(self): - assert self.start >= 0 - assert self.end >= self.start or self.end == -1 + if self.start < 0: + raise AssertionError + if not (self.end >= self.start or self.end == -1): + raise AssertionError def astuple(self) -> Tuple[int, int]: return (self.start, self.end) @@ -95,8 +107,10 @@ class NonNegativeIntInterval: end: int def __post_init__(self): - assert self._from >= 0 - assert self.end >= self.first + if self._from < 0: + raise AssertionError + if self.end < self.first: + raise AssertionError def astuple(self) -> Tuple[int, int]: return (self.start, self.end) @@ -109,8 +123,10 @@ class QueryWindow: def __post_init__(self): if self.size is not None: - assert self.size >= 1 - assert 0 < self.shift <= 1 + if self.size < 1: + raise AssertionError + if not 0 < self.shift <= 1: + raise AssertionError def astuple(self) -> Tuple[Optional[int], Optional[float]]: return (self.size, self.shift) @@ -122,8 +138,10 @@ class QueryFocus: step: float def __post_init__(self): - assert self.size > 0 - assert self.step > 0 + if self.size <= 0: + raise AssertionError + if self.step <= 0: + raise AssertionError @dataclass(frozen=True) @@ -141,6 +159,9 @@ class GCRange: high: float def __post_init__(self): - assert 0 <= self.low <= 1 - assert 0 <= self.high <= 1 - assert self.low <= self.high + if not 0 <= self.low <= 1: + raise AssertionError + if not 0 <= self.high <= 1: + raise AssertionError + if self.low > self.high: + raise AssertionError diff --git a/ifpd2/fasta.py b/ifpd2/fasta.py index 45edb7bd..2334cc02 100644 --- a/ifpd2/fasta.py +++ b/ifpd2/fasta.py @@ -10,7 +10,8 @@ def extract_kmers(input_path: str, kmer_size: int) -> List[Tuple[str, str]]: - assert isfile(input_path) + if not isfile(input_path): + raise AssertionError with open(input_path) as FH: oligos_list: List[Tuple[str, str]] = [] for record_header, record_sequence in tqdm( diff --git a/ifpd2/io.py b/ifpd2/io.py index 96c17a19..b61e85fe 100644 --- a/ifpd2/io.py +++ b/ifpd2/io.py @@ -45,7 +45,8 @@ def parse_hush(path: str) -> pd.DataFrame: Returns: pd.DataFrame -- parsed HUSH data """ - assert os.path.isfile(path), f"cannot find file '{path}'" + if not os.path.isfile(path): + raise AssertionError(f"cannot find file '{path}'") logging.info(f"parsing: '{path}'") sequence_lengths: Set[int] = set() parsed_lines: List[Tuple[str, str, int]] = [] @@ -81,11 +82,13 @@ def parse_melting(path: str, sep: str = "\t", header: bool = True) -> pd.DataFra Returns: pd.DataFrame -- parsed oligo-melting data """ - assert os.path.isfile(path), f"cannot find file '{path}'" + if not os.path.isfile(path): + raise AssertionError(f"cannot find file '{path}'") logging.info(f"parsing: '{path}'") expected_columns = copy.copy(const.dtype_melting) melting_df = pd.read_csv(path, sep=sep, header=None, skiprows=1 if header else 0) - assert melting_df.shape[1] == len(expected_columns) + if melting_df.shape[1] != len(expected_columns): + raise AssertionError melting_df.columns = list(expected_columns.keys()) melting_df.set_index("name", inplace=True) expected_columns.pop("name", None) @@ -104,7 +107,8 @@ def parse_secondary(path: str) -> pd.DataFrame: Returns: pd.DataFrame -- parsed OligoArrayAux .ct data """ - assert os.path.isfile(path), f"cannot find file '{path}'" + if not os.path.isfile(path): + raise AssertionError(f"cannot find file '{path}'") logging.info(f"parsing: '{path}'") parsed_lines: List[Tuple[str, float]] = [] with open(path, "r") as IH: diff --git a/ifpd2/logging.py b/ifpd2/logging.py index 1995f193..92047969 100644 --- a/ifpd2/logging.py +++ b/ifpd2/logging.py @@ -11,9 +11,11 @@ def add_log_file_handler(path: str, logger_name: Optional[str] = None) -> None: - assert not os.path.isdir(path) + if os.path.isdir(path): + raise AssertionError log_dir = os.path.dirname(path) - assert os.path.isdir(log_dir) or log_dir == "" + if not (os.path.isdir(log_dir) or log_dir == ""): + raise AssertionError fh = RichHandler(console=Console(file=open(path, mode="w+")), markup=True) fh.setLevel(logging.INFO) logging.getLogger(logger_name).addHandler(fh) diff --git a/ifpd2/oligo.py b/ifpd2/oligo.py index 13bb2e3b..6d388689 100644 --- a/ifpd2/oligo.py +++ b/ifpd2/oligo.py @@ -37,7 +37,8 @@ class Oligo(object): def __init__(self, oligo, i): super(Oligo, self).__init__() ass.ert_type(i, int, "oligo id") - assert i >= 0 + if i < 0: + raise AssertionError self._raw_data = oligo.strip().split("\t") for i in [2, 3, 9, 10]: self._raw_data[i] = int(self._raw_data[i]) @@ -155,8 +156,10 @@ def focus_window(self): @focus_window.setter def focus_window(self, focus_window): ass.ert_type(focus_window, tuple, "focus window") - assert len(focus_window) == 2 - assert focus_window[1] > focus_window[0] + if len(focus_window) != 2: + raise AssertionError + if focus_window[1] <= focus_window[0]: + raise AssertionError self._focus_window = focus_window @property @@ -222,7 +225,8 @@ def set_focus_window(self, start, end, verbose=True): def expand_focus_to_n_oligos(self, n, verbose=True): # Expand the sub-window of interest to retrieve at least n oligos - assert not isinstance(self.focus_window, type(None)) + if isinstance(self.focus_window, type(None)): + raise AssertionError if n <= self.get_n_focused_oligos(): return @@ -245,7 +249,8 @@ def expand_focus_to_n_oligos(self, n, verbose=True): def expand_focus_by_step(self, step, verbose=True): # Expand the current focus window of a given step (in nt) - assert step < 0 + if step >= 0: + raise AssertionError if ( self.focus_window[0] <= self._data["start"].min() @@ -318,7 +323,8 @@ def expand_focus_to_closest(self): def apply_threshold(self, threshold): # Unfocuses oligos with score higher than the threshold - assert threshold <= 1 and threshold >= 0 + if not (threshold <= 1 and threshold >= 0): + raise AssertionError self._oligos_passing_score_filter = self._data["score"] <= threshold def reset_threshold(self): diff --git a/ifpd2/probe.py b/ifpd2/probe.py index 3e34587b..071e4019 100644 --- a/ifpd2/probe.py +++ b/ifpd2/probe.py @@ -25,9 +25,11 @@ def data(self): @data.setter def data(self, oligo_data): - assert isinstance(oligo_data, pd.DataFrame) + if not isinstance(oligo_data, pd.DataFrame): + raise AssertionError required_columns = ["start", "end", "Tm"] - assert all(col in oligo_data.columns for col in required_columns) + if not all(col in oligo_data.columns for col in required_columns): + raise AssertionError self._data = oligo_data self._range = (self._data["start"].min(), self._data["end"].max()) self._size = self._range[1] - self._range[0] @@ -108,7 +110,8 @@ def count_shared_oligos(self, probe): return np.intersect1d(self.path, probe.path).shape[0] def export(self, path): - assert not os.path.isfile(path) + if os.path.isfile(path): + raise AssertionError if os.path.isdir(path): shutil.rmtree(path) os.mkdir(path) @@ -155,7 +158,8 @@ def _assert(self): ass.ert_nonNeg(self.Tr, "Tr") ass.ert_type(self.Ps, int, "Ps") - assert self.Ps > 1 + if self.Ps <= 1: + raise AssertionError ass.ert_type(self.Ph, float, "Ph") ass.ert_inInterv(self.Ph, 0, 1, "Ph") @@ -318,28 +322,38 @@ def _assert(self): if not isinstance(self.k, type(None)): ass.ert_type(self.k, int, "k") ass.ert_nonNeg(self.k, "k") - assert (self.k + self.D) * self.N <= self.Ps + if (self.k + self.D) * self.N > self.Ps: + raise AssertionError ass.ert_type(self.F, list, "F") - assert len(self.F) == 2 + if len(self.F) != 2: + raise AssertionError for i in range(2): ass.ert_type(self.F[i], int, f"F[{i}]") - assert self.F[i] >= 0 - assert self.F[1] >= self.F[0] + if self.F[i] < 0: + raise AssertionError + if self.F[1] < self.F[0]: + raise AssertionError ass.ert_type(self.Gs, list, "Gs") - assert len(self.Gs) == 2 + if len(self.Gs) != 2: + raise AssertionError for i in range(2): ass.ert_type(self.Gs[i], float, f"Gs[{i}]") - assert self.Gs[i] <= 1 - assert all(np.array(self.Gs) < 0) or all(np.array(self.Gs) >= 0) + if self.Gs[i] > 1: + raise AssertionError + if not (all(np.array(self.Gs) < 0) or all(np.array(self.Gs) >= 0)): + raise AssertionError if self.Gs[0] >= 0: - assert self.Gs[1] >= self.Gs[0] + if self.Gs[1] < self.Gs[0]: + raise AssertionError else: - assert self.Gs[1] <= self.Gs[0] + if self.Gs[1] > self.Gs[0]: + raise AssertionError ass.ert_type(self.Ot, float, "Ot") - assert self.Ot > 0 and self.Ot <= 1 + if not (self.Ot > 0 and self.Ot <= 1): + raise AssertionError def get_prologue(self): s = "* OligoProbeBuilder *\n\n" diff --git a/ifpd2/probe_set.py b/ifpd2/probe_set.py index 8c51e250..539da7a9 100644 --- a/ifpd2/probe_set.py +++ b/ifpd2/probe_set.py @@ -28,7 +28,8 @@ def probe_list(self): @probe_list.setter def probe_list(self, probe_list): - assert all(isinstance(p, OligoProbe) for p in probe_list) + if not all(isinstance(p, OligoProbe) for p in probe_list): + raise AssertionError self._probe_list = sorted(probe_list, key=lambda p: p.range[0]) self._probe_tm_ranges = [p.tm_range for p in self.probe_list] self._tm_range = ( @@ -141,7 +142,8 @@ def featDF(self): return df def export(self, path): - assert not os.path.isfile(path) + if os.path.isfile(path): + raise AssertionError if os.path.isdir(path): shutil.rmtree(path) os.mkdir(path) @@ -177,7 +179,8 @@ class OligoProbeSetBuilder(object): def __init__(self, out_path): super(OligoProbeSetBuilder, self).__init__() - assert not os.path.isfile(out_path) + if os.path.isfile(out_path): + raise AssertionError self.out_path = out_path if os.path.isdir(out_path): shutil.rmtree(out_path) diff --git a/ifpd2/region.py b/ifpd2/region.py index 72fe772a..00b83a2a 100644 --- a/ifpd2/region.py +++ b/ifpd2/region.py @@ -36,13 +36,16 @@ def __init_region(self, chrom: bytes, chromStart: int, chromEnd: int) -> None: chromStart {int} -- chromosome start position (usually 0) chromEnd {int} -- chromosome end position """ - assert len(chrom) != 0, "chromosome cannot be empty" - assert ( - chromStart >= 0 - ), f"start should be greater than or equal to 0: {chromStart}" - assert ( - chromEnd > chromStart - ), f"end should be greater than start: {chromStart}-{chromEnd}" + if len(chrom) == 0: + raise AssertionError("chromosome cannot be empty") + if ( + chromStart < 0 + ): + raise AssertionError(f"start should be greater than or equal to 0: {chromStart}") + if ( + chromEnd <= chromStart + ): + raise AssertionError(f"end should be greater than start: {chromStart}-{chromEnd}") self.__chrom = chrom self.__chromStart = chromStart self.__chromEnd = chromEnd @@ -58,9 +61,11 @@ def __init_focus_size(self, focus_style: float) -> None: focus_style {float} -- focus region size in nt or as a fraction of the genomic region """ - assert focus_style > 0 + if focus_style <= 0: + raise AssertionError if focus_style > 1: - assert focus_style <= self.__chromEnd - self.__chromStart + if focus_style > self.__chromEnd - self.__chromStart: + raise AssertionError self.__focusSize = int(focus_style) else: self.__focusSize = int((self.__chromEnd - self.__chromStart) * focus_style) @@ -76,7 +81,8 @@ def __init_focus_step(self, step_style: float) -> None: step_style {float} -- focus region growth step in nt or as a fraction of the focus region """ - assert step_style > 0 + if step_style <= 0: + raise AssertionError if step_style > 1: self.__focusStep = int(step_style) else: @@ -161,8 +167,10 @@ def __init__( super(GenomicRegionBuilder, self).__init__() self.__chromosome = chromosome_data.name self.__chromosome_size_nt = chromosome_data.size_nt - assert focus_style > 0 - assert focus_step_style > 0 + if focus_style <= 0: + raise AssertionError + if focus_step_style <= 0: + raise AssertionError self.__focus_style = (focus_style, focus_step_style) def __build_overlapping(self, size: int, step: int) -> List[List[GenomicRegion]]: @@ -178,7 +186,8 @@ def __build_overlapping(self, size: int, step: int) -> List[List[GenomicRegion]] Returns: List[List[GenomicRegion]] -- generated region lists """ - assert step < size + if step >= size: + raise AssertionError region_set_list: List[List[GenomicRegion]] = [] for range_start in range(0, size, step): genomic_region_set: List[GenomicRegion] = [] @@ -205,7 +214,8 @@ def __build_non_overlapping( Returns: List[List[GenomicRegion]] -- generated region lists """ - assert step == size + if step != size: + raise AssertionError genomic_region_set: List[GenomicRegion] = [] for start in range(0, self.__chromosome_size_nt, step): end = start + size diff --git a/ifpd2/scripts/autocomplete.py b/ifpd2/scripts/autocomplete.py index 1fc5bdc1..adc4cd77 100644 --- a/ifpd2/scripts/autocomplete.py +++ b/ifpd2/scripts/autocomplete.py @@ -70,7 +70,8 @@ def autocomplete_fish(user_home_path: str, autocomplete_path: str) -> None: def autocomplete_bash_or_zsh( user_home_path: str, autocomplete_path: str, shell_type: str = "bash" ) -> None: - assert shell_type in {"bash", "zsh"} + if shell_type not in {"bash", "zsh"}: + raise AssertionError autocompletion_string = f". {autocomplete_path} # IFPD2-AUTOCOMPLETE\n" run_command_path = os.path.join(user_home_path, f".{shell_type}rc") diff --git a/ifpd2/scripts/db/check.py b/ifpd2/scripts/db/check.py index 00902802..eb6c99b7 100644 --- a/ifpd2/scripts/db/check.py +++ b/ifpd2/scripts/db/check.py @@ -27,6 +27,7 @@ def main(input_paths: str) -> None: desc=f"Checking sorting '{chromosome.decode()}'", total=DB.chromosome_recordnos[chromosome], ): - assert record["start"] > previous_position + if record["start"] <= previous_position: + raise AssertionError previous_position = record["start"] logging.info("That's all! :smiley:") diff --git a/ifpd2/scripts/db/dump.py b/ifpd2/scripts/db/dump.py index 827c56e8..3e904dec 100644 --- a/ifpd2/scripts/db/dump.py +++ b/ifpd2/scripts/db/dump.py @@ -51,14 +51,17 @@ def check_region( end: Optional[int] = None, ) -> Tuple[Optional[str], int, int]: if chrom is None: - assert ( + if not ( start is None and end is None - ), "cannot use --region-start or --region-end without --chrom" + ): + raise AssertionError("cannot use --region-start or --region-end without --chrom") elif start is not None: chrom_size = DB.chromosome_sizes_nt[chrom.encode()] - assert start < chrom_size, f"{start} larger than chromosome size: {chrom_size}" + if start >= chrom_size: + raise AssertionError(f"{start} larger than chromosome size: {chrom_size}") if end is not None: - assert start < end, "end location smaller than start" + if start >= end: + raise AssertionError("end location smaller than start") return ( chrom, @@ -70,6 +73,7 @@ def check_region( def get_chromosome_list(DB: DataBase, chrom: Optional[str]) -> List[bytes]: chromosome_list = DB.chromosome_list if chrom is not None: - assert chrom.encode() in chromosome_list, f"'{chrom}' not found" + if chrom.encode() not in chromosome_list: + raise AssertionError(f"'{chrom}' not found") chromosome_list = [chrom.encode()] return chromosome_list diff --git a/ifpd2/scripts/db/make.py b/ifpd2/scripts/db/make.py index 88de9341..fceafc21 100644 --- a/ifpd2/scripts/db/make.py +++ b/ifpd2/scripts/db/make.py @@ -100,10 +100,11 @@ def main( settings.bin_size = binsize settings.prefix = prefix - assert not ( + if ( len(settings.off_target_paths) == 0 and len(settings.melting_temperature_paths) != 0 - ), "please provide either --hush or --melting" + ): + raise AssertionError("please provide either --hush or --melting") os.mkdir(settings.output_path) dbdf = pd.DataFrame(columns=["name"]) @@ -219,9 +220,10 @@ def parse_record_headers( chromosome_list.append(f"{chromosome_prefix}{chromosome}") chromosome_length_set.add(len(f"{chromosome_prefix}{chromosome}")) start, end = [int(x) for x in extremes.split("-")] - assert (end - start) == len( + if (end - start) != len( record.sequence - ), f"{end - start} != {len(record.sequence)}" + ): + raise AssertionError(f"{end - start} != {len(record.sequence)}") start_list.append(start) end_list.append(end) diff --git a/ifpd2/scripts/db/settings.py b/ifpd2/scripts/db/settings.py index 310cdc61..6ea5521f 100644 --- a/ifpd2/scripts/db/settings.py +++ b/ifpd2/scripts/db/settings.py @@ -31,9 +31,10 @@ def output_path(self) -> str: @output_path.setter def output_path(self, output_path: str) -> None: - assert not isdir(output_path) and not isfile( + if not (not isdir(output_path) and not isfile( output_path - ), f"'{output_path}' already exists" + )): + raise AssertionError(f"'{output_path}' already exists") self._output_path = output_path @property @@ -42,7 +43,8 @@ def bin_size(self) -> int: @bin_size.setter def bin_size(self, bin_size: int) -> None: - assert bin_size >= 1 + if bin_size < 1: + raise AssertionError self._bin_size = bin_size @property @@ -58,7 +60,8 @@ def secondary_structure_paths(self) -> Set[str]: return copy(self._secondary_structure_paths) def add_off_target_path(self, path: str) -> None: - assert isfile(path), f"'{path}' not found." + if not isfile(path): + raise AssertionError(f"'{path}' not found.") self._off_target_paths.add(path) def add_off_target_path_list(self, path_list: List[str]) -> None: @@ -66,7 +69,8 @@ def add_off_target_path_list(self, path_list: List[str]) -> None: self.add_off_target_path(path) def add_melting_temperature_path(self, path: str) -> None: - assert isfile(path), f"'{path}' not found." + if not isfile(path): + raise AssertionError(f"'{path}' not found.") self._melting_temperature_paths.add(path) def add_melting_temperature_path_list(self, path_list: List[str]) -> None: @@ -74,7 +78,8 @@ def add_melting_temperature_path_list(self, path_list: List[str]) -> None: self.add_melting_temperature_path(path) def add_secondary_structure_path(self, path: str) -> None: - assert isfile(path), f"'{path}' not found." + if not isfile(path): + raise AssertionError(f"'{path}' not found.") self._secondary_structure_paths.add(path) def add_secondary_structure_path_list(self, path_list: List[str]) -> None: diff --git a/ifpd2/scripts/extract_kmers.py b/ifpd2/scripts/extract_kmers.py index 1edf5e38..492731c1 100644 --- a/ifpd2/scripts/extract_kmers.py +++ b/ifpd2/scripts/extract_kmers.py @@ -23,8 +23,10 @@ @click.argument("output_path", metavar="OUTPUT_DIRECTORY", type=click.Path(exists=True)) @click.argument("kmer_size", metavar="KMER_LENGTH", type=click.INT) def main(input_path: str, output_path: str, kmer_size: int) -> None: - assert isfile(input_path) - assert isdir(output_path) + if not isfile(input_path): + raise AssertionError + if not isdir(output_path): + raise AssertionError logging.info(f"Input : {input_path}") logging.info(f"Output : {output_path}") diff --git a/ifpd2/scripts/query.py b/ifpd2/scripts/query.py index 2dac009c..1481e635 100644 --- a/ifpd2/scripts/query.py +++ b/ifpd2/scripts/query.py @@ -181,7 +181,8 @@ def main( window_shift = 1.0 window_size = None - assert probes is not None or window_size is not None + if not (probes is not None or window_size is not None): + raise AssertionError if region[1] <= 0: probes = None logging.info( @@ -275,5 +276,7 @@ def assert_reusable(output_path: str): f"Provided path '{output_path}'", ] ) - assert not isfile(output_path), assert_msg + " leads to a file" - assert not isdir(output_path), assert_msg + " leads to a directory." + if isfile(output_path): + raise AssertionError(assert_msg + " leads to a file") + if isdir(output_path): + raise AssertionError(assert_msg + " leads to a directory.") diff --git a/ifpd2/scripts/query2.py b/ifpd2/scripts/query2.py index 2711821c..f3264591 100644 --- a/ifpd2/scripts/query2.py +++ b/ifpd2/scripts/query2.py @@ -189,7 +189,8 @@ def main( region_set_list = RB.build_by_number(PositiveInteger(probes).n) else: window_size, window_shift = QueryWindow(window_size, window_shift).astuple() - assert window_size is not None and window_shift is not None + if not (window_size is not None and window_shift is not None): + raise AssertionError region_set_list = RB.build_by_size(window_size, window_shift) walker = ChromosomeWalker(DB, chromosome.encode()) @@ -212,7 +213,8 @@ def check_input( window_shift = None window_size = None - assert probes is not None or window_size is not None + if not (probes is not None or window_size is not None): + raise AssertionError if probes is not None and window_size is not None: logging.warning("cannot combine -X and -W. Using -X.") window_size = None @@ -253,5 +255,7 @@ def assert_reusable(output_path: str): f"Provided path '{output_path}'", ] ) - assert not isfile(output_path), assert_msg + " leads to a file" - assert not isdir(output_path), assert_msg + " leads to a directory." + if isfile(output_path): + raise AssertionError(assert_msg + " leads to a file") + if isdir(output_path): + raise AssertionError(assert_msg + " leads to a directory.") diff --git a/ifpd2/tests/test_scripts_db.py b/ifpd2/tests/test_scripts_db.py index 9f4bdc6b..d6844f74 100644 --- a/ifpd2/tests/test_scripts_db.py +++ b/ifpd2/tests/test_scripts_db.py @@ -29,13 +29,16 @@ def test_db_make(): "chr", ] ) - assert b"Error" not in script_output + if b"Error" in script_output: + raise AssertionError test_db = open("test_data/test_db/chr16.bin", "rb").read() new_db = open("test_db/chr16.bin", "rb").read() - assert test_db == new_db + if test_db != new_db: + raise AssertionError test_pickle = pickle.load(open("test_data/test_db/db.pickle", "rb")) new_pickle = pickle.load(open("test_db/db.pickle", "rb")) - assert test_pickle == new_pickle + if test_pickle != new_pickle: + raise AssertionError def test_db_check(): @@ -47,7 +50,8 @@ def test_db_check(): "test_db", ] ) - assert b"Error" not in script_output, script_output + if b"Error" in script_output: + raise AssertionError(script_output) def test_db_dump(): @@ -60,7 +64,8 @@ def test_db_dump(): ] ) test_dump = open("test_data/test.dump.txt", "rb").read() - assert script_output == test_dump + if script_output != test_dump: + raise AssertionError def test_db_info(): @@ -72,7 +77,8 @@ def test_db_info(): "test_db", ] ) - assert b"Error" not in script_output + if b"Error" in script_output: + raise AssertionError def test_cleanup(): diff --git a/ifpd2/walker.py b/ifpd2/walker.py index cc1dc8d3..c68515c5 100644 --- a/ifpd2/walker.py +++ b/ifpd2/walker.py @@ -88,7 +88,8 @@ def _assert(self): ass.ert_nonNeg(self.S, "S", True) ass.ert_type(self.E, int, "E") ass.ert_nonNeg(self.E, "E", True) - assert self.S <= self.E + if self.S > self.E: + raise AssertionError ass.ert_multiTypes(self.X, [int, type(None)], "X") ass.ert_multiTypes(self.Ws, [int, type(None)], "Ws") @@ -110,13 +111,15 @@ def _assert(self): ass.ert_multiTypes(self.Rs, [int, float], "Rs") if isinstance(self.Rs, int): - assert self.Rs > 1 + if self.Rs <= 1: + raise AssertionError else: ass.ert_inInterv(self.Rs, 0, 1, "Rs") ass.ert_multiTypes(self.Rt, [int, float], "Rt") if isinstance(self.Rt, int): - assert self.Rt > 1 + if self.Rt <= 1: + raise AssertionError else: ass.ert_inInterv(self.Rt, 0, 1, "Rt") @@ -126,12 +129,13 @@ def _init_windows(self): os.mkdir(os.path.join(self.out_path, "window_sets")) if self.S == self.E: - assert not isinstance(self.Ws, type(None)), " ".join( - [ - "During full-chromosome search, provide a window size.", - "I.e., it is not possible to design X probes.", - ] - ) + if isinstance(self.Ws, type(None)): + raise AssertionError(" ".join( + [ + "During full-chromosome search, provide a window size.", + "I.e., it is not possible to design X probes.", + ] + )) if isinstance(self.X, int): self.Ws = ( @@ -332,7 +336,8 @@ def config(self): def _assert(self): GenomicWindowSet._assert(self) - assert os.path.isdir(self.out_path) + if not os.path.isdir(self.out_path): + raise AssertionError def start(self, *args, start_from_nt: int = 0, end_at_nt: int = -1, **kwargs): self._assert() @@ -656,7 +661,8 @@ def fparse(record, opb=None, *args, **kwargs): @staticmethod def fprocess(oGroup, window, *args, **kwargs): opb = copy.copy(kwargs["opb"]) - assert isinstance(opb, OligoProbeBuilder) + if not isinstance(opb, OligoProbeBuilder): + raise AssertionError logger = logging.getLogger(kwargs["loggerName"]) probe_list = opb.start(oGroup, window, kwargs["cfr_step"], logger) reduced_probe_list = opb.reduce_probe_list(probe_list) @@ -671,7 +677,8 @@ def fimport(path, *args, **kwargs): @staticmethod def fpost(results, opath, *args, **kwargs): - assert isinstance(kwargs["opb"], OligoProbeBuilder) + if not isinstance(kwargs["opb"], OligoProbeBuilder): + raise AssertionError logger = logging.getLogger(kwargs["loggerName"]) if len(results) == 0: logger.critical(f"Built {len(results)} oligo probe candidates") diff --git a/ifpd2/walker2.py b/ifpd2/walker2.py index c61c011c..4bb511e7 100644 --- a/ifpd2/walker2.py +++ b/ifpd2/walker2.py @@ -21,7 +21,8 @@ class ChromosomeWalker(object): def __init__(self, db: Union[DataBase, str], chromosome: bytes): super(ChromosomeWalker, self).__init__() self.__db = DataBase(db) if isinstance(db, str) else db - assert chromosome in self.__db._chromosomes.keys() + if chromosome not in self.__db._chromosomes.keys(): + raise AssertionError self.__IH = open( os.path.join(self.__db._root, f"{chromosome.decode()}.bin"), "rb" ) @@ -154,7 +155,8 @@ def buffer(self, start_from_nt: int = 0, end_at_nt: int = -1) -> Iterator[Record record = self.read_next_record() def walk_single_region(self, region: GenomicRegion) -> Iterator[List[Record]]: - assert region.chromosome == self.__chromosome + if region.chromosome != self.__chromosome: + raise AssertionError focus_start, focus_end = region.focus record_list = list(self.buffer(focus_start, focus_end)) yield record_list