# Gathering and consolidation of raw data files

This file computes the raw data from the `raw_data` folder and creates all the files in the `data` folder.

_Michał Denkiewicz 2024_

In [2]:
import os
import pickle
from concurrent.futures import ProcessPoolExecutor, as_completed
import numpy as np
import pandas as pd
import igraph as ig
from tqdm import tqdm
import pyranges as pr

import datasources
from knots_tools import *

 ## Reading raw data files

In [None]:
RAW_DATA_DIR = './raw_data/'

_parsers = {
    'chromosome': parse_chromosome,
    'min_petcount': int,
    'ccd_id': int
}
_filters = [
    lambda path, keys: os.path.getsize(path) > 0,  # non-empty files
]

def _fix_clean_minors_numbering(keys):
    return type(keys)(keys.assay, keys.cell_line, keys.chromosome, keys.ccd_id + 1)


# Raw input graphs in cknots format
raw_data_files = datasources.DataSources(
    './raw_data/',
    ['assay', 'cell_line', 'chromosome', 'ccd_id']
).add(  # Raw input graphs in cknots format
    'raw_graph_file', r'(?P<assay>IS)_chiapet/(?P<cell_line>\w+)/(?P<chromosome>chr_?\w+)/\w+\.bedpe\.(?P<ccd_id>\d+)\.chr\w+\.mp$',
    parsers=_parsers, filters=_filters,
).add(  # Raw input graphs in cknots format
    'raw_graph_file', r'(?P<assay>LR)_chiapet/(?P<cell_line>\w+)/(?P<chromosome>chr_?\w+)/\w+_pet(?P<min_petcount>\d+)_orig\.bedpe\.(?P<ccd_id>\d+)\.chr\w+\.mp$',
    parsers=_parsers, filters=_filters,
).add(  # Clean minors
    'clean_minors_file', r'(?P<assay>IS)_chiapet/(?P<cell_line>\w+)_cleaned/(?P<chromosome>chr_?\w+)/\w+\.(?P<ccd_id>\d+)\.chr\w+\.mp\.raw_minors$',
    parsers=_parsers, filters=_filters,
).add(  # Clean minors
    'clean_minors_file', r'(?P<assay>LR)_chiapet/(?P<cell_line>\w+)_cleaned/(?P<chromosome>chr_?\w+)/\w+_cleaned\.(?P<ccd_id>\d+)\.chr\w+\.mp.raw_minors',
    parsers=_parsers, filters=_filters, fix=_fix_clean_minors_numbering
).get_paths_as_dataframe()

# Fix index
raw_data_files = raw_data_files.reset_index()
raw_data_files['dataset'] = (raw_data_files['cell_line'] + np.where(raw_data_files['assay'] == 'LR', 'lr', '')).astype(DatasetDtype)
raw_data_files['chromosome'] = raw_data_files['chromosome'].astype(HumanChromosomeDtype)
raw_data_files = raw_data_files.set_index(CCD_INDEX_NAMES).sort_index()
raw_data_files

In [None]:
raw_data_files[(raw_data_files['raw_graph_file'] != '') & (raw_data_files['clean_minors_file'] != '')]

## Creating graphs

In [5]:
DATA_DIR = './data/'
GRAPHS_DIR = os.path.join(DATA_DIR, 'graphs')
MINORS_DIR = os.path.join(DATA_DIR, 'minors')

**WARNING: The cell below can take ~ 100 core-hours to execute (5-10hours on a modern workstation).
It is recommended to use a machine with multiple cores.
You might want to use the pickle file with pre-computed results.**

In [None]:
def add_graph_stats(graph: ig.Graph) -> pd.DataFrame:
    n = max(graph.vcount(), 3)  # Avoid division by zero
    graph.vs['degree'] = graph.degree()
    graph.vs['closeness'] = graph.closeness(normalized=True)
    graph.vs['betweenness'] = np.array(graph.betweenness()) * 2.0 / ((n - 1) * (n - 2))  # Normalization not implemented in igraph
    graph.es['edge_betweenness'] = np.array(graph.edge_betweenness()) * 2.0 / (n * (n - 1))  # Normalization not implemented in igraph


def create_graph(idx, graph_file, minors_file, save_graph_fmt=None, save_minors_fmt=None):
    graph, node_name_to_id = read_graph_from_cknots_file(graph_file)
    if isinstance(minors_file, str) and minors_file != '':
        minors = LinearMinor.read(minors_file, node_name_to_id)
    else:
        minors = []
    LinearMinor.add_multiple_info_to_graph(minors, graph)
    add_graph_stats(graph)    
    if save_graph_fmt is not None:
        graph_path = save_graph_fmt.format(*idx)
        with open(graph_path, 'wb') as f:
            pickle.dump(graph, f)
    if save_minors_fmt is not None:
        minors_path = save_minors_fmt.format(*idx)
        with open(minors_path, 'wb') as f:
            pickle.dump(minors, f)
    return idx, graph, minors


def create_all_graphs_and_minors(n_workers=None, save_graph_fmt=None, save_minors_fmt=None):
    with ProcessPoolExecutor(max_workers=n_workers) as executor:
        futures = [
            executor.submit(create_graph, idx, graph_file, minors_file, save_graph_fmt, save_minors_fmt)
            for idx, graph_file, minors_file in
            raw_data_files[['raw_graph_file', 'clean_minors_file']].itertuples(index=True)
        ]
        _res = []
        _idx = []
        for res in tqdm(as_completed(futures), total=len(futures)):
            idx, graph, minors = res.result()
            _res.append((graph, minors))
            _idx.append(idx)

    graphs = pd.DataFrame.from_records(
        _res, index=pd.MultiIndex.from_tuples(_idx, names=raw_data_files.index.names),
        columns=['graph', 'minors'],
    )
    graphs = graphs.reset_index()
    for col, dttype in CCD_INDEX_DTYPES.items():
        graphs[col] = graphs[col].astype(dttype)
    graphs = graphs.set_index(CCD_INDEX_NAMES)
    graphs = graphs.sort_index()
    return graphs[['graph']], graphs[['minors']]

    
def read_from_pickle(fn):
    with open(fn, 'rb') as f:
        return pickle.load(f)

# UNCOMMENT TO RE-CALCULATE
# os.makedirs(GRAPHS_DIR, exist_ok=True)
# os.makedirs(MINORS_DIR, exist_ok=True)
# all_graphs, all_minors = create_all_graphs_and_minors(
#     save_graph_fmt=os.path.join(GRAPHS_DIR, 'ccd_graph_{0}_{1}_{2:04d}.pkl'),  # ['dataset', 'chromosome', 'ccd_id']
#     save_minors_fmt=os.path.join(MINORS_DIR, 'ccd_minors_{0}_{1}_{2:04d}.pkl')
# ) 

# Load from pickle
all_graphs = datasources.DataSources(
    GRAPHS_DIR, ['dataset', 'chromosome', 'ccd_id']
).add(
    'graph_file', 'ccd_graph_(?P<dataset>\\w+)_(?P<chromosome>\\w+)_(?P<ccd_id>\\d+)\\.pkl$', parsers={'ccd_id': int}
).get_paths_as_dataframe().reset_index()
all_graphs['chromosome'] = all_graphs['chromosome'].astype(HumanChromosomeDtype)
all_graphs['dataset'] = all_graphs['dataset'].astype(DatasetDtype)
all_graphs['graph'] = all_graphs['graph_file'].apply(read_from_pickle)
all_graphs = all_graphs.drop(columns='graph_file').set_index(CCD_INDEX_NAMES).sort_index()
print('****** Graphs dataframe: ******')
all_graphs.info()

print()

all_minors = datasources.DataSources(
    MINORS_DIR, ['dataset', 'chromosome', 'ccd_id']
).add(
    'minors_file', 'ccd_minors_(?P<dataset>\\w+)_(?P<chromosome>\\w+)_(?P<ccd_id>\\d+)\\.pkl$', parsers={'ccd_id': int}
).get_paths_as_dataframe().reset_index()
all_minors['chromosome'] = all_minors['chromosome'].astype(HumanChromosomeDtype)
all_minors['dataset'] = all_minors['dataset'].astype(DatasetDtype)
all_minors['minors'] = all_minors['minors_file'].apply(read_from_pickle)
all_minors = all_minors.drop(columns='minors_file').set_index(CCD_INDEX_NAMES).sort_index()
print('****** Minors dataframe: ')
all_minors.info()

In [None]:
def _make_nodes_df(g: ig.Graph) -> pd.DataFrame:
    df = g.get_vertex_dataframe()
    df['n_minors'] = df['minors'].apply(len)
    df['in_minor'] = df['n_minors'] > 0
    df = df.drop(columns=['minors', 'idx_in_minor'])
    df.index.name = 'node_id'
    return df

all_nodes = pd.concat(
    all_graphs.graph.apply(_make_nodes_df).to_dict(),
    names=all_graphs.index.names
)
all_nodes.info()
all_nodes.head()

In [None]:
def _make_edges_df(g: ig.Graph) -> pd.DataFrame:
    df = g.get_edge_dataframe()
    df['n_minors'] = df['minors'].apply(len)
    df['in_minor'] = df['n_minors'] > 0
    df = df.drop(columns=['minors', 'idx_in_minor', 'loop_ids'])
    df.index.name = 'edge_id'
    return df

all_edges = pd.concat(
    all_graphs.graph.apply(_make_edges_df).to_dict(),
    names=all_graphs.index.names
)
all_edges.info()
all_edges.head()

In [None]:
_node_tab = all_nodes.groupby(CCD_INDEX_NAMES).agg({
    'coord': ['min', 'max', 'count'],  # count is just number of nodes
    'degree': ['min', 'max', 'mean'],
    'closeness': ['min', 'max', 'mean'],
    'betweenness': ['min', 'max', 'mean'],
    'n_minors': 'sum'
})
_node_tab.columns = ['_'.join(col) for col in _node_tab.columns.to_flat_index()]
_node_tab = _node_tab.rename(columns={'coord_count': 'n_nodes', 'n_minors_sum': 'n_nodes_in_minors'})
_node_tab['nodes_in_minors_ratio'] = _node_tab['n_nodes_in_minors'] / _node_tab['n_nodes']
_node_tab

In [None]:
_edge_tab = all_edges.groupby(CCD_INDEX_NAMES).agg({
    'source': 'count',  # count is just number of edges
    'petcount': ['min', 'max', 'mean'],
    'distance': ['min', 'max', 'mean'],
    'edge_betweenness': ['min', 'max', 'mean'],
    'n_minors': 'sum'
})
_edge_tab.columns = ["_".join(c) for c in _edge_tab.columns.to_flat_index()]
_edge_tab = _edge_tab.rename(columns={'source_count': 'n_edges', 'n_minors_sum': 'n_edges_in_minors'})
_edge_tab['edges_in_minors_ratio'] = _edge_tab['n_edges_in_minors'] / _edge_tab['n_edges']
_edge_tab

## Reading all minors

In [None]:
raw_minors = raw_data_files.loc[:, ['clean_minors_file']].map(
    lambda f: LinearMinor.read(f) if f != '' else []
)
raw_minors.columns = ['clean_minors']
raw_minors.info()
raw_minors

In [None]:
_minors_tab = raw_minors.clean_minors.transform(len).to_frame('n_minors')
_minors_tab['has_minors'] = _minors_tab['n_minors'] > 0
_minors_tab.info()
_minors_tab.head()

In [None]:
ccds = pd.concat([_node_tab, _edge_tab, _minors_tab], axis=1)
ccds['density'] = ccds['n_edges'] / (ccds['n_nodes'] * (ccds['n_nodes'] - 1) / 2)
ccds = ccds.rename(columns={'coord_min': 'start', 'coord_max': 'end'})
ccds['length'] = ccds['end'] - ccds['start']
del _node_tab, _edge_tab, _minors_tab
ccds.info()
ccds.head()

In [None]:
minors = raw_minors.clean_minors.explode().dropna().to_frame('minor').reset_index()
minors['start'] = minors['minor'].map(lambda m: m.start)
minors['end'] = minors['minor'].map(lambda m: m.end)
minors['length'] = minors['end'] - minors['start']
minors['n_nodes'] = minors['minor'].map(lambda m: len(list(m.nodes())))
minors['minor_id'] = minors['minor'].map(lambda m: m.chromosome) + '-' + minors['ccd_id'].apply(lambda i: f'{i:03d}') + '-' + minors['minor'].apply(lambda m: f'{m.idx:03d}')
minors = minors.set_index(['dataset', 'chromosome', 'ccd_id', 'minor_id'])
minors.info()
minors.head()

In [None]:
minor_segments = minors.minor.apply(lambda m: m.segment_coords).explode().dropna().to_frame('segment')
minor_segments['segment_idx'] = minor_segments.groupby(['dataset', 'chromosome', 'ccd_id', 'minor_id'], observed=True).cumcount()
minor_segments['start'] = minor_segments['segment'].str[0]
minor_segments['end'] = minor_segments['segment'].str[1]
minor_segments = minor_segments.drop(columns=['segment'])
minor_segments.info()
minor_segments.iloc[:10]

### Create output files

In [16]:
raw_data_files.to_csv(os.path.join(DATA_DIR, 'all_input_files.csv'))
ccds.to_csv(os.path.join(DATA_DIR, 'all_ccds.csv'))  # SD2
all_nodes.to_csv(os.path.join(DATA_DIR, 'all_nodes.csv'))
all_edges.to_csv(os.path.join(DATA_DIR, 'all_edges.csv'))
minors.drop(columns=['minor']).to_csv(os.path.join(DATA_DIR, 'all_minors.csv'))  # SD1
minor_segments.to_csv(os.path.join(DATA_DIR, 'all_minor_segments.csv'))

# ChIA-Drop

In [17]:
import read_chiadrop

RAW_CHIADROP_DATA_FOLDER = './raw_data/chiadrop'
CHIADROP_RAW_FILES = [
    filename for filename in os.listdir(RAW_CHIADROP_DATA_FOLDER)
    if re.match(r'^4DN.*\.txt$', filename)
]

In [None]:
fragments_df = pd.concat({
    filename[:-4]: read_chiadrop.from_txt(os.path.join(RAW_CHIADROP_DATA_FOLDER, filename)).drop(columns=['GEM_ID_long', 'gem_chromosome'])
    for filename in CHIADROP_RAW_FILES
}, names=['accession']).reset_index().rename(columns={
    'GEM_span': 'gem_span',
    'start': 'frag_start',
    'end': 'frag_end'
})
fragments_df['chromosome'] = fragments_df['chromosome'].astype(HumanChromosomeDtype)
fragments_df['gem_id'] = fragments_df.groupby(['accession', 'GEM_idx']).ngroup()
fragments_df = fragments_df.sort_values(['gem_id', 'frag_start']).reset_index(drop=True)
fragments_df = fragments_df.drop(columns=['GEM_idx'])
fragments_df.info()
fragments_df.head()

In [19]:
fragments_df.to_csv(os.path.join(DATA_DIR, 'chiadrop_fragments.csv'), index=False)