diff --git a/annif/__init__.py b/annif/__init__.py index f4a5831f5..bb196b4ee 100644 --- a/annif/__init__.py +++ b/annif/__init__.py @@ -1,8 +1,11 @@ #!/usr/bin/env python3 +from __future__ import annotations + import logging import os import os.path +from typing import TYPE_CHECKING logging.basicConfig() logger = logging.getLogger("annif") @@ -10,8 +13,11 @@ import annif.backend # noqa +if TYPE_CHECKING: + from flask.app import Flask + -def create_flask_app(config_name=None): +def create_flask_app(config_name: str | None = None) -> Flask: """Create a Flask app to be used by the CLI.""" from flask import Flask @@ -23,7 +29,7 @@ def create_flask_app(config_name=None): return app -def create_app(config_name=None): +def create_app(config_name: str | None = None) -> Flask: """Create a Connexion app to be used for the API.""" # 'cxapp' here is the Connexion application that has a normal Flask app # as a property (cxapp.app) @@ -60,7 +66,7 @@ def create_app(config_name=None): return cxapp.app -def _get_config_name(config_name): +def _get_config_name(config_name: str | None) -> str: if config_name is None: config_name = os.environ.get("ANNIF_CONFIG") if config_name is None: diff --git a/annif/analyzer/__init__.py b/annif/analyzer/__init__.py index eacf3d001..a0f93ced3 100644 --- a/annif/analyzer/__init__.py +++ b/annif/analyzer/__init__.py @@ -1,12 +1,17 @@ """Collection of language-specific analyzers and analyzer registry for Annif""" +from __future__ import annotations import re +from typing import TYPE_CHECKING import annif from annif.util import parse_args from . import simple, simplemma, snowball +if TYPE_CHECKING: + from annif.analyzer.analyzer import Analyzer + _analyzers = {} @@ -14,7 +19,7 @@ def register_analyzer(analyzer): _analyzers[analyzer.name] = analyzer -def get_analyzer(analyzerspec): +def get_analyzer(analyzerspec: str) -> Analyzer: match = re.match(r"(\w+)(\((.*)\))?", analyzerspec) if match is None: raise ValueError("Invalid analyzer specification {}".format(analyzerspec)) diff --git a/annif/analyzer/analyzer.py b/annif/analyzer/analyzer.py index 37457069d..5ba876f9d 100644 --- a/annif/analyzer/analyzer.py +++ b/annif/analyzer/analyzer.py @@ -1,4 +1,5 @@ """Common functionality for analyzers.""" +from __future__ import annotations import abc import functools @@ -15,18 +16,18 @@ class Analyzer(metaclass=abc.ABCMeta): name = None token_min_length = 3 # default value, can be overridden in instances - def __init__(self, **kwargs): + def __init__(self, **kwargs) -> None: if _KEY_TOKEN_MIN_LENGTH in kwargs: self.token_min_length = int(kwargs[_KEY_TOKEN_MIN_LENGTH]) - def tokenize_sentences(self, text): + def tokenize_sentences(self, text: str) -> list[str]: """Tokenize a piece of text (e.g. a document) into sentences.""" import nltk.tokenize return nltk.tokenize.sent_tokenize(text) @functools.lru_cache(maxsize=50000) - def is_valid_token(self, word): + def is_valid_token(self, word: str) -> bool: """Return True if the word is an acceptable token.""" if len(word) < self.token_min_length: return False @@ -36,7 +37,7 @@ def is_valid_token(self, word): return True return False - def tokenize_words(self, text, filter=True): + def tokenize_words(self, text: str, filter: bool = True) -> list[str]: """Tokenize a piece of text (e.g. a sentence) into words. If filter=True (default), only return valid tokens (e.g. not punctuation, numbers or very short words)""" diff --git a/annif/analyzer/simple.py b/annif/analyzer/simple.py index 46a8f92f3..4cc35e6f1 100644 --- a/annif/analyzer/simple.py +++ b/annif/analyzer/simple.py @@ -1,4 +1,5 @@ """Simple analyzer for Annif. Only folds words to lower case.""" +from __future__ import annotations from . import analyzer @@ -6,9 +7,9 @@ class SimpleAnalyzer(analyzer.Analyzer): name = "simple" - def __init__(self, param, **kwargs): + def __init__(self, param: None, **kwargs) -> None: self.param = param super().__init__(**kwargs) - def _normalize_word(self, word): + def _normalize_word(self, word: str) -> str: return word.lower() diff --git a/annif/analyzer/simplemma.py b/annif/analyzer/simplemma.py index 02976982b..e535b25de 100644 --- a/annif/analyzer/simplemma.py +++ b/annif/analyzer/simplemma.py @@ -1,4 +1,5 @@ """Simplemma analyzer for Annif, based on simplemma lemmatizer.""" +from __future__ import annotations import simplemma @@ -8,9 +9,9 @@ class SimplemmaAnalyzer(analyzer.Analyzer): name = "simplemma" - def __init__(self, param, **kwargs): + def __init__(self, param: str, **kwargs) -> None: self.lang = param super().__init__(**kwargs) - def _normalize_word(self, word): + def _normalize_word(self, word: str) -> str: return simplemma.lemmatize(word, lang=self.lang) diff --git a/annif/analyzer/snowball.py b/annif/analyzer/snowball.py index c13c4e904..57990c2a1 100644 --- a/annif/analyzer/snowball.py +++ b/annif/analyzer/snowball.py @@ -1,4 +1,5 @@ """Snowball analyzer for Annif, based on nltk Snowball stemmer.""" +from __future__ import annotations import functools @@ -8,7 +9,7 @@ class SnowballAnalyzer(analyzer.Analyzer): name = "snowball" - def __init__(self, param, **kwargs): + def __init__(self, param: str, **kwargs) -> None: self.param = param import nltk.stem.snowball @@ -16,5 +17,5 @@ def __init__(self, param, **kwargs): super().__init__(**kwargs) @functools.lru_cache(maxsize=500000) - def _normalize_word(self, word): + def _normalize_word(self, word: str) -> str: return self.stemmer.stem(word.lower()) diff --git a/annif/analyzer/spacy.py b/annif/analyzer/spacy.py index 212a3a5f6..b5e9cbc55 100644 --- a/annif/analyzer/spacy.py +++ b/annif/analyzer/spacy.py @@ -1,4 +1,5 @@ """spaCy analyzer for Annif which uses spaCy for lemmatization""" +from __future__ import annotations import annif.util from annif.exception import OperationFailedException @@ -11,7 +12,7 @@ class SpacyAnalyzer(analyzer.Analyzer): name = "spacy" - def __init__(self, param, **kwargs): + def __init__(self, param: str, **kwargs) -> None: import spacy self.param = param @@ -28,7 +29,7 @@ def __init__(self, param, **kwargs): self.lowercase = False super().__init__(**kwargs) - def tokenize_words(self, text, filter=True): + def tokenize_words(self, text: str, filter: bool = True) -> list[str]: lemmas = [ lemma for lemma in (token.lemma_ for token in self.nlp(text.strip())) diff --git a/annif/analyzer/voikko.py b/annif/analyzer/voikko.py index d111da25e..e6e693d65 100644 --- a/annif/analyzer/voikko.py +++ b/annif/analyzer/voikko.py @@ -1,4 +1,5 @@ """Voikko analyzer for Annif, based on libvoikko library.""" +from __future__ import annotations import functools @@ -10,12 +11,12 @@ class VoikkoAnalyzer(analyzer.Analyzer): name = "voikko" - def __init__(self, param, **kwargs): + def __init__(self, param: str, **kwargs) -> None: self.param = param self.voikko = None super().__init__(**kwargs) - def __getstate__(self): + def __getstate__(self) -> dict[str, str | None]: """Return the state of the object for pickling purposes. The Voikko instance is set to None because as a ctypes object it cannot be pickled.""" @@ -23,7 +24,7 @@ def __getstate__(self): return {"param": self.param, "voikko": None} @functools.lru_cache(maxsize=500000) - def _normalize_word(self, word): + def _normalize_word(self, word: str) -> str: if self.voikko is None: self.voikko = voikko.libvoikko.Voikko(self.param) result = self.voikko.analyze(word) diff --git a/annif/backend/__init__.py b/annif/backend/__init__.py index 80ede0720..cbeeb648e 100644 --- a/annif/backend/__init__.py +++ b/annif/backend/__init__.py @@ -1,20 +1,26 @@ """Registry of backend types for Annif""" +from __future__ import annotations + +from typing import TYPE_CHECKING, Type + +if TYPE_CHECKING: + from annif.backend.backend import AnnifBackend # define functions for lazily importing each backend (alphabetical order) -def _dummy(): +def _dummy() -> Type[AnnifBackend]: from . import dummy return dummy.DummyBackend -def _ensemble(): +def _ensemble() -> Type[AnnifBackend]: from . import ensemble return ensemble.EnsembleBackend -def _fasttext(): +def _fasttext() -> Type[AnnifBackend]: try: from . import fasttext @@ -23,19 +29,19 @@ def _fasttext(): raise ValueError("fastText not available, cannot use fasttext backend") -def _http(): +def _http() -> Type[AnnifBackend]: from . import http return http.HTTPBackend -def _mllm(): +def _mllm() -> Type[AnnifBackend]: from . import mllm return mllm.MLLMBackend -def _nn_ensemble(): +def _nn_ensemble() -> Type[AnnifBackend]: try: from . import nn_ensemble @@ -46,7 +52,7 @@ def _nn_ensemble(): ) -def _omikuji(): +def _omikuji() -> Type[AnnifBackend]: try: from . import omikuji @@ -55,13 +61,13 @@ def _omikuji(): raise ValueError("Omikuji not available, cannot use omikuji backend") -def _pav(): +def _pav() -> Type[AnnifBackend]: from . import pav return pav.PAVBackend -def _stwfsa(): +def _stwfsa() -> Type[AnnifBackend]: try: from . import stwfsa @@ -70,19 +76,19 @@ def _stwfsa(): raise ValueError("STWFSA not available, cannot use stwfsa backend") -def _svc(): +def _svc() -> Type[AnnifBackend]: from . import svc return svc.SVCBackend -def _tfidf(): +def _tfidf() -> Type[AnnifBackend]: from . import tfidf return tfidf.TFIDFBackend -def _yake(): +def _yake() -> Type[AnnifBackend]: try: from . import yake @@ -108,7 +114,7 @@ def _yake(): } -def get_backend(backend_id): +def get_backend(backend_id: str) -> Type[AnnifBackend]: if backend_id in _backend_fns: return _backend_fns[backend_id]() else: diff --git a/annif/backend/backend.py b/annif/backend/backend.py index 754d66111..f35b0a312 100644 --- a/annif/backend/backend.py +++ b/annif/backend/backend.py @@ -1,13 +1,21 @@ """Common functionality for backends.""" +from __future__ import annotations import abc import os.path from datetime import datetime, timezone from glob import glob +from typing import TYPE_CHECKING, Any from annif import logger from annif.suggestion import SuggestionBatch +if TYPE_CHECKING: + from configparser import SectionProxy + + from annif.corpus.document import DocumentCorpus + from annif.project import AnnifProject + class AnnifBackend(metaclass=abc.ABCMeta): """Base class for Annif backends that perform analysis. The @@ -17,7 +25,12 @@ class AnnifBackend(metaclass=abc.ABCMeta): DEFAULT_PARAMETERS = {"limit": 100} - def __init__(self, backend_id, config_params, project): + def __init__( + self, + backend_id: str, + config_params: dict[str, Any] | SectionProxy, + project: AnnifProject, + ) -> None: """Initialize backend with specific parameters. The parameters are a dict. Keys and values depend on the specific backend type.""" @@ -26,22 +39,22 @@ def __init__(self, backend_id, config_params, project): self.project = project self.datadir = project.datadir - def default_params(self): + def default_params(self) -> dict[str, Any]: return self.DEFAULT_PARAMETERS @property - def params(self): + def params(self) -> dict[str, Any]: params = {} params.update(self.default_params()) params.update(self.config_params) return params @property - def is_trained(self): + def is_trained(self) -> bool: return bool(glob(os.path.join(self.datadir, "*"))) @property - def modification_time(self): + def modification_time(self) -> datetime | None: mtimes = [ datetime.utcfromtimestamp(os.path.getmtime(p)) for p in glob(os.path.join(self.datadir, "*")) @@ -51,23 +64,36 @@ def modification_time(self): return None return most_recent.replace(tzinfo=timezone.utc) - def _get_backend_params(self, params): + def _get_backend_params( + self, + params: dict[str, Any] | None, + ) -> dict[str, Any]: backend_params = dict(self.params) if params is not None: backend_params.update(params) return backend_params - def _train(self, corpus, params, jobs=0): + def _train( + self, + corpus: DocumentCorpus, + params: dict[str, Any], + jobs: int = 0, + ) -> None: """This method can be overridden by backends. It implements the train functionality, with pre-processed parameters.""" pass # default is to do nothing, subclasses may override - def train(self, corpus, params=None, jobs=0): + def train( + self, + corpus: DocumentCorpus, + params: dict[str, Any] | None = None, + jobs: int = 0, + ) -> None: """Train the model on the given document or subject corpus.""" beparams = self._get_backend_params(params) return self._train(corpus, params=beparams, jobs=jobs) - def initialize(self, parallel=False): + def initialize(self, parallel: bool = False) -> None: """This method can be overridden by backends. It should cause the backend to pre-load all data it needs during operation. If parallel is True, the backend should expect to be used for @@ -80,7 +106,9 @@ def _suggest(self, text, params): document, with pre-processed parameters.""" pass # pragma: no cover - def _suggest_batch(self, texts, params): + def _suggest_batch( + self, texts: list[str], params: dict[str, Any] + ) -> SuggestionBatch: """This method can be implemented by backends to use batching of documents in their operations. This default implementation uses the regular suggest functionality.""" @@ -90,22 +118,26 @@ def _suggest_batch(self, texts, params): limit=int(params.get("limit")), ) - def suggest(self, texts, params=None): + def suggest( + self, + texts: list[str], + params: dict[str, Any] | None = None, + ) -> SuggestionBatch: """Suggest subjects for the input documents and return a list of subject sets represented as a list of SubjectSuggestion objects.""" beparams = self._get_backend_params(params) self.initialize() return self._suggest_batch(texts, params=beparams) - def debug(self, message): + def debug(self, message: str) -> None: """Log a debug message from this backend""" logger.debug("Backend {}: {}".format(self.backend_id, message)) - def info(self, message): + def info(self, message: str) -> None: """Log an info message from this backend""" logger.info("Backend {}: {}".format(self.backend_id, message)) - def warning(self, message): + def warning(self, message: str) -> None: """Log a warning message from this backend""" logger.warning("Backend {}: {}".format(self.backend_id, message)) @@ -119,7 +151,11 @@ def _learn(self, corpus, params): functionality, with pre-processed parameters.""" pass # pragma: no cover - def learn(self, corpus, params=None): + def learn( + self, + corpus: DocumentCorpus, + params: dict[str, Any] | None = None, + ) -> None: """Further train the model on the given document or subject corpus.""" beparams = self._get_backend_params(params) return self._learn(corpus, params=beparams) diff --git a/annif/backend/dummy.py b/annif/backend/dummy.py index 9d60b0798..5f62517a5 100644 --- a/annif/backend/dummy.py +++ b/annif/backend/dummy.py @@ -1,10 +1,15 @@ """Dummy backend for testing basic interaction of projects and backends""" +from __future__ import annotations +from typing import TYPE_CHECKING, Any from annif.suggestion import SubjectSuggestion from . import backend +if TYPE_CHECKING: + from annif.corpus.document import DocumentCorpus + class DummyBackend(backend.AnnifLearningBackend): name = "dummy" @@ -13,13 +18,13 @@ class DummyBackend(backend.AnnifLearningBackend): is_trained = True modification_time = None - def default_params(self): + def default_params(self) -> dict[str, int]: return backend.AnnifBackend.DEFAULT_PARAMETERS - def initialize(self, parallel=False): + def initialize(self, parallel: bool = False) -> None: self.initialized = True - def _suggest(self, text, params): + def _suggest(self, text: str, params: dict[str, Any]) -> list[SubjectSuggestion]: score = float(params.get("score", 1.0)) # Ensure tests fail if "text" with wrong type ends up here @@ -37,7 +42,11 @@ def _suggest(self, text, params): return [SubjectSuggestion(subject_id=subject_id, score=score)] - def _learn(self, corpus, params): + def _learn( + self, + corpus: DocumentCorpus, + params: dict[str, Any], + ) -> None: # in this dummy backend we "learn" by picking up the subject ID # of the first subject of the first document in the learning set # and using that in subsequent analysis results diff --git a/annif/backend/ensemble.py b/annif/backend/ensemble.py index 918a41444..6f7f2eb04 100644 --- a/annif/backend/ensemble.py +++ b/annif/backend/ensemble.py @@ -1,5 +1,7 @@ """Ensemble backend that combines results from multiple projects""" +from __future__ import annotations +from typing import TYPE_CHECKING, Any import annif.eval import annif.parallel @@ -9,11 +11,20 @@ from . import backend, hyperopt +if TYPE_CHECKING: + from datetime import datetime + + from optuna.study.study import Study + from optuna.trial import Trial + + from annif.backend.hyperopt import HPRecommendation + from annif.corpus.document import DocumentCorpus + class BaseEnsembleBackend(backend.AnnifBackend): """Base class for ensemble backends""" - def _get_sources_attribute(self, attr): + def _get_sources_attribute(self, attr: str) -> list[bool | None]: params = self._get_backend_params(None) sources = annif.util.parse_sources(params["sources"]) return [ @@ -21,20 +32,27 @@ def _get_sources_attribute(self, attr): for project_id, _ in sources ] - def initialize(self, parallel=False): + def initialize(self, parallel: bool = False) -> None: # initialize all the source projects params = self._get_backend_params(None) for project_id, _ in annif.util.parse_sources(params["sources"]): project = self.project.registry.get_project(project_id) project.initialize(parallel) - def _suggest_with_sources(self, texts, sources): + def _suggest_with_sources( + self, texts: list[str], sources: list[tuple[str, float]] + ) -> dict[str, SuggestionBatch]: return { project_id: self.project.registry.get_project(project_id).suggest(texts) for project_id, _ in sources } - def _merge_source_batches(self, batch_by_source, sources, params): + def _merge_source_batches( + self, + batch_by_source: dict[str, SuggestionBatch], + sources: list[tuple[str, float]], + params: dict[str, Any], + ) -> SuggestionBatch: """Merge the given SuggestionBatches from each source into a single SuggestionBatch. The default implementation computes a weighted average based on the weights given in the sources tuple. Intended @@ -46,7 +64,9 @@ def _merge_source_batches(self, batch_by_source, sources, params): limit=int(params["limit"]) ) - def _suggest_batch(self, texts, params): + def _suggest_batch( + self, texts: list[str], params: dict[str, Any] + ) -> SuggestionBatch: sources = annif.util.parse_sources(params["sources"]) batch_by_source = self._suggest_with_sources(texts, sources) return self._merge_source_batches(batch_by_source, sources, params) @@ -55,7 +75,9 @@ def _suggest_batch(self, texts, params): class EnsembleOptimizer(hyperopt.HyperparameterOptimizer): """Hyperparameter optimizer for the ensemble backend""" - def __init__(self, backend, corpus, metric): + def __init__( + self, backend: EnsembleBackend, corpus: DocumentCorpus, metric: str + ) -> None: super().__init__(backend, corpus, metric) self._sources = [ project_id @@ -64,7 +86,7 @@ def __init__(self, backend, corpus, metric): ) ] - def _prepare(self, n_jobs=1): + def _prepare(self, n_jobs: int = 1) -> None: self._gold_batches = [] self._source_batches = [] @@ -89,16 +111,16 @@ def _prepare(self, n_jobs=1): self._source_batches.append(suggestions) self._gold_batches.append(gold_batch) - def _normalize(self, hps): + def _normalize(self, hps: dict[str, float]) -> dict[str, float]: total = sum(hps.values()) return {source: hps[source] / total for source in hps} - def _format_cfg_line(self, hps): + def _format_cfg_line(self, hps: dict[str, float]) -> str: return "sources=" + ",".join( [f"{src}:{weight:.4f}" for src, weight in hps.items()] ) - def _objective(self, trial): + def _objective(self, trial: Trial) -> float: eval_batch = annif.eval.EvaluationBatch(self._backend.project.subjects) proj_weights = { project_id: trial.suggest_uniform(project_id, 0.0, 1.0) @@ -114,7 +136,7 @@ def _objective(self, trial): results = eval_batch.results(metrics=[self._metric]) return results[self._metric] - def _postprocess(self, study): + def _postprocess(self, study: Study) -> HPRecommendation: line = self._format_cfg_line(self._normalize(study.best_params)) return hyperopt.HPRecommendation(lines=[line], score=study.best_value) @@ -125,17 +147,19 @@ class EnsembleBackend(BaseEnsembleBackend, hyperopt.AnnifHyperoptBackend): name = "ensemble" @property - def is_trained(self): + def is_trained(self) -> bool: sources_trained = self._get_sources_attribute("is_trained") return all(sources_trained) @property - def modification_time(self): + def modification_time(self) -> datetime | None: mtimes = self._get_sources_attribute("modification_time") return max(filter(None, mtimes), default=None) - def get_hp_optimizer(self, corpus, metric): + def get_hp_optimizer( + self, corpus: DocumentCorpus, metric: str + ) -> EnsembleOptimizer: return EnsembleOptimizer(self, corpus, metric) - def _train(self, corpus, params, jobs=0): + def _train(self, corpus: DocumentCorpus, params: dict[str, Any], jobs: int = 0): raise NotSupportedException("Training ensemble backend is not possible.") diff --git a/annif/backend/fasttext.py b/annif/backend/fasttext.py index 7b6e9e842..23c33539a 100644 --- a/annif/backend/fasttext.py +++ b/annif/backend/fasttext.py @@ -1,7 +1,9 @@ """Annif backend using the fastText classifier""" +from __future__ import annotations import collections import os.path +from typing import TYPE_CHECKING, Any import fasttext @@ -11,6 +13,12 @@ from . import backend, mixins +if TYPE_CHECKING: + from fasttext.FastText import _FastText + from numpy import ndarray + + from annif.corpus.document import DocumentCorpus + class FastTextBackend(mixins.ChunkingBackend, backend.AnnifBackend): """fastText backend for Annif""" @@ -48,14 +56,14 @@ class FastTextBackend(mixins.ChunkingBackend, backend.AnnifBackend): # defaults for uninitialized instances _model = None - def default_params(self): + def default_params(self) -> dict[str, Any]: params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy() params.update(mixins.ChunkingBackend.DEFAULT_PARAMETERS) params.update(self.DEFAULT_PARAMETERS) return params @staticmethod - def _load_model(path): + def _load_model(path: str) -> _FastText: # monkey patch fasttext.FastText.eprint to avoid spurious warning # see https://github.com/facebookresearch/fastText/issues/1067 orig_eprint = fasttext.FastText.eprint @@ -65,7 +73,7 @@ def _load_model(path): fasttext.FastText.eprint = orig_eprint return model - def initialize(self, parallel=False): + def initialize(self, parallel: bool = False) -> None: if self._model is None: path = os.path.join(self.datadir, self.MODEL_FILE) self.debug("loading fastText model from {}".format(path)) @@ -79,14 +87,14 @@ def initialize(self, parallel=False): ) @staticmethod - def _id_to_label(subject_id): + def _id_to_label(subject_id: int) -> str: return "__label__{:d}".format(subject_id) - def _label_to_subject_id(self, label): + def _label_to_subject_id(self, label: str) -> int: labelnum = label.replace("__label__", "") return int(labelnum) - def _write_train_file(self, corpus, filename): + def _write_train_file(self, corpus: DocumentCorpus, filename: str) -> None: with open(filename, "w", encoding="utf-8") as trainfile: for doc in corpus.documents: text = self._normalize_text(doc.text) @@ -98,17 +106,20 @@ def _write_train_file(self, corpus, filename): else: self.warning(f'no labels for document "{doc.text}"') - def _normalize_text(self, text): + def _normalize_text(self, text: str) -> str: return " ".join(self.project.analyzer.tokenize_words(text)) - def _create_train_file(self, corpus): + def _create_train_file( + self, + corpus: DocumentCorpus, + ) -> None: self.info("creating fastText training file") annif.util.atomic_save( corpus, self.datadir, self.TRAIN_FILE, method=self._write_train_file ) - def _create_model(self, params, jobs): + def _create_model(self, params: dict[str, Any], jobs: int) -> None: self.info("creating fastText model") trainpath = os.path.join(self.datadir, self.TRAIN_FILE) modelpath = os.path.join(self.datadir, self.MODEL_FILE) @@ -123,7 +134,12 @@ def _create_model(self, params, jobs): self._model = fasttext.train_supervised(trainpath, **params) self._model.save_model(modelpath) - def _train(self, corpus, params, jobs=0): + def _train( + self, + corpus: DocumentCorpus, + params: dict[str, Any], + jobs: int = 0, + ) -> None: if corpus != "cached": if corpus.is_empty(): raise NotSupportedException( @@ -134,7 +150,9 @@ def _train(self, corpus, params, jobs=0): self.info("Reusing cached training data from previous run.") self._create_model(params, jobs) - def _predict_chunks(self, chunktexts, limit): + def _predict_chunks( + self, chunktexts: list[str], limit: int + ) -> tuple[list[list[str]], list[ndarray]]: return self._model.predict( list( filter( @@ -144,7 +162,9 @@ def _predict_chunks(self, chunktexts, limit): limit, ) - def _suggest_chunks(self, chunktexts, params): + def _suggest_chunks( + self, chunktexts: list[str], params: dict[str, Any] + ) -> list[SubjectSuggestion]: limit = int(params["limit"]) chunklabels, chunkscores = self._predict_chunks(chunktexts, limit) label_scores = collections.defaultdict(float) diff --git a/annif/backend/http.py b/annif/backend/http.py index a76dbbb6a..0fce7f8e4 100644 --- a/annif/backend/http.py +++ b/annif/backend/http.py @@ -1,8 +1,9 @@ """HTTP/REST client backend that makes calls to a web service and returns the results""" - +from __future__ import annotations import importlib +from typing import TYPE_CHECKING, Any import dateutil.parser import requests @@ -13,13 +14,16 @@ from . import backend +if TYPE_CHECKING: + from datetime import datetime + class HTTPBackend(backend.AnnifBackend): name = "http" _headers = None @property - def headers(self): + def headers(self) -> dict[str, str]: if self._headers is None: version = importlib.metadata.version("annif") self._headers = { @@ -28,17 +32,17 @@ def headers(self): return self._headers @property - def is_trained(self): + def is_trained(self) -> bool | None: return self._get_project_info("is_trained") @property - def modification_time(self): + def modification_time(self) -> datetime | None: mtime = self._get_project_info("modification_time") if mtime is None: return None return dateutil.parser.parse(mtime) - def _get_project_info(self, key): + def _get_project_info(self, key: str) -> bool | str | None: params = self._get_backend_params(None) try: req = requests.get( @@ -59,7 +63,7 @@ def _get_project_info(self, key): else: return None - def _suggest(self, text, params): + def _suggest(self, text: str, params: dict[str, Any]) -> list[SubjectSuggestion]: data = {"text": text} if "project" in params: data["project"] = params["project"] diff --git a/annif/backend/hyperopt.py b/annif/backend/hyperopt.py index 1bdce0aa4..2c2e7422c 100644 --- a/annif/backend/hyperopt.py +++ b/annif/backend/hyperopt.py @@ -1,14 +1,23 @@ """Hyperparameter optimization functionality for backends""" +from __future__ import annotations import abc import collections import warnings +from typing import TYPE_CHECKING, Callable import optuna import optuna.exceptions from .backend import AnnifBackend +if TYPE_CHECKING: + from click.utils import LazyFile + from optuna.study.study import Study + from optuna.trial import Trial + + from annif.corpus.document import DocumentCorpus + HPRecommendation = collections.namedtuple("HPRecommendation", "lines score") @@ -16,12 +25,12 @@ class TrialWriter: """Object that writes hyperparameter optimization trial results into a TSV file.""" - def __init__(self, results_file, normalize_func): + def __init__(self, results_file: LazyFile, normalize_func: Callable) -> None: self.results_file = results_file self.normalize_func = normalize_func self.header_written = False - def write(self, study, trial): + def write(self, study: Study, trial: Trial) -> None: """Write the results of one trial into the results file. On the first run, write the header line first.""" @@ -44,12 +53,14 @@ def write(self, study, trial): class HyperparameterOptimizer: """Base class for hyperparameter optimizers""" - def __init__(self, backend, corpus, metric): + def __init__( + self, backend: AnnifBackend, corpus: DocumentCorpus, metric: str + ) -> None: self._backend = backend self._corpus = corpus self._metric = metric - def _prepare(self, n_jobs=1): + def _prepare(self, n_jobs: int = 1): """Prepare the optimizer for hyperparameter evaluation. Up to n_jobs parallel threads or processes may be used during the operation.""" @@ -57,21 +68,23 @@ def _prepare(self, n_jobs=1): pass # pragma: no cover @abc.abstractmethod - def _objective(self, trial): + def _objective(self, trial: Trial) -> float: """Objective function to optimize""" pass # pragma: no cover @abc.abstractmethod - def _postprocess(self, study): + def _postprocess(self, study: Study) -> HPRecommendation: """Convert the study results into hyperparameter recommendations""" pass # pragma: no cover - def _normalize(self, hps): + def _normalize(self, hps: dict[str, float]) -> dict[str, float]: """Normalize the given raw hyperparameters. Intended to be overridden by subclasses when necessary. The default is to keep them as-is.""" return hps - def optimize(self, n_trials, n_jobs, results_file): + def optimize( + self, n_trials: int, n_jobs: int, results_file: LazyFile | None + ) -> HPRecommendation: """Find the optimal hyperparameters by testing up to the given number of hyperparameter combinations""" @@ -103,7 +116,7 @@ class AnnifHyperoptBackend(AnnifBackend): optimization""" @abc.abstractmethod - def get_hp_optimizer(self, corpus, metric): + def get_hp_optimizer(self, corpus: DocumentCorpus, metric: str): """Get a HyperparameterOptimizer object that can look for optimal hyperparameter combinations for the given corpus, measured using the given metric""" diff --git a/annif/backend/mixins.py b/annif/backend/mixins.py index 5161a947d..066d5d862 100644 --- a/annif/backend/mixins.py +++ b/annif/backend/mixins.py @@ -1,8 +1,9 @@ """Annif backend mixins that can be used to implement features""" - +from __future__ import annotations import abc import os.path +from typing import TYPE_CHECKING, Any import joblib from sklearn.feature_extraction.text import TfidfVectorizer @@ -10,23 +11,32 @@ import annif.util from annif.exception import NotInitializedException +if TYPE_CHECKING: + from collections.abc import Iterable + + from scipy.sparse._csr import csr_matrix + + from annif.suggestion import SubjectSuggestion + class ChunkingBackend(metaclass=abc.ABCMeta): """Annif backend mixin that implements chunking of input""" DEFAULT_PARAMETERS = {"chunksize": 1} - def default_params(self): + def default_params(self) -> dict[str, Any]: return self.DEFAULT_PARAMETERS @abc.abstractmethod - def _suggest_chunks(self, chunktexts, params): + def _suggest_chunks( + self, chunktexts: list[str], params: dict[str, Any] + ) -> list[SubjectSuggestion]: """Suggest subjects for the chunked text; should be implemented by the subclass inheriting this mixin""" pass # pragma: no cover - def _suggest(self, text, params): + def _suggest(self, text: str, params: dict[str, Any]) -> list[SubjectSuggestion]: self.debug( 'Suggesting subjects for text "{}..." (len={})'.format(text[:20], len(text)) ) @@ -49,7 +59,7 @@ class TfidfVectorizerMixin: vectorizer = None - def initialize_vectorizer(self): + def initialize_vectorizer(self) -> None: if self.vectorizer is None: path = os.path.join(self.datadir, self.VECTORIZER_FILE) if os.path.exists(path): @@ -61,7 +71,9 @@ def initialize_vectorizer(self): backend_id=self.backend_id, ) - def create_vectorizer(self, input, params={}): + def create_vectorizer( + self, input: Iterable[str], params: dict[str, Any] = {} + ) -> csr_matrix: self.info("creating vectorizer") self.vectorizer = TfidfVectorizer(**params) veccorpus = self.vectorizer.fit_transform(input) diff --git a/annif/backend/mllm.py b/annif/backend/mllm.py index 6954dadc3..f73bf8324 100644 --- a/annif/backend/mllm.py +++ b/annif/backend/mllm.py @@ -1,6 +1,8 @@ """Maui-like Lexical Matching backend""" +from __future__ import annotations import os.path +from typing import TYPE_CHECKING, Any import joblib import numpy as np @@ -13,11 +15,21 @@ from . import backend, hyperopt +if TYPE_CHECKING: + from collections.abc import Iterator + + from optuna.study.study import Study + from optuna.trial import Trial + + from annif.backend.hyperopt import HPRecommendation + from annif.corpus.document import DocumentCorpus + from annif.lexical.mllm import Candidate + class MLLMOptimizer(hyperopt.HyperparameterOptimizer): """Hyperparameter optimizer for the MLLM backend""" - def _prepare(self, n_jobs=1): + def _prepare(self, n_jobs: int = 1) -> None: self._backend.initialize() self._train_x, self._train_y = self._backend._load_train_data() self._candidates = [] @@ -29,7 +41,7 @@ def _prepare(self, n_jobs=1): self._candidates.append(candidates) self._gold_subjects.append(doc.subject_set) - def _objective(self, trial): + def _objective(self, trial: Trial) -> float: params = { "min_samples_leaf": trial.suggest_int("min_samples_leaf", 5, 30), "max_leaf_nodes": trial.suggest_int("max_leaf_nodes", 100, 2000), @@ -52,7 +64,7 @@ def _objective(self, trial): results = batch.results(metrics=[self._metric]) return results[self._metric] - def _postprocess(self, study): + def _postprocess(self, study: Study) -> HPRecommendation: bp = study.best_params lines = [ f"min_samples_leaf={bp['min_samples_leaf']}", @@ -80,15 +92,15 @@ class MLLMBackend(hyperopt.AnnifHyperoptBackend): "use_hidden_labels": False, } - def get_hp_optimizer(self, corpus, metric): + def get_hp_optimizer(self, corpus: DocumentCorpus, metric: str) -> MLLMOptimizer: return MLLMOptimizer(self, corpus, metric) - def default_params(self): + def default_params(self) -> dict[str, Any]: params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy() params.update(self.DEFAULT_PARAMETERS) return params - def _load_model(self): + def _load_model(self) -> MLLMModel: path = os.path.join(self.datadir, self.MODEL_FILE) self.debug("loading model from {}".format(path)) if os.path.exists(path): @@ -98,7 +110,7 @@ def _load_model(self): "model {} not found".format(path), backend_id=self.backend_id ) - def _load_train_data(self): + def _load_train_data(self) -> tuple[np.ndarray, np.ndarray]: path = os.path.join(self.datadir, self.TRAIN_FILE) if os.path.exists(path): return joblib.load(path) @@ -107,11 +119,16 @@ def _load_train_data(self): "train data file {} not found".format(path), backend_id=self.backend_id ) - def initialize(self, parallel=False): + def initialize(self, parallel: bool = False) -> None: if self._model is None: self._model = self._load_model() - def _train(self, corpus, params, jobs=0): + def _train( + self, + corpus: DocumentCorpus, + params: dict[str, Any], + jobs: int = 0, + ) -> None: self.info("starting train") if corpus != "cached": if corpus.is_empty(): @@ -137,16 +154,20 @@ def _train(self, corpus, params, jobs=0): self.info("saving model") annif.util.atomic_save(self._model, self.datadir, self.MODEL_FILE) - def _generate_candidates(self, text): + def _generate_candidates(self, text: str) -> list[Candidate]: return self._model.generate_candidates(text, self.project.analyzer) - def _prediction_to_result(self, prediction, params): + def _prediction_to_result( + self, + prediction: list[tuple[np.float64, int]], + params: dict[str, Any], + ) -> Iterator: vector = np.zeros(len(self.project.subjects), dtype=np.float32) for score, subject_id in prediction: vector[subject_id] = score return vector_to_suggestions(vector, int(params["limit"])) - def _suggest(self, text, params): + def _suggest(self, text: str, params: dict[str, Any]) -> Iterator: candidates = self._generate_candidates(text) prediction = self._model.predict(candidates) return self._prediction_to_result(prediction, params) diff --git a/annif/backend/nn_ensemble.py b/annif/backend/nn_ensemble.py index 2ee5f89c4..658bd79be 100644 --- a/annif/backend/nn_ensemble.py +++ b/annif/backend/nn_ensemble.py @@ -1,10 +1,11 @@ """Neural network based ensemble backend that combines results from multiple projects.""" - +from __future__ import annotations import os.path import shutil from io import BytesIO +from typing import TYPE_CHECKING, Any import joblib import lmdb @@ -23,13 +24,18 @@ from . import backend, ensemble +if TYPE_CHECKING: + from tensorflow.python.framework.ops import EagerTensor + + from annif.corpus.document import DocumentCorpus + -def idx_to_key(idx): +def idx_to_key(idx: int) -> bytes: """convert an integer index to a binary key for use in LMDB""" return b"%08d" % idx -def key_to_idx(key): +def key_to_idx(key: memoryview | bytes) -> int: """convert a binary LMDB key to an integer index""" return int(key) @@ -47,7 +53,7 @@ def __init__(self, txn, batch_size): self._counter = 0 self._batch_size = batch_size - def add_sample(self, inputs, targets): + def add_sample(self, inputs: np.ndarray, targets: np.ndarray) -> None: # use zero-padded 8-digit key key = idx_to_key(self._counter) self._counter += 1 @@ -58,7 +64,7 @@ def add_sample(self, inputs, targets): buf.seek(0) self._txn.put(key, buf.read()) - def __getitem__(self, idx): + def __getitem__(self, idx: int) -> tuple[np.ndarray, np.ndarray]: """get a particular batch of samples""" cursor = self._txn.cursor() first_key = idx * self._batch_size @@ -73,7 +79,7 @@ def __getitem__(self, idx): target_arrays.append(target_csr.toarray().flatten()) return np.array(input_arrays), np.array(target_arrays) - def __len__(self): + def __len__(self) -> int: """return the number of available batches""" return int(np.ceil(self._counter / self._batch_size)) @@ -81,7 +87,7 @@ def __len__(self): class MeanLayer(Layer): """Custom Keras layer that calculates mean values along the 2nd axis.""" - def call(self, inputs): + def call(self, inputs: EagerTensor) -> EagerTensor: return K.mean(inputs, axis=2) @@ -106,12 +112,12 @@ class NNEnsembleBackend(backend.AnnifLearningBackend, ensemble.BaseEnsembleBacke # defaults for uninitialized instances _model = None - def default_params(self): + def default_params(self) -> dict[str, Any]: params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy() params.update(self.DEFAULT_PARAMETERS) return params - def initialize(self, parallel=False): + def initialize(self, parallel: bool = False) -> None: super().initialize(parallel) if self._model is not None: return # already initialized @@ -130,7 +136,12 @@ def initialize(self, parallel=False): model_filename, custom_objects={"MeanLayer": MeanLayer} ) - def _merge_source_batches(self, batch_by_source, sources, params): + def _merge_source_batches( + self, + batch_by_source: dict[str, SuggestionBatch], + sources: list[tuple[str, float]], + params: dict[str, Any], + ) -> SuggestionBatch: src_weight = dict(sources) score_vectors = np.array( [ @@ -153,7 +164,7 @@ def _merge_source_batches(self, batch_by_source, sources, params): self.project.subjects, ) - def _create_model(self, sources): + def _create_model(self, sources: list[tuple[str, float]]) -> None: self.info("creating NN ensemble model") inputs = Input(shape=(len(self.project.subjects), len(sources))) @@ -185,7 +196,12 @@ def _create_model(self, sources): self._model.summary(print_fn=summary.append) self.debug("Created model: \n" + "\n".join(summary)) - def _train(self, corpus, params, jobs=0): + def _train( + self, + corpus: DocumentCorpus, + params: dict[str, Any], + jobs: int = 0, + ) -> None: sources = annif.util.parse_sources(self.params["sources"]) self._create_model(sources) self._fit_model( @@ -195,7 +211,12 @@ def _train(self, corpus, params, jobs=0): n_jobs=jobs, ) - def _corpus_to_vectors(self, corpus, seq, n_jobs): + def _corpus_to_vectors( + self, + corpus: DocumentCorpus, + seq: LMDBSequence, + n_jobs: int, + ) -> None: # pass corpus through all source projects sources = dict(annif.util.parse_sources(self.params["sources"])) @@ -236,7 +257,13 @@ def _open_lmdb(self, cached, lmdb_map_size): shutil.rmtree(lmdb_path) return lmdb.open(lmdb_path, map_size=lmdb_map_size, writemap=True) - def _fit_model(self, corpus, epochs, lmdb_map_size, n_jobs=1): + def _fit_model( + self, + corpus: DocumentCorpus, + epochs: int, + lmdb_map_size: int, + n_jobs: int = 1, + ) -> None: env = self._open_lmdb(corpus == "cached", lmdb_map_size) if corpus != "cached": if corpus.is_empty(): @@ -256,7 +283,11 @@ def _fit_model(self, corpus, epochs, lmdb_map_size, n_jobs=1): annif.util.atomic_save(self._model, self.datadir, self.MODEL_FILE) - def _learn(self, corpus, params): + def _learn( + self, + corpus: DocumentCorpus, + params: dict[str, Any], + ) -> None: self.initialize() self._fit_model( corpus, int(params["learn-epochs"]), int(params["lmdb_map_size"]) diff --git a/annif/backend/omikuji.py b/annif/backend/omikuji.py index 99218b951..6c864b89e 100644 --- a/annif/backend/omikuji.py +++ b/annif/backend/omikuji.py @@ -1,7 +1,9 @@ """Annif backend using the Omikuji classifier""" +from __future__ import annotations import os.path import shutil +from typing import TYPE_CHECKING, Any import omikuji @@ -15,6 +17,11 @@ from . import backend, mixins +if TYPE_CHECKING: + from scipy.sparse._csr import csr_matrix + + from annif.corpus.document import DocumentCorpus + class OmikujiBackend(mixins.TfidfVectorizerMixin, backend.AnnifBackend): """Omikuji based backend for Annif""" @@ -36,12 +43,12 @@ class OmikujiBackend(mixins.TfidfVectorizerMixin, backend.AnnifBackend): "collapse_every_n_layers": 0, } - def default_params(self): + def default_params(self) -> dict[str, Any]: params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy() params.update(self.DEFAULT_PARAMETERS) return params - def _initialize_model(self): + def _initialize_model(self) -> None: if self._model is None: path = os.path.join(self.datadir, self.MODEL_FILE) self.debug("loading model from {}".format(path)) @@ -58,11 +65,11 @@ def _initialize_model(self): "model {} not found".format(path), backend_id=self.backend_id ) - def initialize(self, parallel=False): + def initialize(self, parallel: bool = False) -> None: self.initialize_vectorizer() self._initialize_model() - def _create_train_file(self, veccorpus, corpus): + def _create_train_file(self, veccorpus: csr_matrix, corpus: DocumentCorpus) -> None: self.info("creating train file") path = os.path.join(self.datadir, self.TRAIN_FILE) with open(path, "w", encoding="utf-8") as trainfile: @@ -89,7 +96,7 @@ def _create_train_file(self, veccorpus, corpus): trainfile.seek(0) print("{:08d}".format(n_samples), end="", file=trainfile) - def _create_model(self, params, jobs): + def _create_model(self, params: dict[str, Any], jobs: int) -> None: train_path = os.path.join(self.datadir, self.TRAIN_FILE) model_path = os.path.join(self.datadir, self.MODEL_FILE) hyper_param = omikuji.Model.default_hyper_param() @@ -104,7 +111,12 @@ def _create_model(self, params, jobs): shutil.rmtree(model_path) self._model.save(os.path.join(self.datadir, self.MODEL_FILE)) - def _train(self, corpus, params, jobs=0): + def _train( + self, + corpus: DocumentCorpus, + params: dict[str, Any], + jobs: int = 0, + ) -> None: if corpus != "cached": if corpus.is_empty(): raise NotSupportedException( @@ -122,7 +134,9 @@ def _train(self, corpus, params, jobs=0): self.info("Reusing cached training data from previous run.") self._create_model(params, jobs) - def _suggest_batch(self, texts, params): + def _suggest_batch( + self, texts: list[str], params: dict[str, Any] + ) -> SuggestionBatch: vector = self.vectorizer.transform(texts) limit = int(params["limit"]) diff --git a/annif/backend/pav.py b/annif/backend/pav.py index 5125cb8cd..da8a6e2c1 100644 --- a/annif/backend/pav.py +++ b/annif/backend/pav.py @@ -2,8 +2,10 @@ learns which concept suggestions from each backend are trustworthy using the PAV algorithm, a.k.a. isotonic regression, to turn raw scores returned by individual backends into probabilities.""" +from __future__ import annotations import os.path +from typing import TYPE_CHECKING, Any import joblib import numpy as np @@ -17,6 +19,10 @@ from . import backend, ensemble +if TYPE_CHECKING: + from annif.corpus.document import DocumentCorpus + from annif.project import AnnifProject + class PAVBackend(ensemble.BaseEnsembleBackend): """PAV ensemble backend that combines results from multiple projects""" @@ -30,12 +36,12 @@ class PAVBackend(ensemble.BaseEnsembleBackend): DEFAULT_PARAMETERS = {"min-docs": 10} - def default_params(self): + def default_params(self) -> dict[str, Any]: params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy() params.update(self.DEFAULT_PARAMETERS) return params - def initialize(self, parallel=False): + def initialize(self, parallel: bool = False) -> None: super().initialize(parallel) if self._models is not None: return # already initialized @@ -53,11 +59,16 @@ def initialize(self, parallel=False): backend_id=self.backend_id, ) - def _get_model(self, source_project_id): + def _get_model(self, source_project_id: str) -> dict[int, IsotonicRegression]: self.initialize() return self._models[source_project_id] - def _merge_source_batches(self, batch_by_source, sources, params): + def _merge_source_batches( + self, + batch_by_source: dict[str, SuggestionBatch], + sources: list[tuple[str, float]], + params: dict[str, Any], + ) -> SuggestionBatch: reg_batch_by_source = {} for project_id, batch in batch_by_source.items(): reg_models = self._get_model(project_id) @@ -82,7 +93,9 @@ def _merge_source_batches(self, batch_by_source, sources, params): return super()._merge_source_batches(reg_batch_by_source, sources, params) @staticmethod - def _suggest_train_corpus(source_project, corpus): + def _suggest_train_corpus( + source_project: AnnifProject, corpus: DocumentCorpus + ) -> tuple[csc_matrix, csc_matrix]: # lists for constructing score matrix data, row, col = [], [], [] # lists for constructing true label matrix @@ -114,7 +127,9 @@ def _suggest_train_corpus(source_project, corpus): ) return csc_matrix(scores), csc_matrix(true) - def _create_pav_model(self, source_project_id, min_docs, corpus): + def _create_pav_model( + self, source_project_id: str, min_docs: int, corpus: DocumentCorpus + ) -> None: self.info( "creating PAV model for source {}, min_docs={}".format( source_project_id, min_docs @@ -138,7 +153,12 @@ def _create_pav_model(self, source_project_id, min_docs, corpus): pav_regressions, self.datadir, model_filename, method=joblib.dump ) - def _train(self, corpus, params, jobs=0): + def _train( + self, + corpus: DocumentCorpus, + params: dict[str, Any], + jobs: int = 0, + ) -> None: if corpus == "cached": raise NotSupportedException( "Training pav project from cached data not supported." diff --git a/annif/backend/stwfsa.py b/annif/backend/stwfsa.py index d8217ee03..fdc962b11 100644 --- a/annif/backend/stwfsa.py +++ b/annif/backend/stwfsa.py @@ -1,4 +1,7 @@ +from __future__ import annotations + import os +from typing import TYPE_CHECKING, Any from stwfsapy.predictor import StwfsapyPredictor @@ -8,6 +11,9 @@ from . import backend +if TYPE_CHECKING: + from annif.corpus.document import DocumentCorpus + _KEY_CONCEPT_TYPE_URI = "concept_type_uri" _KEY_SUBTHESAURUS_TYPE_URI = "sub_thesaurus_type_uri" _KEY_THESAURUS_RELATION_TYPE_URI = "thesaurus_relation_type_uri" @@ -59,7 +65,7 @@ class StwfsaBackend(backend.AnnifBackend): _model = None - def initialize(self, parallel=False): + def initialize(self, parallel: bool = False) -> None: if self._model is None: path = os.path.join(self.datadir, self.MODEL_FILE) self.debug(f"Loading STWFSA model from {path}.") @@ -71,7 +77,7 @@ def initialize(self, parallel=False): f"Model not found at {path}", backend_id=self.backend_id ) - def _load_data(self, corpus): + def _load_data(self, corpus: DocumentCorpus) -> tuple[list[str], list[list[str]]]: if corpus == "cached": raise NotSupportedException( "Training stwfsa project from cached data not supported." @@ -93,7 +99,12 @@ def _load_data(self, corpus): ) return X, y - def _train(self, corpus, params, jobs=0): + def _train( + self, + corpus: DocumentCorpus, + params: dict[str, Any], + jobs: int = 0, + ) -> None: X, y = self._load_data(corpus) new_params = { key: self.STWFSA_PARAMETERS[key](val) @@ -114,7 +125,7 @@ def _train(self, corpus, params, jobs=0): lambda model, store_path: model.store(store_path), ) - def _suggest(self, text, params): + def _suggest(self, text: str, params: dict[str, Any]) -> list[SubjectSuggestion]: self.debug(f'Suggesting subjects for text "{text[:20]}..." (len={len(text)})') result = self._model.suggest_proba([text])[0] suggestions = [] diff --git a/annif/backend/svc.py b/annif/backend/svc.py index ad8939f5f..1e7932c3e 100644 --- a/annif/backend/svc.py +++ b/annif/backend/svc.py @@ -1,6 +1,8 @@ """Annif backend using a SVM classifier""" +from __future__ import annotations import os.path +from typing import TYPE_CHECKING, Any import joblib import numpy as np @@ -13,6 +15,11 @@ from . import backend, mixins +if TYPE_CHECKING: + from scipy.sparse._csr import csr_matrix + + from annif.corpus.document import DocumentCorpus + class SVCBackend(mixins.TfidfVectorizerMixin, backend.AnnifBackend): """Support vector classifier backend for Annif""" @@ -26,12 +33,12 @@ class SVCBackend(mixins.TfidfVectorizerMixin, backend.AnnifBackend): DEFAULT_PARAMETERS = {"min_df": 1, "ngram": 1} - def default_params(self): + def default_params(self) -> dict[str, Any]: params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy() params.update(self.DEFAULT_PARAMETERS) return params - def _initialize_model(self): + def _initialize_model(self) -> None: if self._model is None: path = os.path.join(self.datadir, self.MODEL_FILE) self.debug("loading model from {}".format(path)) @@ -42,11 +49,13 @@ def _initialize_model(self): "model {} not found".format(path), backend_id=self.backend_id ) - def initialize(self, parallel=False): + def initialize(self, parallel: bool = False) -> None: self.initialize_vectorizer() self._initialize_model() - def _corpus_to_texts_and_classes(self, corpus): + def _corpus_to_texts_and_classes( + self, corpus: DocumentCorpus + ) -> tuple[list[str], list[int]]: texts = [] classes = [] for doc in corpus.documents: @@ -61,7 +70,7 @@ def _corpus_to_texts_and_classes(self, corpus): classes.append(doc.subject_set[0]) return texts, classes - def _train_classifier(self, veccorpus, classes): + def _train_classifier(self, veccorpus: csr_matrix, classes: list[int]) -> None: self.info("creating classifier") self._model = LinearSVC() self._model.fit(veccorpus, classes) @@ -69,7 +78,9 @@ def _train_classifier(self, veccorpus, classes): self._model, self.datadir, self.MODEL_FILE, method=joblib.dump ) - def _train(self, corpus, params, jobs=0): + def _train( + self, corpus: DocumentCorpus, params: dict[str, Any], jobs: int = 0 + ) -> None: if corpus == "cached": raise NotSupportedException( "SVC backend does not support reuse of cached training data." @@ -85,7 +96,9 @@ def _train(self, corpus, params, jobs=0): veccorpus = self.create_vectorizer(texts, vecparams) self._train_classifier(veccorpus, classes) - def _scores_to_suggestions(self, scores, params): + def _scores_to_suggestions( + self, scores: np.ndarray, params: dict[str, Any] + ) -> list[SubjectSuggestion]: results = [] limit = int(params["limit"]) for class_id in np.argsort(scores)[::-1][:limit]: @@ -96,7 +109,9 @@ def _scores_to_suggestions(self, scores, params): ) return results - def _suggest_batch(self, texts, params): + def _suggest_batch( + self, texts: list[str], params: dict[str, Any] + ) -> SuggestionBatch: vector = self.vectorizer.transform(texts) confidences = self._model.decision_function(vector) # convert to 0..1 score range using logistic function diff --git a/annif/backend/tfidf.py b/annif/backend/tfidf.py index 335fe53d1..1cca639ca 100644 --- a/annif/backend/tfidf.py +++ b/annif/backend/tfidf.py @@ -1,8 +1,10 @@ """Backend that returns most similar subjects based on similarity in sparse TF-IDF normalized bag-of-words vector space""" +from __future__ import annotations import os.path import tempfile +from typing import TYPE_CHECKING, Any import gensim.similarities from gensim.matutils import Sparse2Corpus @@ -13,19 +15,26 @@ from . import backend, mixins +if TYPE_CHECKING: + from collections.abc import Iterator + + from scipy.sparse._csr import csr_matrix + + from annif.corpus.document import DocumentCorpus + class SubjectBuffer: """A file-backed buffer to store and retrieve subject text.""" BUFFER_SIZE = 100 - def __init__(self, tempdir, subject_id): + def __init__(self, tempdir: str, subject_id: int) -> None: filename = "{:08d}.txt".format(subject_id) self._path = os.path.join(tempdir, filename) self._buffer = [] self._created = False - def flush(self): + def flush(self) -> None: if self._created: mode = "a" else: @@ -38,12 +47,12 @@ def flush(self): self._buffer = [] self._created = True - def write(self, text): + def write(self, text: str) -> None: self._buffer.append(text) if len(self._buffer) >= self.BUFFER_SIZE: self.flush() - def read(self): + def read(self) -> str: if not self._created: # file was never created - we can simply return the buffer content return "\n".join(self._buffer) @@ -62,7 +71,9 @@ class TFIDFBackend(mixins.TfidfVectorizerMixin, backend.AnnifBackend): INDEX_FILE = "tfidf-index" - def _generate_subjects_from_documents(self, corpus): + def _generate_subjects_from_documents( + self, corpus: DocumentCorpus + ) -> Iterator[str]: with tempfile.TemporaryDirectory() as tempdir: subject_buffer = {} for subject_id in range(len(self.project.subjects)): @@ -76,7 +87,7 @@ def _generate_subjects_from_documents(self, corpus): for sid in range(len(self.project.subjects)): yield subject_buffer[sid].read() - def _initialize_index(self): + def _initialize_index(self) -> None: if self._index is None: path = os.path.join(self.datadir, self.INDEX_FILE) self.debug("loading similarity index from {}".format(path)) @@ -88,11 +99,11 @@ def _initialize_index(self): backend_id=self.backend_id, ) - def initialize(self, parallel=False): + def initialize(self, parallel: bool = False) -> None: self.initialize_vectorizer() self._initialize_index() - def _create_index(self, veccorpus): + def _create_index(self, veccorpus: csr_matrix) -> None: self.info("creating similarity index") gscorpus = Sparse2Corpus(veccorpus, documents_columns=False) self._index = gensim.similarities.SparseMatrixSimilarity( @@ -100,7 +111,12 @@ def _create_index(self, veccorpus): ) annif.util.atomic_save(self._index, self.datadir, self.INDEX_FILE) - def _train(self, corpus, params, jobs=0): + def _train( + self, + corpus: DocumentCorpus, + params: dict[str, Any], + jobs: int = 0, + ) -> None: if corpus == "cached": raise NotSupportedException( "Training tfidf project from cached data not supported." @@ -112,7 +128,7 @@ def _train(self, corpus, params, jobs=0): veccorpus = self.create_vectorizer(subjects) self._create_index(veccorpus) - def _suggest(self, text, params): + def _suggest(self, text: str, params: dict[str, Any]) -> Iterator: self.debug( 'Suggesting subjects for text "{}..." (len={})'.format(text[:20], len(text)) ) diff --git a/annif/backend/yake.py b/annif/backend/yake.py index bb684aaf5..1e6adfdd5 100644 --- a/annif/backend/yake.py +++ b/annif/backend/yake.py @@ -1,10 +1,12 @@ """Annif backend using Yake keyword extraction""" # For license remarks of this backend see README.md: # https://github.com/NatLibFi/Annif#license. +from __future__ import annotations import os.path import re from collections import defaultdict +from typing import TYPE_CHECKING, Any import joblib import yake @@ -16,6 +18,11 @@ from . import backend +if TYPE_CHECKING: + from rdflib.term import URIRef + + from annif.corpus.document import DocumentCorpus + class YakeBackend(backend.AnnifBackend): """Yake based backend for Annif""" @@ -38,7 +45,7 @@ class YakeBackend(backend.AnnifBackend): "remove_parentheses": False, } - def default_params(self): + def default_params(self) -> dict[str, Any]: params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy() params.update(self.DEFAULT_PARAMETERS) return params @@ -48,7 +55,7 @@ def is_trained(self): return True @property - def label_types(self): + def label_types(self) -> list[URIRef]: if type(self.params["label_types"]) == str: # Label types set by user label_types = [lt.strip() for lt in self.params["label_types"].split(",")] self._validate_label_types(label_types) @@ -56,17 +63,17 @@ def label_types(self): label_types = self.params["label_types"] # The defaults return [getattr(SKOS, lt) for lt in label_types] - def _validate_label_types(self, label_types): + def _validate_label_types(self, label_types: list[str]) -> None: for lt in label_types: if lt not in ("prefLabel", "altLabel", "hiddenLabel"): raise ConfigurationException( f"invalid label type {lt}", backend_id=self.backend_id ) - def initialize(self, parallel=False): + def initialize(self, parallel: bool = False) -> None: self._initialize_index() - def _initialize_index(self): + def _initialize_index(self) -> None: if self._index is None: path = os.path.join(self.datadir, self.INDEX_FILE) if os.path.exists(path): @@ -78,12 +85,12 @@ def _initialize_index(self): self._save_index(path) self.info(f"Created index with {len(self._index)} labels") - def _save_index(self, path): + def _save_index(self, path: str) -> None: annif.util.atomic_save( self._index, self.datadir, self.INDEX_FILE, method=joblib.dump ) - def _create_index(self): + def _create_index(self) -> dict[str, set[str]]: index = defaultdict(set) skos_vocab = self.project.vocab.skos for concept in skos_vocab.concepts: @@ -95,21 +102,21 @@ def _create_index(self): index.pop("", None) # Remove possible empty string entry return dict(index) - def _normalize_label(self, label): + def _normalize_label(self, label: str) -> str: label = str(label) if annif.util.boolean(self.params["remove_parentheses"]): label = re.sub(r" \(.*\)", "", label) normalized_label = self._normalize_phrase(label) return self._sort_phrase(normalized_label) - def _normalize_phrase(self, phrase): + def _normalize_phrase(self, phrase: str) -> str: return " ".join(self.project.analyzer.tokenize_words(phrase, filter=False)) - def _sort_phrase(self, phrase): + def _sort_phrase(self, phrase: str) -> str: words = phrase.split() return " ".join(sorted(words)) - def _suggest(self, text, params): + def _suggest(self, text: str, params: dict[str, Any]) -> list[SubjectSuggestion]: self.debug(f'Suggesting subjects for text "{text[:20]}..." (len={len(text)})') limit = int(params["limit"]) @@ -132,7 +139,9 @@ def _suggest(self, text, params): ] return subject_suggestions - def _keyphrases2suggestions(self, keyphrases): + def _keyphrases2suggestions( + self, keyphrases: list[tuple[str, float]] + ) -> list[tuple[str, float]]: suggestions = [] not_matched = [] for kp, score in keyphrases: @@ -154,16 +163,18 @@ def _keyphrases2suggestions(self, keyphrases): ) return suggestions - def _keyphrase2uris(self, keyphrase): + def _keyphrase2uris(self, keyphrase: str) -> set[str]: keyphrase = self._normalize_phrase(keyphrase) keyphrase = self._sort_phrase(keyphrase) return self._index.get(keyphrase, []) - def _transform_score(self, score): + def _transform_score(self, score: float) -> float: score = max(score, 0) return 1.0 / (score + 1) - def _combine_suggestions(self, suggestions): + def _combine_suggestions( + self, suggestions: list[tuple[str, float]] + ) -> list[tuple[str, float]]: combined_suggestions = {} for uri, score in suggestions: if uri not in combined_suggestions: @@ -173,12 +184,12 @@ def _combine_suggestions(self, suggestions): combined_suggestions[uri] = self._combine_scores(score, old_score) return list(combined_suggestions.items()) - def _combine_scores(self, score1, score2): + def _combine_scores(self, score1: float, score2: float) -> float: # The result is never smaller than the greater input score1 = score1 / 2 + 0.5 score2 = score2 / 2 + 0.5 confl = score1 * score2 / (score1 * score2 + (1 - score1) * (1 - score2)) return (confl - 0.5) * 2 - def _train(self, corpus, params, jobs=0): + def _train(self, corpus: DocumentCorpus, params: dict[str, Any], jobs: int = 0): raise NotSupportedException("Training yake backend is not possible.") diff --git a/annif/cli_util.py b/annif/cli_util.py index 72da0d46c..bbfa96df4 100644 --- a/annif/cli_util.py +++ b/annif/cli_util.py @@ -1,10 +1,11 @@ """Utility functions for Annif CLI commands""" - +from __future__ import annotations import collections import itertools import os import sys +from typing import TYPE_CHECKING import click import click_log @@ -14,10 +15,24 @@ from annif.exception import ConfigurationException from annif.project import Access +if TYPE_CHECKING: + from datetime import datetime + from io import TextIOWrapper + + from click.core import Argument, Context, Option + + from annif.corpus.document import DocumentCorpus, DocumentList + from annif.corpus.subject import SubjectIndex + from annif.project import AnnifProject + from annif.suggestion import SuggestionResult + from annif.vocab import AnnifVocabulary + logger = annif.logger -def _set_project_config_file_path(ctx, param, value): +def _set_project_config_file_path( + ctx: Context, param: Option, value: str | None +) -> None: """Override the default path or the path given in env by CLI option""" with ctx.obj.load_app().app_context(): if value: @@ -66,7 +81,7 @@ def docs_limit_option(f): )(f) -def get_project(project_id): +def get_project(project_id: str) -> AnnifProject: """ Helper function to get a project by ID and bail out if it doesn't exist""" try: @@ -76,7 +91,7 @@ def get_project(project_id): sys.exit(1) -def get_vocab(vocab_id): +def get_vocab(vocab_id: str) -> AnnifVocabulary: """ Helper function to get a vocabulary by ID and bail out if it doesn't exist""" @@ -87,7 +102,7 @@ def get_vocab(vocab_id): sys.exit(1) -def make_list_template(*rows): +def make_list_template(*rows) -> str: """Helper function to create a template for a list of entries with fields of variable width. The width of each field is determined by the longest item in the field in the given rows.""" @@ -105,14 +120,19 @@ def make_list_template(*rows): ) -def format_datetime(dt): +def format_datetime(dt: datetime | None) -> str: """Helper function to format a datetime object as a string in the local time.""" if dt is None: return "-" return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S") -def open_documents(paths, subject_index, vocab_lang, docs_limit): +def open_documents( + paths: tuple[str, ...], + subject_index: SubjectIndex, + vocab_lang: str, + docs_limit: int | None, +) -> DocumentCorpus: """Helper function to open a document corpus from a list of pathnames, each of which is either a TSV file or a directory of TXT files. For directories with subjects in TSV files, the given vocabulary language @@ -140,7 +160,7 @@ def open_doc_path(path, subject_index): return docs -def open_text_documents(paths, docs_limit): +def open_text_documents(paths: tuple[str, ...], docs_limit: int | None) -> DocumentList: """ Helper function to read text documents from the given file paths. Returns a DocumentList object with Documents having no subjects. If a path is "-", the @@ -160,7 +180,12 @@ def _docs(paths): return annif.corpus.DocumentList(_docs(paths[:docs_limit])) -def show_hits(hits, project, lang, file=None): +def show_hits( + hits: SuggestionResult, + project: AnnifProject, + lang: str, + file: TextIOWrapper | None = None, +) -> None: """ Print subject suggestions to the console or a file. The suggestions are displayed as a table, with one row per hit. Each row contains the URI, label, possible notation, @@ -177,7 +202,9 @@ def show_hits(hits, project, lang, file=None): click.echo(line, file=file) -def parse_backend_params(backend_param, project): +def parse_backend_params( + backend_param: tuple[str, ...] | tuple[()], project: AnnifProject +) -> collections.defaultdict[str, dict[str, str]]: """Parse a list of backend parameters given with the --backend-param option into a nested dict structure""" backend_params = collections.defaultdict(dict) @@ -189,7 +216,7 @@ def parse_backend_params(backend_param, project): return backend_params -def _validate_backend_params(backend, beparam, project): +def _validate_backend_params(backend: str, beparam: str, project: AnnifProject) -> None: if backend != project.config["backend"]: raise ConfigurationException( 'The backend {} in CLI option "-b {}" not matching the project' @@ -197,13 +224,15 @@ def _validate_backend_params(backend, beparam, project): ) -def generate_filter_params(filter_batch_max_limit): +def generate_filter_params(filter_batch_max_limit: int) -> list[tuple[int, float]]: limits = range(1, filter_batch_max_limit + 1) thresholds = [i * 0.05 for i in range(20)] return list(itertools.product(limits, thresholds)) -def _get_completion_choices(param): +def _get_completion_choices( + param: Argument, +) -> dict[str, AnnifVocabulary] | dict[str, AnnifProject] | list: if param.name == "project_id": return annif.registry.get_projects() elif param.name == "vocab_id": @@ -212,7 +241,7 @@ def _get_completion_choices(param): return [] -def complete_param(ctx, param, incomplete): +def complete_param(ctx: Context, param: Argument, incomplete: str) -> list[str]: with ctx.obj.load_app().app_context(): return [ choice diff --git a/annif/config.py b/annif/config.py index 589b337a3..ab8f0d568 100644 --- a/annif/config.py +++ b/annif/config.py @@ -1,5 +1,5 @@ """Configuration file handling""" - +from __future__ import annotations import configparser import os.path @@ -17,7 +17,7 @@ class AnnifConfigCFG: """Class for reading configuration in CFG/INI format""" - def __init__(self, filename): + def __init__(self, filename: str) -> None: self._config = configparser.ConfigParser() self._config.optionxform = annif.util.identity with open(filename, encoding="utf-8-sig") as projf: @@ -28,20 +28,20 @@ def __init__(self, filename): configparser.DuplicateOptionError, configparser.DuplicateSectionError, ) as err: - raise ConfigurationException(err) + raise ConfigurationException(err.message) @property - def project_ids(self): + def project_ids(self) -> list[str]: return self._config.sections() - def __getitem__(self, key): + def __getitem__(self, key: str) -> configparser.SectionProxy: return self._config[key] class AnnifConfigTOML: """Class for reading configuration in TOML format""" - def __init__(self, filename): + def __init__(self, filename: str) -> None: with open(filename, "rb") as projf: try: logger.debug(f"Reading configuration file {filename} in TOML format") @@ -55,14 +55,14 @@ def __init__(self, filename): def project_ids(self): return self._config.keys() - def __getitem__(self, key): + def __getitem__(self, key: str) -> dict[str, str]: return self._config[key] class AnnifConfigDirectory: """Class for reading configuration from directory""" - def __init__(self, directory): + def __init__(self, directory: str) -> None: files = glob(os.path.join(directory, "*.cfg")) files.extend(glob(os.path.join(directory, "*.toml"))) logger.debug(f"Reading configuration files in directory {directory}") @@ -74,7 +74,7 @@ def __init__(self, directory): self._check_duplicate_project_ids(proj_id, file) self._config[proj_id] = source_config[proj_id] - def _check_duplicate_project_ids(self, proj_id, file): + def _check_duplicate_project_ids(self, proj_id: str, file: str) -> None: if proj_id in self._config: # Error message resembles configparser's DuplicateSection message raise ConfigurationException( @@ -86,11 +86,11 @@ def _check_duplicate_project_ids(self, proj_id, file): def project_ids(self): return self._config.keys() - def __getitem__(self, key): + def __getitem__(self, key: str) -> dict[str, str] | configparser.SectionProxy: return self._config[key] -def check_config(projects_config_path): +def check_config(projects_config_path: str) -> str | None: if os.path.exists(projects_config_path): return projects_config_path else: @@ -104,7 +104,7 @@ def check_config(projects_config_path): return None -def find_config(): +def find_config() -> str | None: for path in ("projects.cfg", "projects.toml", "projects.d"): if os.path.exists(path): return path @@ -119,7 +119,9 @@ def find_config(): return None -def parse_config(projects_config_path): +def parse_config( + projects_config_path: str, +) -> AnnifConfigDirectory | AnnifConfigCFG | AnnifConfigTOML | None: if projects_config_path: projects_config_path = check_config(projects_config_path) else: diff --git a/annif/corpus/combine.py b/annif/corpus/combine.py index 48fc83ff5..75fcc7f55 100644 --- a/annif/corpus/combine.py +++ b/annif/corpus/combine.py @@ -1,19 +1,24 @@ """Class for combining multiple corpora so they behave like a single corpus""" +from __future__ import annotations import itertools +from typing import TYPE_CHECKING from .types import DocumentCorpus +if TYPE_CHECKING: + from annif.corpus.document import DocumentFile + class CombinedCorpus(DocumentCorpus): """Class for combining multiple corpora so they behave like a single corpus""" - def __init__(self, corpora): + def __init__(self, corpora: list[DocumentFile]) -> None: self._corpora = corpora @property - def documents(self): + def documents(self) -> itertools.chain: return itertools.chain.from_iterable( [corpus.documents for corpus in self._corpora] ) diff --git a/annif/corpus/document.py b/annif/corpus/document.py index 54a0a3ba6..09a80a309 100644 --- a/annif/corpus/document.py +++ b/annif/corpus/document.py @@ -1,29 +1,42 @@ """Clases for supporting document corpora""" +from __future__ import annotations import glob import gzip import os.path import re from itertools import islice +from typing import TYPE_CHECKING import annif.util from .subject import SubjectSet from .types import Document, DocumentCorpus +if TYPE_CHECKING: + from collections.abc import Iterator + + from annif.corpus.subject import SubjectIndex + logger = annif.logger class DocumentDirectory(DocumentCorpus): """A directory of files as a full text document corpus""" - def __init__(self, path, subject_index=None, language=None, require_subjects=False): + def __init__( + self, + path: str, + subject_index: SubjectIndex | None = None, + language: str | None = None, + require_subjects: bool = False, + ) -> None: self.path = path self.subject_index = subject_index self.language = language self.require_subjects = require_subjects - def __iter__(self): + def __iter__(self) -> Iterator[tuple[str, str] | tuple[str, None]]: """Iterate through the directory, yielding tuples of (docfile, subjectfile) containing file paths. If require_subjects is False, the subjectfile will be returned as None.""" @@ -42,7 +55,7 @@ def __iter__(self): yield (filename, None) @property - def documents(self): + def documents(self) -> Iterator[Document]: for docfilename, subjfilename in self: with open(docfilename, errors="replace", encoding="utf-8-sig") as docfile: text = docfile.read() @@ -59,12 +72,12 @@ def documents(self): class DocumentFile(DocumentCorpus): """A TSV file as a corpus of documents with subjects""" - def __init__(self, path, subject_index): + def __init__(self, path: str, subject_index: SubjectIndex) -> None: self.path = path self.subject_index = subject_index @property - def documents(self): + def documents(self) -> Iterator[Document]: if self.path.endswith(".gz"): opener = gzip.open else: @@ -73,7 +86,7 @@ def documents(self): for line in tsvfile: yield from self._parse_tsv_line(line) - def _parse_tsv_line(self, line): + def _parse_tsv_line(self, line: str) -> Iterator[Document]: if "\t" in line: text, uris = line.split("\t", maxsplit=1) subject_ids = { diff --git a/annif/corpus/skos.py b/annif/corpus/skos.py index f29eee32d..462a35241 100644 --- a/annif/corpus/skos.py +++ b/annif/corpus/skos.py @@ -1,8 +1,10 @@ """Support for subjects loaded from a SKOS/RDF file""" +from __future__ import annotations import collections import os.path import shutil +from typing import TYPE_CHECKING import rdflib import rdflib.util @@ -12,8 +14,13 @@ from .types import Subject, SubjectCorpus +if TYPE_CHECKING: + from collections.abc import Iterator, Sequence -def serialize_subjects_to_skos(subjects, path): + from rdflib.term import URIRef + + +def serialize_subjects_to_skos(subjects: Iterator, path: str) -> None: """Create a SKOS representation of the given subjects and serialize it into a SKOS/Turtle file with the given path name.""" import joblib @@ -51,7 +58,7 @@ class SubjectFileSKOS(SubjectCorpus): _languages = None - def __init__(self, path): + def __init__(self, path: str) -> None: self.path = path if path.endswith(".dump.gz"): import joblib @@ -62,7 +69,7 @@ def __init__(self, path): self.graph.parse(self.path, format=rdflib.util.guess_format(self.path)) @property - def languages(self): + def languages(self) -> set[str]: if self._languages is None: self._languages = { label.language @@ -73,7 +80,7 @@ def languages(self): } return self._languages - def _concept_labels(self, concept): + def _concept_labels(self, concept: URIRef) -> dict[str, str]: by_lang = self.get_concept_labels(concept, self.PREF_LABEL_PROPERTIES) return { lang: by_lang[lang][0] @@ -85,7 +92,7 @@ def _concept_labels(self, concept): } @property - def subjects(self): + def subjects(self) -> Iterator[Subject]: for concept in self.concepts: labels = self._concept_labels(concept) @@ -96,13 +103,17 @@ def subjects(self): yield Subject(uri=str(concept), labels=labels, notation=notation) @property - def concepts(self): + def concepts(self) -> Iterator[URIRef]: for concept in self.graph.subjects(RDF.type, SKOS.Concept): if (concept, OWL.deprecated, rdflib.Literal(True)) in self.graph: continue yield concept - def get_concept_labels(self, concept, label_types): + def get_concept_labels( + self, + concept: URIRef, + label_types: Sequence[URIRef], + ) -> collections.defaultdict[str | None, list[str]]: """return all the labels of the given concept with the given label properties as a dict-like object where the keys are language codes and the values are lists of labels in that language""" @@ -115,14 +126,14 @@ def get_concept_labels(self, concept, label_types): return labels_by_lang @staticmethod - def is_rdf_file(path): + def is_rdf_file(path: str) -> bool: """return True if the path looks like an RDF file that can be loaded as SKOS""" fmt = rdflib.util.guess_format(path) return fmt is not None - def save_skos(self, path): + def save_skos(self, path: str) -> None: """Save the contents of the subject vocabulary into a SKOS/Turtle file with the given path name.""" @@ -139,5 +150,5 @@ def save_skos(self, path): annif.util.atomic_save( self.graph, *os.path.split(path.replace(".ttl", ".dump.gz")), - method=joblib.dump + method=joblib.dump, ) diff --git a/annif/corpus/subject.py b/annif/corpus/subject.py index 06c33683b..a9ee06397 100644 --- a/annif/corpus/subject.py +++ b/annif/corpus/subject.py @@ -1,7 +1,9 @@ """Classes for supporting subject corpora expressed as directories or files""" +from __future__ import annotations import csv import os.path +from typing import TYPE_CHECKING, Any import annif import annif.util @@ -9,6 +11,11 @@ from .skos import serialize_subjects_to_skos from .types import Subject, SubjectCorpus +if TYPE_CHECKING: + from collections.abc import Generator, Iterator + + import numpy as np + logger = annif.logger.getChild("subject") logger.addFilter(annif.util.DuplicateFilter()) @@ -16,14 +23,14 @@ class SubjectFileTSV(SubjectCorpus): """A monolingual subject vocabulary stored in a TSV file.""" - def __init__(self, path, language): + def __init__(self, path: str, language: str) -> None: """initialize the SubjectFileTSV given a path to a TSV file and the language of the vocabulary""" self.path = path self.language = language - def _parse_line(self, line): + def _parse_line(self, line: str) -> Iterator[Subject]: vals = line.strip().split("\t", 2) clean_uri = annif.util.cleanup_uri(vals[0]) label = vals[1] if len(vals) >= 2 else None @@ -32,16 +39,16 @@ def _parse_line(self, line): yield Subject(uri=clean_uri, labels=labels, notation=notation) @property - def languages(self): + def languages(self) -> list[str]: return [self.language] @property - def subjects(self): + def subjects(self) -> Generator: with open(self.path, encoding="utf-8-sig") as subjfile: for line in subjfile: yield from self._parse_line(line) - def save_skos(self, path): + def save_skos(self, path: str) -> None: """Save the contents of the subject vocabulary into a SKOS/Turtle file with the given path name.""" serialize_subjects_to_skos(self.subjects, path) @@ -50,11 +57,11 @@ def save_skos(self, path): class SubjectFileCSV(SubjectCorpus): """A multilingual subject vocabulary stored in a CSV file.""" - def __init__(self, path): + def __init__(self, path: str) -> None: """initialize the SubjectFileCSV given a path to a CSV file""" self.path = path - def _parse_row(self, row): + def _parse_row(self, row: dict[str, str]) -> Iterator[Subject]: labels = { fname.replace("label_", ""): value or None for fname, value in row.items() @@ -73,7 +80,7 @@ def _parse_row(self, row): ) @property - def languages(self): + def languages(self) -> list[str]: # infer the supported languages from the CSV column names with open(self.path, encoding="utf-8-sig") as csvfile: reader = csv.reader(csvfile) @@ -86,19 +93,19 @@ def languages(self): ] @property - def subjects(self): + def subjects(self) -> Generator: with open(self.path, encoding="utf-8-sig") as csvfile: reader = csv.DictReader(csvfile) for row in reader: yield from self._parse_row(row) - def save_skos(self, path): + def save_skos(self, path: str) -> None: """Save the contents of the subject vocabulary into a SKOS/Turtle file with the given path name.""" serialize_subjects_to_skos(self.subjects, path) @staticmethod - def is_csv_file(path): + def is_csv_file(path: str) -> bool: """return True if the path looks like a CSV file""" return os.path.splitext(path)[1].lower() == ".csv" @@ -108,30 +115,30 @@ class SubjectIndex: """An index that remembers the associations between integers subject IDs and their URIs and labels.""" - def __init__(self): + def __init__(self) -> None: self._subjects = [] self._uri_idx = {} self._label_idx = {} self._languages = None - def load_subjects(self, corpus): + def load_subjects(self, corpus: SubjectCorpus) -> None: """Initialize the subject index from a subject corpus""" self._languages = corpus.languages for subject in corpus.subjects: self.append(subject) - def __len__(self): + def __len__(self) -> int: return len(self._subjects) @property - def languages(self): + def languages(self) -> list[str] | None: return self._languages - def __getitem__(self, subject_id): + def __getitem__(self, subject_id: int) -> Subject: return self._subjects[subject_id] - def append(self, subject): + def append(self, subject: Subject) -> None: if self._languages is None and subject.labels is not None: self._languages = list(subject.labels.keys()) @@ -142,10 +149,10 @@ def append(self, subject): self._label_idx[(label, lang)] = subject_id self._subjects.append(subject) - def contains_uri(self, uri): + def contains_uri(self, uri: str) -> bool: return uri in self._uri_idx - def by_uri(self, uri, warnings=True): + def by_uri(self, uri: str, warnings: bool = True) -> int | None: """return the subject ID of a subject by its URI, or None if not found. If warnings=True, log a warning message if the URI cannot be found.""" try: @@ -155,7 +162,7 @@ def by_uri(self, uri, warnings=True): logger.warning("Unknown subject URI <%s>", uri) return None - def by_label(self, label, language): + def by_label(self, label: str | None, language: str) -> int | None: """return the subject ID of a subject by its label in a given language""" try: @@ -164,7 +171,7 @@ def by_label(self, label, language): logger.warning('Unknown subject label "%s"@%s', label, language) return None - def deprecated_ids(self): + def deprecated_ids(self) -> list[int]: """return indices of deprecated subjects""" return [ @@ -174,7 +181,7 @@ def deprecated_ids(self): ] @property - def active(self): + def active(self) -> list[tuple[int, Subject]]: """return a list of (subject_id, subject) tuples of all subjects that are not deprecated""" @@ -184,7 +191,7 @@ def active(self): if subject.labels is not None ] - def save(self, path): + def save(self, path: str) -> None: """Save this subject index into a file with the given path name.""" fieldnames = ["uri", "notation"] + [f"label_{lang}" for lang in self._languages] @@ -200,7 +207,7 @@ def save(self, path): writer.writerow(row) @classmethod - def load(cls, path): + def load(cls, path: str) -> SubjectIndex: """Load a subject index from a CSV file and return it.""" corpus = SubjectFileCSV(path) @@ -212,7 +219,7 @@ def load(cls, path): class SubjectSet: """Represents a set of subjects for a document.""" - def __init__(self, subject_ids=None): + def __init__(self, subject_ids: Any | None = None) -> None: """Create a SubjectSet and optionally initialize it from an iterable of subject IDs""" @@ -224,23 +231,25 @@ def __init__(self, subject_ids=None): else: self._subject_ids = [] - def __len__(self): + def __len__(self) -> int: return len(self._subject_ids) - def __getitem__(self, idx): + def __getitem__(self, idx: int) -> int: return self._subject_ids[idx] - def __bool__(self): + def __bool__(self) -> bool: return bool(self._subject_ids) - def __eq__(self, other): + def __eq__(self, other: Any) -> bool: if isinstance(other, SubjectSet): return self._subject_ids == other._subject_ids return False @classmethod - def from_string(cls, subj_data, subject_index, language): + def from_string( + cls, subj_data: str, subject_index: SubjectIndex, language: str + ) -> SubjectSet: subject_ids = set() for line in subj_data.splitlines(): uri, label = cls._parse_line(line) @@ -251,7 +260,9 @@ def from_string(cls, subj_data, subject_index, language): return cls(subject_ids) @staticmethod - def _parse_line(line): + def _parse_line( + line: str, + ) -> tuple[str | None, str | None]: uri = label = None vals = line.split("\t") for val in vals: @@ -265,7 +276,9 @@ def _parse_line(line): break return uri, label - def as_vector(self, size=None, destination=None): + def as_vector( + self, size: int | None = None, destination: np.ndarray | None = None + ) -> np.ndarray: """Return the hits as a one-dimensional NumPy array in sklearn multilabel indicator format. Use destination array if given (not None), otherwise create and return a new one of the given size.""" diff --git a/annif/corpus/types.py b/annif/corpus/types.py index fb607fdc7..e6cd4b252 100644 --- a/annif/corpus/types.py +++ b/annif/corpus/types.py @@ -1,4 +1,5 @@ """Basic types for document and subject corpora""" +from __future__ import annotations import abc import collections @@ -19,7 +20,7 @@ def documents(self): pass # pragma: no cover @property - def doc_batches(self): + def doc_batches(self) -> collections.abc.Iterator[list[Document]]: """Iterate through the document corpus in batches, yielding lists of Document objects.""" it = iter(self.documents) @@ -29,7 +30,7 @@ def doc_batches(self): return yield docs_batch - def is_empty(self): + def is_empty(self) -> bool: """Check if there are no documents to iterate.""" try: next(self.documents) diff --git a/annif/datadir.py b/annif/datadir.py index 314f685b1..752da32dd 100644 --- a/annif/datadir.py +++ b/annif/datadir.py @@ -1,4 +1,5 @@ """Mixin class for types that need a data directory""" +from __future__ import annotations import os import os.path @@ -7,11 +8,11 @@ class DatadirMixin: """Mixin class for types that need a data directory for storing files""" - def __init__(self, datadir, typename, identifier): + def __init__(self, datadir: str, typename: str, identifier: str) -> None: self._datadir_path = os.path.join(datadir, typename, identifier) @property - def datadir(self): + def datadir(self) -> str: if not os.path.exists(self._datadir_path): try: os.makedirs(self._datadir_path) diff --git a/annif/eval.py b/annif/eval.py index 264bcad43..5ec5bd17a 100644 --- a/annif/eval.py +++ b/annif/eval.py @@ -1,6 +1,8 @@ """Evaluation metrics for Annif""" +from __future__ import annotations import warnings +from typing import TYPE_CHECKING import numpy as np import scipy.sparse @@ -9,26 +11,38 @@ from annif.exception import NotSupportedException from annif.suggestion import SuggestionBatch, filter_suggestion +if TYPE_CHECKING: + from collections.abc import Iterable, Iterator, Sequence + from io import TextIOWrapper -def true_positives(y_true, y_pred): + from click.utils import LazyFile + from scipy.sparse._arrays import csr_array + + from annif.corpus.subject import SubjectIndex, SubjectSet + from annif.suggestion import SubjectSuggestion + + +def true_positives(y_true: csr_array, y_pred: csr_array) -> int: """calculate the number of true positives using bitwise operations, emulating the way sklearn evaluation metric functions work""" return int((y_true.multiply(y_pred)).sum()) -def false_positives(y_true, y_pred): +def false_positives(y_true: csr_array, y_pred: csr_array) -> int: """calculate the number of false positives using bitwise operations, emulating the way sklearn evaluation metric functions work""" return int((y_true < y_pred).sum()) -def false_negatives(y_true, y_pred): +def false_negatives(y_true: csr_array, y_pred: csr_array) -> int: """calculate the number of false negatives using bitwise operations, emulating the way sklearn evaluation metric functions work""" return int((y_true > y_pred).sum()) -def dcg_score(y_true, y_pred, limit=None): +def dcg_score( + y_true: csr_array, y_pred: csr_array, limit: int | None = None +) -> np.float64: """return the discounted cumulative gain (DCG) score for the selected labels vs. relevant labels""" @@ -43,7 +57,7 @@ def dcg_score(y_true, y_pred, limit=None): return (gain / discount).sum() -def ndcg_score(y_true, y_pred, limit=None): +def ndcg_score(y_true: csr_array, y_pred: csr_array, limit: int | None = None) -> float: """return the normalized discounted cumulative gain (nDCG) score for the selected labels vs. relevant labels""" @@ -65,12 +79,18 @@ class EvaluationBatch: for a list of documents of the batch. Final results can be queried using the results() method.""" - def __init__(self, subject_index): + def __init__(self, subject_index: SubjectIndex) -> None: self._subject_index = subject_index self._suggestion_arrays = [] self._gold_subject_arrays = [] - def evaluate_many(self, suggestion_batch, gold_subject_batch): + def evaluate_many( + self, + suggestion_batch: list[list[SubjectSuggestion]] + | SuggestionBatch + | list[Iterator], + gold_subject_batch: Sequence[SubjectSet], + ) -> None: if not isinstance(suggestion_batch, SuggestionBatch): suggestion_batch = SuggestionBatch.from_sequence( suggestion_batch, self._subject_index @@ -86,7 +106,12 @@ def evaluate_many(self, suggestion_batch, gold_subject_batch): ar[idx, subject_id] = True self._gold_subject_arrays.append(ar.tocsr()) - def _evaluate_samples(self, y_true, y_pred, metrics=[]): + def _evaluate_samples( + self, + y_true: csr_array, + y_pred: csr_array, + metrics: Iterable[str] = [], + ) -> dict[str, float]: y_pred_binary = y_pred > 0.0 # define the available metrics as lazy lambda functions @@ -156,7 +181,9 @@ def _evaluate_samples(self, y_true, y_pred, metrics=[]): return {metric: all_metrics[metric]() for metric in metrics} - def _result_per_subject_header(self, results_file): + def _result_per_subject_header( + self, results_file: LazyFile | TextIOWrapper + ) -> None: print( "\t".join( [ @@ -174,11 +201,19 @@ def _result_per_subject_header(self, results_file): file=results_file, ) - def _result_per_subject_body(self, zipped_results, results_file): + def _result_per_subject_body( + self, zipped_results: zip, results_file: LazyFile | TextIOWrapper + ) -> None: for row in zipped_results: print("\t".join((str(e) for e in row)), file=results_file) - def output_result_per_subject(self, y_true, y_pred, results_file, language): + def output_result_per_subject( + self, + y_true: csr_array, + y_pred: csr_array, + results_file: TextIOWrapper | LazyFile, + language: str, + ) -> None: """Write results per subject (non-aggregated) to outputfile results_file, using labels in the given language""" @@ -208,7 +243,12 @@ def output_result_per_subject(self, y_true, y_pred, results_file, language): self._result_per_subject_header(results_file) self._result_per_subject_body(zipped, results_file) - def results(self, metrics=[], results_file=None, language=None): + def results( + self, + metrics: Iterable[str] = [], + results_file: LazyFile | TextIOWrapper | None = None, + language: str | None = None, + ) -> dict[str, float]: """evaluate a set of selected subjects against a gold standard using different metrics. If metrics is empty, use all available metrics. If results_file (file object) given, write results per subject to it diff --git a/annif/exception.py b/annif/exception.py index efc2d4a3e..b4b9c6552 100644 --- a/annif/exception.py +++ b/annif/exception.py @@ -1,5 +1,5 @@ """Custom exceptions used by Annif""" - +from __future__ import annotations from click import ClickException @@ -9,7 +9,12 @@ class AnnifException(ClickException): that the CLI can automatically handle exceptions. This exception cannot be instantiated directly - subclasses should be used instead.""" - def __init__(self, message, project_id=None, backend_id=None): + def __init__( + self, + message: str, + project_id: str | None = None, + backend_id: str | None = None, + ) -> None: super().__init__(message) self.project_id = project_id self.backend_id = backend_id @@ -20,7 +25,7 @@ def __init__(self, message, project_id=None, backend_id=None): # subclasses should set this to a descriptive prefix prefix = None - def format_message(self): + def format_message(self) -> str: if self.project_id is not None: return "{} project '{}': {}".format( self.prefix, self.project_id, self.message diff --git a/annif/lexical/mllm.py b/annif/lexical/mllm.py index 8c9b59f79..37564a76d 100644 --- a/annif/lexical/mllm.py +++ b/annif/lexical/mllm.py @@ -1,9 +1,11 @@ """MLLM (Maui-like Lexical Matchin) model for Annif""" +from __future__ import annotations import collections import math from enum import IntEnum from statistics import mean +from typing import TYPE_CHECKING, Any import joblib import numpy as np @@ -22,6 +24,16 @@ make_relation_matrix, ) +if TYPE_CHECKING: + from collections import defaultdict + + from rdflib.graph import Graph + from rdflib.term import URIRef + + from annif.analyzer import Analyzer + from annif.corpus.document import DocumentCorpus + from annif.vocab import AnnifVocabulary + Term = collections.namedtuple("Term", "subject_id label is_pref") Match = collections.namedtuple("Match", "subject_id is_pref n_tokens pos ambiguity") @@ -45,7 +57,7 @@ ) -def conflate_matches(matches, doc_length): +def conflate_matches(matches: list[Match], doc_length: int) -> list[Candidate]: subj_matches = collections.defaultdict(list) for match in matches: subj_matches[match.subject_id].append(match) @@ -65,7 +77,12 @@ def conflate_matches(matches, doc_length): ] -def generate_candidates(text, analyzer, vectorizer, index): +def generate_candidates( + text: str, + analyzer: Analyzer, + vectorizer: CountVectorizer, + index: TokenSetIndex, +) -> list[Candidate]: sentences = analyzer.tokenize_sentences(text) sent_tokens = vectorizer.transform(sentences) matches = [] @@ -86,7 +103,9 @@ def generate_candidates(text, analyzer, vectorizer, index): return conflate_matches(matches, len(sentences)) -def candidates_to_features(candidates, mdata): +def candidates_to_features( + candidates: list[Candidate], mdata: "ModelData" +) -> np.ndarray: """Convert a list of Candidates to a NumPy feature matrix""" matrix = np.zeros((len(candidates), len(Feature)), dtype=np.float32) @@ -133,11 +152,11 @@ def candidates_to_features(cls, candidates): class MLLMModel: """Maui-like Lexical Matching model""" - def generate_candidates(self, text, analyzer): + def generate_candidates(self, text: str, analyzer: Analyzer) -> list[Candidate]: return generate_candidates(text, analyzer, self._vectorizer, self._index) @property - def _model_data(self): + def _model_data(self) -> ModelData: return ModelData( broader=self._broader_matrix, narrower=self._narrower_matrix, @@ -148,11 +167,11 @@ def _model_data(self): idf=self._idf, ) - def _candidates_to_features(self, candidates): + def _candidates_to_features(self, candidates: list[Candidate]) -> np.ndarray: return candidates_to_features(candidates, self._model_data) @staticmethod - def _get_label_props(params): + def _get_label_props(params: dict[str, Any]) -> tuple[list[URIRef], list[URIRef]]: pref_label_props = [SKOS.prefLabel] if annif.util.boolean(params["use_hidden_labels"]): @@ -162,7 +181,12 @@ def _get_label_props(params): return (pref_label_props, nonpref_label_props) - def _prepare_terms(self, graph, vocab, params): + def _prepare_terms( + self, + graph: Graph, + vocab: AnnifVocabulary, + params: dict[str, Any], + ) -> tuple[list[Term], list[int]]: pref_label_props, nonpref_label_props = self._get_label_props(params) terms = [] @@ -182,13 +206,18 @@ def _prepare_terms(self, graph, vocab, params): return (terms, subject_ids) - def _prepare_relations(self, graph, vocab): + def _prepare_relations(self, graph: Graph, vocab: AnnifVocabulary) -> None: self._broader_matrix = make_relation_matrix(graph, vocab, SKOS.broader) self._narrower_matrix = make_relation_matrix(graph, vocab, SKOS.narrower) self._related_matrix = make_relation_matrix(graph, vocab, SKOS.related) self._collection_matrix = make_collection_matrix(graph, vocab) - def _prepare_train_index(self, vocab, analyzer, params): + def _prepare_train_index( + self, + vocab: AnnifVocabulary, + analyzer: Analyzer, + params: dict[str, Any], + ) -> list[int]: graph = vocab.as_graph() terms, subject_ids = self._prepare_terms(graph, vocab, params) self._prepare_relations(graph, vocab) @@ -211,7 +240,9 @@ def _prepare_train_index(self, vocab, analyzer, params): return subject_ids - def _prepare_train_data(self, corpus, analyzer, n_jobs): + def _prepare_train_data( + self, corpus: DocumentCorpus, analyzer: Analyzer, n_jobs: int + ) -> tuple[list[list[Candidate]], list[bool]]: # frequency of subjects (by id) in the generated candidates self._doc_freq = collections.Counter() # frequency of manually assigned subjects ("domain keyphraseness") @@ -241,14 +272,18 @@ def _prepare_train_data(self, corpus, analyzer, n_jobs): return (train_x, train_y) - def _calculate_idf(self, subject_ids, doc_count): + def _calculate_idf( + self, subject_ids: list[int], doc_count: int + ) -> defaultdict[int, float]: idf = collections.defaultdict(float) for subj_id in subject_ids: idf[subj_id] = math.log((doc_count + 1) / (self._doc_freq[subj_id] + 1)) + 1 return idf - def _prepare_features(self, train_x, n_jobs): + def _prepare_features( + self, train_x: list[list[Candidate]], n_jobs: int + ) -> list[np.ndarray]: fc_args = {"mdata": self._model_data} jobs, pool_class = annif.parallel.get_pool(n_jobs) @@ -261,7 +296,14 @@ def _prepare_features(self, train_x, n_jobs): return features - def prepare_train(self, corpus, vocab, analyzer, params, n_jobs): + def prepare_train( + self, + corpus: DocumentCorpus, + vocab: AnnifVocabulary, + analyzer: Analyzer, + params: dict[str, Any], + n_jobs: int, + ) -> tuple[np.ndarray, np.ndarray]: # create an index from the vocabulary terms subject_ids = self._prepare_train_index(vocab, analyzer, params) @@ -276,7 +318,7 @@ def prepare_train(self, corpus, vocab, analyzer, params, n_jobs): return (np.vstack(features), np.array(train_y)) - def _create_classifier(self, params): + def _create_classifier(self, params: dict[str, Any]) -> BaggingClassifier: return BaggingClassifier( DecisionTreeClassifier( min_samples_leaf=int(params["min_samples_leaf"]), @@ -285,7 +327,12 @@ def _create_classifier(self, params): max_samples=float(params["max_samples"]), ) - def train(self, train_x, train_y, params): + def train( + self, + train_x: np.ndarray | list[tuple[int, int]], + train_y: list[bool] | np.ndarray, + params: dict[str, Any], + ) -> None: # fit the model on the training corpus self._classifier = self._create_classifier(params) self._classifier.fit(train_x, train_y) @@ -298,20 +345,22 @@ def train(self, train_x, train_y, params): + "data matches your vocabulary." ) - def _prediction_to_list(self, scores, candidates): + def _prediction_to_list( + self, scores: np.ndarray, candidates: list[Candidate] + ) -> list[tuple[np.float64, int]]: subj_scores = [(score[1], c.subject_id) for score, c in zip(scores, candidates)] return sorted(subj_scores, reverse=True) - def predict(self, candidates): + def predict(self, candidates: list[Candidate]) -> list[tuple[np.float64, int]]: if not candidates: return [] features = self._candidates_to_features(candidates) scores = self._classifier.predict_proba(features) return self._prediction_to_list(scores, candidates) - def save(self, filename): + def save(self, filename: str) -> list[str]: return joblib.dump(self, filename) @staticmethod - def load(filename): + def load(filename: str) -> MLLMModel: return joblib.load(filename) diff --git a/annif/lexical/tokenset.py b/annif/lexical/tokenset.py index ebd23e33f..07c15705d 100644 --- a/annif/lexical/tokenset.py +++ b/annif/lexical/tokenset.py @@ -1,6 +1,11 @@ """Index for fast matching of token sets.""" +from __future__ import annotations import collections +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from numpy import ndarray class TokenSet: @@ -8,19 +13,24 @@ class TokenSet: be matched with another set of tokens. A TokenSet can optionally be associated with a subject from the vocabulary.""" - def __init__(self, tokens, subject_id=None, is_pref=False): + def __init__( + self, + tokens: ndarray, + subject_id: int | None = None, + is_pref: bool = False, + ) -> None: self._tokens = set(tokens) self.key = tokens[0] if len(tokens) else None self.subject_id = subject_id self.is_pref = is_pref - def __len__(self): + def __len__(self) -> int: return len(self._tokens) def __iter__(self): return iter(self._tokens) - def contains(self, other): + def contains(self, other: TokenSet) -> bool: """Returns True iff the tokens in the other TokenSet are all included within this TokenSet.""" @@ -30,18 +40,18 @@ def contains(self, other): class TokenSetIndex: """A searchable index of TokenSets (representing vocabulary terms)""" - def __init__(self): + def __init__(self) -> None: self._index = collections.defaultdict(set) - def __len__(self): + def __len__(self) -> int: return len(self._index) - def add(self, tset): + def add(self, tset: TokenSet) -> None: """Add a TokenSet into this index""" if tset.key is not None: self._index[tset.key].add(tset) - def _find_subj_tsets(self, tset): + def _find_subj_tsets(self, tset: TokenSet) -> dict[int | None, TokenSet]: """return a dict (subject_id : TokenSet) of matches contained in the given TokenSet""" @@ -75,7 +85,7 @@ def _find_subj_ambiguity(self, tsets): return subj_ambiguity - def search(self, tset): + def search(self, tset: TokenSet) -> list[tuple[TokenSet, int]]: """Return the TokenSets that are contained in the given TokenSet. The matches are returned as a list of (TokenSet, ambiguity) pairs where ambiguity is an integer indicating the number of other TokenSets diff --git a/annif/lexical/util.py b/annif/lexical/util.py index a6d9931c7..28d21a141 100644 --- a/annif/lexical/util.py +++ b/annif/lexical/util.py @@ -1,13 +1,22 @@ """Utility methods for lexical algorithms""" +from __future__ import annotations import collections +from typing import TYPE_CHECKING from rdflib import URIRef from rdflib.namespace import SKOS from scipy.sparse import csc_matrix, lil_matrix +if TYPE_CHECKING: + from rdflib.graph import Graph -def get_subject_labels(graph, uri, properties, language): + from annif.vocab import AnnifVocabulary + + +def get_subject_labels( + graph: Graph, uri: str, properties: list[URIRef], language: str +) -> list[str]: return [ str(label) for prop in properties @@ -16,7 +25,9 @@ def get_subject_labels(graph, uri, properties, language): ] -def make_relation_matrix(graph, vocab, property): +def make_relation_matrix( + graph: Graph, vocab: AnnifVocabulary, property: URIRef +) -> csc_matrix: n_subj = len(vocab.subjects) matrix = lil_matrix((n_subj, n_subj), dtype=bool) @@ -29,7 +40,7 @@ def make_relation_matrix(graph, vocab, property): return csc_matrix(matrix) -def make_collection_matrix(graph, vocab): +def make_collection_matrix(graph: Graph, vocab: AnnifVocabulary) -> csc_matrix: # make an index with all collection members c_members = collections.defaultdict(list) for coll, member in graph.subject_objects(SKOS.member): diff --git a/annif/openapi/validation.py b/annif/openapi/validation.py index 3799a6126..7f920b35d 100644 --- a/annif/openapi/validation.py +++ b/annif/openapi/validation.py @@ -1,4 +1,5 @@ """Custom validator for the Annif API.""" +from __future__ import annotations import logging @@ -14,10 +15,14 @@ class CustomRequestBodyValidator(decorators.validation.RequestBodyValidator): """Custom request body validator that overrides the default error message for the 'maxItems' validator for the 'documents' property.""" - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) - def validate_schema(self, data, url): + def validate_schema( + self, + data: list | dict, + url: str, + ) -> None: """Validate the request body against the schema.""" if self.is_null_value_valid and is_null(data): diff --git a/annif/parallel.py b/annif/parallel.py index 3162a47c5..c6b293f8e 100644 --- a/annif/parallel.py +++ b/annif/parallel.py @@ -1,8 +1,19 @@ """Parallel processing functionality for Annif""" - +from __future__ import annotations import multiprocessing import multiprocessing.dummy +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from collections import defaultdict + from collections.abc import Iterator + from typing import Callable + + from annif.corpus import Document, SubjectSet + from annif.registry import AnnifRegistry + from annif.suggestion import SuggestionBatch, SuggestionResult + # Start method for processes created by the multiprocessing module. # A value of None means using the platform-specific default. @@ -22,7 +33,7 @@ class BaseWorker: args = None @classmethod - def init(cls, args): + def init(cls, args) -> None: cls.args = args # pragma: no cover @@ -31,14 +42,21 @@ class ProjectSuggestMap: provide a mapping method that converts Document objects to suggestions. Intended to be used with the multiprocessing module.""" - def __init__(self, registry, project_ids, backend_params, limit, threshold): + def __init__( + self, + registry: AnnifRegistry, + project_ids: list[str], + backend_params: defaultdict[str, Any] | None, + limit: int | None, + threshold: float, + ) -> None: self.registry = registry self.project_ids = project_ids self.backend_params = backend_params self.limit = limit self.threshold = threshold - def suggest(self, doc): + def suggest(self, doc: Document) -> tuple[dict[str, SuggestionResult], SubjectSet]: filtered_hits = {} for project_id in self.project_ids: project = self.registry.get_project(project_id) @@ -46,7 +64,9 @@ def suggest(self, doc): filtered_hits[project_id] = batch.filter(self.limit, self.threshold)[0] return (filtered_hits, doc.subject_set) - def suggest_batch(self, batch): + def suggest_batch( + self, batch + ) -> tuple[dict[str, SuggestionBatch], Iterator[SubjectSet]]: filtered_hit_sets = {} texts, subject_sets = zip(*[(doc.text, doc.subject_set) for doc in batch]) @@ -57,19 +77,19 @@ def suggest_batch(self, batch): return (filtered_hit_sets, subject_sets) -def get_pool(n_jobs): - """return a suitable multiprocessing pool class, and the correct jobs - argument for its constructor, for the given amount of parallel jobs""" +def get_pool(n_jobs: int) -> tuple[int | None, Callable]: + """return a suitable constructor for multiprocessing pool class, and the correct + jobs argument for it, for the given amount of parallel jobs""" ctx = multiprocessing.get_context(MP_START_METHOD) if n_jobs < 1: n_jobs = None - pool_class = ctx.Pool + pool_constructor: Callable = ctx.Pool elif n_jobs == 1: # use the dummy wrapper around threading to avoid subprocess overhead - pool_class = multiprocessing.dummy.Pool + pool_constructor = multiprocessing.dummy.Pool else: - pool_class = ctx.Pool + pool_constructor = ctx.Pool - return n_jobs, pool_class + return n_jobs, pool_constructor diff --git a/annif/project.py b/annif/project.py index b94eaf58e..83f7eda7c 100644 --- a/annif/project.py +++ b/annif/project.py @@ -1,8 +1,10 @@ """Project management functionality for Annif""" +from __future__ import annotations import enum import os.path from shutil import rmtree +from typing import TYPE_CHECKING import annif import annif.analyzer @@ -17,6 +19,22 @@ NotSupportedException, ) +if TYPE_CHECKING: + from collections import defaultdict + from configparser import SectionProxy + from datetime import datetime + + from click.utils import LazyFile + + from annif.analyzer import Analyzer + from annif.backend import AnnifBackend + from annif.backend.hyperopt import HPRecommendation + from annif.corpus.document import DocumentCorpus + from annif.corpus.subject import SubjectIndex + from annif.registry import AnnifRegistry + from annif.transform.transform import TransformChain + from annif.vocab import AnnifVocabulary + logger = annif.logger @@ -42,7 +60,13 @@ class AnnifProject(DatadirMixin): # default values for configuration settings DEFAULT_ACCESS = "public" - def __init__(self, project_id, config, datadir, registry): + def __init__( + self, + project_id: str, + config: dict[str, str] | SectionProxy, + datadir: str, + registry: AnnifRegistry, + ) -> None: DatadirMixin.__init__(self, datadir, "projects", project_id) self.project_id = project_id self.name = config.get("name", project_id) @@ -55,7 +79,7 @@ def __init__(self, project_id, config, datadir, registry): self.registry = registry self._init_access() - def _init_access(self): + def _init_access(self) -> None: access = self.config.get("access", self.DEFAULT_ACCESS) try: self.access = getattr(Access, access) @@ -65,7 +89,7 @@ def _init_access(self): project_id=self.project_id, ) - def _initialize_analyzer(self): + def _initialize_analyzer(self) -> None: if not self.analyzer_spec: return # not configured, so assume it's not needed analyzer = self.analyzer @@ -73,7 +97,7 @@ def _initialize_analyzer(self): "Project '%s': initialized analyzer: %s", self.project_id, str(analyzer) ) - def _initialize_subjects(self): + def _initialize_subjects(self) -> None: try: subjects = self.subjects logger.debug( @@ -82,7 +106,7 @@ def _initialize_subjects(self): except AnnifException as err: logger.warning(err.format_message()) - def _initialize_backend(self, parallel): + def _initialize_backend(self, parallel: bool) -> None: logger.debug("Project '%s': initializing backend", self.project_id) try: if not self.backend: @@ -92,7 +116,7 @@ def _initialize_backend(self, parallel): except AnnifException as err: logger.warning(err.format_message()) - def initialize(self, parallel=False): + def initialize(self, parallel: bool = False) -> None: """Initialize this project and its backend so that they are ready to be used. If parallel is True, expect that the project will be used for parallel processing.""" @@ -108,14 +132,18 @@ def initialize(self, parallel=False): self.initialized = True - def _suggest_with_backend(self, texts, backend_params): + def _suggest_with_backend( + self, + texts: list[str], + backend_params: defaultdict[str, dict] | None, + ) -> annif.suggestion.SuggestionBatch: if backend_params is None: backend_params = {} beparams = backend_params.get(self.backend.backend_id, {}) return self.backend.suggest(texts, beparams) @property - def analyzer(self): + def analyzer(self) -> Analyzer: if self._analyzer is None: if self.analyzer_spec: self._analyzer = annif.analyzer.get_analyzer(self.analyzer_spec) @@ -126,7 +154,7 @@ def analyzer(self): return self._analyzer @property - def transform(self): + def transform(self) -> TransformChain: if self._transform is None: self._transform = annif.transform.get_transform( self.transform_spec, project=self @@ -134,7 +162,7 @@ def transform(self): return self._transform @property - def backend(self): + def backend(self) -> AnnifBackend | None: if self._backend is None: if "backend" not in self.config: raise ConfigurationException( @@ -154,7 +182,7 @@ def backend(self): ) return self._backend - def _initialize_vocab(self): + def _initialize_vocab(self) -> None: if self.vocab_spec is None: raise ConfigurationException( "vocab setting is missing", project_id=self.project_id @@ -164,22 +192,22 @@ def _initialize_vocab(self): ) @property - def vocab(self): + def vocab(self) -> AnnifVocabulary: if self._vocab is None: self._initialize_vocab() return self._vocab @property - def vocab_lang(self): + def vocab_lang(self) -> str: if self._vocab_lang is None: self._initialize_vocab() return self._vocab_lang @property - def subjects(self): + def subjects(self) -> SubjectIndex: return self.vocab.subjects - def _get_info(self, key): + def _get_info(self, key: str) -> bool | datetime | None: try: be = self.backend if be is not None: @@ -189,14 +217,18 @@ def _get_info(self, key): return None @property - def is_trained(self): + def is_trained(self) -> bool | None: return self._get_info("is_trained") @property - def modification_time(self): + def modification_time(self) -> datetime | None: return self._get_info("modification_time") - def suggest_corpus(self, corpus, backend_params=None): + def suggest_corpus( + self, + corpus: DocumentCorpus, + backend_params: defaultdict[str, dict] | None = None, + ) -> annif.suggestion.SuggestionResults: """Suggest subjects for the given documents corpus in batches of documents.""" suggestions = ( self.suggest([doc.text for doc in doc_batch], backend_params) @@ -206,7 +238,11 @@ def suggest_corpus(self, corpus, backend_params=None): return annif.suggestion.SuggestionResults(suggestions) - def suggest(self, texts, backend_params=None): + def suggest( + self, + texts: list[str], + backend_params: defaultdict[str, dict] | None = None, + ) -> annif.suggestion.SuggestionBatch: """Suggest subjects for the given documents batch.""" if not self.is_trained: if self.is_trained is None: @@ -216,7 +252,12 @@ def suggest(self, texts, backend_params=None): texts = [self.transform.transform_text(text) for text in texts] return self._suggest_with_backend(texts, backend_params) - def train(self, corpus, backend_params=None, jobs=0): + def train( + self, + corpus: DocumentCorpus, + backend_params: defaultdict[str, dict] | None = None, + jobs: int = 0, + ) -> None: """train the project using documents from a metadata source""" if corpus != "cached": corpus = self.transform.transform_corpus(corpus) @@ -225,7 +266,11 @@ def train(self, corpus, backend_params=None, jobs=0): beparams = backend_params.get(self.backend.backend_id, {}) self.backend.train(corpus, beparams, jobs) - def learn(self, corpus, backend_params=None): + def learn( + self, + corpus: DocumentCorpus, + backend_params: defaultdict[str, dict] | None = None, + ) -> None: """further train the project using documents from a metadata source""" if backend_params is None: backend_params = {} @@ -238,7 +283,14 @@ def learn(self, corpus, backend_params=None): "Learning not supported by backend", project_id=self.project_id ) - def hyperopt(self, corpus, trials, jobs, metric, results_file): + def hyperopt( + self, + corpus: DocumentCorpus, + trials: int, + jobs: int, + metric: str, + results_file: LazyFile | None, + ) -> HPRecommendation: """optimize the hyperparameters of the project using a validation corpus against a given metric""" if isinstance(self.backend, annif.backend.hyperopt.AnnifHyperoptBackend): @@ -250,7 +302,7 @@ def hyperopt(self, corpus, trials, jobs, metric, results_file): project_id=self.project_id, ) - def dump(self): + def dump(self) -> dict[str, str | dict | bool | datetime | None]: """return this project as a dict""" return { "project_id": self.project_id, @@ -261,7 +313,7 @@ def dump(self): "modification_time": self.modification_time, } - def remove_model_data(self): + def remove_model_data(self) -> None: """remove the data of this project""" datadir_path = self._datadir_path if os.path.isdir(datadir_path): diff --git a/annif/registry.py b/annif/registry.py index e0368b1e3..81bd541ef 100644 --- a/annif/registry.py +++ b/annif/registry.py @@ -1,9 +1,9 @@ """Registry that keeps track of Annif projects""" +from __future__ import annotations -import collections import re -from flask import current_app +from flask import Flask, current_app import annif from annif.config import parse_config @@ -28,7 +28,9 @@ class AnnifRegistry: _projects = {} _vocabs = {} - def __init__(self, projects_config_path, datadir, init_projects): + def __init__( + self, projects_config_path: str, datadir: str, init_projects: bool + ) -> None: self._rid = id(self) self._projects_config_path = projects_config_path self._datadir = datadir @@ -37,13 +39,13 @@ def __init__(self, projects_config_path, datadir, init_projects): for project in self._projects[self._rid].values(): project.initialize() - def _init_vars(self): + def _init_vars(self) -> None: # initialize the static variables, if necessary if self._rid not in self._projects: self._projects[self._rid] = self._create_projects() self._vocabs[self._rid] = {} - def _create_projects(self): + def _create_projects(self) -> dict: # parse the configuration config = parse_config(self._projects_config_path) @@ -52,14 +54,16 @@ def _create_projects(self): return {} # create AnnifProject objects from the configuration file - projects = collections.OrderedDict() + projects = dict() for project_id in config.project_ids: projects[project_id] = AnnifProject( project_id, config[project_id], self._datadir, self ) return projects - def get_projects(self, min_access=Access.private): + def get_projects( + self, min_access: Access = Access.private + ) -> dict[str, AnnifProject]: """Return the available projects as a dict of project_id -> AnnifProject. The min_access parameter may be used to set the minimum access level required for the returned projects.""" @@ -71,7 +75,9 @@ def get_projects(self, min_access=Access.private): if project.access >= min_access } - def get_project(self, project_id, min_access=Access.private): + def get_project( + self, project_id: str, min_access: Access = Access.private + ) -> AnnifProject: """return the definition of a single Project by project_id""" projects = self.get_projects(min_access) @@ -80,7 +86,9 @@ def get_project(self, project_id, min_access=Access.private): except KeyError: raise ValueError("No such project {}".format(project_id)) - def get_vocab(self, vocab_spec, default_language): + def get_vocab( + self, vocab_spec: str, default_language: str | None + ) -> tuple[AnnifVocabulary, None] | tuple[AnnifVocabulary, str]: """Return an (AnnifVocabulary, language) pair corresponding to the vocab_spec. If no language information is specified, use the given default language.""" @@ -101,14 +109,14 @@ def get_vocab(self, vocab_spec, default_language): return self._vocabs[self._rid][vocab_key], language -def initialize_projects(app): +def initialize_projects(app: Flask) -> None: projects_config_path = app.config["PROJECTS_CONFIG_PATH"] datadir = app.config["DATADIR"] init_projects = app.config["INITIALIZE_PROJECTS"] app.annif_registry = AnnifRegistry(projects_config_path, datadir, init_projects) -def get_projects(min_access=Access.private): +def get_projects(min_access: Access = Access.private) -> dict[str, AnnifProject]: """Return the available projects as a dict of project_id -> AnnifProject. The min_access parameter may be used to set the minimum access level required for the returned projects.""" @@ -118,7 +126,7 @@ def get_projects(min_access=Access.private): return current_app.annif_registry.get_projects(min_access) -def get_project(project_id, min_access=Access.private): +def get_project(project_id: str, min_access: Access = Access.private) -> AnnifProject: """return the definition of a single Project by project_id""" projects = get_projects(min_access) @@ -128,7 +136,7 @@ def get_project(project_id, min_access=Access.private): raise ValueError(f"No such project '{project_id}'") -def get_vocabs(min_access=Access.private): +def get_vocabs(min_access: Access = Access.private) -> dict[str, AnnifVocabulary]: """Return the available vocabularies as a dict of vocab_id -> AnnifVocabulary. The min_access parameter may be used to set the minimum access level required for the returned vocabularies.""" @@ -143,7 +151,7 @@ def get_vocabs(min_access=Access.private): return vocabs -def get_vocab(vocab_id, min_access=Access.private): +def get_vocab(vocab_id: str, min_access: Access = Access.private) -> AnnifVocabulary: """return a single AnnifVocabulary by vocabulary id""" vocabs = get_vocabs(min_access) diff --git a/annif/rest.py b/annif/rest.py index 0b3b87efe..f848117c8 100644 --- a/annif/rest.py +++ b/annif/rest.py @@ -1,7 +1,9 @@ """Definitions for REST API operations. These are wired via Connexion to methods defined in the OpenAPI specification.""" +from __future__ import annotations import importlib +from typing import TYPE_CHECKING, Any import connexion @@ -10,8 +12,16 @@ from annif.exception import AnnifException from annif.project import Access +if TYPE_CHECKING: + from datetime import datetime -def project_not_found_error(project_id): + from connexion.lifecycle import ConnexionResponse + + from annif.corpus.subject import SubjectIndex + from annif.suggestion import SubjectSuggestion, SuggestionResults + + +def project_not_found_error(project_id: str) -> ConnexionResponse: """return a Connexion error object when a project is not found""" return connexion.problem( @@ -21,7 +31,9 @@ def project_not_found_error(project_id): ) -def server_error(err): +def server_error( + err: AnnifException, +) -> ConnexionResponse: """return a Connexion error object when there is a server error (project or backend problem)""" @@ -30,13 +42,13 @@ def server_error(err): ) -def show_info(): +def show_info() -> dict[str, str]: """return version of annif and a title for the api according to OpenAPI spec""" return {"title": "Annif REST API", "version": importlib.metadata.version("annif")} -def language_not_supported_error(lang): +def language_not_supported_error(lang: str) -> ConnexionResponse: """return a Connexion error object when attempting to use unsupported language""" return connexion.problem( @@ -46,7 +58,7 @@ def language_not_supported_error(lang): ) -def list_projects(): +def list_projects() -> dict[str, list[dict[str, str | dict | bool | datetime | None]]]: """return a dict with projects formatted according to OpenAPI spec""" return { @@ -57,7 +69,9 @@ def list_projects(): } -def show_project(project_id): +def show_project( + project_id: str, +) -> dict | ConnexionResponse: """return a single project formatted according to OpenAPI spec""" try: @@ -67,7 +81,9 @@ def show_project(project_id): return project.dump() -def _suggestion_to_dict(suggestion, subject_index, language): +def _suggestion_to_dict( + suggestion: SubjectSuggestion, subject_index: SubjectIndex, language: str +) -> dict[str, str | float | None]: subject = subject_index[suggestion.subject_id] return { "uri": subject.uri, @@ -77,21 +93,25 @@ def _suggestion_to_dict(suggestion, subject_index, language): } -def _hit_sets_to_list(hit_sets, subjects, lang): +def _hit_sets_to_list( + hit_sets: SuggestionResults, subjects: SubjectIndex, lang: str +) -> list[dict[str, list]]: return [ {"results": [_suggestion_to_dict(hit, subjects, lang) for hit in hits]} for hits in hit_sets ] -def _is_error(result): +def _is_error(result: list[dict[str, list]] | ConnexionResponse) -> bool: return ( isinstance(result, connexion.lifecycle.ConnexionResponse) and result.status_code >= 400 ) -def suggest(project_id, body): +def suggest( + project_id: str, body: dict[str, Any] +) -> dict[str, list] | ConnexionResponse: """suggest subjects for the given text and return a dict with results formatted according to OpenAPI spec""" @@ -106,7 +126,11 @@ def suggest(project_id, body): return result[0] -def suggest_batch(project_id, body, **query_parameters): +def suggest_batch( + project_id: str, + body: dict[str, list], + **query_parameters, +) -> list[dict[str, Any]] | ConnexionResponse: """suggest subjects for the given documents and return a list of dicts with results formatted according to OpenAPI spec""" @@ -120,7 +144,11 @@ def suggest_batch(project_id, body, **query_parameters): return result -def _suggest(project_id, documents, parameters): +def _suggest( + project_id: str, + documents: list[dict[str, str]], + parameters: dict[str, Any], +) -> list[dict[str, list]] | ConnexionResponse: corpus = _documents_to_corpus(documents, subject_index=None) try: project = annif.registry.get_project(project_id, min_access=Access.hidden) @@ -146,7 +174,10 @@ def _suggest(project_id, documents, parameters): return _hit_sets_to_list(hit_sets, project.subjects, lang) -def _documents_to_corpus(documents, subject_index): +def _documents_to_corpus( + documents: list[dict[str, Any]], + subject_index: SubjectIndex | None, +) -> annif.corpus.document.DocumentList: if subject_index is not None: corpus = [ Document( @@ -165,7 +196,10 @@ def _documents_to_corpus(documents, subject_index): return DocumentList(corpus) -def learn(project_id, body): +def learn( + project_id: str, + body: list[dict[str, Any]], +) -> ConnexionResponse | tuple[None, int]: """learn from documents and return an empty 204 response if succesful""" try: diff --git a/annif/suggestion.py b/annif/suggestion.py index 9e967d4bf..ddf3ec2e5 100644 --- a/annif/suggestion.py +++ b/annif/suggestion.py @@ -1,15 +1,22 @@ """Representing suggested subjects.""" +from __future__ import annotations import collections import itertools +from typing import TYPE_CHECKING import numpy as np from scipy.sparse import csr_array +if TYPE_CHECKING: + from collections.abc import Iterable, Iterator, Sequence + + from annif.corpus.subject import SubjectIndex + SubjectSuggestion = collections.namedtuple("SubjectSuggestion", "subject_id score") -def vector_to_suggestions(vector, limit): +def vector_to_suggestions(vector: np.ndarray, limit: int) -> Iterator: limit = min(len(vector), limit) topk_idx = np.argpartition(vector, -limit)[-limit:] return ( @@ -17,7 +24,11 @@ def vector_to_suggestions(vector, limit): ) -def filter_suggestion(preds, limit=None, threshold=0.0): +def filter_suggestion( + preds: csr_array, + limit: int | None = None, + threshold: float = 0.0, +) -> csr_array: """filter a 2D sparse suggestion array (csr_array), retaining only the top K suggestions with a score above or equal to the threshold for each individual prediction; the rest will be left as zeros""" @@ -43,7 +54,7 @@ def filter_suggestion(preds, limit=None, threshold=0.0): class SuggestionResult: """Suggestions for a single document, backed by a row of a sparse array.""" - def __init__(self, array, idx): + def __init__(self, array: csr_array, idx: int) -> None: self._array = array self._idx = idx @@ -57,10 +68,10 @@ def __iter__(self): sorted(suggestions, key=lambda suggestion: suggestion.score, reverse=True) ) - def as_vector(self): + def as_vector(self) -> np.ndarray: return self._array[[self._idx], :].toarray()[0] - def __len__(self): + def __len__(self) -> int: _, cols = self._array[[self._idx], :].nonzero() return len(cols) @@ -68,13 +79,18 @@ def __len__(self): class SuggestionBatch: """Subject suggestions for a batch of documents.""" - def __init__(self, array): + def __init__(self, array: csr_array) -> None: """Create a new SuggestionBatch from a csr_array""" assert isinstance(array, csr_array) self.array = array @classmethod - def from_sequence(cls, suggestion_results, subject_index, limit=None): + def from_sequence( + cls, + suggestion_results: Sequence[Iterable[SubjectSuggestion]], + subject_index: SubjectIndex, + limit: int | None = None, + ) -> SuggestionBatch: """Create a new SuggestionBatch from a sequence where each item is a sequence of SubjectSuggestion objects.""" @@ -96,7 +112,9 @@ def from_sequence(cls, suggestion_results, subject_index, limit=None): ) @classmethod - def from_averaged(cls, batches, weights): + def from_averaged( + cls, batches: list[SuggestionBatch], weights: list[float] + ) -> SuggestionBatch: """Create a new SuggestionBatch where the subject scores are the weighted average of scores in several SuggestionBatches""" @@ -105,31 +123,35 @@ def from_averaged(cls, batches, weights): ) / sum(weights) return SuggestionBatch(avg_array) - def filter(self, limit=None, threshold=0.0): + def filter( + self, limit: int | None = None, threshold: float = 0.0 + ) -> SuggestionBatch: """Return a subset of the hits, filtered by the given limit and score threshold, as another SuggestionBatch object.""" return SuggestionBatch(filter_suggestion(self.array, limit, threshold)) - def __getitem__(self, idx): + def __getitem__(self, idx: int) -> SuggestionResult: if idx < 0 or idx >= len(self): raise IndexError return SuggestionResult(self.array, idx) - def __len__(self): + def __len__(self) -> int: return self.array.shape[0] class SuggestionResults: """Subject suggestions for a potentially very large number of documents.""" - def __init__(self, batches): + def __init__(self, batches: Iterable[SuggestionBatch]) -> None: """Initialize a new SuggestionResults from an iterable that provides SuggestionBatch objects.""" self.batches = batches - def filter(self, limit=None, threshold=0.0): + def filter( + self, limit: int | None = None, threshold: float = 0.0 + ) -> SuggestionResults: """Return a view of these suggestions, filtered by the given limit and/or threshold, as another SuggestionResults object.""" @@ -137,5 +159,5 @@ def filter(self, limit=None, threshold=0.0): (batch.filter(limit, threshold) for batch in self.batches) ) - def __iter__(self): + def __iter__(self) -> itertools.chain: return iter(itertools.chain.from_iterable(self.batches)) diff --git a/annif/transform/__init__.py b/annif/transform/__init__.py index 59317f3f6..716d874a2 100644 --- a/annif/transform/__init__.py +++ b/annif/transform/__init__.py @@ -1,6 +1,8 @@ """Functionality for obtaining text transformation from string specification""" +from __future__ import annotations import re +from typing import TYPE_CHECKING import annif from annif.exception import ConfigurationException @@ -8,8 +10,14 @@ from . import inputlimiter, transform +if TYPE_CHECKING: + from annif.project import AnnifProject + from annif.transform.transform import TransformChain -def parse_specs(transform_specs): + +def parse_specs( + transform_specs: str, +) -> list[tuple[str, list, dict]]: """Parse a transformation specification into a list of tuples, e.g. 'transf_1(x),transf_2(y=42),transf_3' is parsed to [(transf_1, [x], {}), (transf_2, [], {y: 42}), (transf_3, [], {})].""" @@ -27,7 +35,7 @@ def parse_specs(transform_specs): return parsed -def get_transform(transform_specs, project): +def get_transform(transform_specs: str, project: AnnifProject | None) -> TransformChain: transform_defs = parse_specs(transform_specs) transform_classes = [] args = [] diff --git a/annif/transform/inputlimiter.py b/annif/transform/inputlimiter.py index 6883c4c9b..229766864 100644 --- a/annif/transform/inputlimiter.py +++ b/annif/transform/inputlimiter.py @@ -1,23 +1,29 @@ """A simple transformation that truncates the text of input documents to a given character length.""" +from __future__ import annotations + +from typing import TYPE_CHECKING from annif.exception import ConfigurationException from . import transform +if TYPE_CHECKING: + from annif.project import AnnifProject + class InputLimiter(transform.BaseTransform): name = "limit" - def __init__(self, project, input_limit): + def __init__(self, project: AnnifProject | None, input_limit: str) -> None: super().__init__(project) self.input_limit = int(input_limit) self._validate_value(self.input_limit) - def transform_fn(self, text): + def transform_fn(self, text: str) -> str: return text[: self.input_limit] - def _validate_value(self, input_limit): + def _validate_value(self, input_limit: int) -> None: if input_limit < 0: raise ConfigurationException( "input_limit in limit_input transform cannot be negative", diff --git a/annif/transform/langfilter.py b/annif/transform/langfilter.py index 8ee6285a7..6794eb370 100644 --- a/annif/transform/langfilter.py +++ b/annif/transform/langfilter.py @@ -1,5 +1,8 @@ """Transformation filtering out parts of a text that are in a language different from the language of the project.""" +from __future__ import annotations + +from typing import TYPE_CHECKING from simplemma.langdetect import in_target_language @@ -7,6 +10,9 @@ from . import transform +if TYPE_CHECKING: + from annif.project import AnnifProject + logger = annif.logger @@ -14,14 +20,18 @@ class LangFilter(transform.BaseTransform): name = "filter_lang" def __init__( - self, project, text_min_length=500, sentence_min_length=50, min_ratio=0.5 - ): + self, + project: AnnifProject, + text_min_length: int | str = 500, + sentence_min_length: int | str = 50, + min_ratio: float = 0.5, + ) -> None: super().__init__(project) self.text_min_length = int(text_min_length) self.sentence_min_length = int(sentence_min_length) self.min_ratio = float(min_ratio) - def transform_fn(self, text): + def transform_fn(self, text: str) -> str: if len(text) < self.text_min_length: return text @@ -30,7 +40,7 @@ def transform_fn(self, text): if len(sent) < self.sentence_min_length: retained_sentences.append(sent) continue - proportion = in_target_language(sent, lang=self.project.language) + proportion = in_target_language(sent, lang=(self.project.language,)) if proportion >= self.min_ratio: retained_sentences.append(sent) return " ".join(retained_sentences) diff --git a/annif/transform/transform.py b/annif/transform/transform.py index 42123ab56..db71fef37 100644 --- a/annif/transform/transform.py +++ b/annif/transform/transform.py @@ -1,10 +1,16 @@ """Common functionality for transforming text of input documents.""" +from __future__ import annotations import abc +from typing import TYPE_CHECKING, Type from annif.corpus import TransformingDocumentCorpus from annif.exception import ConfigurationException +if TYPE_CHECKING: + from annif.corpus.types import DocumentCorpus + from annif.project import AnnifProject + class BaseTransform(metaclass=abc.ABCMeta): """Base class for text transformations, which need to implement the @@ -12,7 +18,7 @@ class BaseTransform(metaclass=abc.ABCMeta): name = None - def __init__(self, project): + def __init__(self, project: AnnifProject | None) -> None: self.project = project @abc.abstractmethod @@ -26,7 +32,7 @@ class IdentityTransform(BaseTransform): name = "pass" - def transform_fn(self, text): + def transform_fn(self, text: str) -> str: return text @@ -34,11 +40,20 @@ class TransformChain: """Class instantiating and holding the transformation objects performing the actual text transformation.""" - def __init__(self, transform_classes, args, project): + def __init__( + self, + transform_classes: list[Type[BaseTransform]], + args: list[tuple[list, dict]], + project: AnnifProject | None, + ) -> None: self.project = project self.transforms = self._init_transforms(transform_classes, args) - def _init_transforms(self, transform_classes, args): + def _init_transforms( + self, + transform_classes: list[Type[BaseTransform]], + args: list[tuple[list, dict]], + ) -> list[BaseTransform]: transforms = [] for trans, (posargs, kwargs) in zip(transform_classes, args): try: @@ -51,10 +66,10 @@ def _init_transforms(self, transform_classes, args): ) return transforms - def transform_text(self, text): + def transform_text(self, text: str) -> str: for trans in self.transforms: text = trans.transform_fn(text) return text - def transform_corpus(self, corpus): + def transform_corpus(self, corpus: DocumentCorpus) -> TransformingDocumentCorpus: return TransformingDocumentCorpus(corpus, self.transform_text) diff --git a/annif/util.py b/annif/util.py index a664027f5..803aa8aea 100644 --- a/annif/util.py +++ b/annif/util.py @@ -1,10 +1,12 @@ """Utility functions for Annif""" +from __future__ import annotations import glob import logging import os import os.path import tempfile +from typing import Any, Callable from annif import logger @@ -12,11 +14,11 @@ class DuplicateFilter(logging.Filter): """Filter out log messages that have already been displayed.""" - def __init__(self): + def __init__(self) -> None: super().__init__() self.logged = set() - def filter(self, record): + def filter(self, record: logging.LogRecord) -> bool: current_log = hash((record.module, record.levelno, record.msg, record.args)) if current_log not in self.logged: self.logged.add(current_log) @@ -24,7 +26,9 @@ def filter(self, record): return False -def atomic_save(obj, dirname, filename, method=None): +def atomic_save( + obj: Any, dirname: str, filename: str, method: Callable | None = None +) -> None: """Save the given object (which must have a .save() method, unless the method parameter is given) into the given directory with the given filename, using a temporary file and renaming the temporary file to the @@ -44,14 +48,14 @@ def atomic_save(obj, dirname, filename, method=None): os.rename(fn, newname) -def cleanup_uri(uri): +def cleanup_uri(uri: str) -> str: """remove angle brackets from a URI, if any""" if uri.startswith("<") and uri.endswith(">"): return uri[1:-1] return uri -def parse_sources(sourcedef): +def parse_sources(sourcedef: str) -> list[tuple[str, float]]: """parse a source definition such as 'src1:1.0,src2' into a sequence of tuples (src_id, weight)""" @@ -69,7 +73,7 @@ def parse_sources(sourcedef): return [(srcid, weight / totalweight) for srcid, weight in sources] -def parse_args(param_string): +def parse_args(param_string: str) -> tuple[list, dict]: """Parse a string of comma separated arguments such as '42,43,key=abc' into a list of positional args [42, 43] and a dict of keyword args {key: abc}""" @@ -87,7 +91,7 @@ def parse_args(param_string): return posargs, kwargs -def boolean(val): +def boolean(val: Any) -> bool: """Convert the given value to a boolean True/False value, if it isn't already. True values are '1', 'yes', 'true', and 'on' (case insensitive), everything else is False.""" @@ -95,7 +99,7 @@ def boolean(val): return str(val).lower() in ("1", "yes", "true", "on") -def identity(x): +def identity(x: Any) -> Any: """Identity function: return the given argument unchanged""" return x diff --git a/annif/vocab.py b/annif/vocab.py index 14f6209ba..333fa0d69 100644 --- a/annif/vocab.py +++ b/annif/vocab.py @@ -1,6 +1,8 @@ """Vocabulary management functionality for Annif""" +from __future__ import annotations import os.path +from typing import TYPE_CHECKING import annif import annif.corpus @@ -8,6 +10,13 @@ from annif.datadir import DatadirMixin from annif.exception import NotInitializedException +if TYPE_CHECKING: + from rdflib.graph import Graph + + from annif.corpus.skos import SubjectFileSKOS + from annif.corpus.subject import SubjectCorpus, SubjectIndex + + logger = annif.logger @@ -23,18 +32,18 @@ class AnnifVocabulary(DatadirMixin): INDEX_FILENAME_TTL = "subjects.ttl" INDEX_FILENAME_CSV = "subjects.csv" - def __init__(self, vocab_id, datadir): + def __init__(self, vocab_id: str, datadir: str) -> None: DatadirMixin.__init__(self, datadir, "vocabs", vocab_id) self.vocab_id = vocab_id self._skos_vocab = None - def _create_subject_index(self, subject_corpus): + def _create_subject_index(self, subject_corpus: SubjectCorpus) -> SubjectIndex: subjects = annif.corpus.SubjectIndex() subjects.load_subjects(subject_corpus) annif.util.atomic_save(subjects, self.datadir, self.INDEX_FILENAME_CSV) return subjects - def _update_subject_index(self, subject_corpus): + def _update_subject_index(self, subject_corpus: SubjectCorpus) -> SubjectIndex: old_subjects = self.subjects new_subjects = annif.corpus.SubjectIndex() new_subjects.load_subjects(subject_corpus) @@ -55,7 +64,7 @@ def _update_subject_index(self, subject_corpus): return updated_subjects @property - def subjects(self): + def subjects(self) -> SubjectIndex: if self._subjects is None: path = os.path.join(self.datadir, self.INDEX_FILENAME_CSV) if os.path.exists(path): @@ -66,7 +75,7 @@ def subjects(self): return self._subjects @property - def skos(self): + def skos(self) -> SubjectFileSKOS: """return the subject vocabulary from SKOS file""" if self._skos_vocab is not None: return self._skos_vocab @@ -94,14 +103,18 @@ def skos(self): raise NotInitializedException(f"graph file {path} not found") - def __len__(self): + def __len__(self) -> int: return len(self.subjects) @property - def languages(self): + def languages(self) -> list[str]: return self.subjects.languages - def load_vocabulary(self, subject_corpus, force=False): + def load_vocabulary( + self, + subject_corpus: SubjectCorpus, + force: bool = False, + ) -> None: """Load subjects from a subject corpus and save them into one or more subject index files as well as a SKOS/Turtle file for later use. If force=True, replace the existing subject index completely.""" @@ -119,6 +132,6 @@ def load_vocabulary(self, subject_corpus, force=False): logger.info(f"saving vocabulary into SKOS file {skosfile}") subject_corpus.save_skos(skosfile) - def as_graph(self): + def as_graph(self) -> Graph: """return the vocabulary as an rdflib graph""" return self.skos.graph diff --git a/setup.cfg b/setup.cfg index cffe59417..bf3f116d6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -3,7 +3,7 @@ current_version = 1.0.0-dev commit = True tag = True parse = (?P\d+)\.(?P\d+)\.(?P\d+)(\-(?P[a-z]+))? -serialize = +serialize = {major}.{minor}.{patch}-{release} {major}.{minor}.{patch} @@ -13,7 +13,7 @@ serialize = [bumpversion:part:release] optional_value = prod -values = +values = dev prod @@ -23,3 +23,7 @@ test = pytest [flake8] max-line-length = 88 ignore = E203 W503 + +[coverage:report] +exclude_also = + if TYPE_CHECKING: