In [None]:
%pip install -q rdflib gdown==4.6.3

In [None]:
from pathlib import Path
from rdflib import Graph, Literal, Namespace, BNode
from rdflib.namespace import RDF, RDFS, OWL, XSD
import gdown

In [None]:
data_dir = Path('./data/')
kb_path = data_dir / Path('knowledge_base.ttl')


def download_data() -> None:
  # Create folder if not existing
  data_dir.mkdir()

  gdown.download(
    id='1JJcOU_XKrwzPCx-BxKlKJm1uN9nIlgt0',
    output=str(kb_path),
    quiet=True
  )


if not data_dir.exists():
  download_data()

with open(kb_path, 'rb') as file:
  graph = Graph()
  graph.parse(file)  # This takes ~ 9 minutes

ct = Namespace('http://example.org/ontologies/clinical_trials/')
graph.bind('ct', ct)

In [None]:
def get_clinical_trial_ids(triples):
  return [row['clinicalTrial'][-11:] for row in triples]

## Gender + Age stage

Given a gender and age, retrieve the related clinical trials.

In [None]:
def get_gender_age_clinical_trials(graph, gender, age_value, age_type):
  if not type(age_value) is int and gender is not None:
    raise RuntimeError('age_value must be an integer or None')
  if age_value < 0:
    raise RuntimeError('age_value must be positive or equal to 0')
  if (age_type is not None and age_value is None):
    raise RuntimeError('age_value must not be None if age_type is not None')

  query = """
    SELECT DISTINCT ?clinicalTrial
    WHERE {
  """

  # == Gender ==
  if gender == "male":
    query += " ?clinicalTrial a ct:MaleClinicalTrial . "
  elif gender == "female":
    query += " ?clinicalTrial a ct:FemaleClinicalTrial . "
  elif gender == "all":
    query += " ?clinicalTrial a ct:MaleClinicalTrial . "
    query += " ?clinicalTrial a ct:FemaleClinicalTrial . "
  elif gender is not None:
    raise RuntimeError('gender must be either "male", "female", "all" or None')

  # == Age ==
  if age_type == "years":
    query += " ?clinicalTrial a ct:YearClinicalTrial . "
    query += " ?clinicalTrial ct:eligibilityMaximumYears ?maximumAge . "
    query += " ?clinicalTrial ct:eligibilityMinimumYears ?minimumAge . "
    query += f" FILTER(?minimumAge <= {age_value} && ?maximumAge >= {age_value}) . "
  elif age_type == "months":
    query += " ?clinicalTrial a ct:MonthClinicalTrial . "
    query += " ?clinicalTrial ct:eligibilityMaximumMonths ?maximumAge . "
    query += " ?clinicalTrial ct:eligibilityMinimumMonths ?minimumAge . "
    query += f" FILTER(?minimumAge <= {age_value} && ?maximumAge >= {age_value}) . "
  elif age_type == "weeks":
    query += " ?clinicalTrial a ct:WeekClinicalTrial . "
    query += " ?clinicalTrial ct:eligibilityMaximumWeeks ?maximumAge . "
    query += " ?clinicalTrial ct:eligibilityMinimumWeeks ?minimumAge . "
    query += f" FILTER(?minimumAge <= {age_value} && ?maximumAge >= {age_value}) . "
  elif age_type == "days":
    query += " ?clinicalTrial a ct:DayClinicalTrial . "
    query += " ?clinicalTrial ct:eligibilityMaximumDays ?maximumAge . "
    query += " ?clinicalTrial ct:eligibilityMinimumDays ?minimumAge . "
    query += f" FILTER(?minimumAge <= {age_value} && ?maximumAge >= {age_value}) . "
  elif age_type == "hours":
    query += " ?clinicalTrial a ct:HourClinicalTrial . "
    query += " ?clinicalTrial ct:eligibilityMaximumHours ?maximumAge . "
    query += " ?clinicalTrial ct:eligibilityMinimumHours ?minimumAge . "
    query += f" FILTER(?minimumAge <= {age_value} && ?maximumAge >= {age_value}) . "
  elif age_type == "minutes":
    query += " ?clinicalTrial a ct:MinuteClinicalTrial . "
    query += " ?clinicalTrial ct:eligibilityMaximumMinutes ?maximumAge . "
    query += " ?clinicalTrial ct:eligibilityMinimumMinutes ?minimumAge . "
    query += f" FILTER(?minimumAge <= {age_value} && ?maximumAge >= {age_value}) . "
  elif age_type is not None:
    raise RuntimeError('age_type must be either "years", "months", "weeks", "days", "hours", "minutes" or None')

  query += "}"
  return get_clinical_trial_ids(graph.query(query))


## Criteria stage

Given a list of criteria, retrieve the related clinical trials.

In [None]:
def get_inclusion_criteria_clinical_trials(graph, criteria):
  if not type(criteria) is list:
    raise RuntimeError('criteria must be a list')

  query = """
    SELECT DISTINCT ?clinicalTrial
    WHERE {
  """

  # == Criteria ==
  query += " ?clinicalTrial a ct:ClinicalTrial . "
  query += " { "
  for i in range(len(criteria)):
    if i == len(criteria) - 1:
      query += f" ?clinicalTrial ct:eligibilityInclude ct:{criteria[i]} . "
      continue

    query += f" ?clinicalTrial ct:eligibilityInclude ct:{criteria[i]} . "  + " } UNION { "
  query += " } "

  query += "}"
  return get_clinical_trial_ids(graph.query(query))


In [None]:
def get_exclusion_criteria_clinical_trials(graph, criteria):
  if not type(criteria) is list:
    raise RuntimeError('criteria must be a list')

  query = """
    SELECT DISTINCT ?clinicalTrial
    WHERE {
  """

  # == Criteria ==
  query += " ?clinicalTrial a ct:ClinicalTrial . "
  query += " { "
  for i in range(len(criteria)):
    if i == len(criteria) - 1:
      query += f" ?clinicalTrial ct:eligibilityExclude ct:{criteria[i]} . "
      continue

    query += f" ?clinicalTrial ct:eligibilityExclude ct:{criteria[i]} . "  + " } UNION { "
  query += " } "

  query += "}"
  return get_clinical_trial_ids(graph.query(query))

In [None]:
def get_criteria_clinical_trials(graph, criteria):
  if not type(criteria) is list:
    raise RuntimeError('criteria must be a list')

  query = """
    SELECT DISTINCT ?clinicalTrial
    WHERE {
  """

  # == Inclusion criteria ==
  query += " ?clinicalTrial a ct:ClinicalTrial . "
  query += " { "
  for i in range(len(criteria)):
    if i == len(criteria) - 1:
      query += f" ?clinicalTrial ct:eligibilityInclude ct:{criteria[i]} . "
      continue

    query += f" ?clinicalTrial ct:eligibilityInclude ct:{criteria[i]} . "  + " } UNION { "
  query += " } "

  # == Exclusion criteria ==
  query += " FILTER NOT EXISTS { { "
  for i in range(len(criteria)):
    if i == len(criteria) - 1:
      query += f" ?clinicalTrial ct:eligibilityExclude ct:{criteria[i]} . "
      continue

    query += f" ?clinicalTrial ct:eligibilityExclude ct:{criteria[i]} . "  + " } UNION { "
  query += " } } "

  query += "}"
  return get_clinical_trial_ids(graph.query(query))

## Final stage

Given the patient status, retrieve all related clinical trials.


In [None]:
def get_clinical_trials(graph, gender, age_value, age_type, criteria):
  if not type(age_value) is int and gender is not None:
    raise RuntimeError('age_value must be an integer or None')
  if age_value < 0:
    raise RuntimeError('age_value must be positive or equal to 0')
  if (age_type is not None and age_value is None):
    raise RuntimeError('age_value must not be None if age_type is not None')
  if not type(criteria) is list:
    raise RuntimeError('criteria must be a list')

  query = """
    SELECT DISTINCT ?clinicalTrial
    WHERE {
  """

  # == Gender ==
  if gender == "male":
    query += " ?clinicalTrial a ct:MaleClinicalTrial . "
  elif gender == "female":
    query += " ?clinicalTrial a ct:FemaleClinicalTrial . "
  elif gender == "all":
    query += " ?clinicalTrial a ct:MaleClinicalTrial . "
    query += " ?clinicalTrial a ct:FemaleClinicalTrial . "
  elif gender is not None:
    raise RuntimeError('gender must be either "male", "female", "all" or None')

  # == Age ==
  if age_type == "years":
    query += " ?clinicalTrial a ct:YearClinicalTrial . "
    query += " ?clinicalTrial ct:eligibilityMaximumYears ?maximumAge . "
    query += " ?clinicalTrial ct:eligibilityMinimumYears ?minimumAge . "
    query += f" FILTER(?minimumAge <= {age_value} && ?maximumAge >= {age_value}) . "
  elif age_type == "months":
    query += " ?clinicalTrial a ct:MonthClinicalTrial . "
    query += " ?clinicalTrial ct:eligibilityMaximumMonths ?maximumAge . "
    query += " ?clinicalTrial ct:eligibilityMinimumMonths ?minimumAge . "
    query += f" FILTER(?minimumAge <= {age_value} && ?maximumAge >= {age_value}) . "
  elif age_type == "weeks":
    query += " ?clinicalTrial a ct:WeekClinicalTrial . "
    query += " ?clinicalTrial ct:eligibilityMaximumWeeks ?maximumAge . "
    query += " ?clinicalTrial ct:eligibilityMinimumWeeks ?minimumAge . "
    query += f" FILTER(?minimumAge <= {age_value} && ?maximumAge >= {age_value}) . "
  elif age_type == "days":
    query += " ?clinicalTrial a ct:DayClinicalTrial . "
    query += " ?clinicalTrial ct:eligibilityMaximumDays ?maximumAge . "
    query += " ?clinicalTrial ct:eligibilityMinimumDays ?minimumAge . "
    query += f" FILTER(?minimumAge <= {age_value} && ?maximumAge >= {age_value}) . "
  elif age_type == "hours":
    query += " ?clinicalTrial a ct:HourClinicalTrial . "
    query += " ?clinicalTrial ct:eligibilityMaximumHours ?maximumAge . "
    query += " ?clinicalTrial ct:eligibilityMinimumHours ?minimumAge . "
    query += f" FILTER(?minimumAge <= {age_value} && ?maximumAge >= {age_value}) . "
  elif age_type == "minutes":
    query += " ?clinicalTrial a ct:MinuteClinicalTrial . "
    query += " ?clinicalTrial ct:eligibilityMaximumMinutes ?maximumAge . "
    query += " ?clinicalTrial ct:eligibilityMinimumMinutes ?minimumAge . "
    query += f" FILTER(?minimumAge <= {age_value} && ?maximumAge >= {age_value}) . "
  elif age_type is not None:
    raise RuntimeError('age_type must be either "years", "months", "weeks", "days", "hours", "minutes" or None')

  # == Inclusion criteria ==
  query += " { "
  for i in range(len(criteria)):
    if i == len(criteria) - 1:
      query += f" ?clinicalTrial ct:eligibilityInclude ct:{criteria[i]} . "
      continue

    query += f" ?clinicalTrial ct:eligibilityInclude ct:{criteria[i]} . "  + " } UNION { "
  query += " } "

  # == Exclusion criteria ==
  query += " FILTER NOT EXISTS { { "
  for i in range(len(criteria)):
    if i == len(criteria) - 1:
      query += f" ?clinicalTrial ct:eligibilityExclude ct:{criteria[i]} . "
      continue

    query += f" ?clinicalTrial ct:eligibilityExclude ct:{criteria[i]} . "  + " } UNION { "
  query += " } } "

  query += "}"
  return get_clinical_trial_ids(graph.query(query))

## Final test

Test a query for a 28 years old female, which is pregnant and a regular smoker.

In [None]:
get_clinical_trials(graph, "female", 28, "years", ['pregnant', 'smoker'])

['NCT00000828',
 'NCT00001275',
 'NCT00011622',
 'NCT00053651',
 'NCT00064597',
 'NCT00068185',
 'NCT00078143',
 'NCT00110630',
 'NCT00126971',
 'NCT00129506',
 'NCT00140114',
 'NCT00145561',
 'NCT00146783',
 'NCT00148473',
 'NCT00148577',
 'NCT00156000',
 'NCT00157521',
 'NCT00159536',
 'NCT00162812',
 'NCT00173758',
 'NCT00175318',
 'NCT00180219',
 'NCT00182325',
 'NCT00190320',
 'NCT00194142',
 'NCT00194324',
 'NCT00194974',
 'NCT00194987',
 'NCT00197587',
 'NCT00201370',
 'NCT00214331',
 'NCT00227903',
 'NCT00230022',
 'NCT00232713',
 'NCT00243815',
 'NCT00244738',
 'NCT00248209',
 'NCT00249457',
 'NCT00254800',
 'NCT00265421',
 'NCT00266825',
 'NCT00270192',
 'NCT00271219',
 'NCT00279370',
 'NCT00286364',
 'NCT00290173',
 'NCT00291044',
 'NCT00294892',
 'NCT00295659',
 'NCT00298116',
 'NCT00298480',
 'NCT00298519',
 'NCT00306007',
 'NCT00307320',
 'NCT00310349',
 'NCT00310882',
 'NCT00319176',
 'NCT00323401',
 'NCT00329290',
 'NCT00329511',
 'NCT00331695',
 'NCT00341640',
 'NCT003

# Save functionality

A series of functions to store and load results on disk.
This was done to overcome the RAM limitations of Google Colab.

In [None]:
def store_clinical_trials(trials: list, file_name: str) -> None:
  with open('./data/' + file_name, 'w') as fp:
    for trial in trials:
        fp.write("%s\n" % trial)

In [None]:
def load_clinical_trials(file_name: str) -> list:
    trials = []
    with open('./data/' + file_name, 'r') as file:
        for line in file:
            # Avoid linebreaks while loading the trials
            trials.append(line[:-1])

    return trials