In [None]:
# Import packages
import pyspark
import dxpy
import dxdata

In [None]:
# Spark initialization (Done only once; do not rerun this cell unless you select Kernel -> Restart kernel).
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

In [None]:
# Automatically discover dispensed database name and dataset id
dispensed_database = dxpy.find_one_data_object(
    classname='database', 
    name='app*', 
    folder='/', 
    name_mode='glob', 
    describe=True)
dispensed_database_name = dispensed_database['describe']['name']

dispensed_dataset = dxpy.find_one_data_object(
    typename='Dataset', 
    name='app*.dataset', 
    folder='/', 
    name_mode='glob')
dispensed_dataset_id = dispensed_dataset['id']

In [None]:
#Access dataset
#dataset = dxdata.load_dataset(id=dispensed_dataset_id)
#dataset = dxdata.load_cohort(folder="project-Gkp6BK8J66Pz97X00y3Zz7jJ:/", name="RestingData_1st")
all_info = dxdata.load_cohort(folder="/", name="RestingData_1st")
dataset = all_info.dataset

In [None]:
#Dataset 'entities' are virtual tables linked to one another.
dataset.entities

In [None]:
#Accessing the main 'participant' entity
participant = dataset['participant']

In [None]:
#field_names = ['eid', 'p31', 'p21022', 'p40005_i0', 'p93_i0_a0']

In [None]:
# Returns all field objects for a given UKB showcase field id

def fields_for_id(field_id):
    from distutils.version import LooseVersion
    field_id = str(field_id)
    fields = participant.find_fields(name_regex=r'^p{}(_i\d+)?(_a\d+)?$'.format(field_id))
    return sorted(fields, key=lambda f: LooseVersion(f.name))

# Returns all field names for a given UKB showcase field id

def field_names_for_id(field_id):
    return [f.name for f in fields_for_id(field_id)]

In [48]:
def fields_for_id_x(field_id):
    field_id = str(field_id)
    field_items = re.split(r'[,\s_]+', field_id)
    if len(field_items) == 1:
        fields = ['p{}'.format(field_items[0])]
    elif len(field_items) == 2:
        fields = ['p{}_i{}'.format(field_items[0], field_items[1])]
    else:
        fields = []
    return fields

In [45]:
#demographic factors' field ids, including: eid, sex, age at the first scan, IQ and EA (5)
demographic_fields = ['31','21003_2','20016_2','6138_2']
# current depression or anxiety status while scanning (3)
current_status_fields = ['2050_2','2060_2','2070_2']
# Self_Reported_Mental_Health (1)
Self_Reported_Mental_Health_fields = ['20002']
# Ever_Diagnosed_Mental_Health_Problem (1)
Ever_Diagnosed_Mental_Health_Problem_fields = ['20544']
# Self reported history depression: CIDI (13)
history_depression_fields = ['20436','20439','20440','20446','20441','20449','20536','20532','20435','20450','20437']
# Self reported history anxiety: CIDI (18)
history_anxiety_fields = ['20421','20420','20538','20425','20542','20543','20540','20541','20539','20537','20418','20426','20423','20429','20419','20422','20417','20427']
# PHQ (9)
PHQ_fields = ['20514','20510','20517','20519','20511','20507','20508','20518','20513']
# GAD7 (7)
GAD7_fields = ['20506','20509','20520','20515','20516','20505','20512']
# hospital data: ICD10 and ICD9 (2)
hospital_data_fields = ['41270','41271']
# fMRI data
fMRI_fields = ['31016','31018','31019','31015','31014']

all_fields_ids = demographic_fields + current_status_fields + Self_Reported_Mental_Health_fields + Ever_Diagnosed_Mental_Health_Problem_fields + history_depression_fields + history_anxiety_fields + PHQ_fields + GAD7_fields + hospital_data_fields
print("Number of  all fields needed: ", len(all_fields_ids))

Number of  all fields needed:  61


In [None]:
# sum flattens list of lists
#sum([field_names_for_id(field_id) for field_id in all_fields_ids], [])

In [49]:
# sum flattens list of lists
#field_names = ['eid'] \
#    + sum([field_names_for_id(field_id) for field_id in all_fields_ids], [])
field_names = ['eid'] \
             + [item for field_id in all_fields_ids for item in fields_for_id_x(field_id)] \
             + sum([field_names_for_id(field_id) for field_id in fMRI_fields], [])

In [50]:
print(field_names)

['eid', 'p31', 'p21003_i2', 'p20016_i2', 'p6138_i2', 'p2050_i2', 'p2060_i2', 'p2070_i2', 'p20002', 'p20544', 'p20436', 'p20439', 'p20440', 'p20446', 'p20441', 'p20449', 'p20536', 'p20532', 'p20435', 'p20450', 'p20437', 'p20421', 'p20420', 'p20538', 'p20425', 'p20542', 'p20543', 'p20540', 'p20541', 'p20539', 'p20537', 'p20418', 'p20426', 'p20423', 'p20429', 'p20419', 'p20422', 'p20417', 'p20427', 'p20514', 'p20510', 'p20517', 'p20519', 'p20511', 'p20507', 'p20508', 'p20518', 'p20513', 'p20506', 'p20509', 'p20520', 'p20515', 'p20516', 'p20505', 'p20512', 'p41270', 'p41271', 'p31016', 'p31018', 'p31019', 'p31015', 'p31014']


In [None]:
# Grabbing fields into a Spark DataFrame
df = participant.retrieve_fields(names=field_names, engine=dxdata.connect())

In [None]:
# See the first five entries as a Pandas DataFrame:
df.limit(5).toPandas()

In [None]:
# Save results
# Saving as CSV file
df.toPandas().to_csv('participants.csv', index=False)

In [None]:
# Writing results back to the project
%%bash
dx upload participants.csv --dest /