Skip to content

Commit

Permalink
Merge pull request #252 from DeepRank/gcroci2_231_reorganize_preproce…
Browse files Browse the repository at this point in the history
…ss_func

refactor: move preprocess function to QueryDataset class and rename
  • Loading branch information
gcroci2 committed Nov 23, 2022
2 parents c984630 + fe1a136 commit db0f113
Show file tree
Hide file tree
Showing 9 changed files with 324 additions and 303 deletions.
15 changes: 6 additions & 9 deletions README.md
Expand Up @@ -89,13 +89,12 @@ The documentation can be found [here](https://deeprankcore.rtfd.io/).
The process of generating graphs takes as input `.pdb` files representing protein-protein structural complexes and the correspondent Position-Specific Scoring Matrices (PSSMs) in the form of `.pssm` files. Query objects describe how the graphs should be built.

```python
from deeprankcore.preprocess import preprocess
from deeprankcore.query import ProteinProteinInterfaceResidueQuery
from deeprankcore.query import QueryCollection, ProteinProteinInterfaceResidueQuery

queries = []
queries = QueryCollection()

# Append data points
queries.append(ProteinProteinInterfaceResidueQuery(
queries.add(ProteinProteinInterfaceResidueQuery(
pdb_path = "1ATN_1w.pdb",
chain_id1 = "A",
chain_id2 = "B",
Expand All @@ -107,7 +106,7 @@ queries.append(ProteinProteinInterfaceResidueQuery(
"B": "1ATN.B.pdb.pssm"
}
))
queries.append(ProteinProteinInterfaceResidueQuery(
queries.add(ProteinProteinInterfaceResidueQuery(
pdb_path = "1ATN_2w.pdb",
chain_id1 = "A",
chain_id2 = "B",
Expand All @@ -119,7 +118,7 @@ queries.append(ProteinProteinInterfaceResidueQuery(
"B": "1ATN.B.pdb.pssm"
}
))
queries.append(ProteinProteinInterfaceResidueQuery(
queries.add(ProteinProteinInterfaceResidueQuery(
pdb_path = "1ATN_3w.pdb",
chain_id1 = "A",
chain_id2 = "B",
Expand All @@ -133,9 +132,7 @@ queries.append(ProteinProteinInterfaceResidueQuery(
))

# Generate graphs and save them in hdf5 files
# The default creates a number of hdf5 files equals to the cpu cores available
# See deeprankcore.preprocess.preprocess for more details
output_paths = preprocess(queries, "<output_folder>/<prefix_for_outputs>")
output_paths = queries.process("<output_folder>/<prefix_for_outputs>")

```

Expand Down
109 changes: 0 additions & 109 deletions deeprankcore/preprocess.py

This file was deleted.

166 changes: 131 additions & 35 deletions deeprankcore/query.py
Expand Up @@ -4,6 +4,13 @@
import tempfile
import pdb2sql
import pickle
from glob import glob
from types import ModuleType
from functools import partial
from multiprocessing import Pool
import importlib
from os.path import basename, isfile, join
import h5py
from deeprankcore.utils.graph import Graph
from deeprankcore.molstruct.aminoacid import AminoAcid
from deeprankcore.utils.buildgraph import (
Expand All @@ -16,6 +23,7 @@
from deeprankcore.utils.graph import build_residue_graph, build_atomic_graph
from deeprankcore.molstruct.variant import SingleResidueVariant


_log = logging.getLogger(__name__)


Expand Down Expand Up @@ -117,6 +125,129 @@ def __repr__(self) -> str:
return f"{type(self)}({self.get_query_id()})"


class QueryCollection:
"""
Represents the collection of data queries. Queries can be saved as a dictionary to easily navigate through their data
"""

def __init__(self):
self._queries = []

def add(self, query: Query):
""" Adds new query to the collection of all generated queries.
Args:
query (Query): must be a Query object, either ProteinProteinInterfaceResidueQuery or SingleResidueVariantAtomicQuery.
"""
self._queries.append(query)

def export_dict(self, dataset_path: str):
""" Exports the colection of all queries to a dictionary file
Args:
dataset_path (str): the new path where the list of queries be saved to.
"""
with open(dataset_path, "wb") as pkl_file:
pickle.dump(self, pkl_file)

@property
def queries(self) -> List[Query]:
return self._queries

def __contains__(self, query: Query) -> bool:
return query in self._queries

def __iter__(self) -> Iterator[Query]:
return iter(self._queries)

def _process_one_query(
self,
prefix: str,
feature_names: List[str],
query: Query):

_log.info(f'\nProcess query with process ID {os.getpid()}.')

# because only one process may access an hdf5 file at the time:
output_path = f"{prefix}-{os.getpid()}.hdf5"

feature_modules = [
importlib.import_module('deeprankcore.features.' + name) for name in feature_names]

graph = query.build(feature_modules)

graph.write_to_hdf5(output_path)

def process(
self,
prefix: Optional[str] = None,
process_count: Optional[int] = None,
combine_files: bool = True,
feature_modules: Union[List[ModuleType], str] = "all"
):

"""
Args:
prefix: prefix for the output files. ./processed-queries- by default.
process_count: how many subprocesses to be run simultaneously.
By default takes all available cpu cores.
combine_files: boolean for combining the hdf5 files generated by the subprocesses.
By default, the hdf5 files generated are combined into one, and then deleted.
feature_modules: list of features' modules used to generate features.
Each feature's module must implement the add_features function, and
features' modules can be found (or should be placed in case of a custom made feature)
in deeprankcore.features folder.
If "all", all available modules in deeprankcore.features are used to generate the features.
Defaults to "all".
"""

if process_count is None:
# returns the set of CPUs available considering the sched_setaffinity Linux system call,
# which limits which CPUs a process and its children can run on.
process_count = len(os.sched_getaffinity(0))

_log.info(f'\nSet of CPU processors available: {process_count}.')

if prefix is None:
prefix = "processed-queries"

if feature_modules == "all":
feature_modules = glob(join('./deeprankcore/features/', "*.py"))
feature_names = [basename(f)[:-3] for f in feature_modules if isfile(f) and not f.endswith('__init__.py')]
else:
feature_names = [basename(m.__file__)[:-3] for m in feature_modules]

_log.info('Creating pool function to process the queries...')
pool_function = partial(self._process_one_query, prefix,
feature_names)

with Pool(process_count) as pool:
_log.info('Starting pooling...\n')
pool.map(pool_function, self.queries)

output_paths = glob(f"{prefix}-*.hdf5")

if combine_files:
dupl_ids = {}
for output_path in output_paths:
with h5py.File(f"{prefix}.hdf5",'a') as f_dest, h5py.File(output_path,'r') as f_src:
for key, value in f_src.items():
try:
f_src.copy(value,f_dest)
except RuntimeError:
if key not in dupl_ids:
dupl_ids[key] = 2
f_src.copy(f_src[key],f_dest,name=key+"_"+str(dupl_ids[key]))
_log.info(f'{key} Group id has already been added to the file. Renaming Group as {key+"_"+str(dupl_ids)}')
dupl_ids[key] += 1
os.remove(output_path)
return glob(f"{prefix}.hdf5")

return output_paths


class SingleResidueVariantResidueQuery(Query):
"creates a residue graph from a single residue variant in a pdb file"

Expand Down Expand Up @@ -536,38 +667,3 @@ def build(self, feature_modules: List, include_hydrogens: bool = False) -> Graph
feature_module.add_features(self._pdb_path, graph)

return graph


class QueryDataset:
"""
Represents the collection of data queries. Queries can be saved as a dictionary to easily navigate through their data
"""

def __init__(self):
self._queries = []

def add(self, query: Query):
""" Adds new query to the colection of all generated queries.
Args:
query (Query): must be a Query object, either ProteinProteinInterfaceResidueQuery or SingleResidueVariantAtomicQuery.
"""
self._queries.append(query)

def export_dict(self, dataset_path: str):
""" Exports the colection of all queries to a dictionary file
Args:
dataset_path (str): the new path where the list of queries be saved to.
"""
with open(dataset_path, "wb") as pkl_file:
pickle.dump(self, pkl_file)

@property
def queries(self) -> List[Query]:
return self._queries

def __contains__(self, query: Query) -> bool:
return query in self._queries

def __iter__(self) -> Iterator[Query]:
return iter(self._queries)

0 comments on commit db0f113

Please sign in to comment.