Skip to content

Commit

Permalink
Update decentriq SDK to 0.26
Browse files Browse the repository at this point in the history
  • Loading branch information
vemonet committed Apr 2, 2024
1 parent fdfd9c3 commit 3fa1a92
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 75 deletions.
2 changes: 1 addition & 1 deletion backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ dependencies = [
"oxrdflib",
"SPARQLWrapper",
"python-dotenv",
"decentriq_platform ==0.24.4", # TODO: conflict with pydantic 2
"decentriq_platform >=0.26.0", # TODO: conflict with pydantic 2
"curies",
# "pydantic >=2.0.0",
# "pydantic-settings",
Expand Down
6 changes: 3 additions & 3 deletions backend/src/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,14 @@ async def auth_callback(code: str) -> RedirectResponse:

# Check in payload if logged in user has the required permissions
if (
"https://explorer.icare4cvd.eu"
in access_payload["aud"]
"https://explorer.icare4cvd.eu" in access_payload["aud"]
and "read:icare4cvd-dataset-descriptions" in access_payload["permissions"]
):
# Reuse expiration time from decentriq Auth0 access token
exp_timestamp = access_payload["exp"]
jwt_token = create_access_token(
data={"email": id_payload["email"], "access_token": token["access_token"]}, expires_timestamp=exp_timestamp
data={"email": id_payload["email"], "access_token": token["access_token"]},
expires_timestamp=exp_timestamp,
)

# NOTE: Redirect to react frontend
Expand Down
125 changes: 62 additions & 63 deletions backend/src/decentriq.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@
from typing import Any

import decentriq_platform as dq
import decentriq_platform.sql as dqsql

# import decentriq_platform.sql as dqsql
from decentriq_platform.analytics import (
AnalyticsDcrBuilder,
Column,
PrimitiveType,
PythonComputeNodeDefinition,
RawDataNodeDefinition,
)
from fastapi import APIRouter, Depends, HTTPException

from src.auth import get_current_user
Expand All @@ -13,61 +21,56 @@
router = APIRouter()


def get_cohort_schema(cohort_dict: Cohort) -> list[tuple[str, dqsql.PrimitiveType, bool]]:
def get_cohort_schema(cohort_dict: Cohort) -> list[Column]:
"""Convert cohort variables to Decentriq schema"""
schema = []
for variable_id, variable_info in cohort_dict.variables.items():
prim_type = dqsql.PrimitiveType.STRING
prim_type = PrimitiveType.STRING
if variable_info.var_type == "FLOAT":
prim_type = dqsql.PrimitiveType.FLOAT64
prim_type = PrimitiveType.FLOAT
if variable_info.var_type == "INT":
prim_type = dqsql.PrimitiveType.INT64
prim_type = PrimitiveType.INTEGER
nullable = bool(variable_info.na != 0)
schema.append((variable_id, prim_type, nullable))

schema.append(Column(name=variable_id, format_type=prim_type, is_nullable=nullable))
# schema.append((variable_id, prim_type, nullable))
return schema


# https://docs.decentriq.com/sdk/python-getting-started
def create_provision_dcr(user: Any, cohort: Cohort) -> dict[str, Any]:
"""Initialize a Data Clean Room in Decentriq when a new cohort is uploaded"""
users = [user["email"]]

# Establish connection to Decentriq
client = dq.create_client(settings.decentriq_email, settings.decentriq_token)
enclave_specs = dq.enclave_specifications.versions(
[
"decentriq.driver:v20",
"decentriq.sql-worker:v12",
]
)
auth, _ = client.create_auth_using_decentriq_pki(enclave_specs)
session = client.create_session(auth, enclave_specs)

# Creation of a Data Clean Room (DCR)
builder = dq.DataRoomBuilder(f"iCare4CVD DCR provision {cohort.cohort_id}", enclave_specs=enclave_specs)
dcr_title = f"iCARE4CVD DCR provision {cohort.cohort_id}"
builder = (
AnalyticsDcrBuilder(client=client)
.with_name(dcr_title)
.with_owner(user["email"])
.with_description(f"A data clean room to provision the data for the {cohort.cohort_id} cohort")
)

# Create data node for cohort
data_node_builder = dqsql.TabularDataNodeBuilder(cohort.cohort_id, schema=get_cohort_schema(cohort))
data_node_builder.add_to_builder(builder, authentication=client.decentriq_pki_authentication, users=users)

builder.add_user_permission(
email=user["email"],
authentication_method=client.decentriq_pki_authentication,
permissions=[dq.Permissions.update_data_room_status()], # To delete the DCR
data_node_id = cohort.cohort_id.replace(" ", "-")
builder.add_node_definition(RawDataNodeDefinition(name=data_node_id, is_required=True))
# TODO: providing schema is broken in new SDK
# builder.add_node_definition(TableDataNodeDefinition(name=data_node_id, columns=get_cohort_schema(cohort), is_required=True))

builder.add_participant(
user["email"],
data_owner_of=[data_node_id],
)

# Build and publish DCR
data_room = builder.build()
data_room_id = session.publish_data_room(data_room)

dcr_desc = client.get_data_room_description(data_room_id, enclave_specs)
dcr_url = f"https://platform.decentriq.com/datarooms/p/{data_room_id}"
dcr_definition = builder.build()
dcr = client.publish_analytics_dcr(dcr_definition)
dcr_url = f"https://platform.decentriq.com/datarooms/p/{dcr.id}"
return {
"message": f"Data Clean Room for {cohort.cohort_id} provisioned at {dcr_url}",
"identifier": cohort.cohort_id,
"dcr_url": dcr_url,
"dcr_title": dcr_desc["title"],
"dcr": dcr_desc,
"dcr_title": dcr_title,
**cohort.dict(),
}

Expand Down Expand Up @@ -148,50 +151,46 @@ async def create_compute_dcr(

# Establish connection to Decentriq
client = dq.create_client(settings.decentriq_email, settings.decentriq_token)
enclave_specs = dq.enclave_specifications.versions(
[
"decentriq.driver:v20",
"decentriq.sql-worker:v12",
# "decentriq.python-ml-worker-32-64:v21",
# "decentriq.r-latex-worker-32-32:v16",
]
)
auth, _ = client.create_auth_using_decentriq_pki(enclave_specs)
session = client.create_session(auth, enclave_specs)

# Creation of a Data Clean Room (DCR)
data_nodes = []
dcr_count = len(client.get_data_room_descriptions())
builder = dq.DataRoomBuilder(f"iCare4CVD DCR compute {dcr_count}", enclave_specs=enclave_specs)
dcr_title = f"iCARE4CVD DCR compute {dcr_count}"
builder = (
AnalyticsDcrBuilder(client=client)
.with_name(dcr_title)
.with_owner(user["email"])
.with_description("A data clean room to run computations on cohorts for the iCARE4CVD project")
)

# builder = dq.DataRoomBuilder(f"iCare4CVD DCR compute {dcr_count}", enclave_specs=enclave_specs)

# Convert cohort variables to decentriq schema
for cohort_id, cohort in selected_cohorts.items():
# Create data node for cohort
data_node_builder = dqsql.TabularDataNodeBuilder(cohort_id, schema=get_cohort_schema(cohort))
data_node_builder.add_to_builder(builder, authentication=client.decentriq_pki_authentication, users=users)

# Add empty list of permissions
builder.add_user_permission(
email=user["email"],
authentication_method=client.decentriq_pki_authentication,
permissions=[
dq.Permissions.update_data_room_status(), # To delete the DCR
# dq.Permissions.leaf_crud(data_node_id),
# dq.Permissions.execute_compute(uppercase_text_node_id),
# dq.Permissions.retrieve_compute_result(uppercase_text_node_id),
],
data_node_id = cohort_id.replace(" ", "-")
builder.add_node_definition(RawDataNodeDefinition(name=data_node_id, is_required=True))
# TODO: providing schema is broken in new SDK
# builder.add_node_definition(TableDataNodeDefinition(name=data_node_id, columns=get_cohort_schema(cohort), is_required=True))
data_nodes.append(data_node_id)

# Add python data preparation script
builder.add_node_definition(
PythonComputeNodeDefinition(name="prepare-data", script=pandas_script, dependencies=data_nodes)
)

# Build and publish DCR
data_room = builder.build()
data_room_id = session.publish_data_room(data_room)
# Add users permissions
builder.add_participant(user["email"], data_owner_of=[data_node_id], analyst_of=["prepare-data"])

dcr_desc = client.get_data_room_description(data_room_id, enclave_specs)
dcr_url = f"https://platform.decentriq.com/datarooms/p/{data_room_id}"
# Build and publish DCR
dcr_definition = builder.build()
dcr = client.publish_analytics_dcr(dcr_definition)
dcr_url = f"https://platform.decentriq.com/datarooms/p/{dcr.id}"
return {
"message": f"Data Clean Room available for compute at {dcr_url}",
"dcr_url": dcr_url,
"dcr_title": dcr_desc["title"],
"dcr": dcr_desc,
"dcr_title": dcr_title,
# "dcr": dcr_desc,
"merge_script": pandas_script,
**cohorts_request,
}
40 changes: 32 additions & 8 deletions backend/src/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
import os
import shutil
from datetime import datetime
from typing import Any
from re import sub
from typing import Any

import pandas as pd
import requests
from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile
from rdflib import DCTERMS, XSD, Dataset, Graph, Literal, URIRef
from rdflib import XSD, Dataset, Graph, Literal, URIRef
from rdflib.namespace import DC, RDF, RDFS
from SPARQLWrapper import SPARQLWrapper

Expand Down Expand Up @@ -146,10 +146,29 @@ def parse_categorical_string(s: str) -> list[dict[str, str]]:
return result


COLUMNS_LIST = ["VARIABLE NAME", "VARIABLE LABEL", "VAR TYPE", "UNITS", "CATEGORICAL", "COUNT", "NA", "MIN", "MAX", "Definition", "Formula", "ICD-10", "ATC-DDD", "LOINC", "SNOMED-CT", "OMOP", "Visits"]
COLUMNS_LIST = [
"VARIABLE NAME",
"VARIABLE LABEL",
"VAR TYPE",
"UNITS",
"CATEGORICAL",
"COUNT",
"NA",
"MIN",
"MAX",
"Definition",
"Formula",
"ICD-10",
"ATC-DDD",
"LOINC",
"SNOMED-CT",
"OMOP",
"Visits",
]
ACCEPTED_DATATYPES = ["STR", "FLOAT", "INT", "DATETIME"]
ID_COLUMNS_NAMESPACES = {"ICD-10": "icd10", "SNOMED-CT": "snomedct", "ATC-DDD": "atc", "LOINC": "loinc"}


def create_uri_from_id(row):
"""Build concepts URIs from the ID provided in the various columns of the data dictionary"""
uris_list = []
Expand All @@ -158,16 +177,18 @@ def create_uri_from_id(row):
if row[column]:
if "," in str(row[column]): # Handle list of IDs separated by comma
ids = str(row[column]).split(",")
uris_list.extend([converter.expand(f"{ID_COLUMNS_NAMESPACES[column]}:{identif.strip()}") for identif in ids])
uris_list.extend(
[converter.expand(f"{ID_COLUMNS_NAMESPACES[column]}:{identif.strip()}") for identif in ids]
)
else:
uris_list.append(converter.expand(f"{ID_COLUMNS_NAMESPACES[column]}:{str(row[column]).strip()}"))
return ", ".join(uris_list)



def to_camelcase(s: str) -> str:
s = sub(r"(_|-)+", " ", s).title().replace(" ", "")
return ''.join([s[0].lower(), s[1:]])
s = sub(r"(_|-)+", " ", s).title().replace(" ", "")
return "".join([s[0].lower(), s[1:]])


def load_cohort_dict_file(dict_path: str, cohort_id: str, user_email: str) -> Dataset:
"""Parse the cohort dictionary uploaded as excel or CSV spreadsheet, and load it to the triplestore"""
Expand Down Expand Up @@ -405,7 +426,10 @@ def init_triplestore() -> None:
g = init_graph(onto_graph_uri)
ntriple_g = init_graph()
# ntriple_g.parse("https://raw.githubusercontent.com/vemonet/omop-cdm-owl/main/omop_cdm_v6.ttl", format="turtle")
ntriple_g.parse("https://raw.githubusercontent.com/MaastrichtU-IDS/cohort-explorer/main/cohort-explorer-ontology.ttl", format="turtle")
ntriple_g.parse(
"https://raw.githubusercontent.com/MaastrichtU-IDS/cohort-explorer/main/cohort-explorer-ontology.ttl",
format="turtle",
)
# Trick to convert ntriples to nquads with a given graph
for s, p, o in ntriple_g.triples((None, None, None)):
g.add((s, p, o, onto_graph_uri))
Expand Down

0 comments on commit 3fa1a92

Please sign in to comment.