-
Notifications
You must be signed in to change notification settings - Fork 0
/
merge.py
135 lines (115 loc) · 5.42 KB
/
merge.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""Create concept groups and merged records."""
from timeit import default_timer as timer
from typing import Collection, Dict, List, Set, Tuple
from tqdm import tqdm
from disease import logger
from disease.database.database import AbstractDatabase
from disease.schemas import SourcePriority
class Merge:
"""Manage construction of record mergers for normalization."""
def __init__(self, database: AbstractDatabase, silent: bool = True) -> None:
"""Initialize Merge instance.
:param Database database: db instance to use for record retrieval and creation.
:param silent: if ``True``, suppress console output
"""
self._database = database
self._silent = silent
self._groups = [] # list of tuples: (mondo_concept_id, set_of_ids)
def create_merged_concepts(self, record_ids: Collection[str]) -> None:
"""Create concept groups, generate merged concept records, and update database.
Our normalization protocols only generate record ID sets that include Mondo
terms, meaning only Mondo IDs should be submitted to this method.
:param record_ids: concept identifiers from which groups should be generated.
"""
# build groups
logger.info(f"Generating record ID sets from {len(record_ids)} records")
start = timer()
for concept_id in tqdm(record_ids, ncols=80, disable=self._silent):
try:
record = self._database.get_record_by_id(concept_id)
except AttributeError:
logger.error(
f"`create_merged_concepts` received invalid "
f"concept ID: {concept_id}"
)
continue
if not record:
logger.error(f"generate_merged_concepts couldn't find " f"{concept_id}")
continue
xrefs = record.get("xrefs", None)
group = {*xrefs, concept_id} if xrefs else {concept_id}
self._groups.append((concept_id, group))
end = timer()
self._database.complete_write_transaction()
logger.debug(f"Built record ID sets in {end - start} seconds")
# build merged concepts
logger.info("Creating merged records and updating database...")
start = timer()
for record_id, group in tqdm(self._groups, ncols=80, disable=self._silent):
try:
merged_record, merged_ids = self._generate_merged_record(group)
except AttributeError:
logger.error(
"`create_merged_concepts` received invalid group:"
f"{group} for concept {record_id}"
)
continue
self._database.add_merged_record(merged_record)
merge_ref = merged_record["concept_id"]
for concept_id in merged_ids:
self._database.update_merge_ref(concept_id, merge_ref)
self._database.complete_write_transaction()
end = timer()
logger.info("merged concept generation successful.")
logger.debug(f"Generated and added concepts in {end - start} seconds)")
def _generate_merged_record(self, record_id_set: Set[str]) -> Tuple[Dict, List]:
"""Generate merged record from provided concept ID group.
Where attributes are sets, they should be merged, and where they are
scalars, assign from the highest-priority source where that attribute
is non-null.
Priority is NCIt > Mondo > OMIM > OncoTree> DO.
:param Set record_id_set: group of concept IDs
:return: completed merged drug object to be stored in DB, as well as
a list of the IDs ultimately included in said record
"""
records = []
final_ids = []
for record_id in record_id_set:
record = self._database.get_record_by_id(record_id)
if record:
records.append(record)
final_ids.append(record["concept_id"])
else:
logger.error(
f"generate_merged_record could not retrieve "
f"record for {record_id} in {record_id_set}"
)
def record_order(record: Dict) -> Tuple:
"""Provide priority values of concepts for sort function."""
src = record["src_name"].upper()
source_rank = SourcePriority[src].value
return source_rank, record["concept_id"]
records.sort(key=record_order)
merged_properties = {
"concept_id": records[0]["concept_id"],
"aliases": set(),
"associated_with": set(),
}
if len(records) > 1:
merged_properties["xrefs"] = list({r["concept_id"] for r in records[1:]})
set_fields = ["aliases", "associated_with"]
scalar_fields = ["label", "pediatric_disease"]
for record in records:
for field in set_fields:
if field in record:
merged_properties[field] |= set(record[field])
for field in scalar_fields:
if field not in merged_properties and field in record:
merged_properties[field] = record[field]
for field in set_fields:
field_value = merged_properties[field]
if field_value:
merged_properties[field] = list(field_value)
else:
del merged_properties[field]
return merged_properties, final_ids