diff --git a/.gitignore b/.gitignore index 394695b..267abb9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /download /brick +/unzip diff --git a/data_definitions.py b/data_definitions.py new file mode 100644 index 0000000..a7bcfaf --- /dev/null +++ b/data_definitions.py @@ -0,0 +1,37 @@ +from functools import reduce +import httpx +from itertools import groupby +import json + + +timeout = httpx.Timeout(10, read=60) +client = httpx.Client(timeout=timeout) + +URL = "https://aact.ctti-clinicaltrials.org/definitions.json" + + +def merger(acc: dict, d: dict): + acc |= d + return acc + + +def create_data_dict(ds): + out = {} + for table_name, grouper in ds: + out[table_name] = reduce(merger, grouper, {}) + return out + + +def save_json(resp): + as_dict = groupby( + ({v["column"]: v["data type"], "table": v["table"]} for v in resp.json()), + lambda obj: obj["table"], + ) + data_dict = create_data_dict(as_dict) + with open("data_dictionary.json", "w") as f: + json.dump(data_dict, f, indent=4) + + +if __name__ == "__main__": + resp = client.get(URL) + save_json(resp) diff --git a/data_dictionary.json b/data_dictionary.json new file mode 100644 index 0000000..1356c1f --- /dev/null +++ b/data_dictionary.json @@ -0,0 +1,277 @@ +{ + "studies": { + "biospec_retention": "string", + "table": "studies", + "enrollment_type": "string", + "is_fda_regulated_drug": "boolean", + "is_us_export": "boolean", + "last_known_status": "string" + }, + "analyzed_studies": { + "oversight": "string", + "table": "analyzed_studies", + "nct_id": "string", + "url": "string", + "brief_title": "string", + "start_month": "string", + "start_year": "string", + "overall_statusc": "string", + "p_completion_month": "string", + "p_completion_year": "string", + "completion_month": "string", + "completion_year": "string", + "verification_month": "string", + "verification_year": "string", + "p_comp_mn": "string", + "p_comp_yr": "string", + "received_year": "string", + "mntopcom": "string", + "enrollment": "string", + "number_of_arms": "string", + "allocation": "string", + "masking": "string", + "phasec": "string", + "primary_purpose": null, + "sponsor_name": null, + "agency_classc": "string", + "collaborator_names": "string", + "funding": "string", + "responsible_party_type": "string", + "responsible_party_organization": "string", + "behavioral": "string", + "results": "string", + "resultsreceived_month": "string", + "resultsreceived_year": "string", + "firstreceived_results_dt": "string", + "t2result": "string", + "t2result_imp": "string", + "t2resmod": "string", + "results12": "string", + "delayed": "string", + "dr_received_dt": "string", + "mn2delay": "string", + "delayed12": "string", + "id": "integer", + "intervg1": "string", + "biological": "string", + "device": "string", + "dietsup": "string", + "drug": "string", + "genetic": "string", + "procedure": "string", + "radiation": "string", + "otherint": "string" + }, + "outcomes": { + "units_analyze": "string", + "table": "outcomes", + "dispersion_type": "string", + "param_type": "string" + }, + "browse_conditions": { + "mesh_term": "string", + "table": "browse_conditions" + }, + "pending_results": { + "event_date": "date", + "table": "pending_results" + }, + "ipd_information_types": { + "name": "string", + "table": "ipd_information_types" + }, + "design_group_interventions": { + "id": "integer", + "table": "design_group_interventions" + }, + "brief_summaries": { + "description": "text", + "table": "brief_summaries" + }, + "facility_contacts": { + "phone": "string", + "table": "facility_contacts" + }, + "interventions": { + "description": "text", + "table": "interventions" + }, + "browse_interventions": { + "id": "integer", + "table": "browse_interventions", + "mesh_term": "string" + }, + "overall_officials": { + "name": "string", + "table": "overall_officials", + "affiliation": "string" + }, + "reported_events": { + "assessment": "string", + "table": "reported_events" + }, + "outcome_analyses": { + "ci_n_sides": "string", + "table": "outcome_analyses" + }, + "calculated_values": { + "registered_in_calendar_year": "integer", + "table": "calculated_values" + }, + "central_contacts": { + "contact_type": "string", + "table": "central_contacts" + }, + "conditions": { + "name": "string", + "table": "conditions" + }, + "design_groups": { + "group_type": "string", + "table": "design_groups" + }, + "countries": { + "removed": "boolean", + "table": "countries", + "name": "string" + }, + "design_outcomes": { + "outcome_type": "string", + "table": "design_outcomes" + }, + "designs": { + "intervention_model": "string", + "table": "designs" + }, + "detailed_descriptions": { + "description": "text", + "table": "detailed_descriptions" + }, + "documents": { + "comment": "string", + "table": "documents", + "url": "string" + }, + "provided_documents": { + "has_icf": "boolean", + "table": "provided_documents" + }, + "eligibilities": { + "gender": "string", + "table": "eligibilities", + "gender_based": "boolean", + "healthy_volunteers": "string" + }, + "facilities": { + "status": "string", + "table": "facilities" + }, + "responsible_parties": { + "responsible_party_type": "string", + "table": "responsible_parties" + }, + "id_information": { + "id_type": "string", + "table": "id_information" + }, + "intervention_other_names": { + "name": "string", + "table": "intervention_other_names" + }, + "facility_investigators": { + "role": "string", + "table": "facility_investigators" + }, + "keywords": { + "name": "string", + "table": "keywords" + }, + "links": { + "url": "string", + "table": "links", + "description": "text" + }, + "sponsors": { + "agency_class": "string", + "table": "sponsors" + }, + "study_references": { + "reference_type": "string", + "table": "study_references" + }, + "search_results": { + "id": "integer", + "table": "search_results" + }, + "result_agreements": { + "pi_employee": "string", + "table": "result_agreements" + }, + "study_searches": { + "id": "integer", + "table": "study_searches" + }, + "result_contacts": { + "name": "string", + "table": "result_contacts", + "phone": "string", + "email": "string" + }, + "result_groups": { + "description": "text", + "table": "result_groups" + }, + "baseline_measurements": { + "category": "string", + "table": "baseline_measurements" + }, + "outcome_measurements": { + "param_type": "string", + "table": "outcome_measurements" + }, + "baseline_counts": { + "scope": "string", + "table": "baseline_counts", + "units": "string" + }, + "outcome_analysis_groups": { + "ctgov_group_code": "string", + "table": "outcome_analysis_groups" + }, + "outcome_counts": { + "scope": "string", + "table": "outcome_counts" + }, + "milestones": { + "count": "integer", + "table": "milestones", + "description": "text", + "title": "string", + "period": "string", + "ctgov_group_code": "string" + }, + "participant_flows": { + "recruitment_details": "string", + "table": "participant_flows", + "pre_assignment_details": "string" + }, + "drop_withdrawals": { + "period": "string", + "table": "drop_withdrawals" + }, + "tagged_terms": { + "id": "integer", + "table": "tagged_terms" + }, + "cdek_organizations": { + "id": "integer", + "table": "cdek_organizations" + }, + "cdek_synonyms": { + "name": "string", + "table": "cdek_synonyms", + "preferred_name": "string", + "downcase_name": "string", + "downcase_preferred_name": "string" + } +} \ No newline at end of file diff --git a/dvc.lock b/dvc.lock index f5a6be4..ab5fea2 100644 --- a/dvc.lock +++ b/dvc.lock @@ -1,36 +1,54 @@ schema: '2.0' stages: download: - cmd: Rscript stages/download.R + cmd: python3 stages/00_download.py deps: - path: https://aact.ctti-clinicaltrials.org/pipe_files hash: md5 - checksum: '270187926789710790283625292984097059272' - - path: stages/download.R + checksum: '286645490874361551640663643212643142456' + - path: stages/00_download.py hash: md5 - md5: 25796c42a85949526905b1c02c917ab7 - size: 586 + md5: 8d117a0f83077989efddea036576c11a + size: 1085 outs: - path: download/ hash: md5 - md5: 3be97a6b93b71c58a0f0a4195162ee9b.dir - size: 1805514254 + md5: 1aa7eedcafa09cf85dea3f26171d3099.dir + size: 1814330916 nfiles: 1 build: - cmd: Rscript stages/build.R + cmd: python3 stages/03_build.py + deps: + - path: stages/03_build.py + hash: md5 + md5: ca6f58e4fe1b01bb5a62a62c5a60e500 + size: 1420 + - path: unzip/ + hash: md5 + md5: c60563edc132bc7ca753bee8864f8285.dir + size: 12008296453 + nfiles: 47 + outs: + - path: brick/ + hash: md5 + md5: 81e5a88e9d89c4d4525551ce7f596d72.dir + size: 2732820598 + nfiles: 47 + unzip: + cmd: python3 stages/02_unzip.py deps: - path: download/ hash: md5 - md5: 3be97a6b93b71c58a0f0a4195162ee9b.dir - size: 1805514254 + md5: 1aa7eedcafa09cf85dea3f26171d3099.dir + size: 1814330916 nfiles: 1 - - path: stages/build.R + - path: stages/02_unzip.py hash: md5 - md5: 838d95032e611a7168262b12616d9d0f - size: 517 + md5: 3510bb5e24a50ec96768e9f375b69e31 + size: 552 outs: - - path: brick/ + - path: unzip/ hash: md5 - md5: 162aa0bcf28a151c2d6aef3bea897f47.dir - size: 3150743193 + md5: c60563edc132bc7ca753bee8864f8285.dir + size: 12008296453 nfiles: 47 diff --git a/dvc.yaml b/dvc.yaml index 8bef8d6..89679c0 100644 --- a/dvc.yaml +++ b/dvc.yaml @@ -1,15 +1,22 @@ stages: download: - cmd: Rscript stages/download.R + cmd: python3 stages/00_download.py deps: - - stages/download.R + - stages/00_download.py - "https://aact.ctti-clinicaltrials.org/pipe_files" outs: - download/ - build: - cmd: Rscript stages/build.R + unzip: + cmd: python3 stages/02_unzip.py deps: - - stages/build.R - download/ + - stages/02_unzip.py + outs: + - unzip/ + build: + cmd: python3 stages/03_build.py + deps: + - stages/03_build.py + - unzip/ outs: - brick/ diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..bba346b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +beautifulsoup4 +httpx +pandas +selenium \ No newline at end of file diff --git a/stages/00_download.py b/stages/00_download.py new file mode 100644 index 0000000..0d9276c --- /dev/null +++ b/stages/00_download.py @@ -0,0 +1,40 @@ +import requests +from bs4 import BeautifulSoup +import os +import re + +# Set timeout for the requests +requests.options.timeout = 1800 # download timeout + +# read html from page and grab file to download +response = requests.get("https://aact.ctti-clinicaltrials.org/pipe_files") +page = response.content + +# Parse HTML and find all links +soup = BeautifulSoup(page, 'html.parser') +links = soup.find_all('a') + +# Detect the recent zip file link +recent = None +for link in links: + if re.search(r'\.zip$', link.text.strip(), re.IGNORECASE): + recent = link + break + +if recent: + # Get file name and create download directory + name = recent.get_text().strip() + if not os.path.exists('download'): + os.makedirs('download') + download_path = os.path.join('download', name) + + # Get the full URL for download + url = recent['href'] + print(f"Downloading {url} to {download_path}") + + # Download the file + response = requests.get(url) + with open(download_path, 'wb') as file: + file.write(response.content) +else: + print("No zip file found.") \ No newline at end of file diff --git a/stages/02_unzip.py b/stages/02_unzip.py new file mode 100644 index 0000000..1e2904b --- /dev/null +++ b/stages/02_unzip.py @@ -0,0 +1,21 @@ +from zipfile import ZipFile +from pathlib import Path +import glob +import os + +latest_zip = max(glob.glob('download/*.zip'), key=os.path.getctime) + +download_path = Path("download") +unzip_path = Path("unzip") + +def unzip_file(src, dest): + zip_root = ZipFile(src) + zip_root.extractall(dest) + +if __name__ == '__main__': + if download_path.exists() and unzip_path.exists(): + unzip_file(latest_zip, unzip_path) + else: + download_path.mkdir(exist_ok=True) + unzip_path.mkdir(exist_ok=True) + unzip_file(latest_zip, unzip_path) \ No newline at end of file diff --git a/stages/03_build.py b/stages/03_build.py new file mode 100644 index 0000000..352427c --- /dev/null +++ b/stages/03_build.py @@ -0,0 +1,45 @@ +import json +import numpy as np +import os +import pandas as pd +from pathlib import Path +import pyarrow as pa +import pyarrow.parquet as pq + +os.makedirs("brick", exist_ok=True) + +data_dict = None +pandas_mappings = { + "string": pd.StringDtype(storage="pyarrow"), + "integer": 'Int64', + "boolean": np.bool_, + "text": pd.StringDtype(storage="pyarrow"), + "decimal": pd.StringDtype(storage="pyarrow"), # pd.ArrowDtype(pa.decimal128(precision=7, scale=3)), + "date": pd.StringDtype(storage="pyarrow"), + "timestamps": 'datetime64[ns]', + "float 7:6": pd.StringDtype(storage="pyarrow"), +} + +with open("data_dictionary.json", "r") as f: + data_dict: dict = json.load(f) + data_dict = { + k: {key: pandas_mappings[val] for key, val in v.items()} + for k, v in data_dict.items() + } + + +# Process each file in the tmp directory +for f in Path("unzip").iterdir(): + filename = f.stem + col_types = data_dict[filename] + if f.exists(): + # Files are txt files with '|' separation + df = pd.read_csv(f, sep="|", engine="pyarrow", dtype=col_types, on_bad_lines='skip', na_values=['']) + + # Construct the output Parquet file path + new_file_name = Path(f).relative_to("unzip").with_suffix(".parquet") + parquet_path = os.path.join("brick", new_file_name) + + # Write Parquet file + table = pa.Table.from_pandas(df) + pq.write_table(table, parquet_path) diff --git a/stages/build.R b/stages/build.R deleted file mode 100644 index 6635bf2..0000000 --- a/stages/build.R +++ /dev/null @@ -1,22 +0,0 @@ -library(purrr) -library(arrow) -library(fs) - -# creates temp directory and data directory -tmp <- fs::dir_create("tmp") -data <- fs::dir_create("brick") - -# unzips zipfile -fs::dir_ls("download") |> tail(1) |> unzip(exdir = tmp) - -fs::dir_ls(tmp) |> walk(function(f) { - # files are txt files with | separation - df <- read.table(f, sep = "|", fill = TRUE, header = TRUE) - # writes parquet file - arrow::write_parquet(df, fs::path_ext_set(data/fs::path_file(f),"parquet")) -}) - -# delete temp directory -fs::dir_delete(tmp) - - diff --git a/stages/download.R b/stages/download.R deleted file mode 100644 index 61d506b..0000000 --- a/stages/download.R +++ /dev/null @@ -1,18 +0,0 @@ -library(rvest) -library(purrr) -library(fs) -library(stringr) - -options(timeout=1800) # download timeout - -# read html from page and grab file to download -page <- read_html("https://aact.ctti-clinicaltrials.org/pipe_files") -nodes <- page |> html_nodes("a") -recent <- detect(nodes, ~ grepl("*.zip", . |> html_text() )) - -# download file to download directory -name <- recent |> html_text() |> stringr::str_trim() -download <- fs::dir_create("download") |> fs::path(name) -url <- recent |> html_attr("href") -print(paste("Downloading", url, "to", download)) -download.file(url,download)