Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move preprocess function to QueryDataset class and rename #252

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 6 additions & 9 deletions README.md
Expand Up @@ -89,13 +89,12 @@ The documentation can be found [here](https://deeprankcore.rtfd.io/).
The process of generating graphs takes as input `.pdb` files representing protein-protein structural complexes and the correspondent Position-Specific Scoring Matrices (PSSMs) in the form of `.pssm` files. Query objects describe how the graphs should be built.

```python
from deeprankcore.preprocess import preprocess
from deeprankcore.query import ProteinProteinInterfaceResidueQuery
from deeprankcore.query import QueryCollection, ProteinProteinInterfaceResidueQuery

queries = []
queries = QueryCollection()

# Append data points
queries.append(ProteinProteinInterfaceResidueQuery(
queries.add(ProteinProteinInterfaceResidueQuery(
pdb_path = "1ATN_1w.pdb",
chain_id1 = "A",
chain_id2 = "B",
Expand All @@ -107,7 +106,7 @@ queries.append(ProteinProteinInterfaceResidueQuery(
"B": "1ATN.B.pdb.pssm"
}
))
queries.append(ProteinProteinInterfaceResidueQuery(
queries.add(ProteinProteinInterfaceResidueQuery(
pdb_path = "1ATN_2w.pdb",
chain_id1 = "A",
chain_id2 = "B",
Expand All @@ -119,7 +118,7 @@ queries.append(ProteinProteinInterfaceResidueQuery(
"B": "1ATN.B.pdb.pssm"
}
))
queries.append(ProteinProteinInterfaceResidueQuery(
queries.add(ProteinProteinInterfaceResidueQuery(
pdb_path = "1ATN_3w.pdb",
chain_id1 = "A",
chain_id2 = "B",
Expand All @@ -133,9 +132,7 @@ queries.append(ProteinProteinInterfaceResidueQuery(
))

# Generate graphs and save them in hdf5 files
# The default creates a number of hdf5 files equals to the cpu cores available
# See deeprankcore.preprocess.preprocess for more details
output_paths = preprocess(queries, "<output_folder>/<prefix_for_outputs>")
output_paths = queries.process("<output_folder>/<prefix_for_outputs>")

```

Expand Down
109 changes: 0 additions & 109 deletions deeprankcore/preprocess.py

This file was deleted.

166 changes: 131 additions & 35 deletions deeprankcore/query.py
Expand Up @@ -4,6 +4,13 @@
import tempfile
import pdb2sql
import pickle
from glob import glob
from types import ModuleType
from functools import partial
from multiprocessing import Pool
import importlib
from os.path import basename, isfile, join
import h5py
from deeprankcore.utils.graph import Graph
from deeprankcore.molstruct.aminoacid import AminoAcid
from deeprankcore.utils.buildgraph import (
Expand All @@ -16,6 +23,7 @@
from deeprankcore.utils.graph import build_residue_graph, build_atomic_graph
from deeprankcore.molstruct.variant import SingleResidueVariant


_log = logging.getLogger(__name__)


Expand Down Expand Up @@ -117,6 +125,129 @@ def __repr__(self) -> str:
return f"{type(self)}({self.get_query_id()})"


class QueryCollection:
"""
Represents the collection of data queries. Queries can be saved as a dictionary to easily navigate through their data

"""

def __init__(self):
self._queries = []

def add(self, query: Query):
""" Adds new query to the collection of all generated queries.
Args:
query (Query): must be a Query object, either ProteinProteinInterfaceResidueQuery or SingleResidueVariantAtomicQuery.
"""
self._queries.append(query)

def export_dict(self, dataset_path: str):
""" Exports the colection of all queries to a dictionary file
Args:
dataset_path (str): the new path where the list of queries be saved to.
"""
with open(dataset_path, "wb") as pkl_file:
pickle.dump(self, pkl_file)

@property
def queries(self) -> List[Query]:
return self._queries

def __contains__(self, query: Query) -> bool:
return query in self._queries

def __iter__(self) -> Iterator[Query]:
return iter(self._queries)

def _process_one_query(
self,
prefix: str,
feature_names: List[str],
query: Query):

_log.info(f'\nProcess query with process ID {os.getpid()}.')

# because only one process may access an hdf5 file at the time:
output_path = f"{prefix}-{os.getpid()}.hdf5"

feature_modules = [
importlib.import_module('deeprankcore.features.' + name) for name in feature_names]

graph = query.build(feature_modules)

graph.write_to_hdf5(output_path)

def process(
self,
prefix: Optional[str] = None,
process_count: Optional[int] = None,
combine_files: bool = True,
feature_modules: Union[List[ModuleType], str] = "all"
):

"""
Args:
prefix: prefix for the output files. ./processed-queries- by default.

process_count: how many subprocesses to be run simultaneously.
By default takes all available cpu cores.

combine_files: boolean for combining the hdf5 files generated by the subprocesses.
By default, the hdf5 files generated are combined into one, and then deleted.

feature_modules: list of features' modules used to generate features.
Each feature's module must implement the add_features function, and
features' modules can be found (or should be placed in case of a custom made feature)
in deeprankcore.features folder.
If "all", all available modules in deeprankcore.features are used to generate the features.
Defaults to "all".
"""

if process_count is None:
# returns the set of CPUs available considering the sched_setaffinity Linux system call,
# which limits which CPUs a process and its children can run on.
process_count = len(os.sched_getaffinity(0))

_log.info(f'\nSet of CPU processors available: {process_count}.')

if prefix is None:
prefix = "processed-queries"

if feature_modules == "all":
feature_modules = glob(join('./deeprankcore/features/', "*.py"))
feature_names = [basename(f)[:-3] for f in feature_modules if isfile(f) and not f.endswith('__init__.py')]
else:
feature_names = [basename(m.__file__)[:-3] for m in feature_modules]

_log.info('Creating pool function to process the queries...')
pool_function = partial(self._process_one_query, prefix,
feature_names)

with Pool(process_count) as pool:
_log.info('Starting pooling...\n')
pool.map(pool_function, self.queries)

output_paths = glob(f"{prefix}-*.hdf5")

if combine_files:
dupl_ids = {}
for output_path in output_paths:
with h5py.File(f"{prefix}.hdf5",'a') as f_dest, h5py.File(output_path,'r') as f_src:
for key, value in f_src.items():
try:
f_src.copy(value,f_dest)
except RuntimeError:
if key not in dupl_ids:
dupl_ids[key] = 2
f_src.copy(f_src[key],f_dest,name=key+"_"+str(dupl_ids[key]))
_log.info(f'{key} Group id has already been added to the file. Renaming Group as {key+"_"+str(dupl_ids)}')
dupl_ids[key] += 1
os.remove(output_path)
return glob(f"{prefix}.hdf5")

return output_paths


class SingleResidueVariantResidueQuery(Query):
"creates a residue graph from a single residue variant in a pdb file"

Expand Down Expand Up @@ -536,38 +667,3 @@ def build(self, feature_modules: List, include_hydrogens: bool = False) -> Graph
feature_module.add_features(self._pdb_path, graph)

return graph


class QueryDataset:
"""
Represents the collection of data queries. Queries can be saved as a dictionary to easily navigate through their data

"""

def __init__(self):
self._queries = []

def add(self, query: Query):
""" Adds new query to the colection of all generated queries.
Args:
query (Query): must be a Query object, either ProteinProteinInterfaceResidueQuery or SingleResidueVariantAtomicQuery.
"""
self._queries.append(query)

def export_dict(self, dataset_path: str):
""" Exports the colection of all queries to a dictionary file
Args:
dataset_path (str): the new path where the list of queries be saved to.
"""
with open(dataset_path, "wb") as pkl_file:
pickle.dump(self, pkl_file)

@property
def queries(self) -> List[Query]:
return self._queries

def __contains__(self, query: Query) -> bool:
return query in self._queries

def __iter__(self) -> Iterator[Query]:
return iter(self._queries)