-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
211 lines (173 loc) · 7.13 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
"""
REST backend for thehonestgene
"""
import hug
import genotype
import settings
from thehonestgenepipeline import imputation as imp
from thehonestgenepipeline import ancestry as anc
from thehonestgenepipeline import riskprediction as risk
from ancestor.core import ancestry
import cloud
OAUTH_PROVIDERS = {}
for prov, opts in settings.GENOTYPE_PROVIDERS.items():
if opts['has_oauth']:
OAUTH_PROVIDERS[prov] = cloud.CloudResource(opts['client_secret'],
opts['client_id'],
opts['redirect_url'],
opts['scope'],
opts['oauth_url'])
def check_cloud_provider(provider):
'''checks if the provider is available'''
if provider not in OAUTH_PROVIDERS:
raise Exception('Cloud provider %s not found' % provider)
return OAUTH_PROVIDERS[provider]
@hug.post('/id')
def generate_id():
'''Generate a unique id for identifiying an analysis run'''
return {'id':genotype.generate_id()}
@hug.post('/genotype/{id}')
def upload_genotype(body, id):
'''Upload a genotype'''
data = body['file']
return genotype.get_genotype_infos(genotype.upload_genotype(data.decode("utf-8"), id))
@hug.get('/genotype/{id}')
def get_genotype_infos(id):
'''Retrieve information about a specific genotype'''
return genotype.get_genotype_infos(id)
@hug.post('/imputation/{id}/cancel/{task_id}')
def cancel_imputation(id, task_id):
'''Cancel a running process'''
res = imp.imputation.AsyncResult(task_id)
res.revoke(terminate=True)
return {}
@hug.post('/imputation/{id}')
def run_imputation(id):
'''Start the imputation process'''
res = imp.imputation.delay(id)
return {'id':res.id, 'state':res.state}
@hug.get('/imputation/{id}/state/{task_id}')
def get_imputation_state(id, task_id, wait=False):
'''Get the current state of the imputation'''
res = imp.imputation.AsyncResult(task_id)
state = _retrieveTaskState(res, wait)
if state['state'] == 'SUCCESS' and 'data' in state:
state['data']['imputation']['num_imputed_snps'] = sum(state['data']['imputation']['chr_stats'].values())
return state
@hug.post('/ancestry/{id}')
def run_ancestry(id):
'''Start the ancestry'''
res = anc.analysis.delay(id, 'world', check_population='EUR')
return {'id':res.id, 'state':res.state}
@hug.get('/ancestry/{id}/state/{task_id}')
def get_ancestry_state(id, task_id, wait=False):
'''Returns the current state of the ancestry analysis'''
res = anc.analysis.AsyncResult(task_id)
state = _retrieveTaskState(res, wait)
return state
@hug.post('/ancestry/{id}/cancel/{task_id}')
def cancel_ancestry(id, task_id):
'''Cancel a running ancestry'''
res = risk.run.AsyncResult(task_id)
res.revoke(terminate=True)
return {}
@hug.get('/traits')
def get_available_traits():
'''Returns the available traits for risk prediction'''
return settings.TRAITS
@hug.post('/riskprediction/{id}/cancel/{task_id}')
def cancel_prediction(id, task_id):
'''Cancel a running prediction'''
res = risk.run.AsyncResult(task_id)
res.revoke(terminate=True)
return {}
@hug.post('/riskprediction/{id}/{trait}')
def run_prediction(id, trait):
'''Returns the available traits for risk prediction'''
# TODO check if trait is available
res = risk.run.delay(id, trait)
return {'id':res.id, 'state':res.state}
@hug.get('/riskprediction/{id}/state/{task_id}')
def get_prediction_state(id, task_id, wait=False):
'''Returns the current state of the risk prediction analysis'''
res = risk.run.AsyncResult(task_id)
state = _retrieveTaskState(res, wait)
return state
@hug.get('/pcs')
def get_pcs_for_population(platform, region='world', population=None):
'''Returns the PCS for a given population'''
# TODO implement
#pcs = _transform_pcs(ancestry.load_pcs_from_file('%s/AN_DATA/hapmap_%s_%s_pcs.hdf5' % (settings.DATA_PATH, platform, region)))
pcs = {}
if population is not None:
return pcs[population]
return pcs
@hug.get('/plotpcs')
def get_pcs_forplotting(platform, pc1, pc2, region='world'):
#pcs = ancestry.load_pcs_from_file('%s/AN_DATA/hapmap_%s_%s_pcs.hdf5' % (settings.DATA_PATH, platform, region))
# TODO implement
pcs = {}
num_pop = len(pcs['populations'].keys())
header = ['PC1']
data = []
data.append(header)
for population, mask in pcs['populations'].items():
header.append(population)
pc2_ix = len(header)-1
pop_pcs = pcs['pcs'][mask]
for pc in pop_pcs:
value = [pc[0]]
value.extend([None]*(pc2_ix-1))
value.append(pc[1])
value.extend([None]*(num_pop - pc2_ix +1))
data.append(value)
header.append('YOU')
value = [None]*(num_pop+2)
value[0] = float(pc1)
value[-1] = float(pc2)
data.append(value)
return data
def _transform_pcs(pcs):
transformed_pcs = {}
for population, mask in pcs['populations'].items():
transformed_pcs[population] = pcs['pcs'][mask].tolist()
return transformed_pcs
@hug.get('/cloud')
def get_available_cloud_providers():
'''Returns available cloud providers'''
return [{'name':provider, 'title':opts.get('title', provider),
'logoUrl':opts.get('logo_url', ''), 'description':opts.get('description', ''),
'webpage':opts.get('webpage', ''), 'clientId':opts.get('client_id'),
'redirectUrl':opts.get('redirect_url'), 'tokenurl':opts.get('token_url', ''),
'scope':opts.get('scope', ''),
'oauthSupported':opts.get('has_oauth', False)} for (provider, opts) in settings.GENOTYPE_PROVIDERS.items()]
@hug.post('/cloud/{provider}/token')
def get_token_for_provider(provider, request):
'''Returns access token for specified provider'''
provider = check_cloud_provider(provider)
token_result = provider.get_token(request.headers['CODE'])
token_result['userInfo'] = provider.get_genotypes(token_result['access_token'])
return token_result
@hug.get('/cloud/{provider}/genotypes')
def get_genotypes_for_provider(provider, request):
'''Retrieves genotypes for specified provider'''
provider = check_cloud_provider(provider)
return provider.get_genotypes(request.headers['ACCESS-TOKEN'])
@hug.post('/cloud/{provider}/genome/{genotypeid}/{id}')
def transfer_genome(provider, genotypeid, id, request):
'''Uploads genotype data for specified provider'''
source = provider
provider = check_cloud_provider(provider)
data = provider.get_genotype_data(request.headers['ACCESS-TOKEN'], genotypeid)
columns = data.columns.values.tolist()
data.sort([columns[1], columns[2]], inplace=True)
return genotype.get_genotype_infos(genotype.upload_genotype(data.to_csv(sep='\t', index=False, header=False), id, source))
def _retrieveTaskState(res, wait=False):
state = {'id':res.id, 'state':res.state}
if wait:
state['data'] = res.get(timeout=60)
state['state'] = res.state
else:
if state['state'] == 'SUCCESS':
state['data'] = res.get(no_ack=False)
return state