In [None]:
# Load the required packages and intialise Spark session - ONLY RUN THIS ONCE AT THE START
import pyspark;
import dxpy;
import dxdata;
import csv;
sc = pyspark.SparkContext();
spark = pyspark.sql.SparkSession(sc);

In [None]:
# Find the database file in your DNAnexus project - ONLY RUN THIS ONCE AT THE START
dispensed_database_name = dxpy.find_one_data_object(classname="database", name="app*", folder="/", name_mode="glob", describe=True)["describe"]["name"];
dispensed_dataset_id = dxpy.find_one_data_object(typename="Dataset", name="app*.dataset", folder="/", name_mode="glob")["id"];
rx_data_name = f"{dispensed_database_name}.gp_scripts"
spark.sql("USE " + dispensed_database_name); 
# spark.sql("SHOW TABLES").show(truncate=False); # Unblank if you want to see a list of available datasets
dataset = dxdata.load_dataset(id=dispensed_dataset_id);
participant = dataset["participant"];

### Analgesic extraction

In [None]:

drug_patterns = ("%pregabalin%", "%lyrica%", "%gabapentin%", "%neurontin%", "%gralise%", "%oxycodone%", "%oxycontin%", "%duloxetine%", "%cymbalta%", "%Valproate sodium%", "%divalproex%", "%depakote%", "%carbamazepine%", "%tegretol%", "%carbatrol%", "%venlafaxine%", "%effexor%", "%amitriptyline%", "%elavil%", "%desipramine%", "%norpramin%", "%capsaicin patch%", "%qutenza%", "%lidocaine patch%", "%lidoderm%", "%dextromethorphan%", "%tramadol%", "%ultram%", "imipramine%", "%tofranil%", "%baclofen%", "%gablofen%", "%lioresal%", "%tizanidine%", "%zanaflex%", "%cyclobenzaprine%", "%flexeril%", "%dantrolene%", "%dantrium%", "%revonto%")

# Format patterns for SQL LIKE clauses using LOWER()
like_clauses = [f"LOWER(drug_name) LIKE '{pattern}'" for pattern in drug_patterns]

# Combine into a single SQL WHERE clause
where_clause = " OR ".join(like_clauses)

# Run the Spark SQL query
query = f"""
SELECT eid, issue_date, drug_name
FROM {rx_data_name}
WHERE drug_name IS NOT NULL AND ({where_clause})
"""

df = spark.sql(query)

# Convert to list of dictionaries
data_dicts = [
    {"eid": row.eid, "issue_date": row.issue_date, "drug_name": row.drug_name}
    for row in df.rdd.toLocalIterator()
]

# Write to CSV
csv_file_path = "UKB_Rx_drugs.csv"

with open(csv_file_path, "w", newline="") as csv_file:
    fieldnames = ["eid", "issue_date", "drug_name"]
    writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
    writer.writeheader()

    for idx, data_dict in enumerate(data_dicts):
        writer.writerow(data_dict)

print("Analgesic/Anti-neuropathic Rx CSV file written successfully.")



### General pain extraction

In [None]:
field_names = ["eid", "p21022", "p31", "p21000_i0", "p6159_i0", "p3799_i0", "p4067_i0", "p3404_i0", "p3571_i0", "p3741_i0", "p3414_i0", "p3773_i0", "p2956_i0", "p6154_i0", "p137_i0"]
engine = dxdata.connect(dialect="hive+pyspark")
df = participant.retrieve_fields(names=field_names, coding_values="raw", engine=engine)

# Convert DataFrame to a list of dictionaries
data_dicts = [
    {"eid": row.eid,
        "p21022": row.p21022,
        "p31": row.p31,
        "p21000_i0": row.p21000_i0,
        "p6159_i0": row.p6159_i0,
        "p3799_i0": row.p3799_i0,
        "p4067_i0": row.p4067_i0,
        "p3404_i0": row.p3404_i0,
        "p3571_i0": row.p3571_i0,
        "p3741_i0": row.p3741_i0,
        "p3773_i0": row.p3773_i0,
        "p2956_i0": row.p2956_i0,
        "p6154_i0": row.p6154_i0,
        "p137_i0": row.p137_i0
    }
    for row in df.rdd.toLocalIterator()
]

# Specify the CSV file path
csv_file_path = "UKB_general.csv"

# Write data to CSV file
with open(csv_file_path, "w", newline="") as csv_file:
    fieldnames = ["eid", "p21022", "p31", "p21000_i0", "p6159_i0", "p3799_i0", "p4067_i0", "p3404_i0", "p3571_i0", "p3741_i0", "p3414_i0", "p3773_i0", "p2956_i0", "p6154_i0", "p137_i0"]

    writer = csv.DictWriter(csv_file, fieldnames=fieldnames)

    # Write header
    writer.writeheader()

    # Write rows with progress indication
    for idx, data_dict in enumerate(data_dicts):
        writer.writerow(data_dict)


### Neuropathic pain extraction

In [None]:
field_names = ["eid","p120000","p120001","p120002","p120003","p120004","p120005","p120006","p120007",
               "p120008","p120009","p120010","p120011","p120012","p120013","p120014","p120015","p120016","p120017",
               "p120018","p120019","p120020","p120021","p120022","p120023","p120024","p120025","p120026","p120027",
               "p120028","p120029","p120030","p120031","p120032","p120033","p120034","p120035","p120036","p120037",
               "p120038","p120040","p120041","p120042","p120043","p120044","p120045","p120046","p120047",
               "p120048","p120049","p120050","p120051","p120052","p120053","p120054","p120055","p120056","p120057",
               "p120058","p120059","p120060","p120061","p120062","p120063","p120064","p120065","p120066","p120067",
               "p120068","p120069","p120070","p120071","p120072","p120073","p120074","p120075","p120076","p120077",
               "p120078","p120079","p120080","p120081","p120082","p120083","p120084","p120085","p120086","p120087",
               "p120088","p120089","p120090","p120091","p120092","p120093","p120094","p120095","p120096","p120097",
               "p120098","p120099","p120100","p120101","p120102","p120103","p120104","p120105","p120106","p120107",
               "p120108","p120109","p120110","p120111","p120112","p120113","p120114","p120115","p120116","p120117",
               "p120118","p120119","p120120","p120121","p120122","p120123","p120124","p120125","p120126","p120127"]

engine = dxdata.connect(dialect="hive+pyspark")
df = participant.retrieve_fields(names=field_names, coding_values="raw", engine=engine)

# Convert DataFrame to a list of dictionaries
data_dicts = [
    {
        "eid": row.eid,
        "p120000": row.p120000,
        "p120001": row.p120001,
        "p120002": row.p120002,
        "p120003": row.p120003,
        "p120004": row.p120004,
        "p120005": row.p120005,
        "p120006": row.p120006,
        "p120007": row.p120007,
        "p120008": row.p120008,
        "p120009": row.p120009,
        "p120010": row.p120010,
        "p120011": row.p120011,
        "p120012": row.p120012,
        "p120013": row.p120013,
        "p120014": row.p120014,
        "p120015": row.p120015,
        "p120016": row.p120016,
        "p120017": row.p120017,
        "p120018": row.p120018,
        "p120019": row.p120019,
        "p120020": row.p120020,
        "p120021": row.p120021,
        "p120022": row.p120022,
        "p120023": row.p120023,
        "p120024": row.p120024,
        "p120025": row.p120025,
        "p120026": row.p120026,
        "p120027": row.p120027,
        "p120028": row.p120028,
        "p120029": row.p120029,
        "p120030": row.p120030,
        "p120031": row.p120031,
        "p120032": row.p120032,
        "p120033": row.p120033,
        "p120034": row.p120034,
        "p120035": row.p120035,
        "p120036": row.p120036,
        "p120037": row.p120037,
        "p120038": row.p120038,
        "p120040": row.p120040,
        "p120041": row.p120041,
        "p120042": row.p120042,
        "p120043": row.p120043,
        "p120044": row.p120044,
        "p120045": row.p120045,
        "p120046": row.p120046,
        "p120047": row.p120047,
        "p120048": row.p120048,
        "p120049": row.p120049,
        "p120050": row.p120050,
        "p120051": row.p120051,
        "p120052": row.p120052,
        "p120053": row.p120053,
        "p120054": row.p120054,
        "p120055": row.p120055,
        "p120056": row.p120056,
        "p120057": row.p120057,
        "p120058": row.p120058,
        "p120059": row.p120059,
        "p120060": row.p120060,
        "p120061": row.p120061,
        "p120062": row.p120062,
        "p120063": row.p120063,
        "p120064": row.p120064,
        "p120065": row.p120065,
        "p120066": row.p120066,
        "p120067": row.p120067,
        "p120068": row.p120068,
        "p120069": row.p120069,
        "p120070": row.p120070,
        "p120071": row.p120071,
        "p120072": row.p120072,
        "p120073": row.p120073,
        "p120074": row.p120074,
        "p120075": row.p120075,
        "p120076": row.p120076,
        "p120077": row.p120077,
        "p120078": row.p120078,
        "p120079": row.p120079,
        "p120080": row.p120080,
        "p120081": row.p120081,
        "p120082": row.p120082,
        "p120083": row.p120083,
        "p120084": row.p120084,
        "p120085": row.p120085,
        "p120086": row.p120086,
        "p120087": row.p120087,
        "p120088": row.p120088,
        "p120089": row.p120089,
        "p120090": row.p120090,
        "p120091": row.p120091,
        "p120092": row.p120092,
        "p120093": row.p120093,
        "p120094": row.p120094,
        "p120095": row.p120095,
        "p120096": row.p120096,
        "p120097": row.p120097,
        "p120098": row.p120098,
        "p120099": row.p120099,
        "p120100": row.p120100,
        "p120101": row.p120101,
        "p120102": row.p120102,
        "p120103": row.p120103,
        "p120104": row.p120104,
        "p120105": row.p120105,
        "p120106": row.p120106,
        "p120107": row.p120107,
        "p120108": row.p120108,
        "p120109": row.p120109,
        "p120110": row.p120110,
        "p120111": row.p120111,
        "p120112": row.p120112,
        "p120113": row.p120113,
        "p120114": row.p120114,
        "p120115": row.p120115,
        "p120116": row.p120116,
        "p120117": row.p120117,
        "p120118": row.p120118,
        "p120119": row.p120119,
        "p120120": row.p120120,
        "p120121": row.p120121,
        "p120122": row.p120122,
        "p120123": row.p120123,
        "p120124": row.p120124,
        "p120125": row.p120125,
        "p120126": row.p120126,
        "p120127": row.p120127
    }
    for row in df.rdd.toLocalIterator()
]

# Specify the CSV file path
csv_file_path = "UKB_EOP.csv"

# Write data to CSV file
with open(csv_file_path, "w", newline="") as csv_file:
    fieldnames = ["eid", "p120000", "p120001", "p120002", "p120003", "p120004", "p120005", "p120006", "p120007",
                  "p120008", "p120009", "p120010", "p120011", "p120012", "p120013", "p120014", "p120015", "p120016", "p120017",
                  "p120018", "p120019", "p120020", "p120021", "p120022", "p120023", "p120024", "p120025", "p120026", "p120027",
                  "p120028", "p120029", "p120030", "p120031", "p120032", "p120033", "p120034", "p120035", "p120036", "p120037",
                  "p120038", "p120040", "p120041", "p120042", "p120043", "p120044", "p120045", "p120046", "p120047",
                  "p120048", "p120049", "p120050", "p120051", "p120052", "p120053", "p120054", "p120055", "p120056", "p120057",
                  "p120058", "p120059", "p120060", "p120061", "p120062", "p120063", "p120064", "p120065", "p120066", "p120067",
                  "p120068", "p120069", "p120070", "p120071", "p120072", "p120073", "p120074", "p120075", "p120076", "p120077",
                  "p120078", "p120079", "p120080", "p120081", "p120082", "p120083", "p120084", "p120085", "p120086", "p120087",
                  "p120088", "p120089", "p120090", "p120091", "p120092", "p120093", "p120094", "p120095", "p120096", "p120097",
                  "p120098", "p120099", "p120100", "p120101", "p120102", "p120103", "p120104", "p120105", "p120106", "p120107",
                  "p120108", "p120109", "p120110", "p120111", "p120112", "p120113", "p120114", "p120115", "p120116", "p120117",
                  "p120118", "p120119", "p120120", "p120121", "p120122", "p120123", "p120124", "p120125", "p120126", "p120127"
                  ]

    writer = csv.DictWriter(csv_file, fieldnames=fieldnames)

    # Write header
    writer.writeheader()

    # Write rows with progress indication
    for idx, data_dict in enumerate(data_dicts):
        writer.writerow(data_dict)


### ICD extraction

In [None]:
field_names = ["eid", "p41270"]
engine = dxdata.connect(dialect="hive+pyspark")
df = participant.retrieve_fields(names=field_names, coding_values="raw", engine=engine)

data_dicts = [
    {
        "eid":row.eid,
        "icd_code": row.p41270
    }
    for row in df.rdd.toLocalIterator()
]

csv_file_path = "UKB_icd.csv"

with open(csv_file_path, "w", newline="") as csv_file:
    fieldnames = ["eid", "p41270"]

    writer = csv.DictWriter(csv_file, fieldnames=fieldnames)

    # Write header
    writer.writeheader()

    # Write rows with progress indication
    for idx, data_dict in enumerate(data_dicts):
        writer.writerow(data_dict)

In [None]:
! gzip *.csv
! dx upload *.csv.gz --path projects/CMT_NashBio/cohorts