Skip to content

Commit

Permalink
fix dcr participants
Browse files Browse the repository at this point in the history
  • Loading branch information
vemonet committed Apr 11, 2024
1 parent d1d6a93 commit 5684a84
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 127 deletions.
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# 🫀 iCARE4CVD Cohort Explorer

Webapp enabling to upload and explore cohorts metadata built for the [iCARE4CVD project](https://icare4cvd.eu).
Webapp enabling to upload and explore cohorts metadata, built for the [iCARE4CVD project](https://icare4cvd.eu).

It interacts with a privacy computing platform ([Decentriq](https://www.decentriq.com/)) to create secure workspace where data scientists can run analysis on the selected cohorts (the cohorts data is uploaded only to Decentriq, the explorer only uses cohorts metadata).
It interacts with a privacy computing platform ([Decentriq](https://www.decentriq.com/)) to create secure workspace where data scientists can run analysis on the selected cohorts. The cohorts data is uploaded only to Decentriq, the explorer only uses cohorts metadata.

It aims to enable *data custodians* and *data scientists* to:

Expand Down Expand Up @@ -32,7 +32,7 @@ This platform is composed of 3 main components:
* **[Oxigraph](https://github.com/oxigraph/oxigraph) triplestore database** containing the cohorts and their variables metadata, exposing a SPARQL endpoint only available to the backend API.
* The data stored in the triplestore complies with the custom **[iCARE4CVD OWL ontology](https://maastrichtu-ids.github.io/cohort-explorer/)**. It contains 3 classes: Cohort, Variable, and Variable category. You can explore the ontology classes and properties [here](https://maastrichtu-ids.github.io/cohort-explorer/browse).
* Oxigraph has not yet reached release 1.0, but it is already stable enough for our currently expected use. It has the advantages of being open source (important for accountability and trust), and developed in Europe. If missing features appears to be blocking, consider migrating to [OpenLink Virtuoso](https://github.com/openlink/virtuoso-opensource), you'll only need to update the function that upload a RDFLib graph as file.

* **`backend/` server**, built with python, FastAPI and RDFLib.
* **`frontend/` web app** running in the client browser, built with TypeScript, NextJS, ReactJS, TailwindCSS, and DaisyUI.

Expand All @@ -52,6 +52,8 @@ This platform is composed of 3 main components:

## 🧑‍💻 Development

[![Update ontology documentation](https://github.com/MaastrichtU-IDS/cohort-explorer/actions/workflows/docs.yml/badge.svg)](https://github.com/MaastrichtU-IDS/cohort-explorer/actions/workflows/docs.yml)

### 📥 Install dependencies

1. Install [hatch](https://hatch.pypa.io/latest/) for managing python projects, and [pnpm](https://pnpm.io/installation) for TS/JS projects
Expand Down Expand Up @@ -154,7 +156,7 @@ Put the excel spreadsheet with all cohorts metadata in `data/iCARE4CVD_Cohorts.x

We currently use [nginx-proxy](https://github.com/nginx-proxy/nginx-proxy) for routing through environment variables in the `docker-compose.yml` file, you can change for the proxy of your liking.

## 🪄 Administration
## 🪄 Database administration

### 🗑️ Reset database

Expand Down
9 changes: 9 additions & 0 deletions backend/src/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ async def auth_callback(code: str) -> RedirectResponse:
"https://explorer.icare4cvd.eu" in access_payload["aud"]
and "read:icare4cvd-dataset-descriptions" in access_payload["permissions"]
):
# TODO: for LUCE blockchain: check if user email has a blockchain address
# Where? Either a JSON file on the server, or in the triplestore
# blockchain_addrs = json.load(settings.data_folder / "blockchain_addresses.json")
# if id_payload["email"] not in blockchain_addrs:
# blockchain_addrs[id_payload["email"]] = "0x1234567890"
# json.dump(blockchain_addrs, settings.data_folder / "blockchain_addresses.json")



# Reuse expiration time from decentriq Auth0 access token
exp_timestamp = access_payload["exp"]
jwt_token = create_access_token(
Expand Down
40 changes: 29 additions & 11 deletions backend/src/decentriq.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ async def create_compute_dcr(
.with_description("A data clean room to run computations on cohorts for the iCARE4CVD project")
)

participants = {}
participants[user["email"]] = {"data_owner_of": set(), "analyst_of": set()}
preview_nodes = []
# Convert cohort variables to decentriq schema
for cohort_id, cohort in selected_cohorts.items():
Expand All @@ -168,8 +170,7 @@ async def create_compute_dcr(
data_nodes.append(data_node_id)

# TODO: made airlock always True for testing
# if cohort.airlock:
if True:
if cohort.airlock:
# Add airlock node to make it easy to access small part of the dataset
preview_node_id = f"preview-{data_node_id}"
builder.add_node_definition(PreviewComputeNodeDefinition(
Expand All @@ -181,21 +182,24 @@ async def create_compute_dcr(

# Add data owners to provision the data
for owner in cohort.cohort_email:
builder.add_participant(owner, data_owner_of=[data_node_id])
if owner not in participants:
participants[owner] = {"data_owner_of": set(), "analyst_of": set()}
participants[owner]["data_owner_of"].add(owner)

# Add pandas preparation script
pandas_script = "import pandas as pd\nimport decentriq_util\n\n"
df_var = f"df_{cohort_id.replace('-', '_')}"
requested_vars = cohorts_request["cohorts"][cohort_id]

# Direct cohort variables list
if isinstance(requested_vars, list):
# Direct cohort variables list
pandas_script += f'{df_var} = decentriq_util.read_tabular_data("/input/{cohort_id}")\n'

if len(requested_vars) <= len(cohort.variables):
# Add filter variables to pandas script
pandas_script += f"{df_var} = {df_var}[{requested_vars}]\n"

# TODO: Merge operation, need to be implemented on the frontend
elif isinstance(requested_vars, dict):
# Merge operation, need to be implemented on the frontend
pandas_script += pandas_script_merge_cohorts(requested_vars, all_cohorts)
# TODO: add merged cohorts schema to selected_cohorts
else:
Expand All @@ -206,12 +210,26 @@ async def create_compute_dcr(
builder.add_node_definition(
PythonComputeNodeDefinition(name=f"prepare-{cohort_id}", script=pandas_script, dependencies=[data_node_id])
)
builder.add_participant(user["email"], analyst_of=[f"prepare-{cohort_id}"])
# builder.add_participant(user["email"], analyst_of=[f"prepare-{cohort_id}"])

# Add users permissions
if airlock:
builder.add_participant(user["email"], analyst_of=preview_nodes)
builder.add_participant(settings.decentriq_email, data_owner_of=data_nodes)
# Add the requester as analyst of prepare script
participants[user["email"]]["analyst_of"].add(f"prepare-{cohort_id}")

# Add users permissions for previews
for prev_node in preview_nodes:
participants[user["email"]]["analyst_of"].add(prev_node)

# TODO: we add the DQ admin as data owner just for testing for now
# Will need to be removed when the workflow has been tested
for data_node in data_nodes:
participants[user["email"]]["data_owner_of"].add(data_node)

for p_email, p_perm in participants.items():
builder.add_participant(
p_email,
data_owner_of=list(p_perm["data_owner_of"]),
analyst_of=list(p_perm["analyst_of"]),
)

# Build and publish DCR
dcr_definition = builder.build()
Expand Down
114 changes: 2 additions & 112 deletions backend/src/explore.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from typing import Any

import requests
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import FileResponse

from src.auth import get_current_user
from src.config import settings
from src.models import Cohort
from src.utils import converter, retrieve_cohorts_metadata, run_query
from src.utils import retrieve_cohorts_metadata

router = APIRouter()

Expand All @@ -33,113 +33,3 @@ async def download_cohort_spreasheet(cohort_id: str, user: Any = Depends(get_cur

# If no file is found, return an error response
raise HTTPException(status_code=404, detail=f"No data dictionary found for cohort ID '{cohort_id}'")


@router.get("/search-concepts")
async def search_concepts(
query: str, domain: list[str] | None = Query(default=None), user: Any = Depends(get_current_user)
):
"""Search for concepts in the Athena API and check how many time those concepts are use in our KG."""
if not domain:
domain = []
vocabs = ["LOINC", "ATC", "SNOMED"] # "RxNorm"
try:
response = requests.get(
"https://athena.ohdsi.org/api/v1/concepts",
params={
"query": query,
"domain": domain,
"vocabulary": vocabs,
"standardConcept": ["Standard", "Classification"],
"pageSize": 15,
"page": 1,
},
headers={
# We need to fake the user agent to avoid a 403 error. What a bunch of douchebags, and incompetent with this! Try again losers!
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
},
timeout=60,
)
response.raise_for_status()
search_res = response.json().get("content", [])
except Exception:
raise HTTPException(status_code=response.status_code, detail="Error fetching data from OHDSI Athena API")

# print(search_res)
found_concepts = []
for res in search_res:
# Convert snomed CURIE to snomedct
concept_id = f"{'snomedct' if res.get('vocabulary').lower() == 'snomed' else res.get('vocabulary').lower()}:{res.get('id')}"
found_concepts.append(
{
"id": concept_id,
"uri": converter.expand(concept_id),
"label": res.get("name"),
"domain": res.get("domain"),
"vocabulary": res.get("vocabulary"),
"used_by": [],
}
)

found_concepts_filter = " ".join([f"<{concept['uri']}>" for concept in found_concepts])
sparql_query = f"""
PREFIX icare: <https://w3id.org/icare4cvd/>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX dc: <http://purl.org/dc/elements/1.1/>
PREFIX dcterms: <http://purl.org/dc/terms/>
SELECT DISTINCT ?cohortId ?varName ?varLabel ?omopDomain ?mappedId
WHERE {{
VALUES ?mappedId {{ {found_concepts_filter} }}
GRAPH ?cohortMetadataGraph {{
?cohort a icare:Cohort ;
dc:identifier ?cohortId .
}}
GRAPH ?cohortVarGraph {{
?cohort icare:hasVariable ?variable .
?variable a icare:Variable ;
dc:identifier ?varName ;
rdfs:label ?varLabel ;
icare:varType ?varType ;
icare:index ?index .
OPTIONAL {{ ?variable icare:omop ?omopDomain }}
}}
{{
GRAPH ?cohortMappingsGraph {{
?variable icare:mappedId ?mappedId .
}}
}} UNION {{
GRAPH ?cohortVarGraph {{
?variable icare:categories ?category.
}}
GRAPH ?cohortMappingsGraph {{
?category icare:mappedId ?mappedId .
}}
}}
OPTIONAL {{ ?mappedId rdfs:label ?mappedLabel }}
}}
ORDER BY ?cohort ?index
"""
# TODO also get mappings from categories?
# print(sparql_query)
for row in run_query(sparql_query)["results"]["bindings"]:
# print(row)
# Find the concept in the list and add the cohort and variable to the used_by list
for concept in found_concepts:
if concept["uri"] == row["mappedId"]["value"]:
used_by_entry = {
"cohort_id": row["cohortId"]["value"],
"var_name": row["varName"]["value"],
"var_label": row["varLabel"]["value"],
"omop_domain": row["omopDomain"]["value"] if "omopDomain" in row else None,
}
# NOTE: Normally the SPARQL query should note return duplicates, but in case it does:
# existing_entries = [entry for entry in concept["used_by"] if entry["cohort_id"] == used_by_entry["cohort_id"] and entry["var_name"] == used_by_entry["var_name"]]
# if not existing_entries:
concept["used_by"].append(used_by_entry)
break
found_concepts.sort(key=lambda x: len(x["used_by"]), reverse=True)
return found_concepts
2 changes: 2 additions & 0 deletions backend/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from src.config import settings
from src.decentriq import router as decentriq_router
from src.explore import router as explore_router
from src.mapping import router as mapping_router
from src.upload import init_triplestore
from src.upload import router as upload_router

Expand All @@ -17,6 +18,7 @@
)

app.include_router(explore_router, tags=["explore"])
app.include_router(mapping_router, tags=["mapping"])
app.include_router(upload_router, tags=["upload"])
app.include_router(decentriq_router, tags=["upload"])
app.include_router(auth_router, tags=["authentication"])
Expand Down
Loading

0 comments on commit 5684a84

Please sign in to comment.