Skip to content

Commit

Permalink
add first working version
Browse files Browse the repository at this point in the history
  • Loading branch information
vincent-octo committed Jan 17, 2023
1 parent 3c4d2ef commit 9fdd233
Show file tree
Hide file tree
Showing 5 changed files with 341 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Expand Up @@ -13,3 +13,7 @@

# Dependency directories (remove the comment below to include it)
# vendor/

# MMP::io specific
/config.json
/mmp.tsv
20 changes: 20 additions & 0 deletions README.md
@@ -0,0 +1,20 @@
# MMP::io

Tool to merge many GWAS summary statistics into one file ready to use in [MMP](https://geneviz.aalto.fi/MMP/dashboard/).


## Usage

**Set the configuration**

1. Copy `config.json.sample` to `config.json`

2. Edit `config.json` with name, paths and columns for each of your datasets

**Run**

```sh
go run .
```

This outputs a `mmp.tsv` file ready for upload on [MMP](https://geneviz.aalto.fi/MMP/dashboard/).
26 changes: 26 additions & 0 deletions config.json.sample
@@ -0,0 +1,26 @@
[
{
"tag": "Dataset1",
"filepath": "path_to_dataset1_GWAS_summary_stats.gz",
"col_chrom": "#chrom",
"col_pos": "pos",
"col_ref": "ref",
"col_alt": "alt",
"col_pval": "pval",
"col_beta": "beta",
"col_af": "af_alt",
"pval_threshold": 1e-06
},
{
"tag": "Dataset2",
"filepath": "path_to_dataset2_GWAS_summary_stats.gz",
"col_chrom": "CHR",
"col_pos": "POS",
"col_ref": "REF",
"col_alt": "ALT",
"col_pval": "PVAL",
"col_beta": "BETA",
"col_af": "AF_ALT",
"pval_threshold": 1e-06
}
]
3 changes: 3 additions & 0 deletions go.mod
@@ -0,0 +1,3 @@
module github.com/FINNGEN/mmp

go 1.19
288 changes: 288 additions & 0 deletions mmp.go
@@ -0,0 +1,288 @@
// SPDX-License-Identifier: MIT
package main

import (
"compress/gzip"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"log"
"os"
"strconv"
"sync"
)

const OutputPath = "mmp.tsv"

type SumStatConf struct {
Tag string `json:"tag"`
Filepath string `json:"filepath"`
ColChrom string `json:"col_chrom"`
ColPos string `json:"col_pos"`
ColRef string `json:"col_ref"`
ColAlt string `json:"col_alt"`
ColPval string `json:"col_pval"`
ColBeta string `json:"col_beta"`
ColAf string `json:"col_af"`
PvalThreshold float64 `json:"pval_threshold"`
}

type Cpra struct {
Chrom string
Pos string
Ref string
Alt string
}

type Stats struct {
Tag string
Pval string
Beta string
Af string
}

type CpraStats struct {
Cpra Cpra
Stats Stats
}

func main() {
conf := readConf("config.json")

fmt.Println("[1/3] Checking variant selection...")
selectedVariants := checkVariantSelection(conf)

fmt.Println("[2/3] Getting variant statistics...")
statsVariants := findVariantStats(conf, selectedVariants)

fmt.Printf("[3/3] Writing output to %s ...\n", OutputPath)
writeMMPOutput(conf, statsVariants)
}

func logCheck(message string, err error) {
if err != nil {
log.Fatal(":: ", message, " :: ", err)
}
}

func readConf(filePath string) []SumStatConf {
data, err := os.ReadFile(filePath)
logCheck("reading configuration file", err)

var conf []SumStatConf
err = json.Unmarshal(data, &conf)
logCheck("parsing JSON conf", err)

return conf
}

func checkVariantSelection(conf []SumStatConf) map[Cpra]bool {
selectedVariants := make(map[Cpra]bool)

var wg sync.WaitGroup
ch := make(chan Cpra)

for _, ssConf := range conf {
wg.Add(1)
go func(ssConf SumStatConf) {
defer wg.Done()
streamVariants(ssConf, ch)
}(ssConf)
}

go func() {
wg.Wait()
close(ch)
}()

for cpra := range ch {
selectedVariants[cpra] = true
}

return selectedVariants
}

func streamVariants(ssConf SumStatConf, ch chan<- Cpra) {
fmt.Printf("- processing %s\n", ssConf.Tag)

chRows := make(chan CpraStats)
go readGzTsv(ssConf, chRows)

for rec := range chRows {
pvalUnparsed := rec.Stats.Pval
pval, err := strconv.ParseFloat(pvalUnparsed, 64)
logCheck("parsing p-value as float", err)

if pval < ssConf.PvalThreshold {
ch <- rec.Cpra
}
}

fmt.Printf("* done %s\n", ssConf.Tag)
}

func findVariantStats(conf []SumStatConf, selectedVariants map[Cpra]bool) map[Cpra][]Stats {
stats := make(map[Cpra][]Stats)

var wg sync.WaitGroup
ch := make(chan CpraStats)

for _, ssConf := range conf {
wg.Add(1)
go func(ssConf SumStatConf) {
defer wg.Done()
streamStats(ssConf, selectedVariants, ch)
}(ssConf)
}

go func() {
wg.Wait()
close(ch)
}()

for variant := range ch {
multiStats, found := stats[variant.Cpra]
if !found {
var multiStats = []Stats{variant.Stats}
stats[variant.Cpra] = multiStats
} else {
multiStats = append(multiStats, variant.Stats)
stats[variant.Cpra] = multiStats
}
}

return stats
}

func streamStats(ssConf SumStatConf, selectedVariants map[Cpra]bool, ch chan<- CpraStats) {
fmt.Printf("- processing %s\n", ssConf.Tag)

chRows := make(chan CpraStats)
go readGzTsv(ssConf, chRows)

for rec := range chRows {
if _, found := selectedVariants[rec.Cpra]; found {
ch <- rec
}
}

fmt.Printf("* done %s\n", ssConf.Tag)
}

func parseCpra(record []string, fields map[string]int, ssConf SumStatConf) Cpra {
return Cpra{
record[fields[ssConf.ColChrom]],
record[fields[ssConf.ColPos]],
record[fields[ssConf.ColRef]],
record[fields[ssConf.ColAlt]],
}
}

func parseStats(record []string, fields map[string]int, ssConf SumStatConf) Stats {
pval := record[fields[ssConf.ColPval]]
beta := record[fields[ssConf.ColBeta]]
af := record[fields[ssConf.ColAf]]

stats := Stats{
ssConf.Tag,
pval,
beta,
af,
}

return stats
}

func writeMMPOutput(conf []SumStatConf, statsVariants map[Cpra][]Stats) {
var outRecords [][]string

statsCols := []string{"pval", "beta", "af"}
headerFields := []string{
"chrom",
"pos",
"ref",
"alt",
}
lenCpraFields := 4

for _, ssConf := range conf {
for _, suffix := range statsCols {
field := fmt.Sprintf("%s_%s", ssConf.Tag, suffix)
headerFields = append(headerFields, field)
}
}
outRecords = append(outRecords, headerFields)

for cpra, cpraStats := range statsVariants {
record := make([]string, len(headerFields))
record[0] = cpra.Chrom
record[1] = cpra.Pos
record[2] = cpra.Ref
record[3] = cpra.Alt

for _, stats := range cpraStats {
var offset int
for ii, ssConf := range conf {
if ssConf.Tag == stats.Tag {
offset = lenCpraFields + ii*len(statsCols)
}
}
record[offset+0] = stats.Pval
record[offset+1] = stats.Beta
record[offset+2] = stats.Af
}
outRecords = append(outRecords, record)
}

outFile, err := os.Create(OutputPath)
logCheck("creating output file", err)
defer outFile.Close()

tsvWriter := csv.NewWriter(outFile)
tsvWriter.Comma = '\t'
tsvWriter.WriteAll(outRecords)
err = tsvWriter.Error()
logCheck("writing TSV output", err)
}

func readGzTsv(ssConf SumStatConf, ch chan<- CpraStats) {
// Open gzip file for reading
fReader, err := os.Open(ssConf.Filepath)
logCheck("opening file", err)
defer fReader.Close()

gzReader, err := gzip.NewReader(fReader)
logCheck("gunzip-ing file", err)
defer gzReader.Close()

// Parse as TSV
tsvReader := csv.NewReader(gzReader)
tsvReader.Comma = '\t'

// Keep track of the TSV header
rec, err := tsvReader.Read()
logCheck("parsing TSV header", err)
fields := make(map[string]int)
for ii, field := range rec {
fields[field] = ii
}

// Emit the TSV rows
for {
rec, err = tsvReader.Read()

// Can't read data if end of file or parsing error
if err == io.EOF {
break
}
logCheck("parsing TSV row", err)

// Emit the data if valid
cpra := parseCpra(rec, fields, ssConf)
stats := parseStats(rec, fields, ssConf)
ch <- CpraStats{cpra, stats}
}

close(ch)
}

0 comments on commit 9fdd233

Please sign in to comment.