In [1]:
# IMPORTS
import re

import dxdata
import dxpy
import pyspark
from pprint import pprint

import tomli
import hail as hl
from pathlib import Path
from pyspark.sql.types import StringType
import subprocess

# SET LOGFILE

with open("../config.toml", "rb") as f:
    conf = tomli.load(f)

    LOG_FILE = Path(conf["IMPORT"]["LOG_DIR"], f"prescriptions.log").resolve().__str__()


# INIT SPARK
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

# INIT HAIL
hl.init(sc=sc, default_reference="GRCh38", log=LOG_FILE)

# DISPENSE DATASET
dispensed_database_name = dxpy.find_one_data_object(
    classname="database", name="app*", folder="/", name_mode="glob", describe=True
)["describe"]["name"]
dispensed_dataset_id = dxpy.find_one_data_object(
    typename="Dataset", name="app*.dataset", folder="/", name_mode="glob"
)["id"]

spark.sql("USE " + dispensed_database_name)

dataset = dxdata.load_dataset(id=dispensed_dataset_id)
scripts = dataset["gp_scripts"]

field_names = [field.name for field in scripts.fields]
df = scripts.retrieve_fields(names=field_names, engine=dxdata.connect())

# MAKE TMPDIR

Path("../tmp").resolve().mkdir(parents=True, exist_ok=True)

pip-installed Hail requires additional configuration options in Spark referring
  to the path to the Hail Python module directory HAIL_DIR,
  e.g. /path/to/python/site-packages/hail:
    spark.jars=HAIL_DIR/hail-all-spark.jar
    spark.driver.extraClassPath=HAIL_DIR/hail-all-spark.jar
    spark.executor.extraClassPath=./hail-all-spark.jarRunning on Apache Spark version 2.4.4
SparkUI available at http://ip-10-60-31-131.eu-west-2.compute.internal:8081
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2.61-3c86d3ba497a
LOGGING: writing to /opt/notebooks/gogoGPCR/hail_logs/prescriptions.log


In [2]:
df = df.withColumn("issue_date", df.issue_date.cast(StringType()))

In [3]:
ht = hl.Table.from_spark(df, key=["eid"])

2021-12-03 10:20:05 Hail: INFO: Ordering unsorted dataset with network shuffle


In [4]:
ids = hl.import_table(
    "file://" + "/mnt/project/Data/schizophrenia/Participant_table.csv",
    delimiter=",",
    impute=True,
    key="Participant ID",
    types={"Participant ID": "str"},
)

2021-12-03 10:20:06 Hail: INFO: Reading table to impute column types
2021-12-03 10:20:08 Hail: INFO: Finished type imputation
  Loading field 'Participant ID' as type str (user-supplied type)
  Loading field 'F20' as type str (imputed)
  Loading field 'F21' as type str (imputed)
  Loading field 'F25' as type str (imputed)
  Loading field 'Age' as type int32 (imputed)
  Loading field 'Sex' as type str (imputed)


In [None]:
ht = ht.semi_join(ids)
ht = ht.annotate(**ids[ht.eid])
ht.show()

In [6]:
total = ht.count()
patients = ht.group_by("eid").aggregate(eids=hl.agg.count()).count()
pprint(f"Total number of prescriptions: {total} from {patients} unique patients")

2021-12-03 10:21:02 Hail: INFO: Coerced sorted dataset
2021-12-03 10:22:01 Hail: INFO: Coerced sorted dataset


'Total number of prescriptions: 475215 from 641 unique patients'


In [10]:
write_path = "/tmp/schizophrenia_prescriptions.tsv.bgz"
ht.export(write_path)

In [11]:
subprocess.run(
    ["hadoop", "fs", "-get", write_path, f"..{write_path}"], check=True, shell=False
)

CompletedProcess(args=['hadoop', 'fs', '-get', '/tmp/schizophrenia_prescriptions.tsv.bgz', '../tmp/schizophrenia_prescriptions.tsv.bgz'], returncode=0)

In [14]:
subprocess.run(
    [
        "dx",
        "upload",
        f"..{write_path}",
        "--path",
        "Data/schizophrenia/schizophrenia_prescriptions.tsv.bgz",
    ],
    check=True,
    shell=False,
)

CompletedProcess(args=['dx', 'upload', '../tmp/schizophrenia_prescriptions.tsv.bgz', '--path', 'Data/schizophrenia/schizophrenia_prescriptions.tsv.bgz'], returncode=0)