## Notebook - Extract record-level data 
Author: Dat


### [Required Spark session] Set up

In [1]:
import pyspark 
import dxpy # tools starting with 'dx' are from the DNANexus ecosystem
import dxdata
from pyspark.sql.functions import when, concat_ws
from re import sub

### [Run this only once]

In [2]:
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

### Dispense the dataset
Code to automatically discover the database name and dataset id

In [3]:
dispensed_database_name = dxpy.find_one_data_object(
    classname="database", 
    name="app*", 
    folder="/", 
    name_mode="glob", 
    describe=True)["describe"]["name"]

dispensed_dataset_id = dxpy.find_one_data_object(
    typename="Dataset", 
    name="app*.dataset", 
    folder="/", 
    name_mode="glob")["id"]

### Load data set

In [4]:
dataset = dxdata.load_dataset(id=dispensed_dataset_id)

Check all the available entities

In [6]:
dataset.entities

[<Entity "participant">,
 <Entity "covid19_result_england">,
 <Entity "covid19_result_scotland">,
 <Entity "covid19_result_wales">,
 <Entity "gp_clinical">,
 <Entity "gp_scripts">,
 <Entity "gp_registrations">,
 <Entity "hesin">,
 <Entity "hesin_diag">,
 <Entity "hesin_oper">,
 <Entity "hesin_critical">,
 <Entity "hesin_maternity">,
 <Entity "hesin_delivery">,
 <Entity "hesin_psych">,
 <Entity "death">,
 <Entity "death_cause">,
 <Entity "omop_death">,
 <Entity "omop_device_exposure">,
 <Entity "omop_note">,
 <Entity "omop_observation">,
 <Entity "omop_drug_exposure">,
 <Entity "omop_observation_period">,
 <Entity "omop_person">,
 <Entity "omop_procedure_occurrence">,
 <Entity "omop_specimen">,
 <Entity "omop_visit_detail">,
 <Entity "omop_visit_occurrence">,
 <Entity "omop_dose_era">,
 <Entity "omop_drug_era">,
 <Entity "omop_condition_era">,
 <Entity "omop_condition_occurrence">,
 <Entity "omop_measurement">,
 <Entity "olink_instance_0">,
 <Entity "olink_instance_2">,
 <Entity "olink_

### Choose entities of each record-levels table
We need to choose "hesin" entity for hospital inpatient data 
"hesin_diag" enity

In [7]:
gp_clinical = dataset["gp_clinical"]
gp_script = dataset["gp_scripts"]
hesin = dataset["hesin"]
hesin_diag = dataset["hesin_diag"]
hesin_oper = dataset["hesin_oper"]
death = dataset["death"]
death_cause = dataset["death_cause"]


gp_clinical_data = gp_clinical.retrieve_fields(engine=dxdata.connect())
gp_script_data = gp_script.retrieve_fields(engine=dxdata.connect())
hesin_data = hesin.retrieve_fields(engine=dxdata.connect())
hesin_diag_data = hesin_diag.retrieve_fields(engine=dxdata.connect())
hesin_oper_data = hesin_oper.retrieve_fields(engine=dxdata.connect())
death_data = death.retrieve_fields(engine=dxdata.connect())
death_cause_data = death_cause.retrieve_fields(engine=dxdata.connect())

  self._context = ssl.SSLContext(ssl_version)


### Note from other notebook: (we need to check if other record level file need the same cleaning process)
A small proportion of episodes are missing an episode start date but have different dates associated with the episode. In an optional additional step, we make a new "dateepiimp" column which takes the value of "epistart", but when that is missing imputes them with "disdate" [NOTE - 2022-10-24, UK Biobank's own protocol when producing derived fields is to impute with admidate, epiend and disdate (see https://biobank.ndph.ox.ac.uk/showcase/ukb/docs/first_occurrences_outcomes.pdf, page 8). This notebook will be updated to match.] :

In [8]:
hesin_data = hesin_data.withColumn("dateepiimp",
                                   when(hesin_data["epistart"].isNotNull(), hesin_data["epistart"]).otherwise(hesin_data["disdate"]))

In [11]:
gp_clinical_data.coalesce(1).write.mode("overwrite").option("header", "true").csv("gp_clinical_data")
gp_script_data.coalesce(1).write.mode("overwrite").option("header", "true").csv("gp_script_data")
hesin_data.coalesce(1).write.mode("overwrite").option("header", "true").csv("hesin_data")
hesin_diag_data.coalesce(1).write.mode("overwrite").option("header", "true").csv("hesin_diag_data")
hesin_oper_data.coalesce(1).write.mode("overwrite").option("header", "true").csv("hesin_oper_data")
death_data.coalesce(1).write.mode("overwrite").option("header", "true").csv("death_data")
death_cause_data.coalesce(1).write.mode("overwrite").option("header", "true").csv("death_cause_data")

### Write the file on the project storage

In [19]:
%%bash
# Define an array of dataset names
datasets=("gp_clinical_data" "gp_script_data" "hesin_data" "hesin_diag_data" "hesin_oper_data" "death_data" "death_cause_data")

# Loop through each dataset and copy it from HDFS
for dataset in "${datasets[@]}"; do
    hdfs dfs -copyToLocal "/user/root/$dataset" "$dataset" || { echo "Failed to copy $dataset"; exit 1; }
done

echo "All files copied successfully!"

2025-02-05 09:06:51,934 WARN metrics.MetricsReporter: Unable to initialize metrics scraping configurations from hive-site.xml. Message:InputStream cannot be null
2025-02-05 09:06:52,032 WARN service.DNAxApiSvc: Using default configurations. Unable to find dnanexus.conf.location=null
2025-02-05 09:06:52,032 INFO service.DNAxApiSvc: apiserver connection-pool config. MaxPoolSize=10, MaxPoolPerRoute=10,MaxWaitTimeout=60000
2025-02-05 09:06:52,032 INFO service.DNAxApiSvc: initializing http connection manager pools
2025-02-05 09:06:52,481 INFO service.DNAxApiSvc: Worker process - IdleConnectionMonitorThread disabled
2025-02-05 09:06:52,482 INFO service.DNAxApiSvc: Worker process - IdleConnectionMonitorThread disabled
2025-02-05 09:06:52,482 INFO service.DNAxApiSvc: initializing DNAxApiSvc
2025-02-05 09:07:10,720 WARN service.DNAxApiSvc: Shutting down Runtime service for Connection Pools
2025-02-05 09:07:10,721 INFO service.DNAxApiSvc: shutting down httpClientConnManager
2025-02-05 09:07:10,7

All files copied successfully!


In [23]:
%%bash
dx upload gp_clinical_data/*.csv --dest raw_data/record_level/gp_clinical_data.csv
dx upload gp_script_data/*.csv --dest raw_data/record_level/gp_script_data.csv
dx upload hesin_data/*.csv --dest raw_data/record_level/hesin_data.csv
dx upload hesin_diag_data/*.csv --dest raw_data/record_level/hesin_diag_data.csv
dx upload hesin_oper_data/*.csv --dest raw_data/record_level/hesin_oper_data.csv
dx upload death_data/*.csv --dest raw_data/record_level/death_data.csv
dx upload death_cause_data/*.csv --dest raw_data/record_level/death_cause_data.csv

ID                                file-GyVXgVjJYZjQxFG8g8Y8YZgG
Class                             file
Project                           project-GyQGxq8JYZjQPJQK6fv05B4B
Folder                            /raw_data/record_level
Name                              gp_clinical_data.csv
State                             closing
Visibility                        visible
Types                             -
Properties                        -
Tags                              -
Outgoing links                    -
Created                           Wed Feb  5 09:13:43 2025
Created by                        thiendattran
 via the job                      job-GyVVqz0JYZjZYPyQ4V8VkGV0
Last modified                     Wed Feb  5 09:13:50 2025
Media type                        
archivalState                     "live"
cloudAccount                      "cloudaccount-dnanexus"
ID                                file-GyVXgYjJYZjX370JyvJfbQ9v
Class                             file
Project                  