# ETL Using Minimal Amount of Data

This notebook shows how one can transform data when the data is defined using a dictionary of ontology terms.

# Installing Dependencies


In [1]:
%pip install duckdb

Note: you may need to restart the kernel to use updated packages.


# ETL: Loading data to beacon compliant JSON format


## Content of data

1. Dataset

   This contains the information related to the actual data collection event.

   You can explore this data here - [dataset.tsv](https://github.com/GSI-Xapiens-CSIRO/GASPI-ETL-notebooks/blob/main/ETL-for-beacon-data-ingestion/ingesting-with-minimal-data/dataset.tsv)

2. Information

   This contains individual information.

   You can explore this data here - [info.tsv](https://github.com/GSI-Xapiens-CSIRO/GASPI-ETL-notebooks/blob/main/ETL-for-beacon-data-ingestion/ingesting-with-minimal-data/info.tsv)

3. Data dictionary

   Data dictionary contains the mapping of the disease name to the ontology code. This ensures Beacon can perform ontology based queries.

   This data is available here - [data_dictionary.tsv](https://github.com/GSI-Xapiens-CSIRO/GASPI-ETL-notebooks/blob/main/ETL-for-beacon-data-ingestion/ingesting-with-minimal-data/data_dictionary.tsv)


# ETL Logic

## Import necessary libraries

Note that we are only using duckdb as the thirdparty library. Read about at [https://duckdb.org](https://duckdb.org).


In [2]:
import duckdb
from functools import lru_cache
from itertools import chain
import json

## Load metadata from CSV files to duckdb for querying

We are loading data into three tables.

1. datasets - this table contains dataset information row. Since we are considering one dataset per submission, there will be one row.
2. metadata - this table contains the actual metadata belonging to the above dataset.
3. dict - data dictionary has mapping of ontology terms and the labels used in data gathering.

Note that we are using a URL to fetch data - this could be your presigned url from S3, a local file you upload to notebook instance or a public URL.


In [3]:
# Load CSV file into DuckDB
url = "https://raw.githubusercontent.com/GSI-Xapiens-CSIRO/GASPI-ETL-notebooks/refs/heads/main/ETL-for-beacon-data-ingestion/ingesting-with-minimal-data"
con = duckdb.connect(database="./minimal-metadata.db", )
con.execute(f"CREATE TABLE IF NOT EXISTS datasets AS SELECT * FROM read_csv('{url}/dataset.tsv', ALL_VARCHAR=TRUE, SEP='\\t')")
con.execute(f"CREATE TABLE IF NOT EXISTS info AS SELECT * FROM read_csv('{url}/info.tsv', ALL_VARCHAR=TRUE, SEP='\\t')")
con.execute(f"CREATE TABLE IF NOT EXISTS dict AS SELECT * FROM read_csv('{url}/data_dictionary.tsv', ALL_VARCHAR=TRUE, SEP='\\t')")
con.execute("SHOW TABLES").df()


Unnamed: 0,name
0,datasets
1,dict
2,info


Below is a helper function for us to fetch ontology code given the english label of conditions recorded in metadata.

This particular helper function fetches a dictionary of form `{ "id": str, "label": str, "ontology": str}` from the `dict` table.

If the data dictionary has a different column setting ammend as required.


In [4]:
@lru_cache(maxsize=1000)
def fetch_id(label):
    if not len(label):
        return {"id":"","label":"","ontology":""}
    # escaping single quote
    label = label.replace("'", "''")
    result = con.execute(f"SELECT * FROM dict WHERE LOWER(label) = LOWER('{label}')").df()
    return result.iloc[0].to_dict()


We are now ready to construct our dataset entry.


In [5]:
# The values for dates can be changed as required
beacon_dataset = {
    "createDateTime": "2022-08-05T17:21:00+01:00",
    "dataUseConditions": {
      "duoDataUse": [
        {
          "id": "DUO:0000042",
          "label": "general research use",
          "version": "17-07-2016"
        }
      ]
    },
    "description": "Example description",
    "externalUrl": "Example URL",
    "info": {},
    "name": "Minimal Metadata",
    "updateDateTime": "2022-08-05T17:21:00+01:00",
    "version": "v1.0"
  }
print(json.dumps(beacon_dataset, indent=2))


{
  "createDateTime": "2022-08-05T17:21:00+01:00",
  "dataUseConditions": {
    "duoDataUse": [
      {
        "id": "DUO:0000042",
        "label": "general research use",
        "version": "17-07-2016"
      }
    ]
  },
  "description": "Example description",
  "externalUrl": "Example URL",
  "info": {},
  "name": "Minimal Metadata",
  "updateDateTime": "2022-08-05T17:21:00+01:00",
  "version": "v1.0"
}


In the following block we extract information related to the `individuals` entity type.

Because our table has more fields that it fits the entity type `individuals`, we use SQL `SELECT <fields>` syntax to extract just the items we need.


In [6]:
individuals_df = con.execute(f"SELECT * FROM info").df()
individuals_df

Unnamed: 0,id_subject,age,sex
0,1,50,MALE
1,2,60,FEMALE
2,3,70,MALE
3,4,45,FEMALE


Now we are ready to create the `JSON` format entries for each of the individuals.


In [7]:
individuals = []

for data in individuals_df.iterrows():
    idx, data = data
    data.fillna("", inplace=True)
    data = data.to_dict()
    individual = {
            "id": data["id_subject"],
            "karyotypicSex": "XX" if data["sex"] == "MALE" else "XY",
            "sex": {
                "id": fetch_id(data["sex"])["id"],
                "label": data["sex"]

            }
        }
    individuals.append(individual)

print(json.dumps(individuals, indent=2))

[
  {
    "id": "1",
    "karyotypicSex": "XX",
    "sex": {
      "id": "SNOMED:248153007",
      "label": "MALE"
    }
  },
  {
    "id": "2",
    "karyotypicSex": "XY",
    "sex": {
      "id": "SNOMED:248152002",
      "label": "FEMALE"
    }
  },
  {
    "id": "3",
    "karyotypicSex": "XX",
    "sex": {
      "id": "SNOMED:248153007",
      "label": "MALE"
    }
  },
  {
    "id": "4",
    "karyotypicSex": "XY",
    "sex": {
      "id": "SNOMED:248152002",
      "label": "FEMALE"
    }
  }
]


Creating biosample entries.


In [8]:
dataset_df = con.execute(f"SELECT * FROM datasets").df()
dataset_df

Unnamed: 0,id_subject,code_repository,code_box,code_position,date_received,date_enumerated,origin_biobank,origin_code_repository,origin_code_box,biosample_type,biosample_specimen,biosample_volume,biosample_status
0,1,CR123456,X1111111,X1111111-05,1/1/2020,1/1/2019,Example Biobank (TestBB),REPO000050,C50,healthy,mouth,100,active
1,2,CR123456,X1111111,X1111111-06,2/1/2020,2/1/2019,Example Biobank (TestBB),REPO000051,C51,healthy,throat,100,active
2,3,CR123456,X1111111,X1111111-07,3/1/2020,3/1/2019,Example Biobank (TestBB),REPO000052,C52,healthy,stomach,200,active
3,4,CR123456,X1111111,X1111111-08,4/1/2020,4/1/2019,Example Biobank (TestBB),REPO000053,C53,healthy,liver,300,active


In [9]:
biosamples = []

for data in dataset_df.iterrows():
    idx, data = data
    data.fillna("", inplace=True)
    data = data.to_dict()
    # formatting
    date_parts = data["date_received"].split("/")
    collection_date = f"{date_parts[2]}-{date_parts[1]}-{date_parts[0]}"
    biosample = {
            "id": data["id_subject"],
            "individualId": data["id_subject"],
            "biosampleStatus": {
                "id": f'CUST:{data["biosample_status"]}',
                "label": data["biosample_status"]
            },
            "collectionDate": collection_date,
            "sampleOriginType": {
                "id": f'CUST:{data["origin_biobank"]}',
                "label": data["origin_biobank"]
            },
            "sampleOriginDetail": {
                "id": f'CUST:{data["origin_biobank"]}',
                "label": data["origin_biobank"]
            },
            "info": {
                "codeRepository": f'CUST:{data["code_repository"]}',
                "codeBox": f'CUST:{data["code_box"]}',
                "codePosition": f'CUST:{data["code_position"]}',
                "dateEnumerated": f'CUST:{data["date_enumerated"]}',
                "originCodeRespository": f'CUST:{data["origin_code_repository"]}',
                "originCodeBox": f'CUST:{data["origin_code_box"]}',
                "biosampleSpeciment": f'CUST:{data["biosample_specimen"]}',
                "biosampleVolume": f'CUST:{data["biosample_volume"]}',
            },
            "notes": ""
        }
    biosamples.append(biosample)

print(json.dumps(biosamples, indent=2))

[
  {
    "id": "1",
    "individualId": "1",
    "biosampleStatus": {
      "id": "CUST:active",
      "label": "active"
    },
    "collectionDate": "2020-1-1",
    "sampleOriginType": {
      "id": "CUST:Example Biobank (TestBB)",
      "label": "Example Biobank (TestBB)"
    },
    "sampleOriginDetail": {
      "id": "CUST:Example Biobank (TestBB)",
      "label": "Example Biobank (TestBB)"
    },
    "info": {
      "codeRepository": "CUST:CR123456",
      "codeBox": "CUST:X1111111",
      "codePosition": "CUST:X1111111-05",
      "dateEnumerated": "CUST:1/1/2019",
      "originCodeRespository": "CUST:REPO000050",
      "originCodeBox": "CUST:C50",
      "biosampleSpeciment": "CUST:mouth",
      "biosampleVolume": "CUST:100"
    },
    "notes": ""
  },
  {
    "id": "2",
    "individualId": "2",
    "biosampleStatus": {
      "id": "CUST:active",
      "label": "active"
    },
    "collectionDate": "2020-1-2",
    "sampleOriginType": {
      "id": "CUST:Example Biobank (TestBB)",


Creating runs entries.


In [10]:
runs = []

for data in dataset_df.iterrows():
    idx, data = data
    data.fillna("", inplace=True)
    data = data.to_dict()
    date_parts = data["date_enumerated"].split("/")
    run_date = f"{date_parts[2]}-{date_parts[1]}-{date_parts[0]}"
    run = {
            "id": data["id_subject"],
            "biosampleId": data["id_subject"],
            "individualId": data["id_subject"],
            "runDate": run_date,
        }
    runs.append(run)

print(json.dumps(runs, indent=2))

[
  {
    "id": "1",
    "biosampleId": "1",
    "individualId": "1",
    "runDate": "2019-1-1"
  },
  {
    "id": "2",
    "biosampleId": "2",
    "individualId": "2",
    "runDate": "2019-1-2"
  },
  {
    "id": "3",
    "biosampleId": "3",
    "individualId": "3",
    "runDate": "2019-1-3"
  },
  {
    "id": "4",
    "biosampleId": "4",
    "individualId": "4",
    "runDate": "2019-1-4"
  }
]


Creating analyses entries.


In [11]:
analyses = []

for data in dataset_df.iterrows():
    idx, data = data
    data.fillna("", inplace=True)
    data = data.to_dict()
    date_parts = data["date_enumerated"].split("/")
    analysis_date = f"{date_parts[2]}-{date_parts[1]}-{date_parts[0]}"
    analysis = {
            "id": data["id_subject"],
            "individualId": data["id_subject"],
            "biosampleId": data["id_subject"],
            "runId": data["id_subject"],
            "analysisDate": analysis_date,
            "pipelineName": "CUSTOM",
            "vcfSampleId": data["id_subject"],
        }
    analyses.append(analysis)

print(json.dumps(analyses, indent=2))

[
  {
    "id": "1",
    "individualId": "1",
    "biosampleId": "1",
    "runId": "1",
    "analysisDate": "2019-1-1",
    "pipelineName": "CUSTOM",
    "vcfSampleId": "1"
  },
  {
    "id": "2",
    "individualId": "2",
    "biosampleId": "2",
    "runId": "2",
    "analysisDate": "2019-1-2",
    "pipelineName": "CUSTOM",
    "vcfSampleId": "2"
  },
  {
    "id": "3",
    "individualId": "3",
    "biosampleId": "3",
    "runId": "3",
    "analysisDate": "2019-1-3",
    "pipelineName": "CUSTOM",
    "vcfSampleId": "3"
  },
  {
    "id": "4",
    "individualId": "4",
    "biosampleId": "4",
    "runId": "4",
    "analysisDate": "2019-1-4",
    "pipelineName": "CUSTOM",
    "vcfSampleId": "4"
  }
]


## Final Submission Entry


In [12]:
submission = {
    "dataset": beacon_dataset,
    "assemblyId": "GRCH38",
    "individuals": individuals,
    "biosamples": biosamples,
    "runs": runs,
    "analyses": analyses
}

print(json.dumps(submission, indent=2))
json.dump(submission, open("submission-minimal-data.json", "w+"), indent=2)


{
  "dataset": {
    "createDateTime": "2022-08-05T17:21:00+01:00",
    "dataUseConditions": {
      "duoDataUse": [
        {
          "id": "DUO:0000042",
          "label": "general research use",
          "version": "17-07-2016"
        }
      ]
    },
    "description": "Example description",
    "externalUrl": "Example URL",
    "info": {},
    "name": "Minimal Metadata",
    "updateDateTime": "2022-08-05T17:21:00+01:00",
    "version": "v1.0"
  },
  "assemblyId": "GRCH38",
  "individuals": [
    {
      "id": "1",
      "karyotypicSex": "XX",
      "sex": {
        "id": "SNOMED:248153007",
        "label": "MALE"
      }
    },
    {
      "id": "2",
      "karyotypicSex": "XY",
      "sex": {
        "id": "SNOMED:248152002",
        "label": "FEMALE"
      }
    },
    {
      "id": "3",
      "karyotypicSex": "XX",
      "sex": {
        "id": "SNOMED:248153007",
        "label": "MALE"
      }
    },
    {
      "id": "4",
      "karyotypicSex": "XY",
      "sex": {
    

## Validating the Submission

Now you can validate the generated schema using the following scripts.

Firstly we install python library `jsonschemas` which will help us validate the created payloads.

We then download latest schemas to be used with `jsonschemas` library.


In [13]:
%pip install jsonschema

Note: you may need to restart the kernel to use updated packages.


In [14]:
%mkdir schemas
!wget https://raw.githubusercontent.com/GSI-Xapiens-CSIRO/sBeacon-BGSi/refs/heads/main/shared_resources/schemas/analysis-schema.json -O schemas/analysis-schema.json    
!wget https://raw.githubusercontent.com/GSI-Xapiens-CSIRO/sBeacon-BGSi/refs/heads/main/shared_resources/schemas/biosample-schema.json -O schemas/biosample-schema.json
!wget https://raw.githubusercontent.com/GSI-Xapiens-CSIRO/sBeacon-BGSi/refs/heads/main/shared_resources/schemas/dataset-schema.json -O schemas/dataset-schema.json
!wget https://raw.githubusercontent.com/GSI-Xapiens-CSIRO/sBeacon-BGSi/refs/heads/main/shared_resources/schemas/individual-schema.json -O schemas/individual-schema.json
!wget https://raw.githubusercontent.com/GSI-Xapiens-CSIRO/sBeacon-BGSi/refs/heads/main/shared_resources/schemas/run-schema.json -O schemas/run-schema.json
!wget https://raw.githubusercontent.com/GSI-Xapiens-CSIRO/sBeacon-BGSi/refs/heads/main/shared_resources/schemas/submit-dataset-schema-new.json -O schemas/submit-dataset-schema-new.json

mkdir: schemas: File exists
--2025-02-03 09:46:51--  https://raw.githubusercontent.com/GSI-Xapiens-CSIRO/sBeacon-BGSi/refs/heads/main/shared_resources/schemas/analysis-schema.json
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 2606:50c0:8002::154, 2606:50c0:8003::154, 2606:50c0:8000::154, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|2606:50c0:8002::154|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3144 (3.1K) [text/plain]
Saving to: ‘schemas/analysis-schema.json’


2025-02-03 09:46:51 (30.0 MB/s) - ‘schemas/analysis-schema.json’ saved [3144/3144]

--2025-02-03 09:46:52--  https://raw.githubusercontent.com/GSI-Xapiens-CSIRO/sBeacon-BGSi/refs/heads/main/shared_resources/schemas/biosample-schema.json
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 2606:50c0:8002::154, 2606:50c0:8003::154, 2606:50c0:8000::154, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|2606:50c0:8002::

In [15]:
from jsonschema import Draft202012Validator, RefResolver
import os

  from jsonschema import Draft202012Validator, RefResolver


In [16]:

def validate_request(parameters):
    # inject dummy values for missing fields - these will be added by the Dataportal during submission
    parameters["datasetId"] = "Dataset Id"
    parameters["projectName"] = "Project Name"
    parameters["vcfLocations"] = ["vcfLocation1", "vcfLocation2"]
    # load validator
    new_schema = "./schemas/submit-dataset-schema-new.json"
    schema_dir = os.path.dirname(os.path.abspath(new_schema))
    new_schema = json.load(open(new_schema))
    resolveNew = RefResolver(base_uri="file://" + schema_dir + "/", referrer=new_schema)
    validator = Draft202012Validator(new_schema, resolver=resolveNew)
    errors = []

    for error in sorted(validator.iter_errors(parameters), key=lambda e: e.path):
        error_message = f"{error.message} "
        for part in list(error.path):
            error_message += f"/{part}"
        errors.append(error_message)
    return errors

validate_request(submission)

[]

If the above output is `[]` or an empty list, you do not have any errors.

Otherwise you'll see the JSON path of errors and can be fixed.
