From 0cc17cb763ccd97038e268957866614fd4d939be Mon Sep 17 00:00:00 2001 From: Anjing Date: Fri, 25 Jul 2025 16:29:35 -0400 Subject: [PATCH] Changes in tensorqtl_postprocessor.R --- inst/code/tensorqtl_postprocessor.R | 327 ++++++++++++++++++++++++---- 1 file changed, 286 insertions(+), 41 deletions(-) diff --git a/inst/code/tensorqtl_postprocessor.R b/inst/code/tensorqtl_postprocessor.R index 5fee5fae..6aaafa6e 100644 --- a/inst/code/tensorqtl_postprocessor.R +++ b/inst/code/tensorqtl_postprocessor.R @@ -3,6 +3,7 @@ suppressPackageStartupMessages({ library(dplyr) library(tidyr) library(readr) + library(arrow) }) # Utility function for NULL coalescing @@ -46,6 +47,212 @@ safe_qvalue <- function(p, ...) { ) } +# Parse additional p-value columns parameter +parse_additional_pvalue_cols <- function(additional_pvalue_cols_str) { + if (is.null(additional_pvalue_cols_str) || additional_pvalue_cols_str == "" || additional_pvalue_cols_str == "NULL") { + return(character(0)) + } + # Split by comma and trim whitespace + cols <- trimws(strsplit(additional_pvalue_cols_str, ",")[[1]]) + return(cols[cols != ""]) +} + +# Check if qvalue column exists in file +check_qvalue_exists <- function(file_path, qvalue_pattern) { + if (grepl("\\.parquet$", file_path)) { + cols <- names(read_parquet(file_path, n_max = 1)) + } else { + if (grepl("\\.(gz|bz2|xz)$", file_path)) { + header <- system(paste0("zcat ", shQuote(file_path), " | head -1"), intern = TRUE) + } else { + header <- system(paste0("head -1 ", shQuote(file_path)), intern = TRUE) + } + cols <- strsplit(header, "\t")[[1]] + } + + q_cols <- grep(qvalue_pattern, cols, value = TRUE) + return(length(q_cols) > 0) +} + +# Extract chromosome and position from variant_id if needed +extract_chrom_pos_from_variant_id <- function(data) { + if ("chrom" %in% names(data)) { + message("Column 'chrom' already exists, no extraction needed") + return(data) + } + + if (!"variant_id" %in% names(data)) { + stop("Neither 'chrom' nor 'variant_id' column found in data") + } + + message("Extracting 'chrom' and 'pos' from 'variant_id' column") + + # Extract chrom and pos from variant_id (format: chr1:14677_G_A) + data <- data %>% + mutate( + chrom = gsub("^(chr[^:]+):.*$", "\\1", variant_id), + pos = as.integer(gsub("^[^:]+:([0-9]+)_.*$", "\\1", variant_id)) + ) + + # Check if extraction was successful + if (any(is.na(data$pos)) || any(data$chrom == data$variant_id)) { + warning("Some variant_id entries could not be parsed. Check format: expected 'chrX:position_ref_alt'") + # Show examples of problematic entries + problematic <- data$variant_id[data$chrom == data$variant_id | is.na(data$pos)] + if (length(problematic) > 0) { + message("Examples of problematic variant_id entries:") + print(head(problematic, 5)) + } + } + + message(sprintf("Successfully extracted chrom and pos for %d variants", nrow(data))) + return(data) +} + +# Compute qvalues for molecular traits +compute_and_save_qvalues <- function(params) { + setwd(params$workdir) + message("Computing q-values for QTL files...") + + molecular_id_col <- params$molecular_id_col %||% "molecular_trait_object_id" + + # Get all QTL files + qtl_files <- list.files(pattern = params$qtl_pattern, full.names = TRUE) + if (length(qtl_files) == 0) { + message("No QTL files found matching pattern") + return(list(files = qtl_files, data_list = list(), need_qvalue_computation = FALSE)) + } + + # Check if qvalue column exists in first file + qvalue_exists <- check_qvalue_exists(qtl_files[1], params$qvalue_pattern) + + if (qvalue_exists) { + message("Q-value column already exists, skipping q-value computation") + return(list(files = qtl_files, data_list = list(), need_qvalue_computation = FALSE)) + } + + message("Q-value column not found, computing q-values...") + + # Parse additional p-value columns + additional_pvalue_cols <- parse_additional_pvalue_cols(params$additional_pvalue_cols) + + # Get column info from first file + column_info <- extract_column_names(qtl_files[1], params$pvalue_pattern, params$qvalue_pattern) + main_pvalue_col <- column_info$p_col + main_qvalue_col <- column_info$q_col + + message(sprintf("Processing %d files with main p-value column: %s → %s", + length(qtl_files), main_pvalue_col, main_qvalue_col)) + + if (length(additional_pvalue_cols) > 0) { + additional_qvalue_cols <- gsub("^pval", "qval", additional_pvalue_cols) + message(sprintf("Additional p-value columns: %s → %s", + paste(additional_pvalue_cols, collapse = ", "), + paste(additional_qvalue_cols, collapse = ", "))) + } + + # Determine if we need p-value filtering + apply_pvalue_filter <- params$pvalue_cutoff < 1 + if (apply_pvalue_filter) { + message(sprintf("Will apply p-value filter < %g during processing to save memory", params$pvalue_cutoff)) + } + + # Process each file: read → compute qvalue → save complete → filter → store filtered + filtered_data_list <- list() + + for (i in seq_along(qtl_files)) { + file_path <- qtl_files[i] + message(sprintf("Processing file %d/%d: %s", i, length(qtl_files), basename(file_path))) + + # Step 1: Read file + if (grepl("\\.parquet$", file_path)) { + data <- read_parquet(file_path) + } else { + is_compressed <- grepl("\\.(gz|bz2|xz)$", file_path) + if (is_compressed) { + data <- data.table::fread(cmd = paste("zcat", shQuote(file_path))) + } else { + data <- data.table::fread(file_path) + } + } + + # Step 2: Compute q-values for each molecular trait + data_with_qvalues <- data %>% + # Extract chrom/pos from variant_id if needed + extract_chrom_pos_from_variant_id() %>% + group_by(!!sym(molecular_id_col)) %>% + do({ + trait_data <- . + + # Compute q-value for main p-value column + if (main_pvalue_col %in% names(trait_data)) { + trait_data[[main_qvalue_col]] <- safe_qvalue(trait_data[[main_pvalue_col]])$qvalues + } + + # Compute q-values for additional p-value columns + for (pval_col in additional_pvalue_cols) { + if (pval_col %in% names(trait_data)) { + # More flexible column name conversion: pval_* -> qval_* + qval_col <- gsub("^pval", "qval", pval_col) + trait_data[[qval_col]] <- safe_qvalue(trait_data[[pval_col]])$qvalues + } else { + warning(sprintf("P-value column '%s' not found in file %s", pval_col, basename(file_path))) + } + } + + trait_data + }) %>% + ungroup() + + # Print completion message once per file + if (length(additional_pvalue_cols) > 0) { + for (pval_col in additional_pvalue_cols) { + qval_col <- gsub("^pval", "qval", pval_col) + message(sprintf("Computed %s → %s for %s", pval_col, qval_col, basename(file_path))) + } + } + + # Step 3: Save complete data with qvalues (always as .tsv.gz) + message(sprintf("Saving computed q-values for file: %s", basename(file_path))) + + # Always save as .tsv.gz regardless of input format + if (grepl("\\.parquet$", file_path)) { + output_file <- sub("\\.parquet$", ".qvalue_computed.tsv.gz", file_path) + } else if (grepl("\\.gz$", file_path)) { + output_file <- sub("\\.gz$", ".qvalue_computed.tsv.gz", file_path) + } else { + output_file <- sub("\\.(tsv|txt)$", ".qvalue_computed.tsv.gz", file_path) + } + + write_delim(data_with_qvalues, gzfile(output_file), delim = "\t") + message(sprintf("Saved complete q-value computed file: %s", basename(output_file))) + + # Step 4: Apply p-value filtering immediately to save memory + if (apply_pvalue_filter) { + pre_filter_rows <- nrow(data_with_qvalues) + filtered_data <- data_with_qvalues %>% + filter(!!sym(main_pvalue_col) < params$pvalue_cutoff) + post_filter_rows <- nrow(filtered_data) + message(sprintf("Applied p-value filter: %d → %d rows", pre_filter_rows, post_filter_rows)) + + # Store the filtered data for rbind + filtered_data_list[[i]] <- filtered_data + } else { + # No filtering needed, store all data + filtered_data_list[[i]] <- data_with_qvalues + } + + # Step 5: Clean up memory + rm(data, data_with_qvalues) + if (apply_pvalue_filter && exists("filtered_data")) { + rm(filtered_data) + } + gc() # Force garbage collection to free memory + } + + return(list(files = qtl_files, data_list = filtered_data_list, need_qvalue_computation = TRUE)) +} + find_common_prefix <- function(files) { if (length(files) == 0) { return("") @@ -137,11 +344,14 @@ read_and_combine_files <- function(files, ...) { missing <- files[!file.exists(files)] if (length(missing) > 0) stop("Missing files: ", paste(missing, collapse = ", ")) - # Determine if files are compressed + # Determine file types is_compressed <- grepl("\\.(gz|bz2|xz)$", files) + is_parquet <- grepl("\\.parquet$", files) # Load first file - if (all(is_compressed)) { + if (is_parquet[1]) { + data <- read_parquet(files[1]) + } else if (is_compressed[1]) { data <- data.table::fread(cmd = paste("zcat", shQuote(files[1])), ...) } else { data <- data.table::fread(files[1], ...) @@ -150,18 +360,21 @@ read_and_combine_files <- function(files, ...) { # Load remaining files and append if (length(files) > 1) { for (i in 2:length(files)) { - if (is_compressed[i]) { + if (is_parquet[i]) { + next_data <- read_parquet(files[i]) + } else if (is_compressed[i]) { next_data <- data.table::fread( cmd = paste("zcat", shQuote(files[i])), skip = 1, header = FALSE, ... ) + # Set column names to match first file + data.table::setnames(next_data, names(data)) } else { next_data <- data.table::fread(files[i], skip = 1, header = FALSE, ...) + # Set column names to match first file + data.table::setnames(next_data, names(data)) } - # Set column names to match first file - data.table::setnames(next_data, names(data)) - # Append using rbindlist (faster than rbind) data <- data.table::rbindlist(list(data, next_data), use.names = TRUE, fill = TRUE) } @@ -188,6 +401,7 @@ load_regional_data <- function(params) { if (length(regional_files) > 0) { data <- read_and_combine_files(regional_files) %>% + extract_chrom_pos_from_variant_id() %>% { if ("chrom" %in% colnames(.)) mutate(., chrom = standardize_chrom(chrom)) else . } @@ -214,8 +428,19 @@ load_regional_data <- function(params) { } extract_column_names <- function(file_path, pvalue_pattern = "pvalue", qvalue_pattern = "qvalue") { - header <- system(paste0("zcat ", file_path, " | head -1"), intern = TRUE) - cols <- strsplit(header, "\t")[[1]] + if (grepl("\\.parquet$", file_path)) { + # For parquet files, read column names directly + cols <- names(read_parquet(file_path, n_max = 1)) + } else { + # For text files, use original method + if (grepl("\\.(gz|bz2|xz)$", file_path)) { + header <- system(paste0("zcat ", shQuote(file_path), " | head -1"), intern = TRUE) + } else { + header <- system(paste0("head -1 ", shQuote(file_path)), intern = TRUE) + } + cols <- strsplit(header, "\t")[[1]] + } + column_info <- list( all_columns = cols ) @@ -325,7 +550,9 @@ calculate_filtered_variant_counts <- function(filename, params, gene_coords) { shQuote(filename), paste(required_cols, collapse = "\t"), awk_cols ) message("Extracting minimal columns from file...") - qtl_data <- data.table::fread(cmd = cmd) %>% mutate(chrom = standardize_chrom(chrom)) + qtl_data <- data.table::fread(cmd = cmd) %>% + extract_chrom_pos_from_variant_id() %>% + mutate(chrom = standardize_chrom(chrom)) trait_chrom <- qtl_data %>% group_by(!!sym(molecular_id_col)) %>% summarize(chrom = first(chrom)) @@ -442,6 +669,7 @@ load_n_variants_data <- function(params, gene_coords) { if (length(n_variants_files) > 0) { message("Loading n_variants count data...") n_variants_data <- read_and_combine_files(n_variants_files) %>% + extract_chrom_pos_from_variant_id() %>% mutate(chrom = standardize_chrom(chrom)) } @@ -453,7 +681,10 @@ load_qtl_data <- function(params, load_n_variants = FALSE) { message('workdir is ', getwd()) gene_coords <- load_gene_coordinates(params) - files <- list.files(pattern = params$qtl_pattern, full.names = TRUE) + # Compute q-values first if qvalue column doesn't exist + qvalue_result <- compute_and_save_qvalues(params) + files <- qvalue_result$files + if (length(files) == 0) { stop("No pair files found") } @@ -461,24 +692,44 @@ load_qtl_data <- function(params, load_n_variants = FALSE) { # Get column names column_info <- extract_column_names(files[1], params$pvalue_pattern, params$qvalue_pattern) - # Only filter if pvalue_cutoff is less than 1 - filter_by_p <- params$pvalue_cutoff < 1 - - if (filter_by_p) { - # Filter rows with p-value < threshold for efficiency - files_str <- paste(shQuote(files), collapse = " ") - awk_cmd <- sprintf( - "awk 'NR==1 {print; next} $%d < %s'", - column_info$p_idx, params$pvalue_cutoff - ) - cmd <- sprintf("zcat %s | %s", files_str, awk_cmd) - - message(sprintf("Only loading QTL data with p-value < %g", params$pvalue_cutoff)) - data <- data.table::fread(cmd = cmd) %>% mutate(chrom = standardize_chrom(chrom)) + # Decide whether to use computed data or read from files + if (qvalue_result$need_qvalue_computation) { + message("Using pre-computed and pre-filtered q-value data") + # Combine all computed and filtered data + data <- data.table::rbindlist(qvalue_result$data_list, use.names = TRUE, fill = TRUE) + + # Extract chrom/pos from variant_id if needed, then standardize + data <- data %>% + extract_chrom_pos_from_variant_id() %>% + mutate(chrom = standardize_chrom(chrom)) + + message(sprintf("Combined data from %d files: %d total rows", + length(qvalue_result$data_list), nrow(data))) } else { - # Load all data (may be huge) - message("Loading all pair data (no p-value filtering)") - data <- read_and_combine_files(files) %>% mutate(chrom = standardize_chrom(chrom)) + # Read from original files + # Only filter if pvalue_cutoff is less than 1 + filter_by_p <- params$pvalue_cutoff < 1 + + if (filter_by_p) { + # Filter rows with p-value < threshold for efficiency + files_str <- paste(shQuote(files), collapse = " ") + awk_cmd <- sprintf( + "awk 'NR==1 {print; next} $%d < %s'", + column_info$p_idx, params$pvalue_cutoff + ) + cmd <- sprintf("zcat %s | %s", files_str, awk_cmd) + + message(sprintf("Only loading QTL data with p-value < %g", params$pvalue_cutoff)) + data <- data.table::fread(cmd = cmd) %>% + extract_chrom_pos_from_variant_id() %>% + mutate(chrom = standardize_chrom(chrom)) + } else { + # Load all data (may be huge) + message("Loading all pair data (no p-value filtering)") + data <- read_and_combine_files(files) %>% + extract_chrom_pos_from_variant_id() %>% + mutate(chrom = standardize_chrom(chrom)) + } } # Load n_variants data if requested @@ -540,7 +791,7 @@ prepare_local_qtl_data <- function(qtl_data, regional_data, params, should_filte molecular_id_col <- params$molecular_id_col %||% "molecular_trait_object_id" if (should_filter) { - # NEW: Check if cis_window is 0 to skip cis-window filtering + # Check if cis_window is 0 to skip cis-window filtering if (params$cis_window == 0) { message("cis_window is 0, applying only MAF filtering...") # Apply only MAF filtering when cis_window is 0 @@ -961,21 +1212,15 @@ identify_qvalue_snps <- function(data, params, base_data = NULL) { return(base_data) } - # Check if the q-value column exists in the data + # Use existing q-value column q_value_col <- data$qtl_data$column_info$q_col - if (q_value_col %in% names(snp_data)) { - message(sprintf("Use existing q-value column '%s' for qvalue-based QTL identification", q_value_col)) - significant_snps <- snp_data %>% - filter(!!sym(q_value_col) < params$fdr_threshold) - } else { - p_col <- data$qtl_data$column_info$p_col - message(sprintf("Computing q-values for each event's SNPs using p-value from '%s'...", p_col)) - significant_snps <- snp_data %>% - group_by(!!sym(molecular_id_col)) %>% - mutate(!!sym(q_value_col) := safe_qvalue(!!sym(p_col))$qvalues) %>% - filter(!!sym(q_value_col) < params$fdr_threshold) %>% - ungroup() + if (!q_value_col %in% names(snp_data)) { + stop(sprintf("Q-value column '%s' not found. Please ensure q-values are computed before analysis.", q_value_col)) } + + message(sprintf("Using existing q-value column '%s' for qvalue-based QTL identification", q_value_col)) + significant_snps <- snp_data %>% + filter(!!sym(q_value_col) < params$fdr_threshold) # Create result name based on method used base_data$method_name <- q_col