In [1]:
from pathlib import Path
import socket
from enum import Enum


class Hosts(Enum):
    IMAGEOMICS_SERVER = "cse-cnc196909s.coeit.osu.edu"


hostname = socket.gethostname()
if hostname == Hosts.IMAGEOMICS_SERVER.value:
    ROOT_PHENOTYPE_INPUT_DIR = Path("/local/scratch/carlyn.1/dna/colors")
    ROOT_PHENOTYPE_OUTPUT_DIR = Path("/local/scratch/carlyn.1/dna/colors/processed")
    ROOT_GENOTYPE_INPUT_DIR = Path("/local/scratch/carlyn.1/dna/vcfs")
    ROOT_GENOTYPE_OUTPUT_DIR = Path("/local/scratch/carlyn.1/dna/processed")
else:
    ROOT_PHENOTYPE_INPUT_DIR = Path("/local/scratch/david/geno-pheno-data/colors")
    ROOT_PHENOTYPE_OUTPUT_DIR = Path(
        "/local/scratch/david/geno-pheno-data/colors/processed"
    )
    ROOT_GENOTYPE_INPUT_DIR = Path("/local/scratch/david/geno-pheno-data/dna/")
    ROOT_GENOTYPE_OUTPUT_DIR = Path(
        "/local/scratch/david/geno-pheno-data/dna/processed"
    )

### Create Profiling Object

In [2]:
import time


class ExecutionTimer:
    def __init__(self, name=""):
        self.name = name
        self.end_time = None
        self.start_time = time.perf_counter()

    def stop(self):
        self.end_time = time.perf_counter()

    def restart(self):
        self.end_time = None
        self.start_time = time.perf_counter()

    def get_elapsed_time(self):
        if self.end_time is None:
            print("Timer hasn't been stopped yet. Run the stop() method first.")
        return self.end_time - self.start_time

    def print_elapsed_time(self):
        et = self.get_elapsed_time()
        print(f"{self.name}: {et:.4f}")

# Loading CSV for Phenotypes

### Inspect files sizes

In [None]:
import os
import math

results = []
for root, dirs, paths in os.walk(ROOT_GENOTYPE_OUTPUT_DIR / "genome/erato"):
    for p in paths:
        if p == "states.csv":
            fsize = os.path.getsize(os.path.join(root, p))
            fsize = fsize / math.pow(1024, 3)  # bytes to gigabytes
            results.append([fsize, root])

for fsize, root in sorted(results, key=lambda x: x[0], reverse=True):
    print(f"({fsize} GB) {root}")

### Set initial path

In [3]:
test_suffix = "erato_forewings_PCA/PCA_color_3_loadings.csv"
phenotype_loading_path = ROOT_PHENOTYPE_INPUT_DIR / test_suffix
phenotype_loading_path

test_suffix = "genome/erato/Herato1411/states.csv"
genotype_loading_path = ROOT_GENOTYPE_OUTPUT_DIR / test_suffix
genotype_loading_path

PosixPath('/local/scratch/carlyn.1/dna/processed/genome/erato/Herato1411/states.csv')

### Load with Pandas

In [None]:
import pandas as pd

# Phenotype loading
pandas_timer = ExecutionTimer(name="pandas-phenotype")
df = pd.read_csv(phenotype_loading_path)
pandas_timer.stop()
pandas_timer.print_elapsed_time()

# Genotype loading
pandas_timer = ExecutionTimer(name="pandas-genotype")
df = pd.read_csv(genotype_loading_path)
pandas_timer.stop()
pandas_timer.print_elapsed_time()

# Genotype loading
pandas_timer = ExecutionTimer(name="pandas-arrow-genotype")
df = pd.read_csv(genotype_loading_path, engine="pyarrow")
pandas_timer.stop()
pandas_timer.print_elapsed_time()


So pandas takes about 20 minutes to read a 1GB file
using arrow as the engine seems to break...

In [None]:
import polars as pl

# Genotype loading
pandas_timer = ExecutionTimer(name="polars-phenotype")
df = pl.read_csv(phenotype_loading_path)
pandas_timer.stop()
pandas_timer.print_elapsed_time()

# Genotype loading
pandas_timer = ExecutionTimer(name="polars-genotype")
df = pl.read_csv(genotype_loading_path)
pandas_timer.stop()
pandas_timer.print_elapsed_time()

Polars takes about 90 seconds!

In [None]:
pandas_timer = ExecutionTimer(name="polars-to-pandas-genotype")
pdf = df.to_pandas()
pandas_timer.stop()
pandas_timer.print_elapsed_time()

To convert from polars back to pandas, it takes about 2 minutes.

# What about the .parquet format?

In [None]:
pandas_timer = ExecutionTimer(name="pandas-to-parquet-genotype")
pdf.to_parquet("../../tmp/tmp.parquet")
pandas_timer.stop()
pandas_timer.print_elapsed_time()

To Write to parquet, a 1GB file takes ~15.5 minutes

In [None]:
import pandas as pd

pandas_timer = ExecutionTimer(name="pandas-from-parquet-genotype")
pd.read_parquet("../../tmp/tmp.parquet")
pandas_timer.stop()
pandas_timer.print_elapsed_time()

# Genotype loading
pandas_timer = ExecutionTimer(name="polars-from-parquet-genotype")
df = pl.read_parquet("../../tmp/tmp.parquet")
pandas_timer.stop()
pandas_timer.print_elapsed_time()

If we transition the data to a parquet file, then the read time from:
- pandas => 40 seconds
- polars => 2.5 seconds

# Common Data Operations

If we switch to polars and parquet files, then can we do common operations quicker?

### Loading Chromosome position metadata

In [None]:
positions = df.columns
if "index_level" in positions[-1]:
    positions = positions[:-1]
new_positions = [int(pos.replace('"', "")) for pos in positions]
new_positions

In [None]:
# Original method
raw_method_timer = ExecutionTimer(name="raw-metadata-loading")
with open(genotype_loading_path, "r") as f:
    line = f.readline()
    columns = line.split(",")
    # Ignore last column if it has a pandas artifact at the end. Result of incorrect processing :(
    if "index_level" in columns[-1]:
        columns = columns[:-1]
    positions = [int(col.replace('"', "")) for col in columns]
raw_method_timer.stop()
raw_method_timer.print_elapsed_time()

# Proposed method
pandas_method_timer = ExecutionTimer(name="pandas-metadata-loading")
df = pd.read_parquet("../../tmp/tmp.parquet")
positions = df.columns
if "index_level" in positions[-1]:
    positions = positions[:-1]
new_positions = [int(pos.replace('"', "")) for pos in positions]
pandas_method_timer.stop()
pandas_method_timer.print_elapsed_time()

# Proposed method
polars_method_timer = ExecutionTimer(name="polars-metadata-loading")
df = pl.read_parquet("../../tmp/tmp.parquet")
positions = df.columns
if "index_level" in positions[-1]:
    positions = positions[:-1]
new_positions = [int(pos.replace('"', "")) for pos in positions]
polars_method_timer.stop()
polars_method_timer.print_elapsed_time()

In this case, the raw metadata-loading is quicker (0.0758 seconds), where the polars method with parquet is ~ 2 seconds.

# Loading .tsv

Much of the bottleneck in data processing is in the preprocessing step. Once we get data to an ML ready state, it's fast to read into memory. So, let's look at speeding up the preprocessing.

In [None]:
import os

import polars as pl
import pandas as pd

genotype_input_path = ROOT_GENOTYPE_INPUT_DIR / "erato/genome/Herato1411.tsv"
print(genotype_input_path)

# Proposed Method
polars_method_timer = ExecutionTimer(name="polars-tsv-loading")
df = pl.read_csv(genotype_input_path, separator="\t", has_header=False, quote_char=None)
polars_method_timer.stop()
polars_method_timer.print_elapsed_time()

# Current Method
pandas_method_timer = ExecutionTimer(name="pandas-tsv-loading")
pdf = pd.read_csv(
    genotype_input_path,
    sep="\t",
    header=None,
)
pandas_method_timer.stop()
pandas_method_timer.print_elapsed_time()

Polars takes about .9 seconds while pandas takes about 17.1 seconds

In [None]:
import pandas as pd

genotype_input_path = ROOT_GENOTYPE_INPUT_DIR / "erato/genome/Herato1411.tsv"
print(genotype_input_path)
df = pd.read_csv(
    genotype_input_path,
    sep="\t",
    header=None,
)
df.head()

In [63]:
from gtp.dataloading.tools import butterfly_states_to_ml_ready
from gtp.tools.timing import profile_exe_time
import numpy as np
import pandas as pd


original_genotype_processing_timer = ExecutionTimer(name="org-geno-pipeline")
pandas_method_timer = ExecutionTimer(name="pandas-tsv-loading")
df = pd.read_csv(
    genotype_input_path,
    sep="\t",
    header=None,
)
pandas_method_timer.stop()
pandas_method_timer.print_elapsed_time()


def extract_states(x):
    allele_states = [x.split("=")[1].replace("/", "|") for x in x.tolist()]
    return pd.Series(allele_states)


def extract_camids(x):
    #! We are assuming all the camids are the same, we are just extracting from the first row
    camid = x.iloc[0].split("=")[0]
    return camid


def df_extract_states(df):
    return df.apply(extract_states)
    # return df.map(extract_states_alt)


df = df.rename(
    {
        0: "Scaffold",
        1: "Position",
        2: "Reference Allele",
        3: "Alternative Allele",
    },
    axis="columns",
)

# Remove duplicate positions
df = df.drop_duplicates(subset=["Position"], keep=False)

step_timer = ExecutionTimer(name="org-extract-camids")
camids = df.iloc[:, 4:].apply(extract_camids)
step_timer.stop()
step_timer.print_elapsed_time()

step_timer = ExecutionTimer(name="org-extract-states")
df.iloc[:, 4:] = df_extract_states(df.iloc[:, 4:])
step_timer.stop()
step_timer.print_elapsed_time()

step_timer = ExecutionTimer(name="org-intermediates")
states = df.iloc[:, 4:].T.copy(deep=True)
states.set_index(camids)
positions = df["Position"].values.tolist()
column_dict = {i + 4: camids.values[i] for i in range(len(camids))}
df = df.rename(columns=column_dict)
states.columns = positions
step_timer.stop()
step_timer.print_elapsed_time()


@profile_exe_time(verbose=True)
def create_ml_ready(states):
    ml_ready = butterfly_states_to_ml_ready(states)
    ml_ready = ml_ready.astype(np.bool_)  # Saves significant memory
    return ml_ready


step_timer = ExecutionTimer(name="org-create-ML")
test_ml_ready = create_ml_ready(states)
step_timer.stop()
step_timer.print_elapsed_time()

genotype_data = {
    "all_info": df,
    "states": states,
    "positions": np.array(positions),
    "camids": np.array(camids.values.tolist()),
    "ml_ready": test_ml_ready,
}

original_genotype_processing_timer.stop()
original_genotype_processing_timer.print_elapsed_time()


pandas-tsv-loading: 17.7991
org-extract-camids: 0.9018
org-extract-states: 38.2941
org-intermediates: 11.9993
create_ml_ready exe time: 00:01:54
org-create-ML: 114.4105
org-geno-pipeline: 185.0862


We ran the orignial pipeline on a specific file and found the following speed amounts:
- pandas-tsv-loading: 17.7991
- org-extract-camids: 0.9018
- org-extract-states: 38.2941
- org-intermediates: 11.9993
- create_ml_ready exe time: 00:01:54
- org-create-ML: 114.4105
- org-geno-pipeline: 185.0862

In [None]:
df.head()

In [64]:
import polars as pl
from gtp.dataloading.tools import butterfly_states_to_ml_ready, get_ml_state_map
from gtp.tools.timing import profile_exe_time
import numpy as np

genotype_input_path = ROOT_GENOTYPE_INPUT_DIR / "erato/genome/Herato1411.tsv"

# Proposed Method
polars_method_timer = ExecutionTimer(name="polars-tsv-loading")
df = pl.read_csv(genotype_input_path, separator="\t", has_header=False, quote_char=None)
polars_method_timer.stop()
polars_method_timer.print_elapsed_time()


original_genotype_processing_timer = ExecutionTimer(name="proposed-geno-pipeline")

df = df.rename(
    {
        "column_1": "Scaffold",
        "column_2": "Position",
        "column_3": "Reference Allele",
        "column_4": "Alternative Allele",
    }
)

# Remove duplicate positions
df = df.unique(subset=["Position"], keep="none")

step_timer = ExecutionTimer(name="proposed-extract-camids")
data_cols = df.columns[4:]
camids = [x.split("=")[0] for x in df[0, data_cols].rows()[0]]
step_timer.stop()
step_timer.print_elapsed_time()

step_timer = ExecutionTimer(name="proposed-extract-states")
df = df.with_columns(
    pl.col(old_col)
    .str.split_exact("=", 1)
    .struct[1]
    .str.replace("/", "|")
    .alias(old_col)
    for old_col in data_cols
)
step_timer.stop()
step_timer.print_elapsed_time()

step_timer = ExecutionTimer(name="proposed-intermediates")
states = df.select(data_cols).transpose()
state_columns = states.columns
str_pos = df[:, "Position"].cast(pl.String).to_list()
states = (
    states.with_columns(
        pl.Series(camids).alias("camids"),
    )
    .rename(dict(zip(state_columns, str_pos)))
    .select(["camids"] + str_pos)
)
step_timer.stop()
step_timer.print_elapsed_time()

step_timer = ExecutionTimer(name="proposed-create-ML")
values = states.select(
    pl.col(str_pos).str.split("|").cast(pl.List(pl.Int32)).list.sum()
).rows()
np_values = np.array(values)
one_hot_size = np_values.max() + 1
ml_ready = np.zeros(np_values.shape + (one_hot_size,))
ml_ready.reshape(-1, one_hot_size)[np.arange(np_values.size), np_values.reshape(-1)] = 1
ml_ready = ml_ready.astype(np.bool_)
step_timer.stop()
step_timer.print_elapsed_time()

genotype_data = {
    "all_info": df,
    "states": states,
    "camids": np.array(camids),
    "positions": np.array(str_pos),
    "ml_ready": ml_ready,
}

polars_method_timer.stop()
polars_method_timer.print_elapsed_time()


polars-tsv-loading: 0.7218
proposed-extract-camids: 0.0031
proposed-extract-states: 0.8367
proposed-intermediates: 13.3661
proposed-create-ML: 28.8862
polars-tsv-loading: 58.5318


We ran the proposed pipeline on the same file and found the following speed amounts:
- polars-tsv-loading: 0.7218
- proposed-extract-camids: 0.0031
- proposed-extract-states: 0.8367
- proposed-intermediates: 13.3661
- proposed-create-ML: 28.8862
- polars-tsv-loading: 58.5318

In [65]:
speed_up = (185.0862 / 58.5318) - 1
print(f"A {speed_up * 100:.2f}% speed up!")

A 216.21% speed up!


In [67]:
(ml_ready == test_ml_ready).all()

np.True_