# SCRIPT TO GENERATE COVARIATES

## This script should be only run once

#### Initialization 
##### Load packages

Import to current directory: src/project_permed

In [None]:
import dxdata
import dxpy
import pyspark
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

from fields import fields_for_id

##### Spark and dataset configuration

In [None]:
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

dispensed_dataset_id = dxpy.find_one_data_object(
    typename="Dataset", name="app*.dataset", folder="/", name_mode="glob"
)["id"]
dataset = dxdata.load_dataset(id=dispensed_dataset_id)

participant = dataset["participant"]

#### Data
##### Retrieve covariates and 20 first PCs. Retrieve also date to each target disease if necessary

In [None]:
# Add diagnosis fields (first diagnosis date fields)
# disease_fields = { "M47": "131916", "M13": "131864", "M19": "131876", "M79": "131960", "M54": "131928", "M51": "131924", "M25": "131888", "M16": "131870", "M75": "131954",  "M17": "131872"}

fields = [
    "21022",  # Age at recruitment
    "22001",  # Genetic sex
    "22009",  # Genetic principal components
]  # + list(disease_fields.values())

# Get names of given fields
field_names = [fields_for_id(id, participant) for id in fields]
field_names = ["eid"] + [field.name for fields in field_names for field in fields]

In [4]:
# Retrieve data
df = participant.retrieve_fields(
    names=field_names, engine=dxdata.connect(), coding_values="raw"
)
core_fields = ["p21022", "p22001"] + [f"p22009_a{i}" for i in range(1, 40)]
df = df.na.drop(subset=core_fields)
print(f"Number of entries {df.count()}")
df.drop("eid").show(3, truncate=False)

  self._context = ssl.SSLContext(ssl_version)


Number of entries 488006
+------+------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+-------+----------+----------+----------+-------+-------+----------+-------+-------+----------+
|p21022|p22001|p22009_a1|p22009_a2|p22009_a3|p22009_a4|p22009_a5|p22009_a6|p22009_a7|p22009_a8|p22009_a9|p22009_a10|p22009_a11|p22009_a12|p22009_a13|p22009_a14|p22009_a15|p22009_a16|p22009_a17|p22009_a18|p22009_a19|p22009_a20|p22009_a21|p22009_a22|p22009_a23|p22009_a24|p22009_a25|p22009_a26|p22009_a27|p22009_a28|p22009_a29|p22009_a30|p22009_a31|p22009_a32|p22009_a33|p22009_a34|p22009_a35|p22009_a36|p22009_a37|p22009_a38|p22009_a3

In [None]:
# Rename PC columns
pcs = {f"p22009_a{i}": f"PC{i}" for i in range(1, 21)}
covs = ["FID", "IID", "SEX", "AGE", "AGE2", "AGESEX", "AGE2SEX"] + list(
    pcs.values()
)  # + [f"HAS_{k}" for k in disease_fields]

# Add flags for each disease when necessary
# for code, field_id in disease_fields.items():
#    df = df.withColumn(f"HAS_{code}", F.when(F.col(f"p{field_id}").isNotNull(), 1).otherwise(0))

# Final DataFrame with renamed and computed columns
df = (
    df.select([F.col(c).alias(pcs.get(c, c)) for c in df.columns])
    .withColumn("FID", F.col("eid"))
    .withColumn("IID", F.col("eid"))
    .withColumn("SEX", F.col("p22001").cast(IntegerType()))
    .withColumn("AGE", F.col("p21022").cast(IntegerType()))
    .withColumn("AGE2", (F.col("p21022") ** 2).cast(IntegerType()))
    .withColumn("AGESEX", (F.col("p21022") * F.col("p22001")).cast(IntegerType()))
    .withColumn(
        "AGE2SEX", ((F.col("p21022") ** 2) * F.col("p22001")).cast(IntegerType())
    )
    .select(*covs)
)

In [6]:
df.drop("eid").show(3, truncate=False)

+-------+-------+---+---+----+------+-------+--------+-------+--------+--------+--------+---------+---------+---------+---------+--------+--------+---------+---------+-------+---------+--------+---------+----------+-------+----------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|FID    |IID    |SEX|AGE|AGE2|AGESEX|AGE2SEX|PC1     |PC2    |PC3     |PC4     |PC5     |PC6      |PC7      |PC8      |PC9      |PC10    |PC11    |PC12     |PC13     |PC14   |PC15     |PC16    |PC17     |PC18      |PC19   |PC20      |HAS_M47|HAS_M13|HAS_M19|HAS_M79|HAS_M54|HAS_M51|HAS_M25|HAS_M16|HAS_M75|HAS_M17|
+-------+-------+---+---+----+------+-------+--------+-------+--------+--------+--------+---------+---------+---------+---------+--------+--------+---------+---------+-------+---------+--------+---------+----------+-------+----------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|1000146|1000146|0  |52 |2704|0     |0      |-12.417 |6

##### Save and Export

In [None]:
# Save DataFrame
df.coalesce(1).write.csv("/tmp/covariates.tsv", sep="\t", header=True)
# df.coalesce(1).write.csv("/tmp/covariates_with_target_diseases.tsv",sep="\t", header=True)
# df.coalesce(1).write.mode("overwrite").csv("/tmp/covariates_with_target_diseases.tsv",sep="\t",header=True)

In [None]:
# Upload DataFrame
!hadoop fs -getmerge /tmp/covariates.tsv ../tmp/covariates_with_target_diseases.tsv
!dx upload ../tmp/covariates.tsv --path WGS_Lucia/Data/Input_regenie/
#!hadoop fs -getmerge /tmp/covariates_with_target_diseases.tsv ../tmp/covariates_with_target_diseases.tsv
#!dx upload ../tmp/covariates_with_target_diseases.tsv --path WGS_Lucia/Data/Input_regenie/

2025-04-25 09:24:53,320 WARN metrics.MetricsReporter: Unable to initialize metrics scraping configurations from hive-site.xml. Message:InputStream cannot be null
2025-04-25 09:24:53,429 WARN service.DNAxApiSvc: Using default configurations. Unable to find dnanexus.conf.location=null
2025-04-25 09:24:53,429 INFO service.DNAxApiSvc: apiserver connection-pool config. MaxPoolSize=10, MaxPoolPerRoute=10,MaxWaitTimeout=60000
2025-04-25 09:24:53,429 INFO service.DNAxApiSvc: initializing http connection manager pools
2025-04-25 09:24:53,871 INFO service.DNAxApiSvc: Worker process - IdleConnectionMonitorThread disabled
2025-04-25 09:24:53,871 INFO service.DNAxApiSvc: Worker process - IdleConnectionMonitorThread disabled
2025-04-25 09:24:53,872 INFO service.DNAxApiSvc: initializing DNAxApiSvc
2025-04-25 09:24:54,715 WARN service.DNAxApiSvc: Shutting down Runtime service for Connection Pools
2025-04-25 09:24:54,723 INFO service.DNAxApiSvc: shutting down httpClientConnManager
2025-04-25 09:24:54,7