# Build a set of training data

Use a set of (most) common peptides to create inital data sets

- based on `Counter` over all outputs from search (here: MaxQuant)

In [None]:
import yaml
import json
import random  # shuffle, seed
import functools
from pathlib import Path
import logging
import multiprocessing

import numpy as np
import pandas as pd

from tqdm.notebook import tqdm_notebook

import vaep
from vaep.io import data_objects

import config as config

In [None]:
from typing import List
def select_files_by_parent_folder(fpaths:List, years:List):
    selected = []
    for year_folder in years:
        # several passes, but not a bottle neck
        selected += [dump for dump in fpaths if year_folder in dump.parent.stem]
    return selected

## Setup

In [None]:
RANDOM_SEED: int = 42  # Random seed for reproducibility
FEAT_COMPLETNESS_CUTOFF = 0.25 # Minimal proportion of samples which have to share a feature
YEARS = ['2017','2018', '2019', '2020']
SAMPLE_COL = 'Sample ID'


Select a specific config file

In [None]:
# options = ['peptides', 'evidence', 'proteinGroups']
from config.training_data import peptides as cfg
# from config.training_data import evidence as cfg
# from config.training_data import proteinGroups as cfg

{k: getattr(cfg, k) for k in dir(cfg) if not k.startswith('_')}

In [None]:
out_folder = 'data/selected/'
out_folder = Path(out_folder) / cfg.NAME
out_folder.mkdir(exist_ok=True, parents=True)

Set defaults from file (allows to potentially overwrite parameters)

In [None]:
# normal structure of config.py files
NAME = cfg.NAME
BASE_NAME = cfg.BASE_NAME

TYPES_DUMP = cfg.TYPES_DUMP
TYPES_COUNT = cfg.TYPES_COUNT

IDX_COLS_LONG = cfg.IDX_COLS_LONG

LOAD_DUMP = cfg.LOAD_DUMP

CounterClass = cfg.CounterClass
FNAME_COUNTER = cfg.FNAME_COUNTER

## Selected IDs

- currently only `Sample ID` is used
- path are to `.raw` raw files, not the output folder (could be changed)

In [None]:
fn_id_old_new: str = 'data/rename/selected_old_new_id_mapping.csv' # selected samples with pride and original id
df_ids = pd.read_csv(fn_id_old_new)
df_ids

## Counter

In [None]:
counter = CounterClass(FNAME_COUNTER)
counts = counter.get_df_counts()
counts

In [None]:
if TYPES_COUNT:
    counts = counts.convert_dtypes().astype({'Charge': int}) #
mask = counts['proportion'] >= FEAT_COMPLETNESS_CUTOFF
counts.loc[mask]

Based on selected samples, retain features that potentially could be in the subset

- if 1000 samples are selected, and given at treshold of 25%, one would need at least 250 observations

In [None]:
treshold_counts = int(len(df_ids) * FEAT_COMPLETNESS_CUTOFF)
mask = counts['counts'] >= treshold_counts
counts.loc[mask]

In [None]:
IDX_selected = counts.loc[mask].set_index('Sequence').index
IDX_selected

### Collect in parallel

In [None]:
selected_dumps = df_ids["Sample ID"]
selected_dumps = {k: counter.dumps[k] for k in selected_dumps}
selected_dumps = list(selected_dumps.items())
selected_dumps[:10]

In [None]:
N_WORKERS = 8
IDX = IDX_selected

def load_fct(path):
    s = (
    pd.read_csv(path, index_col="Sequence", usecols=["Sequence", "Intensity"])
    .notna()
    .squeeze()
    .astype(pd.Int8Dtype())
    )
    return s


def collect(folders, index=IDX, load_fct=load_fct):
    current = multiprocessing.current_process()
    i = current._identity[0] % N_WORKERS + 1
    print(" ", end="", flush=True)

    failed = []
    all = pd.DataFrame(index=index)

    with tqdm_notebook(total=len(folders), position=i) as pbar:
        for id, path in folders:
            try:
                s = load_fct(path)
                s.name = id
                all = all.join(s, how='left')
            except FileNotFoundError:
                logging.warning(f"File not found: {path}")
                failed.append((id, path))
            except pd.errors.EmptyDataError:
                logging.warning(f"Empty file: {path}")
                failed.append((id, path))
            pbar.update(1)
            
    return all

In [None]:
with multiprocessing.Pool(N_WORKERS) as p:
    all = list(
        tqdm_notebook(
            p.imap(collect, np.array_split(selected_dumps, N_WORKERS)),
            total=N_WORKERS,
        )
    )
    
all = pd.concat(all, axis=1)
all

In [None]:
count_samples = all.sum()

In [None]:
fname = out_folder / 'count_samples.json'
count_samples.to_json(fname)

vaep.plotting.make_large_descriptors(size='medium')

ax = count_samples.sort_values().plot(rot=90, ylabel='observations')
vaep.savefig(ax.get_figure(), fname)

In [None]:
%%time
all = all.T
all

In [None]:
fname = out_folder / config.insert_shape(all,  'absent_present_pattern_selected{}.pkl')
all.to_pickle(fname)

In [None]:
count_features = all.sum()
fname = out_folder / 'count_feat.json'
count_features.to_json(fname)

ax = count_features.sort_values().plot(rot=90, ylabel='observations') 
vaep.savefig(ax.get_figure(), fname)

In [None]:
%%time
all.to_csv(fname.with_suffix('.csv'), chunksize=1_000)

## Selected Features

- index names should also match!
- if not-> rather use a list?

In [None]:
def load_fct(path):
    s = (
    pd.read_csv(path, index_col="Sequence", usecols=["Sequence", "Intensity"])
    .squeeze()
    .astype(pd.Int64Dtype())
    )
    return s

all = None

from functools import partial

collect_intensities = partial(collect, index=IDX, load_fct=load_fct)

with multiprocessing.Pool(N_WORKERS) as p:
    all = list(
        tqdm_notebook(
            p.imap(collect_intensities, np.array_split(selected_dumps, N_WORKERS)),
            total=N_WORKERS,
        )
    )  
    
all = pd.concat(all, axis=1)
all

In [None]:
all.memory_usage(deep=True).sum() / (2**20)

In [None]:
fname = out_folder / config.insert_shape(all,  'intensities_wide_selected{}.pkl') 
all.to_pickle(fname)

In [None]:
# %%time
# all = all.T
# all

In [None]:
all.to_pickle(fname)

In [None]:
selected_features = counts.loc[mask].set_index(counter.idx_names).sort_index().index
# selected_features.name = 'Gene names' # needs to be fixed
selected_features

## Select Dumps

In [None]:
selected_dumps = select_files_by_parent_folder(list(counter.dumps.values()), years=YEARS)
print("Total number of files:", len(selected_dumps))
selected_dumps[-10:]

## Load one dump

- check that this looks like you expect it


In [None]:
LOAD_DUMP(selected_dumps[0])

## Process folders

- potentially in parallel, aggregating results
- if needed: debug using two samples

Design decisions
- long format of data with categorical features (to save memory)


In [None]:
from typing import List, Callable
from pandas.errors import EmptyDataError

def process_folders(fpaths: List[Path],
                    selected_features: pd.Index,
                    load_folder: Callable,
                    id_col='Sample ID',
                    dtypes: dict = {
                        'Sample ID': 'category',
                        'Sequence': 'category'}) -> tuple:
    print(f"started new process with {len(fpaths)} files.")
    data_intensity = []
    for i, fpath in enumerate(fpaths):
        if not i % 10: print(f"File ({i}): {fpath}")
        sample_name = fpath.stem
        try:
            # does some filtering
            dump = load_folder(fpath)
        except EmptyDataError:
            logging.warning(f'Empty dump: {fpath}')
            continue
        except FileNotFoundError:
            logging.warning(f'Missing dump: {fpath}')
            continue
        
        # long data format
        sequences_available = dump.index.intersection(selected_features)
        dump = dump.loc[sequences_available, 'Intensity'].reset_index()
        dump[id_col] = sample_name
        dump = dump.astype(dtypes)
        data_intensity.append(dump)
    
    data_intensity = pd.concat(data_intensity, copy=False, ignore_index=True)
    data_intensity = data_intensity.astype(dtypes)
    return data_intensity

# # experiment
# process_folders(selected_dumps[:2],
#                 selected_features=selected_features,
#                 load_folder=LOAD_DUMP,
#                 dtypes=TYPES_DUMP)


In [None]:
%%time
process_folders_peptides = functools.partial(process_folders,
                                             selected_features=selected_features,
                                             load_folder=LOAD_DUMP,
                                             dtypes=TYPES_DUMP)
collected_dfs = data_objects.collect_in_chuncks(paths=selected_dumps,
                                                process_chunk_fct=process_folders_peptides,
                                                chunks=200,
                                                n_workers=1 # to debug, don't multiprocess
                                               )

# one would need to aggregate categories first to keep them during aggregation?
collected_dfs = pd.concat(collected_dfs, copy=False, ignore_index=True)
collected_dfs = collected_dfs.astype(TYPES_DUMP)
df_intensities = collected_dfs
df_intensities

Except Intensities everything should be of data type category in order to save memory

In [None]:
df_intensities.dtypes

In [None]:
df_intensities.describe(include='all')

Check how many samples could be loaded, set total number of features

In [None]:
N = df_intensities[SAMPLE_COL].nunique()
M = len(selected_features)
N,M

set index columns provided and squeeze to series

In [None]:
df_intensities = df_intensities.set_index(IDX_COLS_LONG).squeeze()

In [None]:
base_name = f'{BASE_NAME}_' + '_'.join(YEARS)
fname = config.FOLDER_DATA / config.insert_shape(df_intensities,  base_name + '{}.pkl', shape=(N,M))
print(f"{fname = }")
df_intensities.to_pickle(fname)

- Only binary pickle format works for now
- csv and reshaping the data needs to much memory for a single erda instance with many samples