-
Notifications
You must be signed in to change notification settings - Fork 0
/
base.py
158 lines (131 loc) · 5.46 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
"""Provides abstract base classes for source ETL procedures.
Generally, users shouldn't ever have to work directly with the classes contained within.
"""
import logging
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, List, Optional, Set, Union
import click
from owlready2.rdflib_store import TripleLiteRDFlibGraph as RDFGraph
from wags_tails import CustomData, DataSource, DoData, MondoData, NcitData, OncoTreeData
from disease import ITEM_TYPES, SOURCES_FOR_MERGE
from disease.database import AbstractDatabase
from disease.schemas import Disease, SourceName
DATA_DISPATCH = {
SourceName.NCIT: NcitData,
SourceName.ONCOTREE: OncoTreeData,
SourceName.MONDO: MondoData,
SourceName.DO: DoData,
}
_logger = logging.getLogger(__name__)
class Base(ABC):
"""The ETL base class."""
def __init__(
self,
database: AbstractDatabase,
data_path: Optional[Path] = None,
silent: bool = True,
) -> None:
"""Extract from sources.
:param database: database client
:param data_path: location of data directory
:param silent: if True, don't print ETL results to console
"""
self._silent = silent
self._src_name = SourceName(self.__class__.__name__)
self._data_source: Union[
NcitData, OncoTreeData, MondoData, DoData, CustomData
] = self._get_data_handler(data_path)
self._database = database
self._store_ids = self.__class__.__name__ in SOURCES_FOR_MERGE
if self._store_ids:
self._added_ids = []
def _get_data_handler(self, data_path: Optional[Path] = None) -> DataSource:
"""Construct data handler instance for source. Overwrite for edge-case sources.
:param data_path: location of data storage
:return: instance of wags_tails.DataSource to manage source file(s)
"""
return DATA_DISPATCH[self._src_name](data_dir=data_path, silent=self._silent)
def perform_etl(self, use_existing: bool = False) -> List:
"""Public-facing method to begin ETL procedures on given data.
:param use_existing: if True, use local data instead of retrieving most recent
version
:return: List of concept IDs to be added to merge generation.
"""
self._extract_data(use_existing)
self._load_meta()
if not self._silent:
click.echo("Transforming and loading data to DB...")
self._transform_data()
self._database.complete_write_transaction()
if self._store_ids:
return self._added_ids
return []
def _extract_data(self, use_existing: bool = False) -> None:
"""Get source file from data directory.
:param use_existing: if True, use local data regardless of whether it's up to
date
"""
self._data_file, self._version = self._data_source.get_latest(
from_local=use_existing
)
@abstractmethod
def _transform_data(self, *args, **kwargs) -> None: # noqa: ANN002
raise NotImplementedError
@abstractmethod
def _load_meta(self, *args, **kwargs) -> None: # noqa: ANN002
raise NotImplementedError
def _load_disease(self, disease: Dict) -> None:
"""Load individual disease record."""
_ = Disease(**disease)
concept_id = disease["concept_id"]
for attr_type in ITEM_TYPES:
if attr_type in disease:
value = disease[attr_type]
if value is not None and value != []:
if isinstance(value, str):
items = [value.lower()]
else:
disease[attr_type] = list(set(value))
items = {item.lower() for item in value}
if (attr_type == "aliases") and (len(items) > 20):
_logger.debug("%s has > 20 aliases.", concept_id)
del disease[attr_type]
continue
else:
del disease[attr_type]
if "pediatric_disease" in disease and disease["pediatric_disease"] is None:
del disease["pediatric_disease"]
self._database.add_record(disease, self._src_name)
if self._store_ids:
self._added_ids.append(concept_id)
class OWLBase(Base):
"""Base class for sources that use OWL files."""
def _get_subclasses(self, uri: str, graph: RDFGraph) -> Set[str]:
"""Retrieve URIs for all terms that are subclasses of given URI.
:param uri: URI for class
:param graph: RDFLib graph of ontology default world
:return: Set of URIs (strings) for all subclasses of `uri`
"""
query = f"""
SELECT ?c WHERE {{
?c rdfs:subClassOf* <{uri}>
}}
"""
return {item.c.toPython() for item in graph.query(query)}
def _get_by_property_value(
self, prop: str, value: str, graph: RDFGraph
) -> Set[str]:
"""Get all classes with given value for a specific property.
:param prop: property URI
:param value: property value
:param graph: RDFLib graph of ontology default world
:return: Set of URIs (as strings) matching given property/value
"""
query = f"""
SELECT ?c WHERE {{
?c <{prop}>
"{value}"
}}
"""
return {item.c.toPython() for item in graph.query(query)}