## R accessing data notebook

The purpose of this notebook is to demonstrate how to access, explore and investigate the participant and record table data.


##### Run info

- Runtime: 20 mins
- Instance: mem1_hdd1_v2_x8
- Cost: £0.30

### This notebook depends on
- **A spark instance** 

"With Spark, only one-step is needed where data is read into memory, operations performed, and the results written back—resulting in a much faster execution. Spark also reuses data by using an in-memory cache to greatly speed up machine learning algorithms that repeatedly call a function on the same dataset" - [Amazon Web Services](https://aws.amazon.com/what-is/apache-spark/#:~:text=With%20Spark%2C%20only%20one%2Dstep,function%20on%20the%20same%20dataset.)

## 1. Install required packages 
Function `p_load` from `pacman` loads packages into R. If the given package is missing p_load will automatically install it - this can take a considerable amount of time for a package that needs C or FORTRAN code compilation. The following packages are needed to run this notebook:

*Note*: If you wish to rerun this notebook, and avoid having to wait for the installation of the packages, creating a [snapshot](https://documentation.dnanexus.com/user/jupyter-notebooks#environment-snapshotshttps://documentation.dnanexus.com/user/jupyter-notebooks#environment-snapshots) of the environment is useful.

- `sparklyr` – Allows access to spark data and interact with using familiar R interfaces such as dplyr.
- `reticulate` - R-Python interface, required to use the `dxdata` package that connects to Spark database and allows retrieval of phenotypic data
- `Stringr` – Used for character manipulation 
- `parallel` - Parallel computation in R
- `Dplyr` – Tabular data manipulation in R
- `data.table` – Read data into data.table format 
- `arrow` - Input/output library for Apache binary files


In [None]:
 # Load required packages 
if(!require(pacman)) install.packages("pacman")
install.packages("sparklyr")
pacman::p_load(sparklyr, reticulate, stringr, parallel, dplyr, data.table, arrow)

## 2. Create spark connection and define database
We set a sc variable, which establishes a connection to the Spark cluster.

To connect to the database, manipulate the database path to ensure it is in the correct format.

In [None]:
# Connecting to master node to orchestrates the analysis in spark 
port <- Sys.getenv("SPARK_MASTER_PORT") # retrieve port number to establish a connection with the spark cluster
master <- paste("spark://master:", port, sep = '')
sc = spark_connect(master)

To improve the reproducibility of your notebooks and ensure they are portable across projects, it is better not to hardcode any database or dataset names. Instead, you can use the following code to automatically discover the database and dataset:

In [None]:
# import python packages
dxdata <- import("dxdata")

# connect to dataset
project_id <- system("dx env | grep project- | awk -F '\t' '{print $2}'", intern = TRUE)
record_id <- system("dx describe *dataset | grep  record- | awk -F ' ' '{print $2}' | head -n 1" , intern = TRUE)
DATASET_ID <- paste0(project_id, ":", record_id)
dataset <- dxdata$load_dataset(id=DATASET_ID)

# assign app_id and database
database_path <- system("dx find data --class database", intern =TRUE)
app_substring <- na.omit(str_extract(database_path, '(app\\d+_\\d+)'))
database_substring <- str_extract(database_path[str_detect(database_path, app_substring)], 'database-([A-Za-z0-9]+)') %>% tolower()  %>% str_replace("database-", "database_")
database <- paste0(database_substring, "__", app_substring)

## 3. Select the database for the session and explore tables

In [None]:
tbl_change_db(sc, paste0(database))

The database is broken down into different tables. These tables include the participant table (e.g. demographics, blood levels, etc.), olink tables (proteomics data), OMOP tables (Observational Medical Outcomes Partnership), HESIN tables (Hospital Episode Statistics Inpatient database) etc. 

In [None]:
# View tables within the database
tables <- DBI::dbGetQuery(sc, paste0("SHOW TABLES"))

# You may notice each table appearing twice, using a regular name and a versioned name, such as "hesin_critical" and "hesin_critical_v18_1_edeb6b8". 
# This naming scheme is part of the system's architecture, supporting data refreshes and participant withdrawals.
# Due to this behavior, please make sure to always use the regularly named tables e.g. "hesin_critical" 
tables %>% 
    filter(str_detect(tableName, "hesin")) # e.g. look at Hospital Episode Statistics Inpatient database (hesin) tables

## 4. [Accessing record tables](https://biobank.ctsu.ox.ac.uk/crystal/docs.cgi?id=3)

#### Using sparklyr

`sparklyr` is a useful package to work with Spark dataframes within R. 
However, not all functions within R are accessible, and you are restricted to a limited set of functions, primarily those in dplyr, broom and DBI packages. 

More information can be found on [sparklyr](https://spark.posit.co/). You can browse the list of [functions available on sparklyr](https://rdrr.io/github/rstudio/sparklyr/man/) on the GitHub


`sdf_sql` allows querying the record table of interest. The function returns a Spark table, not a standard dataframe. 

**Note** that the Olink data is split across multiple tables. Please see the A108_Constructing-the-Olink-dataset_R notebook to access this data. 

In [None]:
# View columns names within a table
sdf_sql(sc, "SHOW COLUMNS FROM hesin_critical") %>% pull(col_name)

In [None]:
hesin_critical_sdf <- sparklyr::sdf_sql(sc, "SELECT * FROM hesin_critical")
class(hesin_critical_sdf)

In [None]:
# Column data type
as.data.frame(sdf_schema(hesin_critical_sdf)) %>% # reads the schema of a Spark dataframe
    select(contains("type")) %>% 
    pivot_longer(cols = everything(), names_to = "variable", values_to = "data_type")

In [None]:
# example of sparklyr data wrangling functions
hesin_critical_sdf_wrangled <- hesin_critical_sdf %>% 
    filter(cclev3days > 0) %>%
    select(eid, ins_index, cclev3days, unitbedconfig)

In [None]:
# data wrangling can also be performed in the sql query 
hesin_critical_sdf_2 <- sparklyr::sdf_sql(sc, "SELECT eid, ins_index, cclev3days, unitbedconfig 
                                                FROM hesin_critical
                                                WHERE cclev3days > 0")


##### Conversion of Spark df to R df
For smaller dataframes, it is possible to convert the Spark df to a standard R data.frame. This allows access to functions not available to Spark dataframes and saving as .csv to the environment.

In [None]:
hesin_critical_df <- hesin_critical_sdf %>% collect() # collect() allows the conversion to an R dataframe.
class(hesin_critical_df)

In [None]:
write.csv(hesin_critical_df, "hesin_critical_data.csv")

##### Upload File Locally
Saving the file to the current environment (within the session) does not upload it locally to the project environment, and the file will be lost when the session terminates. 

You should upload the data saved in the local environment to the project environment if you wish to save it permanently and be able to reuse it for later analysis - this can also be uploaded into RStudio.

This is possible with the dx command [`dx upload`](https://documentation.dnanexus.com/user/objects/uploading-and-downloading-files/small-sets-of-files/uploading-using-dx). 

In [None]:
system("dx upload hesin_critical_data.csv")

##### Disconnecting from the r spark connection

This allows connection to the Pyspark, which is needed to access the phenotypic data.

In [None]:
spark_disconnect(sc)

## 5. Extract the Phenotypic data

Because the main participant data is horizontally split into multiple tables, you may find that SQL is less suitable for querying those tables directly.

Functions from `dxdata` and schema allow access to the phenotypic data present in the participant table.

*Some of the features presented below can also be used to explore record tables.* 

In [None]:
# Connect to Pyspark, unlike r Spark, it is currently unknown how to disconnect from Pyspark
# You can't be connected to Pyspark and r Spark in the same time
engine <- dxdata$connect(dialect="hive+pyspark")

###  Select the `participant` table

The following code selects the `participant` table. You can view the tables available in the dataset using `dataset$entities`. 

In [None]:
dataset = dxdata$load_dataset(id=DATASET_ID)
participant_database = dataset["participant"]

### Schema data
Importing the schema data from the Showcase metadata folder in the project allows you to explore the list of fields, and map field ids to names, making it easier to search within the data.

Data can also be accessed with the dx command [`dx extract dataset`](https://documentation.dnanexus.com/user/helpstrings-of-sdk-command-line-utilities#extract_dataset).

In [None]:
# Schema data, contains searchable field titles 
field_schema <- fread("/mnt/project/Showcase metadata/field.tsv")

# download data dictionary schema into the environment, contains field ids
system(paste0("dx extract_dataset ", DATASET_ID, " -ddd"), intern = TRUE)
datadict <- data.table::fread(paste0(app_substring,".dataset.data_dictionary.csv"))

##### Fields identifiers

Within the database, fields are identified by:
* Field id: this correspond to the unique field integer identifier
* Field title: this is the title of the field 
* Field name: this includes the entity, field id, field instance and array. 

e.g. Field 94: 
* Field id = 94
* Field title = Diastolic blood pressure, manual reading
* Field name = 'p94_i0_a0', 'p94_i0_a1', 'p94_i1_a0', 'p94_i1_a1', 'p94_i2_a0', 'p94_i2_a1', 'p94_i3_a0', 'p94_i3_a1'


#### Searching for fields ids using titles

In [None]:
field_ids_of_interest <- field_schema %>% 
                         filter(title %in% c("Standing height", "Sex", "Weight method") | # searching using titles
                                str_detect(title, "blood pressure") | # searching using regex
                                main_category == 1020) %>% # searching category
                         pull(field_id)

The following function allows mapping of the given list of  field ids to the corresponding field names. Note that this function is for the participant table. 

In [None]:
# some field ids have multiple instances, this function will find all the associated field ids
fields_for_id <- function(field) {
    
    regex <- paste0('^p', field, '(?![0-9])')
    fields <- dplyr::filter(datadict, stringr::str_detect(name, regex)) %>%
        dplyr::pull(name)
    return(fields)
}

After getting the list of the fields you'd like to extract the data from, `find_fields` is used to extract them from the database.

In [None]:
# 'eid' is added manually because it is not included within 'field_schema' which creates the list of 'field_ids_of_interest'
complete_field_ids <- c('eid', unlist(lapply(field_ids_of_interest, fields_for_id)))

# 'iterate()' is needed because the 'find_fields()' output is an itorator object 
# The resulting object is a list of `Field` objects (object specific to dxdata).
complete_field_ids <- iterate(participant_database$find_fields(names=complete_field_ids))

#### Assigning column names

Within 'complete_fields_ids' there is an attribute `title` which provides the title of the associated field_id. This an attribute for `Field` objects.

The data can be extracted with the function `retrieve_fields`. This functions takes as input the fields list `fields` and has the optional input `column_alias`. This argument maps the field id to its corresponding title, allowing for readable column names if desired.

The following code removes special characters and spaces to avoid errors when using the `column_alias` argument within the `retrieve_fields` function. 

In [None]:
complete_col_names_clean <- lapply(seq_along(complete_field_ids), function(i) {
    
    clean_title <- gsub(" ", "_", complete_field_ids[[i]]$title) # replace spaces with an _
    clean_title <- gsub("[^a-zA-Z0-9_]", "", clean_title) # remove special characters
    
    setNames(list(clean_title), complete_field_ids[[i]]$column_name)

})

`complete_col_names_clean()` is a nested list, `do.call` flattens the list.

`dict()` creates a python dictionary which is the data type used for the `column_alias` arguement within the `retrieve_fields()` function. 

In [None]:
complete_col_names_clean <- do.call(c, complete_col_names_clean)
complete_col_names_clean <- dict(complete_col_names_clean)

#### Extracting data

Using `retrieve_fields` to access the fields of interest

**Note** other arguements are available 
- `coding_values` - inputs "raw"/"exclude"/"replace"
- `limit` - integer value - Maximum number of table rows to return

In [None]:
data_of_interest <- participant_database$retrieve_fields(engine=engine, fields=complete_field_ids, column_aliases = complete_col_names_clean) # returns a pyspark df

**Side note:**  Record tables can also be accessed using the retrieve_fields() function from dxdata 


In [None]:
omop_death = dataset["omop_death"]$retrieve_fields(engine) 

## 6. Write the data to a temporary `parquet` file 
You can learn more about the _parquet_ file format here: [https://parquet.apache.org/](https://parquet.apache.org/)

In [None]:
system('hadoop fs -rm -r -f data_of_interest.parquet', intern = TRUE)
data_of_interest$write$parquet('data_of_interest.parquet')

#### Copy the temporary _parquet_ file from distributed to the local file system

In [None]:
if(dir.exists('data_of_interest.parquet')) unlink("data_of_interest.parquet", recursive=TRUE)
system('hadoop dfs -copyToLocal data_of_interest.parquet', intern = TRUE)

# This file can be uploaded to your project using system("dx upload -r data_of_interest.parquet") 
# This is useful if you have a large file and want to save some steps. You can download it later in another session and use open_dataset() and collect() within RStudio for instance.

#### Read the dataset information R using Apache `arrow` package

In [None]:
data_of_interest_ds <- arrow::open_dataset('data_of_interest.parquet')

## Collect the data from the dataset to R memory
Use `collect()` to convert the phenotypic data to a standard `tibble` object, which can be interacted with using functions from the `tidyverse` package.

In [None]:
data_of_interest_tbl <- data_of_interest_ds %>% collect()

#### *Optional* - rename column names to raw titles

In [None]:
complete_col_names_raw <- lapply(seq_along(complete_field_ids), function(i) {
        
    setNames(list(complete_field_ids[[i]]$title), "col_title")
})

complete_col_names_raw <- unlist(complete_col_names_raw, use.names = FALSE)

names(data_of_interest_tbl) <- complete_col_names_raw

### Write a csv in your local environment and then upload the data to you project

This may take some time - depending on how large your data is.

In [None]:
write.csv(data_of_interest_tbl, 'data_of_interest_tbl.csv')

In [None]:
system("dx upload data_of_interest_tbl.csv")

### Export data in PLINK phenotype format

In [None]:
pheno_out <- data_of_interest_tbl %>% 
    transmute(
        FID=`Participant ID`, 
        IID=`Participant ID`, 
        Y1=as.double(`Standing height | Instance 0`)
)

pheno_out %>% write_delim(file = 'ukb_phenotypes_height.txt', delim = ' ')