# UKB data extraction template
Generated for OxWearables group by Alaina Shreves 15 December 2022

Updated by Alaina Shreves 21 March 2023


## Introduction
This is a Python notebook walking through a simple example for extracting data using the RAP. 

### How to run this notebook

This notebook should be run in a *Spark in JupyterLab* session. Read how to set up a Spark cluster [here](https://dnanexus.gitbook.io/uk-biobank-rap/working-on-the-research-analysis-platform/using-spark-to-analyze-tabular-data).

Before each code chunk, there is text labeled 'edit code' and 'standard code.' Most of the included code is required for extracting UKB data using the RAP. To extract your variables of interest, you can run the chunks with <span style="color:blue">blue 'standard code'</span> as is and update the chunks with <span style="color:red">red 'edit code'</span> accordingly.

## Set up the session

### Load modules and set up the Spark session (<span style="color:blue">standard code</span>)

In [2]:
import pyspark
import dxpy # tools starting with 'dx' are from the DNANexus ecosystem
import dxdata
from pyspark.sql.functions import when, concat_ws
from re import sub

In [3]:
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

Note - If you are trying to pull a large dataset and have issues with the session timing out, you can increase the memory (from the default buffer size of 64m) and increase the maxResultSize. You can run the following code instad of the first lines that set up the Spark sesion in two chunks. 

***
// Chunk 1
import pyspark
import dxpy
import dxdata
from pyspark.sql.functions import when
from re import sub
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)


// Chunk 2
from pyspark.sql import SparkSession
conf = spark.sparkContext._conf.setAll([('spark.kryoserializer.buffer.max', '1g'), ('spark.driver.maxResultSize', '4g')])
spark.sparkContext.stop()
spark = SparkSession.builder.config(conf=conf).getOrCreate()

### Dispense a dataset (<span style="color:blue">standard code</span>)

In [None]:
dispensed_database = dxpy.find_one_data_object(
    classname="database", 
    name="app*", 
    folder="/", 
    name_mode="glob", 
    describe=True)
dispensed_database_name = dispensed_database["describe"]["name"]

dispensed_dataset = dxpy.find_one_data_object(
    typename="Dataset", 
    name="app*.dataset", 
    folder="/", 
    name_mode="glob")
dispensed_dataset_id = dispensed_dataset["id"]

### Load dataset (<span style="color:blue">standard code</span>)

In [5]:
dataset = dxdata.load_dataset(id=dispensed_dataset_id)

### Load tabular participant data (<span style="color:blue">standard code</span> if using main database table, <span style="color:red">edit code</span> if using different database table)

The below example pulls data from the main database table, called 'particpant.' You might need to access other database tables, like death data or HES data, for your analysis. You can check which database table holds your variables of interest (i.e., 'hesin' contains hospital records and 'death' contains death records) [here](https://dnanexus.gitbook.io/uk-biobank-rap/working-on-the-research-analysis-platform/using-spark-to-analyze-tabular-data).* Examples of pulling data from other database tables can be found in the first notebook of the [RAP Wearables tutorial](https://github.com/OxWearables/rap_wearables). 

In [6]:
#If extracting your own data, change the database table name ('participant') as appropriate.

participant = dataset["participant"]

Next, we'll load a list of fields we might want for our analysis.

### Load utility function (<span style="color:blue">standard code</span>)

In [7]:
def load_column_list(file_name):
    """Load list of UK Biobank column IDs from file
    Column IDs can be obtained from the UK Biobank showcase: https://biobank.ndph.ox.ac.uk/showcase/index.cgi
    e.g. 90012 refers to the recommended variable for accelerometer measured physical activity
        https://biobank.ndph.ox.ac.uk/showcase/field.cgi?id=90012
        
    This function is due to Aiden Doherty.

    :param str file_name: Name of file listing UK Biobank column IDs
    :return: list of IDs
    :rtype: list
    :Example:
    >>> load_column_list("field_list.txt")
    ["31", "34", "52" ...]
    """
    
    column_IDs = []
    
    with open(file_name) as f:
            for line in f:
                li = line.strip()
                if "#" in li:
                    li = li.split("#")[0].strip()
                if not li.startswith("#") and not li=="":
                    column_IDs += [li]
    return column_IDs

## Read in column list

There are two main ways of reading in columns names:
- Using a candidate field list
- Directly in the Jupyter notebook

We present examples of both methods below. 

## Use a candidate field list to read in column names (<span style="color:red">edit code</span> to map to column list file)

In [8]:
#If extracting your own data, change the column list file location accordingly. 
column_list = load_column_list("/mnt/project/wearables-main/data_access_instructions/candidate_field_list_updated_example.txt") # This is a text file listing the field IDs we'll use

## Assign field names
### Use field numbers to work out the [field *names*](https://dnanexus.gitbook.io/uk-biobank-rap/frequently-asked-questions#how-are-column-names-determined-for-the-dispensed-database) <span style="color:blue">(standard code)</span>

*Note - read more about how UK Biobank fields sometimes have *instance* and *array* indices, which are [indicated in the field names](https://dnanexus.gitbook.io/uk-biobank-rap/frequently-asked-questions#how-are-column-names-determined-for-the-dispensed-database).*

In [9]:
def fields_for_id(field_id):
    from distutils.version import LooseVersion
    field_id = str(field_id)
    fields = participant.find_fields(name_regex=r'^p{}(_i\d+)?(_a\d+)?$'.format(field_id))
    return sorted(fields, key=lambda f: LooseVersion(f.name))

def field_names_for_id_instanced(field_id, instance="i0", include_non_instanced=True):
    candidate_fields = [f.name for f in fields_for_id(field_id)]
    return_fields = [f for f in candidate_fields if instance in f]
    if (include_non_instanced): # This means fields without instancing (e.g. sex) will be included, and defaults to true
        return_fields += [f for f in candidate_fields if "_i" not in f]
    return return_fields

## Describe how to treat variables from instance 0 or with no instancing (<span style="color:blue">standard code</span>)

In [10]:
field_list = ["eid"]
for col in column_list:
    field_list += field_names_for_id_instanced(col, instance="i0")

## Add any additional columns not listed in field name file (<span style="color:red">edit code</span> to add any additional columns fields)

The notation for column names is p(data-field #)_i(instance #). Column IDs can be obtained from the [UK Biobank showcase](https://biobank.ndph.ox.ac.uk/showcase/search.cgi). For example, p6150_i0 refers to ['Vascular/heart problems diagnosed by doctor'](https://biobank.ndph.ox.ac.uk/showcase/field.cgi?id=6150) at baseline. 

In [39]:
#If extracting your own data, change the name of the database table accordingly. If you are not using data from a field name file, you can use the below line to pull all variables manually, change the "+=" symbol to "=".
#In this example, we'll add three variables including "p6150_i0": "self_report_cvd_baseline", "p6150_i1": "self_report_cvd_inst_1", and "p6150_i2": "self_report_cvd_inst_2". 

field_list += ["p6150_i0","p6150_i1", "p6150_i2"] 

## Rename variables (<span style="color:red">edit code</span> to add any additional column names)
Renaming variables is optional, but it can make the extraction simplier to reference in the future since the variable names can be complex. 

*Note - We use two different approaches here to illustrate them- you can use either, neither (i.e. use the fields with their original field names), or a mix.*

In [40]:
#If extracting your own data, change the field codes and variable alias names accordingly.

# We'll handcraft some of the names to slot neatly into later code
field_list_aliases = {
    "eid" : "eid", 
    "p31" :  "sex",
    "p52" : "month_birth", 
    "p34" : "year_birth", 
    "p6150_i0": "self_report_cvd_baseline", 
    "p6150_i1": "self_report_cvd_inst_1", 
    "p6150_i2": "self_report_cvd_inst_2"
    }

# We'll then auto-add names for other fields (This is not necessary in this very simple example, but could be helpful in your code where you pull many variables)
dataset_fields = [participant.find_field(name=x) for x in field_list] # get the full field info, not just the name
for field in dataset_fields:
    if field.name not in field_list_aliases.keys():
        field_list_aliases[field.name] = field.title

## Retrieve relevant data (<span style="color:blue">standard code</span>)

In [41]:
participant_data_example = participant.retrieve_fields(names=field_list, engine=dxdata.connect(), coding_values="replace", column_aliases=field_list_aliases)

*Note - Setting coding values to "replace" in this function means we get variables' values (rather than the coded format they are stored in).*

## Write the data out as a csv file

### Convert array variables to strings (<span style="color:red">edit code</span> to list all variables that are arrays)

The Jupyter notebook has a specific way of dealing with array variables. For any 'select all that apply' questions (also called 'arrays'), the notebook will only pull the first selected option unless you spaecifically tell it to pull all columns. In this example, someone could have selected multiple responses to the question asking about 'vascular/heart problems diagnosed by doctor' and we are interested in pulling all possible responses. 

In [42]:
#If extracting your own data, ensure that the list of array variables is complete. The csv file will not be generated if you fail to list all array variables. 
arrayed_cols = ["self_report_cvd_baseline", "self_report_cvd_inst_1", "self_report_cvd_inst_2"]

for col in arrayed_cols:
    participant_data_example = participant_data_example.withColumn(col, concat_ws("|", col))

### Spark stores and works with the data in several parts, so we coalesce them into one part (<span style="color:blue">standard code</span>)

In [43]:
participant_data_example = participant_data_example.coalesce(1)

In [44]:
#This is a good time to pause and test your data. If you are interested, comment out the below line to see if your dataframe is filled with expected data
#(participant_data_example.show())

# Write data to a csv file

## Set options for writing to a csv file  (<span style="color:blue">standard code</span>)

In [45]:
participant_data_example.write.mode("overwrite").option("header", "true").csv("participant_data_example")

## Upload file to temporary storage (<span style="color:blue">standard code</span>)
You can check this ended up in the right place by looking in the folder tab on the left side of your screen above the tab labeled 'DNAnexus'

In [None]:
%%bash
hdfs dfs -copyToLocal /user/root/participant_data_example participant_data_example # this gets it out of the hdfs

## Upload file to your permanent storage (<span style="color:red">edit code</span> with your file name and file path)
Note - If extracting your own data, change the destination of the output file or the text that follows the "--dest" in the second line.

In [None]:
%%bash
dx upload participant_data_example/*.csv --dest data_access_instructions/participant_data_example.csv # this uploads to the permanent storage on the RAP