From 9fdd23338e9d6d2ce2d3cdea2911c331b8ce21b6 Mon Sep 17 00:00:00 2001 From: Vincent Llorens Date: Tue, 17 Jan 2023 11:33:09 +0200 Subject: [PATCH] add first working version --- .gitignore | 4 + README.md | 20 ++++ config.json.sample | 26 ++++ go.mod | 3 + mmp.go | 288 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 341 insertions(+) create mode 100644 README.md create mode 100644 config.json.sample create mode 100644 go.mod create mode 100644 mmp.go diff --git a/.gitignore b/.gitignore index 66fd13c..73672b4 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,7 @@ # Dependency directories (remove the comment below to include it) # vendor/ + +# MMP::io specific +/config.json +/mmp.tsv \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..9dc9dde --- /dev/null +++ b/README.md @@ -0,0 +1,20 @@ +# MMP::io + +Tool to merge many GWAS summary statistics into one file ready to use in [MMP](https://geneviz.aalto.fi/MMP/dashboard/). + + +## Usage + +**Set the configuration** + +1. Copy `config.json.sample` to `config.json` + +2. Edit `config.json` with name, paths and columns for each of your datasets + +**Run** + +```sh +go run . +``` + +This outputs a `mmp.tsv` file ready for upload on [MMP](https://geneviz.aalto.fi/MMP/dashboard/). diff --git a/config.json.sample b/config.json.sample new file mode 100644 index 0000000..ec59271 --- /dev/null +++ b/config.json.sample @@ -0,0 +1,26 @@ +[ + { + "tag": "Dataset1", + "filepath": "path_to_dataset1_GWAS_summary_stats.gz", + "col_chrom": "#chrom", + "col_pos": "pos", + "col_ref": "ref", + "col_alt": "alt", + "col_pval": "pval", + "col_beta": "beta", + "col_af": "af_alt", + "pval_threshold": 1e-06 + }, + { + "tag": "Dataset2", + "filepath": "path_to_dataset2_GWAS_summary_stats.gz", + "col_chrom": "CHR", + "col_pos": "POS", + "col_ref": "REF", + "col_alt": "ALT", + "col_pval": "PVAL", + "col_beta": "BETA", + "col_af": "AF_ALT", + "pval_threshold": 1e-06 + } +] \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..dd509e5 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/FINNGEN/mmp + +go 1.19 diff --git a/mmp.go b/mmp.go new file mode 100644 index 0000000..ae6ae66 --- /dev/null +++ b/mmp.go @@ -0,0 +1,288 @@ +// SPDX-License-Identifier: MIT +package main + +import ( + "compress/gzip" + "encoding/csv" + "encoding/json" + "fmt" + "io" + "log" + "os" + "strconv" + "sync" +) + +const OutputPath = "mmp.tsv" + +type SumStatConf struct { + Tag string `json:"tag"` + Filepath string `json:"filepath"` + ColChrom string `json:"col_chrom"` + ColPos string `json:"col_pos"` + ColRef string `json:"col_ref"` + ColAlt string `json:"col_alt"` + ColPval string `json:"col_pval"` + ColBeta string `json:"col_beta"` + ColAf string `json:"col_af"` + PvalThreshold float64 `json:"pval_threshold"` +} + +type Cpra struct { + Chrom string + Pos string + Ref string + Alt string +} + +type Stats struct { + Tag string + Pval string + Beta string + Af string +} + +type CpraStats struct { + Cpra Cpra + Stats Stats +} + +func main() { + conf := readConf("config.json") + + fmt.Println("[1/3] Checking variant selection...") + selectedVariants := checkVariantSelection(conf) + + fmt.Println("[2/3] Getting variant statistics...") + statsVariants := findVariantStats(conf, selectedVariants) + + fmt.Printf("[3/3] Writing output to %s ...\n", OutputPath) + writeMMPOutput(conf, statsVariants) +} + +func logCheck(message string, err error) { + if err != nil { + log.Fatal(":: ", message, " :: ", err) + } +} + +func readConf(filePath string) []SumStatConf { + data, err := os.ReadFile(filePath) + logCheck("reading configuration file", err) + + var conf []SumStatConf + err = json.Unmarshal(data, &conf) + logCheck("parsing JSON conf", err) + + return conf +} + +func checkVariantSelection(conf []SumStatConf) map[Cpra]bool { + selectedVariants := make(map[Cpra]bool) + + var wg sync.WaitGroup + ch := make(chan Cpra) + + for _, ssConf := range conf { + wg.Add(1) + go func(ssConf SumStatConf) { + defer wg.Done() + streamVariants(ssConf, ch) + }(ssConf) + } + + go func() { + wg.Wait() + close(ch) + }() + + for cpra := range ch { + selectedVariants[cpra] = true + } + + return selectedVariants +} + +func streamVariants(ssConf SumStatConf, ch chan<- Cpra) { + fmt.Printf("- processing %s\n", ssConf.Tag) + + chRows := make(chan CpraStats) + go readGzTsv(ssConf, chRows) + + for rec := range chRows { + pvalUnparsed := rec.Stats.Pval + pval, err := strconv.ParseFloat(pvalUnparsed, 64) + logCheck("parsing p-value as float", err) + + if pval < ssConf.PvalThreshold { + ch <- rec.Cpra + } + } + + fmt.Printf("* done %s\n", ssConf.Tag) +} + +func findVariantStats(conf []SumStatConf, selectedVariants map[Cpra]bool) map[Cpra][]Stats { + stats := make(map[Cpra][]Stats) + + var wg sync.WaitGroup + ch := make(chan CpraStats) + + for _, ssConf := range conf { + wg.Add(1) + go func(ssConf SumStatConf) { + defer wg.Done() + streamStats(ssConf, selectedVariants, ch) + }(ssConf) + } + + go func() { + wg.Wait() + close(ch) + }() + + for variant := range ch { + multiStats, found := stats[variant.Cpra] + if !found { + var multiStats = []Stats{variant.Stats} + stats[variant.Cpra] = multiStats + } else { + multiStats = append(multiStats, variant.Stats) + stats[variant.Cpra] = multiStats + } + } + + return stats +} + +func streamStats(ssConf SumStatConf, selectedVariants map[Cpra]bool, ch chan<- CpraStats) { + fmt.Printf("- processing %s\n", ssConf.Tag) + + chRows := make(chan CpraStats) + go readGzTsv(ssConf, chRows) + + for rec := range chRows { + if _, found := selectedVariants[rec.Cpra]; found { + ch <- rec + } + } + + fmt.Printf("* done %s\n", ssConf.Tag) +} + +func parseCpra(record []string, fields map[string]int, ssConf SumStatConf) Cpra { + return Cpra{ + record[fields[ssConf.ColChrom]], + record[fields[ssConf.ColPos]], + record[fields[ssConf.ColRef]], + record[fields[ssConf.ColAlt]], + } +} + +func parseStats(record []string, fields map[string]int, ssConf SumStatConf) Stats { + pval := record[fields[ssConf.ColPval]] + beta := record[fields[ssConf.ColBeta]] + af := record[fields[ssConf.ColAf]] + + stats := Stats{ + ssConf.Tag, + pval, + beta, + af, + } + + return stats +} + +func writeMMPOutput(conf []SumStatConf, statsVariants map[Cpra][]Stats) { + var outRecords [][]string + + statsCols := []string{"pval", "beta", "af"} + headerFields := []string{ + "chrom", + "pos", + "ref", + "alt", + } + lenCpraFields := 4 + + for _, ssConf := range conf { + for _, suffix := range statsCols { + field := fmt.Sprintf("%s_%s", ssConf.Tag, suffix) + headerFields = append(headerFields, field) + } + } + outRecords = append(outRecords, headerFields) + + for cpra, cpraStats := range statsVariants { + record := make([]string, len(headerFields)) + record[0] = cpra.Chrom + record[1] = cpra.Pos + record[2] = cpra.Ref + record[3] = cpra.Alt + + for _, stats := range cpraStats { + var offset int + for ii, ssConf := range conf { + if ssConf.Tag == stats.Tag { + offset = lenCpraFields + ii*len(statsCols) + } + } + record[offset+0] = stats.Pval + record[offset+1] = stats.Beta + record[offset+2] = stats.Af + } + outRecords = append(outRecords, record) + } + + outFile, err := os.Create(OutputPath) + logCheck("creating output file", err) + defer outFile.Close() + + tsvWriter := csv.NewWriter(outFile) + tsvWriter.Comma = '\t' + tsvWriter.WriteAll(outRecords) + err = tsvWriter.Error() + logCheck("writing TSV output", err) +} + +func readGzTsv(ssConf SumStatConf, ch chan<- CpraStats) { + // Open gzip file for reading + fReader, err := os.Open(ssConf.Filepath) + logCheck("opening file", err) + defer fReader.Close() + + gzReader, err := gzip.NewReader(fReader) + logCheck("gunzip-ing file", err) + defer gzReader.Close() + + // Parse as TSV + tsvReader := csv.NewReader(gzReader) + tsvReader.Comma = '\t' + + // Keep track of the TSV header + rec, err := tsvReader.Read() + logCheck("parsing TSV header", err) + fields := make(map[string]int) + for ii, field := range rec { + fields[field] = ii + } + + // Emit the TSV rows + for { + rec, err = tsvReader.Read() + + // Can't read data if end of file or parsing error + if err == io.EOF { + break + } + logCheck("parsing TSV row", err) + + // Emit the data if valid + cpra := parseCpra(rec, fields, ssConf) + stats := parseStats(rec, fields, ssConf) + ch <- CpraStats{cpra, stats} + } + + close(ch) +}