In [1]:
import dxpy
import dxdata
import pandas as pd
import pyspark
import subprocess
from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext

dxdata.__version__
connection = dxdata.connect()
conf = pyspark.SparkConf().set("spark.kryoserializer.buffer.max", "2000m")

  self._context = ssl.SSLContext(ssl_version)


In [2]:
sc = pyspark.SparkContext(conf=conf)
spark = pyspark.sql.SparkSession(sc)

In [3]:
# Automatically discover dispensed database name and dataset id
dispensed_database = dxpy.find_one_data_object(
    classname='database', 
    name='app*', 
    folder='/', 
    name_mode='glob', 
    describe=True)
dispensed_database_name = dispensed_database['describe']['name']

dispensed_dataset = dxpy.find_one_data_object(
    typename='Dataset', 
    name='app*.dataset', 
    folder='/', 
    name_mode='glob')
dispensed_dataset_id = dispensed_dataset['id']
dataset = dxdata.load_dataset(id=dispensed_dataset_id)
dataset.entities

[<Entity "participant">,
 <Entity "covid19_result_england">,
 <Entity "covid19_result_scotland">,
 <Entity "covid19_result_wales">,
 <Entity "gp_clinical">,
 <Entity "gp_scripts">,
 <Entity "gp_registrations">,
 <Entity "hesin">,
 <Entity "hesin_diag">,
 <Entity "hesin_oper">,
 <Entity "hesin_critical">,
 <Entity "hesin_maternity">,
 <Entity "hesin_delivery">,
 <Entity "hesin_psych">,
 <Entity "death">,
 <Entity "death_cause">,
 <Entity "olink_instance_0">,
 <Entity "olink_instance_2">,
 <Entity "olink_instance_3">]

In [4]:
participant = dataset['participant']

In [5]:
# Returns all field objects for a given UKB showcase field id
def fields_for_id(field_id):
    from distutils.version import LooseVersion
    field_id = str(field_id)
    fields = participant.find_fields(name_regex=r'^p{}(_i\d+)?(_a\d+)?$'.format(field_id))
    return sorted(fields, key=lambda f: LooseVersion(f.name))

# Returns all field names for a given UKB showcase field id
def field_names_for_id(field_id):
    return [f.name for f in fields_for_id(field_id)]

In [None]:
# split the recommended fields into a list
merged_fields = pd.read_csv("merged_fields.tsv", sep="\t", header=0)
merge_field_list = merged_fields["field_id"].tolist()
# cut the recommended fields list for 20 fields each group
merge_field_list = [merge_field_list[i:i + 20] for i in range(0, len(merge_field_list), 20)]
for i, group in enumerate(merge_field_list):
    if i <= 83:
        continue
    print(f"Retrieving fields for group {i + 1} with {len(group)} fields")
    # change the group astype to string
    group = [str(x) for x in group]
    get_ids = sum([field_names_for_id(field_id) for field_id in group], [])
    field_names = ['eid'] + get_ids
    df = participant.retrieve_fields(names=field_names, engine=connection)
    data = df.toPandas()
    display(data)
    # save the data to a file
    # if the file already exists, continue to the next iteration
    data.to_csv(f'./core_category/fields_group_{i + 1}.tsv', sep='\t', index=False, header=True)
    # Upload the file using subprocess
    subprocess.run([
        "dx", "upload", f"./core_category/fields_group_{i + 1}.tsv",
        "-p", "--path", "/Output/Traits/core_category/", "--brief"
    ], check=True)

In [None]:
split_group_list = ["29"]
merged_fields = pd.read_csv("merged_fields.tsv", sep="\t", header=0)
merge_field_list = merged_fields["field_id"].tolist()
# cut the recommended fields list for 20 fields each group
merge_field_list = [merge_field_list[i:i + 20] for i in range(0, len(merge_field_list), 20)]
for i in range(len(split_group_list)):
    group_str = split_group_list[i]
    group_num = int(group_str) - 1
    field_list = merge_field_list[group_num]
    field_list = [str(x) for x in field_list]
    # split field_list into four groups (each with 5 fields)
    temp_group_1 = field_list[:5]
    temp_group_2 = field_list[5:10]
    temp_group_3 = field_list[10:15]
    temp_group_4 = field_list[15:20]
    for j, group in enumerate([temp_group_1, temp_group_2, temp_group_3, temp_group_4]):
        print(f"Processing group {group_str} - part {j + 1}")
        print(f"Field IDs: {group}")
        get_ids = sum([field_names_for_id(field_id) for field_id in group], [])
        field_names = ['eid'] + get_ids
        df = participant.retrieve_fields(names=field_names, engine=connection)
        data = df.toPandas()
        display(data)
        # save the data to a file
        # if the file already exists, continue to the next iteration
        data.to_csv(f'./core_category/fields_group_{group_str}_{j + 1}.tsv', sep='\t', index=False, header=True)
        # Upload the file using subprocess
        subprocess.run([
            "dx", "upload", f"./core_category/fields_group_{group_str}_{j+1}.tsv",
            "-p", "--path", "/Output/Traits/core_category/", "--brief"
        ], check=True)

In [None]:
# fields_groups_13_4 !!done!!
group = ['3062', '3063', '3064']
get_ids = sum([field_names_for_id(field_id) for field_id in group], [])
field_names = ['eid'] + get_ids
df = participant.retrieve_fields(names=field_names, engine=connection)
data = df.toPandas()
data.to_csv(f'./core_category/fields_group_13_4.tsv', sep='\t', index=False, header=True)
subprocess.run([
    "dx", "upload", f"./core_category/fields_group_13_4.tsv",
    "-p", "--path", "/Output/Traits/core_category/", "--brief"
], check=True)

In [None]:
# fields_groups_13_5 !!done!!
group = ['3065']
get_ids = sum([field_names_for_id(field_id) for field_id in group], [])
field_names = ['eid'] + get_ids
df = participant.retrieve_fields(names=field_names, engine=connection)
data = df.toPandas()
display(data)
data.to_csv(f'./core_category/fields_group_13_5.tsv', sep='\t', index=False, header=True)
subprocess.run([
    "dx", "upload", f"./core_category/fields_group_13_5.tsv",
    "-p", "--path", "/Output/Traits/core_category/", "--brief"
], check=True)

In [None]:
# fields_groups_13_6: too lagre to download
group = ['3066']
get_ids = sum([field_names_for_id(field_id) for field_id in group], [])
field_names = ['eid'] + get_ids
df = participant.retrieve_fields(names=field_names, engine=connection)
data = df.toPandas()
display(data)
data.to_csv(f'./core_category/fields_group_13_6.tsv', sep='\t', index=False, header=True)
subprocess.run([
    "dx", "upload", f"./core_category/fields_group_13_6.tsv",
    "-p", "--path", "/Output/Traits/core_category/", "--brief"
], check=True)

In [6]:
# fields_groups_26_2
group = ['5983', '5984', '5985']
get_ids = sum([field_names_for_id(field_id) for field_id in group], [])
field_names = ['eid'] + get_ids
df = participant.retrieve_fields(names=field_names, engine=connection)
data = df.toPandas()
display(data)
data.to_csv(f'./core_category/fields_group_26_2.tsv', sep='\t', index=False, header=True)
subprocess.run([
    "dx", "upload", f"./core_category/fields_group_26_2.tsv",
    "-p", "--path", "/Output/Traits/core_category/", "--brief"
], check=True)

Unnamed: 0,eid,p5983_i0_a0,p5983_i0_a1,p5983_i0_a2,p5983_i0_a3,p5983_i0_a4,p5983_i0_a5,p5983_i0_a6,p5983_i0_a7,p5983_i0_a8,...,p5985_i1_a104,p5985_i1_a105,p5985_i1_a106,p5985_i1_a107,p5985_i1_a108,p5985_i1_a109,p5985_i1_a110,p5985_i1_a111,p5985_i1_a112,p5985_i1_a113
0,1000122,,,,,,,,,,...,,,,,,,,,,
1,1000150,,,,,,,,,,...,,,,,,,,,,
2,1000163,,,,,,,,,,...,,,,,,,,,,
3,1000196,60.0,60.0,,,,,,,,...,,,,,,,,,,
4,1000221,,,,,,,,,,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
502123,6021183,,,,,,,,,,...,,,,,,,,,,
502124,6021216,,,,,,,,,,...,,,,,,,,,,
502125,6021303,,,,,,,,,,...,,,,,,,,,,
502126,6021358,,,,,,,,,,...,,,,,,,,,,


CompletedProcess(args=['dx', 'upload', './core_category/fields_group_26_2.tsv', '-p', '--path', '/Output/Traits/core_category/', '--brief'], returncode=0)

In [7]:
# fields_groups_26_3
group = ['5986', '5987']
get_ids = sum([field_names_for_id(field_id) for field_id in group], [])
field_names = ['eid'] + get_ids
df = participant.retrieve_fields(names=field_names, engine=connection)
data = df.toPandas()
display(data)
data.to_csv(f'./core_category/fields_group_26_3.tsv', sep='\t', index=False, header=True)
subprocess.run([
    "dx", "upload", f"./core_category/fields_group_26_3.tsv",
    "-p", "--path", "/Output/Traits/core_category/", "--brief"
], check=True)

Unnamed: 0,eid,p5986_i0_a0,p5986_i0_a1,p5986_i0_a2,p5986_i0_a3,p5986_i0_a4,p5986_i0_a5,p5986_i0_a6,p5986_i0_a7,p5986_i0_a8,...,p5987_i1_a104,p5987_i1_a105,p5987_i1_a106,p5987_i1_a107,p5987_i1_a108,p5987_i1_a109,p5987_i1_a110,p5987_i1_a111,p5987_i1_a112,p5987_i1_a113
0,1000122,,,,,,,,,,...,,,,,,,,,,
1,1000150,,,,,,,,,,...,,,,,,,,,,
2,1000163,,,,,,,,,,...,,,,,,,,,,
3,1000196,0.0,120.0,,,,,,,,...,,,,,,,,,,
4,1000221,,,,,,,,,,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
502123,6021183,,,,,,,,,,...,,,,,,,,,,
502124,6021216,,,,,,,,,,...,,,,,,,,,,
502125,6021303,,,,,,,,,,...,,,,,,,,,,
502126,6021358,,,,,,,,,,...,,,,,,,,,,


CompletedProcess(args=['dx', 'upload', './core_category/fields_group_26_3.tsv', '-p', '--path', '/Output/Traits/core_category/', '--brief'], returncode=0)

In [8]:
# fields_groups_26_4
group = ['5988', '5990', '5991', '5992', '5993']
get_ids = sum([field_names_for_id(field_id) for field_id in group], [])
field_names = ['eid'] + get_ids
df = participant.retrieve_fields(names=field_names, engine=connection)
data = df.toPandas()
display(data)
data.to_csv(f'./core_category/fields_group_26_4.tsv', sep='\t', index=False, header=True)
subprocess.run([
    "dx", "upload", f"./core_category/fields_group_26_4.tsv",
    "-p", "--path", "/Output/Traits/core_category/", "--brief"
], check=True)

Unnamed: 0,eid,p5988_i0_a0,p5988_i0_a1,p5988_i0_a2,p5988_i0_a3,p5988_i0_a4,p5988_i0_a5,p5988_i0_a6,p5988_i0_a7,p5988_i0_a8,...,p5992_i0_a2,p5992_i1_a0,p5992_i1_a1,p5992_i1_a2,p5993_i0_a0,p5993_i0_a1,p5993_i0_a2,p5993_i1_a0,p5993_i1_a1,p5993_i1_a2
0,1000122,,,,,,,,,,...,,,,,,,,,,
1,1000150,,,,,,,,,,...,,,,,,,,,,
2,1000163,,,,,,,,,,...,,,,,,,,,,
3,1000196,Steady,Steady,,,,,,,,...,0.0,,,,1.0,0.0,0.0,,,
4,1000221,,,,,,,,,,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
502123,6021183,,,,,,,,,,...,,,,,,,,,,
502124,6021216,,,,,,,,,,...,,,,,,,,,,
502125,6021303,,,,,,,,,,...,,,,,,,,,,
502126,6021358,,,,,,,,,,...,,,,,,,,,,


CompletedProcess(args=['dx', 'upload', './core_category/fields_group_26_4.tsv', '-p', '--path', '/Output/Traits/core_category/', '--brief'], returncode=0)

In [9]:
# fields_groups_26_5
group = ['6014', '6015', '6016', '6017', '6019']
get_ids = sum([field_names_for_id(field_id) for field_id in group], [])
field_names = ['eid'] + get_ids
df = participant.retrieve_fields(names=field_names, engine=connection)
data = df.toPandas()
display(data)
data.to_csv(f'./core_category/fields_group_26_5.tsv', sep='\t', index=False, header=True)
subprocess.run([
    "dx", "upload", f"./core_category/fields_group_26_5.tsv",
    "-p", "--path", "/Output/Traits/core_category/", "--brief"
], check=True)

Unnamed: 0,eid,p6014_i0,p6014_i1,p6015_i0,p6015_i1,p6016_i0,p6016_i1,p6017_i0,p6017_i1,p6019_i0,p6019_i1
0,1451876,,0.0,,0.0,,0.0,,1.0,,1.0
1,3046819,,,,,,,,,,
2,5015491,,,,,,,,,,
3,1598419,,,,,,,,,,
4,3293483,0.0,,0.0,,1.0,,1.0,,2.0,
...,...,...,...,...,...,...,...,...,...,...,...
502123,2998069,,,,,,,,,,
502124,3834454,,,,,,,,,,
502125,5758738,,,,,,,,,,
502126,5115563,,,,,,,,,,


CompletedProcess(args=['dx', 'upload', './core_category/fields_group_26_5.tsv', '-p', '--path', '/Output/Traits/core_category/', '--brief'], returncode=0)