In [1]:
import pyspark
import dxpy
import dxdata
from pyspark.sql.functions import array_join
import pyspark.sql.functions as F
import os

In [2]:
# Spark initialization (Done only once; do not rerun this cell unless you select Kernel -> Restart kernel).
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

In [None]:
# Output file
OUTPUT_DIR='/path/to/output/directory'
OUTPUT_FILENAME='output_phenotype_filename.csv'

In [None]:
# Automatically discover dispensed database name and dataset id
dispensed_database = dxpy.find_one_data_object(
    classname='database', 
    name='app*', 
    folder='/', 
    name_mode='glob', 
    describe=True)
dispensed_database_name = dispensed_database['describe']['name']

dispensed_dataset = dxpy.find_one_data_object(
    typename='Dataset', 
    name='app*.dataset', 
    folder='/', 
    name_mode='glob')
dispensed_dataset_id = dispensed_dataset['id']
dataset = dxdata.load_dataset(id=dispensed_dataset_id)
participant = dataset['participant']

In [None]:
field_name_dict = {'Sample':'eid', 'yob':'p34', 'sex':'p22001',
                    'white_british_genetic':'p22006',
                    'ethnic_background1':'p21000_i0', 'ethnic_background2':'p21000_i1', 'ethnic_background3':'p21000_i2', 'ethnic_background4':'p21000_i3',
                    'recommend_for_exclusion':'p22010',
                    'icd10':'p41270',
                    'depressed_mood':'p20446', 'lost_interest':'p20441',
                    'sleep_trouble1':'p1200_i0', 'sleep_trouble2':'p1200_i1', 'sleep_trouble3':'p1200_i2', 'sleep_trouble4':'p1200_i3',
                    'mood_lability1':'p1920_i0', 'mood_lability2':'p1920_i1', 'mood_lability3':'p1920_i2', 'mood_lability4':'p1920_i3',
                    'anxious_feeling':'p20421', 'worried_more':'p20425', 'drugs_for_anxiety':'p20549', 'anxiety_interferes_life':'p20418',
                    'drinking_interferes_life':'p20407', 'unable_stop_drinking':'p20413', 'drug_addiction':'p20456',
                    'unreal_sounds':'p20463', 'unreal_visions':'p20471', 'conspiracy':'p20468'
}
for i in range(1, 11):
    field_name_dict['PC'+str(i)]='p22009_a'+str(i)

field_names = list(field_name_dict.values())
df = participant.retrieve_fields(names=field_names, engine=dxdata.connect(), coding_values="raw")

In [None]:
# Filter to restrict to samples that have all the information needed
df=df.na.drop("any", subset=['p22001', 'p34']) # Have sex and age data
df=df.where(df.p22010.isNull()) # Is not recommended for exclusion based on genetic QC
# Drop any columns with all Null values
null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
df_size=df.count()
to_drop = [k for k, v in null_counts.items() if v == df_size] 
df = df.drop(*to_drop)

In [None]:
# Convert ICD10 codes to string
df=df.withColumn('p41270', (df.p41270).cast('string'))

In [None]:
# Save to csv
pdf=df.toPandas()
pdf.to_csv(OUTPUT_FILENAME, index=False)
dxpy.upload_local_file(OUTPUT_FILENAME, folder=OUTPUT_DIR, parents=True)
os.remove(OUTPUT_FILENAME)