# Creating a Parquet dataset from a combined counts table

In [1]:
import pandas as pd
import pyarrow as pa
import pyarrow.dataset
import pyarrow.parquet as pq
from pathlib import Path
from collections import namedtuple
import re

## Definitions of file paths, names, and splits

In [2]:
DATA_ROOT = Path('../igvf-pm/K562/leave-one-out')
DB_BASE = Path('../data')

# datafile = "K562.combined.sample.tsv.gz"
datafile = "combined.input_and_output.gt_100.log2FC.sequence.txt.gz"
splits = ["train", "test"]
fastafiles = pd.Series(
    [f"data-normalized/downsampled-{split}.fasta" for split in splits],
    index=splits)

## Read combined counts table

We read the combined counts table into a Pandas dataframe.

In [3]:
df = pd.read_csv(DATA_ROOT / datafile, compression="gzip", delimiter="\t")

The 1-based row number seems to be used as an identifier, so we preserve it as a column.

In [4]:
df["index"] = range(1, df.shape[0]+1)

## Read sampling of rows for different splits

A sample consists of a counts table and a FASTA sequence bins file, both in the same order. We can recover the row identifier from the FASTA sequence bins file as the sequence identifier; the format of the sampled counts table no longer includes the row identifier.

To enable verifying that a sequence bin is the same as in the combined counts table, in addition to the row identifier we extract and return chromosome, start and end. We also convert the four values to their correct types (`int` for index, start and end), and code this as a generator that returns a named tuple for each row.

In [5]:
def fasta_bins(fastafile, compression=None):
    rowtuple = namedtuple('rowtuple',["index","chrom","start","end"])
    typeconvs = [int, str, int, int]
    fopen = open
    if compression:
        fopen = gzip.open
    with fopen(fastafile, "rt") as f:
        pat = re.compile(r'>(\d+)\s+.*=(chr[0-9MXY]+):(\d+)-(\d+)')
        for line in f:
            if line.startswith(">"):
                match = pat.match(line)
                yield rowtuple(*[conv(val) for conv, val in zip(typeconvs, match.groups())])

We can now directly create a dataframe from the generator, with columns correctly named and of the right type. For a given split:

In [6]:
def append_split_from_fasta(df, fastafile, split, compression=None, check_cols=["chrom","start","end"]):
    df_split = pd.DataFrame(fasta_bins(fastafile, compression=compression))
    df_split.index = df_split["index"] - 1
    if check_cols and len(check_cols) > 0:
        check = df.loc[df_split.index, check_cols].eq(df_split.loc[:, check_cols]).all(axis=0).all()
        if not check:
            raise Exception(f"Index mismatch with split '{split}' in {fastafile}")
    df[split] = False
    df.loc[df_split.index, split] = True

Now we can use this for each split we're dealing with, resulting in one additional column of type `bool` for each split.

In [8]:
for split in splits:
    print(f"Obtaining split '{split}'")
    append_split_from_fasta(df, DATA_ROOT / fastafiles[split], split=split)

Obtaining split 'train'
Obtaining split 'test'


Report absolute and relative sizes of the splits:

In [14]:
splitsizes = df.loc[:,splits].sum(axis=0)
for split in splits:
    print(f"Split '{split}' is size {splitsizes[split]} ({splitsizes[split] / len(df):.2}% of total)")

Split 'train' is size 2600332 (0.13% of total)
Split 'test' is size 2598989 (0.13% of total)


## Write Parquet dataset

To write the dataframe to a Parquet dataset, we first need to a convert a `pyarrow.Table` object, which we then write using HIVE partitioning by chromosome and the two splits.

In [15]:
def write_db(df, db_path, partition_cols=["chrom","train","test"]):
    dt = pa.Table.from_pandas(df)
    pq.write_to_dataset(dt, root_path=db_path, partition_cols=partition_cols)

In [16]:
write_db(df, db_path=DB_BASE / "K562db")

In [None]:
def normalize_by_lib(df, counts_per=1e6, dna_prefix="input", rna_prefix="output", rep_suffix="_rep"):
    col_regex = "^(" + dna_prefix + "|" + rna_prefix + ")" + rep_suffix
    libsizes = df.filter(regex=col_regex).sum(axis=0)
    dfnorm = df.filter(regex=col_regex) / libsizes * counts_per
    return dfnorm

In [None]:
dfnorm = normalize_by_lib(df).rename(columns=lambda x: x + "_norm")

In [None]:
def mean_fc(df, pseudocount=0, dna_prefix="input", rna_prefix="output", rep_suffix="_rep"):
    dna_cols = "^" + dna_prefix + rep_suffix
    rna_cols = "^" + rna_prefix + rep_suffix
    return ((df.filter(regex=rna_cols).mean(axis=1) + pseudocount) /
            (df.filter(regex=dna_cols).mean(axis=1) + pseudocount))