In [None]:
pip install -r requirements.txt

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta import *


import os
import yaml
import pyspark.pandas as ps
import logging

In [None]:
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
# Set Spark environment
os.environ['SPARK_HOME'] = "/external/rprshnas01/netdata_kcni/dflab/tools/general/distributed-computing/spark/3.5.0/"

In [None]:
# Load parameters
with open('params.yaml','r') as f:
    params = yaml.safe_load(f)


path_ukb_delta = params["ukbiobank"]["main"]["delta"] 
field_list = params['ukbiobank']['main']['fields']
instance_list = params['ukbiobank']['main']['instances']
path_dictionary = params['ukbiobank']['main']['dictionary']
path_output = params['ukbiobank']['main']['output']
path_exclude_ids = params['ukbiobank']['main'].get('exclude_ids', None)

In [None]:
# Start Spark session
logger.info("Starting Spark session...")
builder = (
    SparkSession.builder.appName('Subsetting UK Biobank Data')
        .master("local[*]")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()
logger.info("Spark session started.")

In [None]:
# Subset by fields and instances
logger.info("Subsetting dataset by fields and instances...")
df_subset = df_ukb.select(['eid', 'instance'] + field_list)
df_subset = df_subset.filter(col('instance').isin(instance_list))
logger.info("Subset created.")

In [None]:
# Load main dataset
logger.info("Loading UK Biobank main dataset...")
df_ukb = spark.read.format("delta").load(path_ukb_delta)
logger.info("UK Biobank main dataset loaded.")

In [None]:
# Subset by fields and instances
logger.info("Subset UK Biobank main dataset based on field and instance lists...")
df_subset = df_ukb[['eid', 'instance'] + field_list]
df_subset = df_subset[df_subset['instance'].isin(instance_list)]
logger.info("UK Biobank main dataset subsetted based on field and instance lists.")

In [None]:
# Remove excluded IDs
if path_exclude_ids:
    logger.info(f"Loading exclusion list from {path_exclude_ids}...")
    exclude_ids = spark.read.text(path_exclude_ids).withColumnRenamed("value", "eid").distinct()
    exclude_ids = exclude_ids.withColumn("eid", exclude_ids["eid"].cast("long"))
    logger.info("Filtering out excluded IDs...")
    df_subset = df_subset.join(exclude_ids, on="eid", how="left_anti")
    logger.info("Excluded IDs removed from dataset.")


In [None]:
# Load dictionary
logger.info("Loading dictionary...")
df_dictionary = spark.read.csv(
    path_dictionary,
    sep='\t',
    header=True
)
logger.info("Dictionary loaded.")

In [None]:
# Create column name mapping
logger.info("Creating column name mapping dictionary...")
df_dictionary = df_dictionary.withColumn(
    "NewColumn",
    concat(df_dictionary['Field'], lit(' (FieldID: '), df_dictionary['FieldID'], lit(')'))
)
df_field_name_map = df_dictionary.select('FieldID', 'Field', 'NewColumn')
df_field_name_map = df_field_name_map.filter(col('FieldID').isin(field_list))
mapping = df_field_name_map.pandas_api().set_index('FieldID')['NewColumn'].to_dict()


In [None]:
# Rename columns
logger.info("Renaming columns...")
df_subset = df_subset.pandas_api().rename(columns=mapping).to_spark()


In [None]:
# Convert to Pandas and export to CSV
logger.info("Converting to Pandas DataFrame...")
df_subset = df_subset.toPandas()
logger.info(f"Saving output to {path_output}...")
df_subset.to_csv(path_output, index=False)
logger.info("Process completed successfully.")