# Summary statistics formatting
This notebook takes in more than one collections of sumstat text file,  to produce a collections of merged.rds per gene files that can served as the input of both MASH and MVSuSiE analysis.

## Input
1. **Sumstats Files**: Bgzipped summary statistics for chromosomes 1-22, indexed by Tabix.
2. **Region File**: Defines genomic regions for each gene; enhanced cis regions available elsewhere.
3. **Susie List**: Paths list for fine-mapped RDS files from finemapping output.
## Output
**mashr input**: A list of 10

```
List of 10
 $ random.z: num [1:36, 1:2] -0.785 -0.785 -0.785 -0.785 -0.785 ...
  ..- attr(*, "dimnames")=List of 2
  .. ..$ : chr [1:36] "1:97960:A:G" "1:138565:G:A" "1:15112:C:T" "1:189947:G:A" ...
  .. ..$ : chr [1:2] "A" "B"
 $ null.z  : num [1:36, 1:2] -0.785 -0.785 -0.785 -0.785 -0.785 ...
  ..- attr(*, "dimnames")=List of 2
  .. ..$ : chr [1:36] "1:93692:C:T" "1:273645:A:G" "1:10442:CCTA:." "1:198942:A:C" ...
  .. ..$ : chr [1:2] "A" "B"
 $ random.b: num [1:36, 1:2] -0.123 -0.123 -0.123 -0.123 -0.123 ...
  ..- attr(*, "dimnames")=List of 2
  .. ..$ : chr [1:36] "1:97960:A:G" "1:138565:G:A" "1:15112:C:T" "1:189947:G:A" ...
  .. ..$ : chr [1:2] "A" "B"
 $ null.b  : num [1:36, 1:2] -0.123 -0.123 -0.123 -0.123 -0.123 ...
  ..- attr(*, "dimnames")=List of 2
  .. ..$ : chr [1:36] "1:93692:C:T" "1:273645:A:G" "1:10442:CCTA:." "1:198942:A:C" ...
  .. ..$ : chr [1:2] "A" "B"
 $ null.s  : num [1:36, 1:2] 0.157 0.157 0.157 0.157 0.157 ...
  ..- attr(*, "dimnames")=List of 2
  .. ..$ : chr [1:36] "1:93692:C:T" "1:273645:A:G" "1:10442:CCTA:." "1:198942:A:C" ...
  .. ..$ : chr [1:2] "A" "B"
 $ random.s: num [1:36, 1:2] 0.157 0.157 0.157 0.157 0.157 ...
  ..- attr(*, "dimnames")=List of 2
  .. ..$ : chr [1:36] "1:97960:A:G" "1:138565:G:A" "1:15112:C:T" "1:189947:G:A" ...
  .. ..$ : chr [1:2] "A" "B"
 $ strong.b:Classes ‘data.table’ and 'data.frame':	1 obs. of  2 variables:
  ..$ A: num -0.217
  ..$ B: num -0.217
  ..- attr(*, ".internal.selfref")=<externalptr> 
 $ strong.s:Classes ‘data.table’ and 'data.frame':	1 obs. of  2 variables:
  ..$ A: num 0.0481
  ..$ B: num 0.0481
  ..- attr(*, ".internal.selfref")=<externalptr> 
 $ strong.z:Classes ‘data.table’ and 'data.frame':	1 obs. of  2 variables:
  ..$ A: num -4.5
  ..$ B: num -4.5
  ..- attr(*, ".internal.selfref")=<externalptr> 
 $ XtX     : num [1:2, 1:2] 20.3 20.3 20.3 20.3
  ..- attr(*, "dimnames")=List of 2
  .. ..$ : chr [1:2] "A" "B"
  .. ..$ : chr [1:2] "A" "B"
  ```

### Example

In [None]:
# generate random and null only
sos run pipeline/mash_preprocessing.ipynb processing \
    --name protocol_example_protein \
    --sum_files test_pQTL_asso_list \
               test_pQTL_asso_list \
    --region_file test.region \
    --pheno_names A,B 


In [None]:
# generate strong only
sos run pipeline/mash_preprocessing.ipynb susie_signal \
    --name protocol_example_protein \
    --susie_list protocol_example_protein.susie_output.txt \
    --pheno_names A,B 


In [None]:
# generate mashr input directly
sos run pipeline/mash_preprocessing.ipynb mash_input \
    --name protocol_example_protein \
    --sum_files test_pQTL_asso_list \
               test_pQTL_asso_list \
    --region_file test.region \
    --susie_list protocol_example_protein.susie_output.txt \
    --pheno_names A,B 


In [None]:
[global]
import glob
parameter: name = str

parameter: pheno_names = str
# Path to work directory where output locates
parameter: cwd = path("./output")
# Containers that contains the necessary packages
parameter: container = ''
# For cluster jobs, number commands to run per job
parameter: job_size = 1
# Wall clock time expected
parameter: walltime = "5h"
# Memory expected
parameter: mem = "16G"
# Number of threads
parameter: numThreads = 8
parameter: per_chunk =100
# Columns: "#chr", sumstat(merged.vcf.gz)
parameter: table_name = ""
parameter: bhat = "bhat"
parameter: sbhat = "sbhat"
parameter: expected_ncondition = 0
##  conditions can be excluded if needs arise. If nothing to exclude keep the default 0
parameter: exclude_condition = []
parameter: datadir = ""
parameter: seed = 999
parameter: n_random = 4
parameter: n_null = 4
parameter: z_only = False
parameter: na_remove = "TRUE"
ran_null = file_target(f"{cwd}/{name}.random.null.rds")
strong = file_target(f"{cwd}/{name}.strong.rds")


## Get the random and null effects per analysis unit

In [None]:
[processing_1]
parameter: sum_files = paths
parameter: region_file = path
import re
import pandas as pd
def find_matching_files_for_region(chr_id):
    chr_number = chr_id[3:]  # subset 1 from chr1
    pattern_str = r"\.{chr_number}\."
    pattern = re.compile(pattern_str.format(chr_number=chr_number))
    paths = []
    for sum_file in sum_files:
        with open(sum_file, 'r') as af:
            for aline in af:
                if pattern.search(aline):
                    paths.append(aline.strip())
    return ",".join(paths)

updated_regions = []
with open(region_file, 'r') as regions:
    header = regions.readline().strip()
    updated_regions.append(header + "\tpath\tregion")
    for line in regions:
        parts = line.strip().split("\t")
        chr_id, start, end, gene_id = parts
        paths = find_matching_files_for_region(chr_id)
        updated_regions.append(f"{chr_id}\t{start}\t{end}\t{gene_id}\t{paths}\t{chr_id}:{start}-{end}")

meta_df = pd.DataFrame([line.split("\t") for line in updated_regions[1:]], columns=updated_regions[0].split("\t"))
meta = meta_df[['gene_id', 'path', 'region']].to_dict(orient='records')

input: for_each='meta'
output: f'{cwd:a}/{name}_cache/{name}.{_meta["gene_id"]}.rds'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime,  mem = mem, tags = f'{step_name}_{_output:bn}'  
R: expand = "${ }", stderr = f'{_output}.stderr', stdout = f'{_output}.stdout', container = container
    # Extract and preprocess data from phenotype_path
    extract_data <- function(path, region) {
        tabix_region(path, region) %>%
            mutate(variant = paste(`#CHROM`, POS, REF, ALT, sep = ":")) %>%
            select(-c(3, 6:9)) %>%
            distinct(variant, .keep_all = TRUE) %>%
            as.matrix
    }
    # Extract bhat and sbhat
    extract_component <- function(df, component_index) {
        df %>%
            select(6:ncol(df)) %>%
            mutate(across(everything(), ~as.numeric(strsplit(as.character(.), ":")[[1]][component_index]))) %>%
            as.matrix
    }

    load_combined_matrix_data <- function(phenotype_path, region) {
        library(dplyr)   
        Y <- lapply(phenotype_path, extract_data, region)

        # Combine matrices
        combined_matrix <- Reduce(function(x, y) merge(x, y, by = c("variant", "#CHROM", "POS", "REF", "ALT")), Y) %>%
            distinct(variant, .keep_all = TRUE)

        dat <- list(
            bhat = extract_component(combined_matrix, 1),
            sbhat = extract_component(combined_matrix, 2)
        )

        rownames(dat$bhat) <- rownames(dat$sbhat) <- combined_matrix$variant
        colnames(dat$bhat) <- colnames(dat$sbhat) <- unlist(strsplit("${pheno_names}", ","))
        return(dat)
    }
  
    tabix_region <- function(file, region){
        data.table::fread(cmd = paste0("tabix -h ", file, " ", region))%>%as_tibble() 
    }
  
    region <- "${_meta['region']}"
    phenotype_path <- unlist(strsplit("${_meta['path']}", ","))

    dat <- tryCatch(
      {
        # Try to run the function
         load_combined_matrix_data(phenotype_path = phenotype_path, region = region)
      },
      error = function(e) {
        message("gsub chr in region id...")
        # If an error occurs, modify the region and try again
         load_combined_matrix_data(phenotype_path = phenotype_path, region =  gsub("chr", "", region))
      }
    )
      saveRDS(dat, ${_output:r})

In [None]:
# extract data for MASH from summary stats
[processing_2]
input:  group_by = per_chunk
output: f"{cwd}/{name}_cache/{name}_batch{_index+1}.rds"
task: trunk_workers = 1, walltime = '1h', trunk_size = 1, mem = '4G', cores = 1, tags = f'{_output:bn}'
R: expand = "${ }",stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', container = container
    library(dplyr)
    library(stringr)
    set.seed(${seed})
    #library(huiiy)
    matxMax <- function(mtx) {
      return(arrayInd(which.max(mtx), dim(mtx)))
    }
    remove_rownames = function(x) {
        for (name in names(x)) rownames(x[[name]]) = NULL
        return(x)
    }
    handle_nan_etc = function(x) {
      x$bhat[which(is.nan(x$bhat))] = 0
      x$sbhat[which(is.nan(x$sbhat) | is.infinite(x$sbhat))] = 1E3
      return(x)
    }
    extract_one_data = function(dat, n_random, n_null, infile, na_remove = TRUE) {
        if (is.null(dat)) return(NULL)
        if(na_remove == TRUE){
          na.info = list()
          na.info$n_bhat_ori = nrow(dat$${bhat})
          dat$bhat = na.omit(dat$${bhat})
          na.info$n_bhat = nrow(dat$${bhat})
          na.info$n_sbhat_ori = nrow(dat$${sbhat})
          dat$sbhat = na.omit(dat$${sbhat})
          na.info$n_sbhat_ori = nrow(dat$${sbhat})
          msg = paste(c("Out of ",na.info$n_bhat_ori," SNP, ",na.info$n_bhat," was retained for analysis"), collapse = "")
          message(msg)
          if (na.info$n_bhat == 0){
            stop("None of the SNP was retained for analysis, skipping genes") }
        }
        z = abs(dat$${bhat}/dat$${sbhat})
        # random samples can include the real signals 
        sample_idx = 1:nrow(z)
        
        random_idx = sample(sample_idx, min(n_random, length(sample_idx)), replace = F)
        random = list(bhat = dat$${bhat}[random_idx,,drop=F], sbhat = dat$${sbhat}[random_idx,,drop=F])
        # null samples defined as |z| < 2
        null.id = which(apply(abs(z), 1, max) < 2)
        if (length(null.id) == 0) {
          warning(paste("Null data is empty for input file", infile))
          null = list()
        } else {
          null_idx = sample(null.id, min(n_null, length(null.id)), replace = F)
          null = list(bhat = dat$${bhat}[null_idx,,drop=F], sbhat = dat$${sbhat}[null_idx,,drop=F])
        }
        #dat = (list(random = remove_rownames(random), null = remove_rownames(null)))
        dat = (list(random = random, null = null))
        dat$random = handle_nan_etc(dat$random)
        dat$null = handle_nan_etc(dat$null)
        return(dat)
    }
    reformat_data = function(dat, z_only = TRUE) {
        # make output consistent in format with 
        # https://github.com/stephenslab/gtexresults/blob/master/workflows/mashr_flashr_workflow.ipynb      
        res = list(random.z = dat$random$bhat/dat$random$sbhat, 
                  null.z = dat$null$bhat/dat$null$sbhat)
        if (!z_only) {
          res = c(res, list(random.b = dat$random$bhat,
           null.b = dat$null$bhat,
           null.s = dat$null$sbhat,
           random.s = dat$random$sbhat))
      }
      return(res)
    }
    merge_data = function(res, one_data) {
      if (length(res) == 0) {
          return(one_data)
      } else if (is.null(is.null(res$random.b)|is.null(res$null.b))) {
          return(one_data)
      } else if (is.null(one_data)) {
          return(res)
      } else {
          for (d in names(one_data)) {
            if (is.null(one_data[[d]])) {
              next
            } else {
                res[[d]] = as.matrix(rbind(res[[d]],as.data.frame(one_data[[d]])))
            }
          }
          return(res)
      }
    }
    res = list()
    signals.df<-NULL
 
    
    for (f in c(${_input:r,})) {
      # If cannot read the input for some reason then we just skip it, assuming we have other enough data-sets to use.
      dat = tryCatch(readRDS(f), error = function(e) return(NULL))${("$"+table_name) if table_name != "" else ""}
      if (is.null(dat)) {
          message(paste("Skip loading file", f, "due to load failure."))
          next
      }
      if (${expected_ncondition} > 0 && (ncol(dat$${bhat}) != ${expected_ncondition} || ncol(dat$${sbhat}) != ${expected_ncondition})) {
          message(paste("Skip loading file", f, "because it has", ncol(dat$${bhat}), "columns different from required", ${expected_ncondition}))
          next
      }
      if(length(c(${",".join([repr(x) for x in exclude_condition])})) > 0 ){
      message(paste("Excluding condition ${exclude_condition} from the analysis"))
      dat$bhat = dat$bhat[,-c(${",".join(exclude_condition)})]
      dat$sbhat = dat$sbhat[,-c(${",".join(exclude_condition)})]
      dat$Z = dat$Z[,-c(${",".join(exclude_condition)})]
      }

      dat<-tryCatch(extract_one_data(dat, ${n_random}, ${n_null}, f, ${na_remove}), error = function(e) return(NULL))
      res = tryCatch(merge_data(res, reformat_data(dat , ${"TRUE" if z_only else "FALSE"})), error = function(e) message("Skipping gene due to lack of SNPs"))
      
    saveRDS(res, ${_output:r})}


In [None]:
[processing_3]
input: group_by = "all"
output: ran_null = f"{cwd}/{name}.random.null.rds"
task: trunk_workers = 1, walltime = '1h', trunk_size = 1, mem = '16G', cores = 1, tags = f'{_output:bn}'
R: expand = "${ }", container = container,stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', volumes = [f'{cwd:ad}:{cwd:ad}']
    merge_data = function(res, one_data) {
      if (length(res) == 0) {
          return(one_data)
      } else {
          for (d in names(one_data)) {
            res[[d]] = rbind(res[[d]], one_data[[d]])
          }
          return(res)
      }
    }
    dat = list()
    for (f in c(${_input:r,})) {
      dat = merge_data(dat, readRDS(f))
    }
    saveRDS(dat, ${_output:r})
 
bash: expand = "${ }", container = container,stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', volumes = [f'{cwd:ad}:{cwd:ad}']
    #rm -rf ${cwd}/${name}_cache/

In [None]:
[susie_signal]
parameter: susie_list = path
input: susie_list
output: strong = f"{cwd}/{name}.strong.rds"
task: trunk_workers = 1, walltime = '1h', trunk_size = 1, mem = '16G', cores = 1, tags = f'{_output:bn}'
R: expand = "${ }", container = container,stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', volumes = [f'{cwd:ad}:{cwd:ad}']
    # Load the required library
    library(data.table)

    rds_files <- read.table("${_input}")$V1
    pheno_names <- unlist(strsplit("${pheno_names}", ","))
    read_and_extract <- function(file) {
      dat <- readRDS(file)

      bhats <- lapply(dat, function(x) as.data.table(x$qtl_identified)[, .(variants, bhat)])
      sbhats <- lapply(dat, function(x) as.data.table(x$qtl_identified)[, .(variants, sbhat)])

      list(
        bhat = do.call(cbind, lapply(bhats, `[[`, "bhat")),
        sbhat = do.call(cbind, lapply(sbhats, `[[`, "sbhat")),
        variants = bhats[[1]]$variants
      )
    }

    results <- lapply(rds_files, read_and_extract)

    out <- list(
      bhat = rbindlist(lapply(results, function(x) data.table(variants = x$variants, x$bhat)), fill = TRUE),
      sbhat = rbindlist(lapply(results, function(x) data.table(variants = x$variants, x$sbhat)), fill = TRUE)
    )

    variants <- out$bhat$variants                                            

    out$bhat <- out$bhat[, -1, with = FALSE]
    out$sbhat <- out$sbhat[, -1, with = FALSE]

    # Assuming 'pheno_names' variable is defined elsewhere in your script
                                             
    colnames(out$bhat) <- colnames(out$sbhat) <- pheno_names

    # Calculate 'z'
    out$z <- out$bhat/ out$sbhat
    rownames(out$bhat) <- rownames(out$sbhat) <- rownames(out$z) <- variants    
    saveRDS(out, ${_output:r})


In [None]:
[mash_input]
input: ran_null, strong
output: f"{cwd}/{name}.mashr_input.rds"
task: trunk_workers = 1, walltime = '1h', trunk_size = 1, mem = '16G', cores = 1, tags = f'{_output:bn}'
R: expand = "${ }", container = container,stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', volumes = [f'{cwd:ad}:{cwd:ad}']
    ran_null <- readRDS(${_input[0]:r})
    strong <- readRDS(${_input[1]:r})
    out <- ran_null
    out$strong.b <- strong$bhat
    out$strong.s <- strong$sbhat
    X <- out$strong.z <- strong$z
    X[is.na(X)] = 0
    out$XtX = t(as.matrix(X)) %*% as.matrix(X) / nrow(X)
  
    saveRDS(out, ${_output:r})
