In [2]:
# Import packages
import pyspark
import dxpy
import dxdata
from pyspark.sql import SparkSession

In [3]:
dxdata.__version__

'0.41.0'

In [4]:
# Spark initialization (Done only once; do not rerun this cell unless you select Kernel -> Restart kernel).
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)


# spark = SparkSession.builder \
#     .appName("MyApp") \
#     .config("spark.sql.broadcastTimeout", "600") \
#     .config("spark.sql.autoBroadcastJoinThreshold", -1) \
#     .getOrCreate()

# spark = SparkSession.builder \
#     .appName("MyApp") \
#     .config("spark.driver.maxResultSize", "4g") \
#     .config("spark.kryoserializer.buffer.max", "2048m") \
#     .getOrCreate()

conf = pyspark.SparkConf().setAll([('spark.driver.maxResultSize', '2047'), ("spark.kryoserializer.buffer.max", "1g")])

In [5]:
# conf = spark.sparkContext.getConf()

# Print the values of the specific configurations
print("spark.kryoserializer.buffer.max:", conf.get("spark.kryoserializer.buffer.max"))
print("spark.driver.maxResultSize:", conf.get("spark.driver.maxResultSize"))

spark.kryoserializer.buffer.max: 1g
spark.driver.maxResultSize: 2047


In [6]:
# List all Spark configurations to verify settings
for key, value in conf.getAll():
    print(f"{key}: {value}")


spark.port.maxRetries: 16
spark.eventLog.enabled: true
spark.driver.defaultJavaOptions: -Ddnanexus.fs.output.committer.pendingdirname=job-GkkFpgjJfg3xyj9XpVy3ZF7Z
spark.blockManager.port: 44000
spark.driver.port: 42000
spark.driver.host: ip-10-60-42-98.eu-west-2.compute.internal
spark.executor.memory: 2800m
spark.kryo.registrator: is.hail.kryo.HailKryoRegistrator
spark.serializer: org.apache.spark.serializer.KryoSerializer
spark.sql.shuffle.partitions: 5
spark.history.fs.logDirectory: hdfs://master:9000/eventlogs/
spark.repl.local.jars: local:/cluster/dnax/jars/dnax-common-1.0.jar,local:/cluster/dnax/jars/dnaxfilesystem-1.0.jar,local:/cluster/dnax/jars/hiveclient-1.0.jar,local:/cluster/dnax/jars/dnanexus-api-0.1.0-SNAPSHOT-jar-with-dependencies.jar
spark.executor.extraClassPath: /cluster/dnax/jars/dnax-common-1.0.jar:/cluster/dnax/jars/dnanexus-api-0.1.0-SNAPSHOT-jar-with-dependencies.jar:/cluster/dnax/jars/dnaxspark-1.0.jar:/cluster/spark/jars/hail-all-spark-0.2.116.jar
spark.executor

In [2]:
# Automatically discover dispensed database name and dataset id
dispensed_database = dxpy.find_one_data_object(
    classname='database', 
    name='app*', 
    folder='/', 
    name_mode='glob', 
    describe=True)
dispensed_database_name = dispensed_database['describe']['name']

dispensed_dataset = dxpy.find_one_data_object(
    typename='Dataset', 
    name='app*.dataset', 
    folder='/', 
    name_mode='glob')
dispensed_dataset_id = dispensed_dataset['id']
print('dispensed_dataset_id:', dispensed_dataset_id)

dispensed_dataset_id: record-GZ85K5jJYvKZJ6kG1yxfzbzJ


# Access dataset

In [3]:
dataset = dxdata.load_dataset(id=dispensed_dataset_id)

In [7]:
dataset.entities

[<Entity "participant">,
 <Entity "death">,
 <Entity "hesin_oper">,
 <Entity "death_cause">,
 <Entity "hesin_diag">,
 <Entity "hesin_psych">,
 <Entity "hesin_maternity">,
 <Entity "hesin">,
 <Entity "hesin_delivery">,
 <Entity "hesin_critical">,
 <Entity "covid19_result_england">,
 <Entity "covid19_result_scotland">,
 <Entity "covid19_result_wales">,
 <Entity "gp_clinical">,
 <Entity "gp_registrations">,
 <Entity "gp_scripts">,
 <Entity "omop_visit_occurrence">,
 <Entity "omop_dose_era">,
 <Entity "omop_drug_era">,
 <Entity "omop_drug_exposure">,
 <Entity "omop_note">,
 <Entity "omop_observation">,
 <Entity "omop_death">,
 <Entity "omop_device_exposure">,
 <Entity "omop_condition_era">,
 <Entity "omop_condition_occurrence">,
 <Entity "omop_measurement">,
 <Entity "omop_procedure_occurrence">,
 <Entity "omop_specimen">,
 <Entity "omop_observation_period">,
 <Entity "omop_person">,
 <Entity "omop_visit_detail">,
 <Entity "olink_instance_0">,
 <Entity "olink_instance_2">,
 <Entity "olink_

In [7]:
participant = dataset['participant']

In [4]:
# Returns all field objects for a given UKB showcase field id

def fields_for_id(field_id):
    from distutils.version import LooseVersion
    field_id = str(field_id)
    fields = participant.find_fields(name_regex=r'^p{}(_i\d+)?(_a\d+)?$'.format(field_id))
    return sorted(fields, key=lambda f: LooseVersion(f.name))

# Returns all field names for a given UKB showcase field id

def field_names_for_id(field_id):
    return [f.name for f in fields_for_id(field_id)]

# Extraction examples

In [13]:
# Participant sex
print(field_names_for_id('31'))

['p31']


  return sorted(fields, key=lambda f: LooseVersion(f.name))


In [16]:
# Age when attending assessment centre has multiple instances (visits) 
field_names_for_id('21003')

  return sorted(fields, key=lambda f: LooseVersion(f.name))


['p21003_i0', 'p21003_i1', 'p21003_i2', 'p21003_i3']

In [17]:
# Pulse rate has multiple instances and array indices (measured twice in each visit)
field_names_for_id('102')

  return sorted(fields, key=lambda f: LooseVersion(f.name))


['p102_i0_a0',
 'p102_i0_a1',
 'p102_i1_a0',
 'p102_i1_a1',
 'p102_i2_a0',
 'p102_i2_a1',
 'p102_i3_a0',
 'p102_i3_a1']

In [10]:
field_ids = ['31','21003', '102']
# sum flattens list of lists
field_ids = sum([field_names_for_id(field_id) for field_id in field_ids], []) 
field_ids

  return sorted(fields, key=lambda f: LooseVersion(f.name))


['p31',
 'p21003_i0',
 'p21003_i1',
 'p21003_i2',
 'p21003_i3',
 'p102_i0_a0',
 'p102_i0_a1',
 'p102_i1_a0',
 'p102_i1_a1',
 'p102_i2_a0',
 'p102_i2_a1',
 'p102_i3_a0',
 'p102_i3_a1']

In [12]:
field_ids = ['p31']

#subset_participants = participant_df.head(10)
#subset_participants

In [13]:
df = participant.retrieve_fields(names=field_ids, coding_values='replace', engine=dxdata.connect())

In [14]:
# df.show(5, truncate=False)

In [None]:
df.toPandas().to_csv('test_extraction.tsv', sep='\t', index=False)

In [None]:
%%bash
dx upload test_extraction.tsv --dest /Test_Project_Start/

# Blood and disease data extraction

In [5]:
common_info = ['eid', 'p31', 'p21022', 'p34', 'p52', 'p23165'] # sex, Age at recruitment, year, month of birth, 23165: Blood-type haplotype
conditions = ['21003', '2188', '2306', '20022', '20001', '20002', '3079'] # 21003: Age when attended assessment centre, 2188: Long-standing illness, disability or infirmity, 2306: Weight change compared with 1 year ago, 20022: Birth weight, 20001: cancer-code, self-reported; 20002: Non-cancer illness code, self-reported (https://biobank.ctsu.ox.ac.uk/crystal/field.cgi?id=20002); 3079: pace-maker
blood_count =  ['30160', '30220', '30150', '30210', '30030', '30020', '30300', '30290', '30280', '30120', '30180', '30050', '30060', '30040', '30100', '30260', '30270', '30130', '30190', '30140', '30200', '30170', '30230', '30080', '30090', '30110', '30010', '30070', '30250', '30240', '30000']
blood_biochemistry = ['30897', '30622', '30621', '30623', '30624', '30625', '30626', '30602', '30601', '30603', '30604', '30605', '30606', '30612', '30611', '30613', '30614', '30615', '30616', '30632', '30631', '30633', '30634', '30635', '30636', '30642', '30641', '30643', '30644', '30645', '30646', '30652', '30651', '30653', '30654', '30655', '30656', '30712', '30711', '30713', '30714', '30715', '30716', '30682', '30681', '30683', '30684', '30685', '30686', '30692', '30691', '30693', '30694', '30695', '30696', '30702', '30701', '30703', '30704', '30705', '30706', '30722', '30721', '30723', '30724', '30725', '30726', '30662', '30661', '30663', '30664', '30665', '30666', '30732', '30731', '30733', '30734', '30735', '30736', '30742', '30741', '30743', '30744', '30745', '30746', '30751', '30753', '30754', '30755', '30756', '30762', '30761', '30763', '30764', '30765', '30766', '30772', '30771', '30773', '30774', '30775', '30776', '30782', '30781', '30783', '30784', '30785', '30786', '30792', '30791', '30793', '30794', '30795', '30796', '30802', '30801', '30803', '30804', '30805', '30806', '30812', '30811', '30813', '30814', '30815', '30816', '30822', '30821', '30823', '30824', '30825', '30826', '30832', '30831', '30833', '30834', '30835', '30836', '30852', '30851', '30853', '30854', '30855', '30856', '30842', '30841', '30843', '30844', '30845', '30846', '30862', '30861', '30863', '30864', '30865', '30866', '30872', '30871', '30873', '30874', '30875', '30876', '30882', '30881', '30883', '30884', '30885', '30886', '30672', '30671', '30673', '30674', '30675', '30676', '30892', '30891', '30893', '30894', '30895', '30896']
metabolomics =  ['23474', '23475', '23476', '23477', '23460', '23479', '23440', '23439', '23441', '23433', '23432', '23431', '23484', '23526', '23561', '23533', '23498', '23568', '23540', '23505', '23575', '23547', '23512', '23554', '23491', '23519', '23580', '23610', '23635', '23615', '23590', '23640', '23620', '23595', '23645', '23625', '23600', '23630', '23585', '23605', '23485', '23418', '23527', '23417', '23562', '23534', '23499', '23569', '23541', '23506', '23576', '23548', '23513', '23416', '23555', '23492', '23520', '23581', '23611', '23636', '23616', '23591', '23641', '23621', '23596', '23646', '23626', '23601', '23631', '23586', '23606', '23473', '23404', '23481', '23430', '23523', '23429', '23558', '23530', '23495', '23565', '23537', '23502', '23572', '23544', '23509', '23428', '23551', '23488', '23516', '23478', '23443', '23450', '23457', '23486', '23422', '23528', '23421', '23563', '23535', '23500', '23570', '23542', '23507', '23577', '23549', '23514', '23420', '23556', '23493', '23521', '23582', '23612', '23637', '23617', '23592', '23642', '23622', '23597', '23647', '23627', '23602', '23632', '23587', '23607', '23470', '20280', '23461', '23462', '23480', '23406', '23463', '23465', '23405', '23471', '23466', '23449', '23456', '23447', '23454', '23444', '23451', '23445', '23459', '23452', '23468', '23437', '23434', '23483', '23414', '23525', '23413', '23560', '23532', '23497', '23567', '23539', '23504', '23574', '23546', '23511', '23412', '23553', '23490', '23518', '23579', '23609', '23634', '23614', '23589', '23639', '23619', '23594', '23644', '23624', '23599', '23629', '23584', '23604', '23446', '23458', '23453', '23472', '23402', '23448', '23455', '20281', '23438', '23400', '23401', '23436', '23464', '23427', '23415', '23442', '23419', '23482', '23426', '23524', '23425', '23559', '23531', '23496', '23423', '23566', '23538', '23503', '23573', '23545', '23510', '23424', '23552', '23489', '23517', '23411', '23407', '23487', '23410', '23529', '23409', '23564', '23536', '23501', '23571', '23543', '23508', '23578', '23550', '23515', '23408', '23557', '23494', '23522', '23435', '23583', '23613', '23638', '23618', '23593', '23643', '23623', '23598', '23648', '23628', '23603', '23633', '23588', '23608', '23469', '23403', '23467']
infectious = ['23000', '23001', '23049', '23048', '23026', '23039', '23043', '23018', '23030', '23031', '23006', '23004', '23042', '23016', '23017', '23025', '23024', '23023', '23022', '23010', '23011', '23027', '23015', '23029', '23032', '23014', '23028', '23019', '23041', '23037', '23013', '23044', '23003', '23040', '23005', '23002', '23034', '23033', '23012', '23020', '23038', '23009', '23008', '23007', '23021', '23035', '23036']

In [8]:

fields = common_info + sum([field_names_for_id(field_id) for field_id in conditions],[]) + \
sum([field_names_for_id(field_id) for field_id in blood_count],[]) + \
sum([field_names_for_id(field_id) for field_id in blood_biochemistry],[]) + \
sum([field_names_for_id(field_id) for field_id in metabolomics],[]) + \
sum([field_names_for_id(field_id) for field_id in infectious],[])


  return sorted(fields, key=lambda f: LooseVersion(f.name))


In [9]:
len(fields)

1078

In [12]:
with open('field_names.txt', 'w') as file:
    for item in fields:
        file.write("%s\n" % item)

In [13]:
%%bash
dx upload field_names.txt --path /

ID                          file-GkZv22QJfg3pGVz95v7VG0G7
Class                       file
Project                     project-GZ82qXQJfg3z7xv124xqBJpj
Folder                      /
Name                        field_names.txt
State                       closing
Visibility                  visible
Types                       -
Properties                  -
Tags                        -
Outgoing links              -
Created                     Wed Jun 12 15:38:50 2024
Created by                  alina_grf
 via the job                job-GkZqbkjJfg3Xg10VYVJ91614
Last modified               Wed Jun 12 15:38:51 2024
Media type                  
archivalState               "live"
cloudAccount                "cloudaccount-dnanexus"


## Grabbing fields into a Spark DataFrame

In [23]:
df = participant.retrieve_fields(names=fields, coding_values='replace', engine=dxdata.connect())

In [62]:
%%bash
dx run table-exporter df -output 'blood_disease_extraction.tsv' -output_format "TSV" -coding_option "REPLACE"

usage: dx [-h] [--version] command ...

DNAnexus Command-Line Client, API v1.0.0, client v0.351.0

dx is a command-line client for interacting with the DNAnexus platform.  You
can log in, navigate, upload, organize and share your data, launch analyses,
and more.  For a quick tour of what the tool can do, see

  https://documentation.dnanexus.com/getting-started/tutorials/cli-quickstart#quickstart-for-cli

For a breakdown of dx commands by category, run "dx help".

dx exits with exit code 3 if invalid input is provided or an invalid operation
is requested, and exit code 1 if an internal error is encountered.  The latter
usually indicate bugs in dx; please report them at

  https://github.com/dnanexus/dx-toolkit/issues

optional arguments:
  -h, --help  show this help message and exit
  --env-help  Display help message for overriding environment
              variables
  --version   show program's version number and exit

dx: error: unrecognized arguments: df -output blood_disease_extrac

CalledProcessError: Command 'b'dx run table-exporter df -output \'blood_disease_extraction.tsv\' -output_format "TSV" -coding_option "REPLACE"\n'' returned non-zero exit status 2.

In [63]:
df_repartitioned = df.repartition(5000).persist()

In [64]:
df_repartitioned.toPandas().to_csv('blood_disease_extraction.tsv', sep='\t', index=False)

Py4JJavaError: An error occurred while calling o16768.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 3767 tasks (1024.3 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2548)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2493)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2492)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2492)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1250)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1250)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1250)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2736)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2678)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2667)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:410)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3538)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3535)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


In [None]:
%%bash
dx upload blood_disease_extraction.tsv --path /