-
Notifications
You must be signed in to change notification settings - Fork 5
/
civic.py
1503 lines (1265 loc) · 52.7 KB
/
civic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import requests
import importlib
import logging
import pandas as pd
import pickle
import os
from pathlib import Path
from collections import defaultdict, namedtuple
from civicpy import REMOTE_CACHE_URL, LOCAL_CACHE_PATH, CACHE_TIMEOUT_DAYS
import requests
from civicpy.exports import VCFWriter
from datetime import datetime, timedelta
from backports.datetime_fromisoformat import MonkeyPatch
MonkeyPatch.patch_fromisoformat()
CACHE = dict()
COORDINATE_TABLE = None
COORDINATE_TABLE_START = None
COORDINATE_TABLE_STOP = None
COORDINATE_TABLE_CHR = None
HPO_TERMS = dict()
FRESH_DELTA = timedelta(days=CACHE_TIMEOUT_DAYS)
MODULE = importlib.import_module('civicpy.civic')
API_URL = 'https://civicdb.org/api'
LINKS_URL = 'https://civicdb.org/links'
UNMARKED_PLURALS = {'evidence'}
CIVIC_TO_PYCLASS = {
'evidence_items': 'evidence'
}
_CoordinateQuery = namedtuple('CoordinateQuery', ['chr', 'start', 'stop', 'alt', 'ref', 'build', 'key'])
_CoordinateQuery.__new__.__defaults__ = (None, None, "GRCh37", None)
class CoordinateQuery(_CoordinateQuery): # Wrapping for documentation
"""
A namedtuple with preset fields describing a genomic coordinate,
for use with coordinate-based queries of CIViC Variants.
:param str chr: A chromosome of value 1-23, X, Y
:param int start: The chromosomal start position in base coordinates (1-based)
:param int stop: The chromosomal stop position in base coordinates (1-based)
:param str optional alt: The alternate nucleotide(s) at the designated coordinates
:param str optional ref: The reference nucleotide(s) at the designated coordinates
:param GRCh37,GRCh38 build: The reference build version of the coordinates
:param Any optional key: A user-defined object linked to the coordinate
"""
pass
def pluralize(string):
if string in UNMARKED_PLURALS:
return '{}_items'.format(string)
if string.endswith('s'):
return string
return string + 's'
def singularize(string):
string = string.rstrip('s')
if string == 'evidence_item':
string = 'evidence'
return string
def search_url(element, use_search_meta):
element = pluralize(element).lower()
components = [API_URL, element]
if use_search_meta:
components.append('search')
return '/'.join(components)
def snake_to_camel(snake_string):
words = snake_string.split('_')
cap_words = [x.capitalize() for x in words]
return ''.join(cap_words)
def element_lookup_by_id(element_type, element_id):
e_string = pluralize(element_type.lower())
url = '/'.join([API_URL, e_string, str(element_id)])
resp = requests.get(url)
resp.raise_for_status()
resp_dict = resp.json()
return resp_dict
def get_class(element_type):
e_string = singularize(element_type)
class_string = snake_to_camel(e_string)
cls = getattr(MODULE, class_string, CivicAttribute)
return cls
def download_remote_cache(remote_cache_url=REMOTE_CACHE_URL, local_cache_path=LOCAL_CACHE_PATH):
"""
Retrieve a remote cache file from URL and save to local filepath.
:param remote_cache_url: A URL string to a remote cache for retrieval.
This parameter defaults to REMOTE_CACHE_URL.
:param local_cache_path: A filepath destination string for the retrieved remote cache.
This parameter defaults to LOCAL_CACHE_PATH.
:return: Returns True on success.
"""
logging.warning(
'Downloading remote cache from {}.'.format(remote_cache_url)
)
_make_local_cache_path_if_missing(local_cache_path)
r = requests.get(remote_cache_url)
r.raise_for_status()
with open(local_cache_path, 'wb') as local_cache:
local_cache.write(r.content)
return True
def save_cache(local_cache_path=LOCAL_CACHE_PATH):
"""
Save in-memory cache to local file.
:param local_cache_path: A filepath destination string for storing the cache.
This parameter defaults to LOCAL_CACHE_PATH.
:return: Returns True on success.
"""
with open(local_cache_path, 'wb') as pf:
pickle.dump(CACHE, pf)
return True
def cache_file_present(local_cache_path=LOCAL_CACHE_PATH):
"""
Determines if a file exists at a given path.
:param local_cache_path: A filepath where cache is expected.
This parameter defaults to LOCAL_CACHE_PATH.
:return: Returns True on success.
"""
return os.path.isfile(local_cache_path)
def delete_local_cache(local_cache_path=LOCAL_CACHE_PATH):
"""
Deletes local cache file.
:param local_cache_path: A filepath destination string for the cache to be deleted.
This parameter defaults to LOCAL_CACHE_PATH.
:return: Returns True on success.
"""
return os.unlink(local_cache_path)
def load_cache(local_cache_path=LOCAL_CACHE_PATH, on_stale='auto'):
"""
Load local file to in-memory cache.
:param local_cache_path: A filepath destination string for loading the cache.
This parameter defaults to LOCAL_CACHE_PATH.
:param on_stale: ['auto', 'reject', 'ignore', 'update']
auto: If cache_path matches the filepath in
LOCAL_CACHE_PATH, follows 'update' behavior.
Otherwise, follows 'reject' behavior.
reject: Clear loaded cache from memory if stale.
ignore: Keep loaded cache in memory if stale.
update: Run update_cache and save fresh cache to
cache_path.
This parameter defaults to 'auto'.
:return: Returns True if content is loaded to in-memory cache.
"""
downloaded_remote = False
remote_url = REMOTE_CACHE_URL
if local_cache_path == LOCAL_CACHE_PATH:
if not cache_file_present():
download_remote_cache(remote_cache_url=remote_url)
downloaded_remote = True
elif not cache_file_present(local_cache_path):
raise FileNotFoundError("No cache found at {}".format(local_cache_path))
with open(local_cache_path, 'rb') as pf:
loaded_cache = pickle.load(pf)
c = dict()
variants = set()
for k, v in loaded_cache.items():
if isinstance(k, str):
c[k] = v
elif isinstance(k, int):
c[hash(v)] = v
if v.type == 'variant':
variants.add(v)
else:
raise ValueError
old_cache = MODULE.CACHE
MODULE.CACHE = c
for k, v in MODULE.CACHE.items():
if isinstance(k, str):
continue
v.update()
if _has_full_cached_fresh() or on_stale == 'ignore':
_build_coordinate_table(variants)
return True
elif (on_stale == 'auto' and local_cache_path == LOCAL_CACHE_PATH) or on_stale == 'update':
MODULE.CACHE = old_cache
if downloaded_remote:
logging.error(
'Remote cache at {} is stale. Consider running `update_cache(from_remote_cache=False)` '
"to create cache from API query (slow), or `load_cache(on_stale='ignore')` "
"to load stale local cache (if present). "
'Please create an issue at https://github.com/griffithlab/civicpy/issues '
'if this is unexpected behavior.'.format(remote_url)
)
raise SystemError
else:
logging.warning(
'Local cache at {} is stale, updating from remote.'.format(local_cache_path)
)
update_cache(local_cache_path=local_cache_path)
return True
elif on_stale == 'reject' or on_stale == 'auto':
MODULE.CACHE = old_cache
logging.warning(
'Local cache at {} is stale and was not loaded. To load anyway, re-run '
'`load_cache` with `on_stale` parameter set to desired behavior.'.format(local_cache_path)
)
return False
raise NotImplementedError # An unexpected condition occurred.
def update_cache(from_remote_cache=True, remote_cache_url=REMOTE_CACHE_URL,
local_cache_path=LOCAL_CACHE_PATH):
"""
Update local cache file.
:param from_remote_cache: If set to True, update_cache will first download the
remote cache designated by REMOTE_CACHE_URL, store it
to LOCAL_CACHE_PATH, and then load the downloaded cache
into memory.
This parameter defaults to True.
:param remote_cache_url: A URL string to a remote cache for retrieval.
This parameter defaults to REMOTE_CACHE_URL.
:param local_cache_path: A filepath destination string for the retrieved remote cache.
This parameter defaults to LOCAL_CACHE_PATH.
:return: Returns True on success.
"""
_make_local_cache_path_if_missing(local_cache_path)
if from_remote_cache:
download_remote_cache(local_cache_path=local_cache_path, remote_cache_url=remote_cache_url)
load_cache(local_cache_path=local_cache_path)
else:
_get_elements_by_ids('evidence', allow_cached=False, get_all=True)
variants = _get_elements_by_ids('variant', allow_cached=False, get_all=True)
genes = _get_elements_by_ids('gene', allow_cached=False, get_all=True)
for g in genes:
for v in g._variants:
v.update()
_get_elements_by_ids('assertion', allow_cached=False, get_all=True)
_get_elements_by_ids('variant_group', allow_cached=False, get_all=True)
CACHE['full_cached'] = datetime.now()
_build_coordinate_table(variants)
save_cache(local_cache_path=local_cache_path)
def _make_local_cache_path_if_missing(local_cache_path):
p = Path(local_cache_path)
if not p.parent.is_dir():
os.makedirs(p.parent)
class CivicRecord:
"""
As a base class, :class:`CivicRecord` is used to define the characteristic of all records in CIViC. This class is not
intended to be invoked directly by the end user, but provided for documentation of shared methods and variables in
child classes.
"""
_SIMPLE_FIELDS = {'id', 'type'}
_COMPLEX_FIELDS = set()
_OPTIONAL_FIELDS = set()
def __init__(self, partial=False, **kwargs):
"""
The record object may be initialized by the user, though the practice is discouraged. To do so, values for each
of the object attributes (except ``type``) must be specified as keyword arguments, or the ``partial`` parameter must
be set to **True**. If ``partial`` is set to **True**, the ``id`` keyword argument is still required.
Users are encouraged to use the functions for `getting records`_ in lieu of directly initializing record
objects.
:param bool partial: Indicates whether the the set of object attributes passed is incomplete. If set to **True** the ``id`` keyword is required.
"""
self._incomplete = set()
self._partial = partial
simple_fields = sorted(self._SIMPLE_FIELDS, reverse=True)
simple_fields = sorted(simple_fields, key=lambda x: x in CivicRecord._SIMPLE_FIELDS, reverse=True)
for field in simple_fields:
try:
self.__setattr__(field, kwargs[field])
except KeyError:
try:
object.__getattribute__(self, field)
except AttributeError:
if (partial and field not in CivicRecord._SIMPLE_FIELDS) or field in self._OPTIONAL_FIELDS:
self._incomplete.add(field) # Allow for incomplete data when partial flag set
else:
raise AttributeError('Expected {} attribute for {}, none found.'.format(field, self.type))
for field in self._COMPLEX_FIELDS:
try:
v = kwargs[field]
if v is None:
v = dict()
except KeyError:
if partial or field in self._OPTIONAL_FIELDS:
self._incomplete.add(field)
continue
else:
raise AttributeError('Expected {} attribute for {}, none found.'.format(field, self.type))
is_compound = isinstance(v, list)
cls = get_class(field)
if is_compound:
result = list()
for data in v:
if isinstance(data, dict):
data['type'] = data.get('type', singularize(field))
result.append(cls(partial=True, **data))
else:
result.append(data)
self.__setattr__(field, result)
else:
t = v.get('type', field)
v['type'] = CIVIC_TO_PYCLASS.get(t, t)
if v.keys() == {'type'}:
self.__setattr__(field, {})
else:
self.__setattr__(field, cls(partial=True, **v))
self._partial = bool(self._incomplete)
if not isinstance(self, CivicAttribute) and not self._partial and self.__class__.__name__ != 'CivicRecord':
CACHE[hash(self)] = self
self._include_status = ['accepted','submitted','rejected']
def __dir__(self):
return [attribute for attribute in super().__dir__() if not attribute.startswith('_')]
def __repr__(self):
return '<CIViC {} {}>'.format(self.type, self.id)
def __getattr__(self, item):
if self._partial and item in self._incomplete:
self.update()
return object.__getattribute__(self, item)
def __hash__(self):
return hash('{}:{}'.format(self.type, self.id))
def __eq__(self, other):
return hash(self) == hash(other)
def __setstate__(self, state):
self.__dict__ = state
def update(self, allow_partial=True, force=False, **kwargs):
"""
Updates the record object from the cache or the server.
Keyword arguments may be passed to ``kwargs``, which will update the corresponding attributes of the
:class:`CivicRecord` instance.
:param bool allow_partial: Flag to indicate whether the record will be updated according to the contents of CACHe, without requiring all attributes to be assigned.
:param bool force: Flag to indicate whether to force an update fromt he server, even if a full record ecists in the cache.
:return: True if record is complete after update, else False.
"""
if kwargs:
self.__init__(partial=allow_partial, force=force, **kwargs)
return not self._partial
if not force and CACHE.get(hash(self)):
cached = CACHE[hash(self)]
for field in self._SIMPLE_FIELDS | self._COMPLEX_FIELDS:
v = getattr(cached, field)
setattr(self, field, v)
self._partial = False
logging.info('Loading {} from cache'.format(str(self)))
return True
resp_dict = element_lookup_by_id(self.type, self.id)
self.__init__(partial=False, **resp_dict)
return True
@property
def site_link(self):
"""Returns a URL to the record on the CIViC web application."""
return '/'.join([LINKS_URL, self.type, str(self.id)])
class Variant(CivicRecord):
_SIMPLE_FIELDS = CivicRecord._SIMPLE_FIELDS.union({
'allele_registry_id',
'civic_actionability_score',
'description',
'entrez_id',
'entrez_name',
'gene_id',
'name'})
_COMPLEX_FIELDS = CivicRecord._COMPLEX_FIELDS.union({
'assertions',
'clinvar_entries',
'coordinates',
# 'errors',
'evidence_items',
'hgvs_expressions',
'lifecycle_actions',
# 'provisional_values',
'sources',
'variant_aliases',
'variant_groups',
'variant_types'})
def __init__(self, **kwargs):
# Handle overloaded evidence_items from some advanced search views
evidence_items = kwargs.get('evidence_items')
kwargs['type'] = 'variant'
self._evidence_items = []
self._assertions = []
if evidence_items and not isinstance(evidence_items, list):
del(kwargs['evidence_items'])
coordinates = kwargs.get('coordinates')
if coordinates:
if coordinates.get('reference_bases') in ['', '-']:
coordinates['reference_bases'] = None
if coordinates.get('variant_bases') in ['', '-']:
coordinates['variant_bases'] = None
super().__init__(**kwargs)
@property
def evidence_sources(self):
sources = set()
for evidence in self.evidence_items:
if evidence.source is not None:
sources.add(evidence.source)
return sources
@property
def aliases(self):
return self.variant_aliases
@property
def groups(self):
return self.variant_groups
@property
def types(self):
return self.variant_types
@property
def summary(self):
return self.description
@summary.setter
def summary(self, value):
self.description = value
@property
def evidence(self):
return self.evidence_items
@property
def evidence_items(self):
return [e for e in self._evidence_items if e.status in self._include_status]
@evidence_items.setter
def evidence_items(self, value):
self._evidence_items = value
@property
def assertions(self):
return [a for a in self._assertions if a.status in self._include_status]
@assertions.setter
def assertions(self, value):
self._assertions = value
@property
def gene(self):
return _get_element_by_id('gene', self.gene_id)
@property
def is_insertion(self):
ref = self.coordinates.reference_bases
alt = self.coordinates.variant_bases
return (ref is None and alt is not None) or (ref is not None and alt is not None and len(ref) < len(alt))
@property
def is_deletion(self):
ref = self.coordinates.reference_bases
alt = self.coordinates.variant_bases
if alt is not None and (alt == '-' or alt == ''):
alt = None
return (ref is not None and alt is None) or (ref is not None and alt is not None and len(ref) > len(alt))
def is_valid_for_vcf(self, emit_warnings=False):
if self.coordinates.chromosome2 or self.coordinates.start2 or self.coordinates.stop2:
warning = "Variant {} has a second set of coordinates. Skipping".format(self.id)
elif self.coordinates.chromosome and self.coordinates.start and (self.coordinates.reference_bases or self.coordinates.variant_bases):
if self._valid_ref_bases():
if self._valid_alt_bases():
return True
else:
warning = "Unsupported variant base(s) for variant {}. Skipping.".format(self.id)
else:
warning = "Unsupported reference base(s) for variant {}. Skipping.".format(self.id)
else:
warning = "Incomplete coordinates for variant {}. Skipping.".format(self.id)
if emit_warnings:
logging.warning(warning)
return False
def _valid_ref_bases(self):
if self.coordinates.reference_bases is not None:
return all([c.upper() in ['A', 'C', 'G', 'T', 'N'] for c in self.coordinates.reference_bases])
else:
return True
def _valid_alt_bases(self):
if self.coordinates.variant_bases is not None:
return all([c.upper() in ['A', 'C', 'G', 'T', 'N'] for c in self.coordinates.variant_bases])
else:
return True
def vcf_coordinates(self):
ensembl_server = "https://grch37.rest.ensembl.org"
if self.coordinates.reference_build != 'GRCh37':
return
if self.is_insertion:
if not self.coordinates.representative_transcript:
return
else:
start = self.coordinates.start
ext = "/sequence/region/human/{}:{}-{}".format(self.coordinates.chromosome, start, start)
r = requests.get(ensembl_server+ext, headers={ "Content-Type" : "text/plain"})
if self.coordinates.reference_bases == None or self.coordinates.reference_bases == '-' or self.coordinates.reference_bases == '':
ref = r.text
else:
ref = "{}{}".format(r.text, self.coordinates.reference_bases)
alt = "{}{}".format(r.text, self.coordinates.variant_bases)
elif self.is_deletion:
if not self.coordinates.representative_transcript:
return
else:
start = self.coordinates.start - 1
ext = "/sequence/region/human/{}:{}-{}".format(self.coordinates.chromosome, start, start)
r = requests.get(ensembl_server+ext, headers={ "Content-Type" : "text/plain"})
ref = "{}{}".format(r.text, self.coordinates.reference_bases)
if self.coordinates.variant_bases == None or self.coordinates.variant_bases == '-' or self.coordinates.variant_bases == '':
alt = r.text
else:
alt = "{}{}".format(r.text, self.coordinates.variant_bases)
else:
start = self.coordinates.start
ref = self.coordinates.reference_bases
alt = self.coordinates.variant_bases
return (start, ref, alt)
def csq_alt(self):
if self.coordinates.reference_build != 'GRCh37':
return
if self.is_insertion:
if not self.coordinates.representative_transcript:
return
else:
return self.coordinates.variant_bases
elif self.is_deletion:
if not self.coordinates.representative_transcript:
return
else:
return "-"
else:
return self.coordinates.variant_bases
def hgvs_c(self):
if self.coordinates.representative_transcript:
hgvs_cs = [e for e in self.hgvs_expressions if (':c.' in e) and (self.coordinates.representative_transcript in e)]
return hgvs_cs[0] if len(hgvs_cs) == 1 else ''
else:
return ''
def hgvs_p(self):
if self.coordinates.representative_transcript:
hgvs_ps = [e for e in self.hgvs_expressions if (':p.' in e) and (self.coordinates.representative_transcript in e)]
return hgvs_ps[0] if len(hgvs_ps) == 1 else ''
else:
return ''
def csq(self, include_status=None):
if self.csq_alt() is None:
return []
else:
csq = []
for evidence in self.evidence:
if include_status is not None and evidence.status not in include_status:
continue
special_character_table = str.maketrans(VCFWriter.SPECIAL_CHARACTERS)
csq.append('|'.join([
self.csq_alt(),
'&'.join(map(lambda t: t.name, self.variant_types)),
self.gene.name,
str(self.gene.entrez_id),
'transcript',
str(self.coordinates.representative_transcript),
self.hgvs_c(),
self.hgvs_p(),
self.name,
str(self.id),
'&'.join(map(lambda a: a.translate(special_character_table), self.variant_aliases)),
'&'.join(map(lambda e: e.translate(special_character_table), self.hgvs_expressions)),
str(self.allele_registry_id),
'&'.join(self.clinvar_entries),
str(self.civic_actionability_score),
"evidence",
str(evidence.id),
"https://civicdb.org/links/evidence/{}".format(evidence.id),
"{} ({})".format(evidence.source.citation_id, evidence.source.source_type),
str(evidence.variant_origin),
evidence.status,
]))
for assertion in self.assertions:
if include_status is not None and assertion.status not in include_status:
continue
csq.append('|'.join([
self.csq_alt(),
'&'.join(map(lambda t: t.name, self.variant_types)),
self.gene.name,
str(self.gene.entrez_id),
'transcript',
str(self.coordinates.representative_transcript),
self.hgvs_c(),
self.hgvs_p(),
self.name,
str(self.id),
'&'.join(map(lambda a: a.translate(special_character_table), self.variant_aliases)),
'&'.join(map(lambda e: e.translate(special_character_table), self.hgvs_expressions)),
str(self.allele_registry_id),
'&'.join(self.clinvar_entries),
str(self.civic_actionability_score),
"assertion",
str(assertion.id),
"https://civicdb.org/links/assertion/{}".format(assertion.id),
"",
str(assertion.variant_origin),
assertion.status,
]))
return csq
class VariantGroup(CivicRecord):
_SIMPLE_FIELDS = CivicRecord._SIMPLE_FIELDS.union(
{'description', 'name'})
_COMPLEX_FIELDS = CivicRecord._COMPLEX_FIELDS.union({
# 'errors', # TODO: Add support for these fields in advanced search endpoint
# 'lifecycle_actions',
# 'provisional_values',
# 'sources',
'variants'
})
class Gene(CivicRecord):
_SIMPLE_FIELDS = CivicRecord._SIMPLE_FIELDS.union(
{'description', 'entrez_id', 'name'})
_COMPLEX_FIELDS = CivicRecord._COMPLEX_FIELDS.union({
'aliases',
# 'errors', # TODO: Add support for these fields in advanced search endpoint
'lifecycle_actions',
# 'provisional_values',
# 'sources',
'variants'
})
def __init__(self, **kwargs):
self._variants = []
super().__init__(**kwargs)
@property
def variants(self):
for variant in self._variants:
variant._include_status = self._include_status
return [v for v in self._variants if v.evidence]
@variants.setter
def variants(self, value):
self._variants = value
class Evidence(CivicRecord):
_SIMPLE_FIELDS = CivicRecord._SIMPLE_FIELDS.union({
'clinical_significance',
'description',
'drug_interaction_type',
'evidence_direction',
'evidence_level',
'evidence_type',
'gene_id',
'name',
# 'open_change_count',
'rating',
'status',
'variant_id',
'variant_origin'})
_COMPLEX_FIELDS = CivicRecord._COMPLEX_FIELDS.union({
'assertions',
'disease',
'drugs',
# 'errors',
# 'fields_with_pending_changes',
'lifecycle_actions',
'phenotypes',
'source'})
def __init__(self, **kwargs):
self._assertion = []
super().__init__(**kwargs)
@property
def variant(self):
return get_variant_by_id(self.variant_id)
@property
def assertions(self):
return [a for a in self._assertions if a.status in self._include_status]
@assertions.setter
def assertions(self, value):
self._assertions = value
@property
def statement(self):
return self.description
@statement.setter
def statement(self, value):
self.description = value
class Assertion(CivicRecord):
_SIMPLE_FIELDS = CivicRecord._SIMPLE_FIELDS.union({
'allele_registry_id',
'amp_level',
'clinical_significance',
'description',
'drug_interaction_type',
'evidence_direction',
# 'evidence_item_count',
'evidence_type',
'fda_companion_test',
'fda_regulatory_approval',
'name',
'nccn_guideline',
'nccn_guideline_version',
# 'open_change_count',
# 'pending_evidence_count',
'status',
'summary',
'variant_origin'
})
_COMPLEX_FIELDS = CivicRecord._COMPLEX_FIELDS.union({
'acmg_codes',
'disease',
'drugs',
'evidence_items',
'gene',
'lifecycle_actions',
'phenotypes',
'variant'
})
@property
def evidence(self):
return self.evidence_items
@property
def hpo_ids(self):
return [x.hpo_id for x in self.phenotypes if x.hpo_id]
class User(CivicRecord):
_SIMPLE_FIELDS = CivicRecord._SIMPLE_FIELDS.union({
'name',
'username',
'role',
'avatar_url',
'area_of_expertise',
'orcid',
'display_name',
'created_at',
'url',
'twitter_handle',
'facebook_profile',
'linkedin_profile',
'bio',
'featured_expert',
# 'accepted_license',
# 'signup_complete',
# 'affiliation'
})
_OPTIONAL_FIELDS = CivicRecord._OPTIONAL_FIELDS.union({
'country',
'organization',
'conflict_of_interest'
})
_COMPLEX_FIELDS = CivicRecord._COMPLEX_FIELDS.union(_OPTIONAL_FIELDS)
def __init__(self, **kwargs):
self._created_at = None
super().__init__(**kwargs)
@property
def created_at(self):
assert self._created_at[-1] == 'Z'
return datetime.fromisoformat(self._created_at[:-1])
@created_at.setter
def created_at(self, value):
self._created_at = value
class Organization(CivicRecord):
_SIMPLE_FIELDS = CivicRecord._SIMPLE_FIELDS.union({
'name',
'url',
'description'
})
_COMPLEX_FIELDS = CivicRecord._COMPLEX_FIELDS.union({
'profile_image',
'parent'
})
class CivicAttribute(CivicRecord, dict):
_SIMPLE_FIELDS = {'type'}
_COMPLEX_FIELDS = set()
def __repr__(self):
try:
_id = self.id
except AttributeError:
return '<CIViC Attribute {}>'.format(self.type)
else:
return '<CIViC Attribute {} {}>'.format(self.type, self.id)
def __init__(self, **kwargs):
kwargs['partial'] = False
for k, v in kwargs.items():
self.__setattr__(k, v)
super().__init__(**kwargs)
def __hash__(self):
try:
_id = self.id
except AttributeError:
raise NotImplementedError
if _id is not None:
return CivicRecord.__hash__(self)
else:
raise ValueError
@property
def site_link(self):
return None
def update(self):
return NotImplementedError
class Drug(CivicAttribute):
_SIMPLE_FIELDS = CivicRecord._SIMPLE_FIELDS.union({'ncit_id'})
class Disease(CivicAttribute):
_SIMPLE_FIELDS = CivicRecord._SIMPLE_FIELDS.union({'display_name', 'doid', 'url'})
class Country(CivicAttribute):
_SIMPLE_FIELDS = CivicRecord._SIMPLE_FIELDS.union({'iso', 'name'})
class LifecycleAction(CivicAttribute):
_OPTIONAL_FIELDS = CivicAttribute._OPTIONAL_FIELDS.union({
'submitted',
'last_modified',
'last_reviewed',
'accepted'
})
_COMPLEX_FIELDS = CivicAttribute._COMPLEX_FIELDS.union(_OPTIONAL_FIELDS)
class BaseLifecycleAction(CivicAttribute):
_SIMPLE_FIELDS = CivicAttribute._SIMPLE_FIELDS.union({
'timestamp'
})
_COMPLEX_FIELDS = CivicAttribute._COMPLEX_FIELDS.union({
'user'
})
def __init__(self, **kwargs):
self._timestamp = None
super().__init__(**kwargs)
@property
def timestamp(self):
assert self._timestamp[-1] == 'Z'
return datetime.fromisoformat(self._timestamp[:-1])
@timestamp.setter
def timestamp(self, value):
self._timestamp = value
class Submitted(BaseLifecycleAction):
pass
class LastModified(BaseLifecycleAction):
pass
class LastReviewed(BaseLifecycleAction):
pass
class Accepted(BaseLifecycleAction):
pass
def get_cached(element_type, element_id):
klass = get_class(element_type)
r = klass(type=element_type, id=element_id, partial=True)
return CACHE.get(hash(r), False)
def _has_full_cached_fresh(delta=FRESH_DELTA):
s = 'full_cached'
if CACHE.get(s, False):
return CACHE[s] + delta > datetime.now()
return False
def _get_elements_by_ids(element, id_list=[], allow_cached=True, get_all=False):
if allow_cached:
if not CACHE:
load_cache()
if not get_all:
cached = [get_cached(element, element_id) for element_id in id_list]
if all(cached):
logging.info('Loading {} from cache'.format(pluralize(element)))
return cached
else:
cached = [get_cached(element, element_id) for element_id in CACHE['{}_all_ids'.format(pluralize(element))]]
logging.info('Loading {} from cache'.format(pluralize(element)))
return cached
if id_list and get_all:
raise ValueError('Please pass list of ids or use the get_all flag, not both.')
if get_all:
payload = _construct_get_all_payload()
logging.warning('Getting all {}. This may take a couple of minutes...'.format(pluralize(element)))
elif element == 'variant_group':
raise NotImplementedError("Bulk ID search for variant groups not supported. Use get_all=True instead.")
else:
payload = _construct_query_payload(id_list)
adv_search = (element != 'variant_group')
url = search_url(element, use_search_meta=adv_search)
if adv_search:
response = requests.post(url, json=payload)
container_key = 'results'
else:
response = requests.get(url)
container_key = 'records'
response.raise_for_status()
cls = get_class(element)