# Preparing and uploading data to Fair Data Point (FDP) compliant with the Health-RI core shapes using SeMPyRO

**Prerequirements:** To execute this notebook in full one needs to have a running FAIR Data Point (FDP) instance with an active write access account.

I used [sample](https://github.com/Health-RI/health-ri-metadata) data as a reference for the Health-RI core shapes. The shapes are available at the repo: [shapes](https://github.com/Health-RI/health-ri-metadata/tree/develop/Formalisation(shacl)/Core/PiecesShape). And the FDP instance I used is https://fdp-test.health-ri.nl. The sample data is also available in metadata repository

FDP requires each dataset to be a part of a catalogue, therefore we need to create a catalogue. Let's see what we need to provide for that:

In [93]:
from sempyro.hri_dcat import HRICatalog

catalog_fields = HRICatalog.annotate_model()
print(catalog_fields.mandatory_fields())

['description', 'publisher', 'title']


Let's create a minimum catalogue with an example title and description. We also need a URI to use as a graph subject at serialization. A FDP requirement is a link pointing to a parent object, in the case of a catalogue it is FDP itself and it should be a property `is_part_of` in the range `DCTERMS.isPartOf`. More about this in the other example Usage_example_FDP.ipynb. We used the reusable code from this example.  Let's use `example.com` domain for now for this purpose:

In [94]:
from sempyro import LiteralField
from sempyro.foaf import Agent
from sempyro.hri_dcat import HRICatalog
from rdflib import URIRef, DCTERMS
from pydantic import AnyHttpUrl, Field

EX = "https://example.com"
fdp_base=input("Enter base link to FDP: ")

class FDPCatalog(HRICatalog):
    is_part_of: [AnyHttpUrl] = Field(description="Link to parent object",
                                   rdf_term=DCTERMS.isPartOf,
                                   rdf_type="uri"
                                  )

# Create a class instance with the same data
fdp_catalog = FDPCatalog(
    title=[LiteralField(value="Inflammatory Bowel Disease catalogue", language="en")],
    description=[LiteralField(value="This catalogue describes the core metadata of AUMC Inflammatory Bowel Disease datasets", language="en")],
    publisher=[Agent(
                                 name=[
                                     LiteralField(value="Academic Medical Center")],
                                 identifier="https://ror.org/05wg1m734")],
    is_part_of=[fdp_base],
    dataset=[]
                        )

fdp_catalog_record = fdp_catalog.to_graph(URIRef(f"{EX}/test_catalog_1"))
print(fdp_catalog_record.serialize())




@prefix dcat: <http://www.w3.org/ns/dcat#> .
@prefix dcterms: <http://purl.org/dc/terms/> .
@prefix foaf: <http://xmlns.com/foaf/0.1/> .

<https://example.com/test_catalog_1> a dcat:Catalog ;
    dcterms:description "This catalogue describes the core metadata of AUMC Inflammatory Bowel Disease datasets"@en ;
    dcterms:isPartOf <https://fdp-test.healthdata.nl> ;
    dcterms:publisher [ a foaf:Agent ;
            dcterms:identifier "https://ror.org/05wg1m734" ;
            foaf:name "Academic Medical Center" ] ;
    dcterms:title "Inflammatory Bowel Disease catalogue"@en .




  warn(


Now we need to define an FDP API client which can log into an FDP, get a token and perform basic calls.


In [95]:
import logging
import requests
from typing import Dict, Union
from requests import Response
from urllib.parse import urljoin
import urllib3
import sys

logger = logging.getLogger(__name__)

class FDPClient:

    def __init__(self, base_url: str, username: str, password: str):
        self.base_url = base_url
        self.username = username
        self.password = password
        self.token = self.login_fdp()
        self.headers = self.get_headers()
        self.session = requests.session()
        self.session.headers.update(self.headers)
        self.ssl_verification = False
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
        

    def login_fdp(self) -> str:
        token_response = requests.post(
            f"{self.base_url}/tokens",
            json={"email": self.username, "password": self.password},
        )
        token_response.raise_for_status()
        response = token_response.json()
        return response["token"]

    def get_headers(self):
        return {"Authorization": f"Bearer {self.token}", "Content-Type": "text/turtle"}


    def _call_method(self, method, path, params: Dict = None, data=None):
        if method.upper() not in ["GET", "POST", "PUT", "DELETE"]:
            raise ValueError(f"Unsupported method {method}")
        url = urljoin(self.base_url, path)
        response = None
        try:
            response = self.session.request(
                method, url, params=params, data=data, verify=self.ssl_verification
            )
            response.raise_for_status()
            return response
        except requests.exceptions.HTTPError as e:
            logger.error(e)
            if response is not None:
                logger.error(response.text)
            sys.exit(1)
        except (requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.RequestException) as e:
            logger.error(e)
            sys.exit(1)

    def get(self, path: str, params: Dict = None) -> Response:
        return self._call_method("GET", path, params=params)

    def post(self, path: str, params: Dict = None, data=None) -> Response:
        return self._call_method("POST", path, params=params, data=data)

    def update(self, path: str, params: Dict = None, data=None) -> Response:
        return self._call_method("PUT", path, params=params, data=data)

    def delete(self, path: str, params: Dict = None, data=None) -> Response:
        return self._call_method("DELETE", path, params=params, data=data)

Publishing a record in FDP consists of two steps: creating a record and publishing. These two actions are performed as API calls with different content types, so we need to implement methods for changing content type, creating a record and publishing the record. After that, the client looks like this:

In [96]:
from rdflib import Graph


class FDPEndPoints:
    meta = "meta"
    state = f"{meta}/state"
    members = "members"
    expanded = "expanded"


class FDPClient:

    def __init__(self, base_url: str, username: str, password: str):
        self.base_url = base_url
        self.username = username
        self.password = password
        self.token = self.login_fdp()
        self.headers = self.get_headers()
        self.session = requests.session()
        self.session.headers.update(self.headers)
        self.ssl_verification = False
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
        

    def login_fdp(self) -> str:
        token_response = requests.post(
            f"{self.base_url}/tokens",
            json={"email": self.username, "password": self.password},
        )
        token_response.raise_for_status()
        response = token_response.json()
        return response["token"]

    def get_headers(self):
        return {"Authorization": f"Bearer {self.token}", "Content-Type": "text/turtle"}


    def _call_method(self, method, path, params: Dict = None, data=None):
        if method.upper() not in ["GET", "POST", "PUT", "DELETE"]:
            raise ValueError(f"Unsupported method {method}")
        url = urljoin(self.base_url, path)
        response = None
        try:
            response = self.session.request(
                method, url, params=params, data=data, verify=self.ssl_verification
            )
            response.raise_for_status()
            return response
        except requests.exceptions.HTTPError as e:
            logger.error(e)
            if response is not None:
                logger.error(response.text)
            sys.exit(1)
        except (requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.RequestException) as e:
            logger.error(e)
            sys.exit(1)

    def get(self, path: str, params: Dict = None) -> Response:
        return self._call_method("GET", path, params=params)

    def post(self, path: str, params: Dict = None, data=None) -> Response:
        return self._call_method("POST", path, params=params, data=data)

    def update(self, path: str, params: Dict = None, data=None) -> Response:
        return self._call_method("PUT", path, params=params, data=data)

    def delete(self, path: str, params: Dict = None, data=None) -> Response:
        return self._call_method("DELETE", path, params=params, data=data)

    def _update_session_headers(self):
        self.session.headers.update(self.headers)

    def _change_content_type(self, content_type):
        self.headers["Content-Type"] = content_type
        self._update_session_headers()

    def post_serialised(self, resource_type: str, metadata: Graph) -> Union[requests.Response, None]:
        self._change_content_type("text/turtle")
        path = f"{self.base_url}/{resource_type}"
        response = self.post(path=path, data=metadata.serialize())
        return response

    def publish_record(self, record_url):
        self._change_content_type("application/json")
        path = f"{record_url}/{FDPEndPoints.state}"
        data = '{"current": "PUBLISHED"}'
        self.update(path=path, data=data)

    def create_and_publish(self, resource_type: str, metadata: Graph) -> URIRef:
        post_response = self.post_serialised(resource_type=resource_type, metadata=metadata)
        fdp_subject = URIRef(post_response.headers["Location"])
        self.publish_record(fdp_subject)
        return fdp_subject

In [97]:
username="albert.einstein@example.com"

In [98]:
from getpass import getpass
password = "teu@vky8jdz2jtf5PRY"

Now we can create a client instance and publish the catalogue record to FDP:

In [99]:
fdp_client = FDPClient(base_url=fdp_base, username=username, password=password)

catalog_fdp_id = fdp_client.create_and_publish(resource_type="catalog", metadata=fdp_catalog_record)
print(catalog_fdp_id)

https://fdp-test.healthdata.nl/catalog/0c1e5d32-823b-4b3b-b9e0-f994b0c7ef08


Now we need to add datasets to the catalogue. Data for example datasets will be used from [sample](https://github.com/Health-RI/health-ri-metadata) 

In [100]:
from sempyro.hri_dcat import HRIDataset
from sempyro.vcard import VCard
import dateutil.parser as parser

hri_dataset = HRIDataset(
    contact_point=[
        VCard(hasEmail=[URIRef("mailto:data-access-committee@xumc.nl")],
              full_name=["Data Access Committee of the x UMC"], hasUID="https://ror.org/05wg1m734")
    ],
    creator=[Agent(name=["Academic Medical Center"], identifier="https://ror.org/05wg1m734")],
    description=[LiteralField(value=
                              "The primary aim of the PRISMA study was to investigate the potential value of risk-tailored versus "
                              "traditional breast cancer screening protocols in the Netherlands. Data collection took place between "
                              "2014-2019, resulting in ∼67,000 mammograms, ∼38,000 surveys, ∼10,000 blood samples and ∼600 saliva "
                              "samples.")],
    issued=parser.isoparse("2024-07-01T11:11:11"),
    identifier=f"{EX}/dataset/ZLOYOJ",
    modified=parser.isoparse("2024-06-04T13:36:10.246Z"),
    publisher=[
        Agent(name=["Radboud University Medical Center"], identifier="https://ror.org/05wg1m734")],
    theme=[URIRef("http://publications.europa.eu/resource/authority/data-theme/HEAL")],
    title=[LiteralField(value="Questionnaire data of the Personalised RISk-based MAmmascreening Study (PRISMA)")],
    license=URIRef("https://creativecommons.org/licenses/by-sa/4.0/"),
    distribution=[]
)

#fdp_dataset_record = hri_dataset.to_graph(subject=URIRef(hri_dataset.identifier))

Make sure the dataset is correctly serialized link it to the catalogue and publish it:


In [92]:
fdp_dataset_record = hri_dataset.to_graph(subject=URIRef(hri_dataset.identifier))
fdp_dataset_record.add((URIRef(hri_dataset.identifier), DCTERMS.isPartOf, URIRef(catalog_fdp_id)))
dataset_fdp_id = fdp_client.create_and_publish(resource_type="dataset", metadata=fdp_dataset_record)

400 Client Error: Bad Request for url: https://fdp-test.healthdata.nl/dataset
Unable to read RDF (parse exception)


@prefix dcat: <http://www.w3.org/ns/dcat#> .
@prefix dcterms: <http://purl.org/dc/terms/> .
@prefix foaf: <http://xmlns.com/foaf/0.1/> .
@prefix v: <http://www.w3.org/2006/vcard/ns#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .

<https://example.com/dataset/ZLOYOJ> a dcat:Dataset ;
    dcterms:creator [ a foaf:Agent ;
            dcterms:identifier "https://ror.org/05wg1m734" ;
            foaf:name "Academic Medical Center" ] ;
    dcterms:description "The primary aim of the PRISMA study was to investigate the potential value of risk-tailored versus traditional breast cancer screening protocols in the Netherlands. Data collection took place between 2014-2019, resulting in ∼67,000 mammograms, ∼38,000 surveys, ∼10,000 blood samples and ∼600 saliva samples." ;
    dcterms:identifier "https://example.com/dataset/ZLOYOJ" ;
    dcterms:isPartOf <https://fdp-test.healthdata.nl/catalog/1a4855b6-d417-41de-ae7a-4fc291bc2f13> ;
    dcterms:issued "2024-07-01T11:11:11"^^xsd:dateTime ;
   

AttributeError: 'tuple' object has no attribute 'tb_frame'

Now we can check the catalogue and the dataset in FDP. The next step is adding a distribution. Let's create a distribution for the dataset:

In [None]:
from sempyro.hri_dcat import HRIDistribution

hri_distribution = HRIDistribution(
    title=[LiteralField(value="CSV-distribution of the questionnaire data of the Personalised RISk-based MAmmascreening Study (PRISMA)")],
    description=[LiteralField(value="CSV file containing the questionnaire data of the PRISMA study")],
    access_url=[URIRef("https://example.com/dataset/PRISMA/questionnaire.csv")],
    media_type=URIRef("https://www.iana.org/assignments/media-types/text/csv")
)

The identifier of the distribution should be unique in the context of the dataset. Access URL is mandatory. Let's add the distribution to the dataset and publish it:

In [None]:
access_url_str = str(hri_distribution.access_url[0])
distribution_uri = URIRef(f"{EX}/distribution/{access_url_str.split('/')[-1]}")
fdp_distribution_record = hri_distribution.to_graph(subject=distribution_uri)
fdp_distribution_record.add((distribution_uri, DCTERMS.isPartOf, URIRef(dataset_fdp_id)))
distribution_fdp_id = fdp_client.create_and_publish(resource_type="distribution", metadata=fdp_distribution_record)


Now we can check the catalogue, dataset and distribution in FDP.