In [2]:
import pathlib
import numpy as np
import matplotlib.pyplot as plt
from Bio.PDB import *
import multiprocessing as mp
import h5py
import logging
from math import floor
from tqdm.notebook import tqdm

In [3]:
# Biopython creates warnings for chains that are discontinuous. I recommend turning them off.
import warnings
from Bio import BiopythonWarning
warnings.simplefilter('ignore', BiopythonWarning)

In [4]:
logging.basicConfig(filename='data_generation.log',level=logging.ERROR)

In [5]:
#Global Variables
#----------------
# Requires absolute path
test_path = pathlib.Path("/home/collin/protein_gan/data/test/")
train_path = pathlib.Path("/home/collin/protein_gan/data/train/")

In [6]:
def split_matrix(matrix, res):
    if (len(matrix)>=res):
        for n in range(1,int(floor(len(matrix)/res))):
            # Creating RES x RES matrices by traversing the spine of input matrix
            matrix_chunk = matrix[res*(n-1):res*n, res*(n-1):res*n]
            yield matrix_chunk

In [7]:
def calc_dist_matrix(residues):
    """Returns a matrix of distances between residues of the same chain."""
    size = len(residues)
    answer = np.zeros((size, size), np.float)
    for row, residue_one in enumerate(residues):
        for col, residue_two in enumerate(residues):
            answer[row, col] = residue_one["CA"] - residue_two["CA"]
    return answer

In [8]:
def generate_maps(files):
    """
    Generate specified resolution a-carbon maps given a input directory
    """
    # Create A chain maps as matrices
    parser = PDBParser()
    io = PDBIO()
    # Get the initial structure of the protein
    try:    
        structure = parser.get_structure('X', files)
        for models in structure:
            residues = Selection.unfold_entities(models['A'], 'R')
            ca_residues = [residue for residue in residues if 'CA' in residue]
            distance_matrix = calc_dist_matrix(ca_residues)
            return (list(split_matrix(distance_matrix, 16)), 
                        list(split_matrix(distance_matrix, 64)), 
                        list(split_matrix(distance_matrix, 128)))
    except ValueError as err:
        logging.error(f'ValuError file :{files}, Error is:{err}')
    except TypeError as err:
        logging.error(f'TypeError file :{files}, Error is:{err}')


In [13]:
def main(files, desc):
    """
    Clean the generated maps using all cores in the process
    """
    p = mp.Pool()
    pdbs = [file for file in files.glob("*.pdb")]
    r = list(tqdm(p.imap(generate_maps, pdbs), total=len(pdbs), desc=desc))
    p.close()
    p.join()
    return r

In [10]:
def test_result(result):
    test_len = [len(x) for x in result]
    plt.hist(test_len)
    plt.show()
    plt.imshow(result[5], cmap='viridis')
    plt.colorbar()
    plt.show()

In [59]:
test_sets = main(test_path, "test_sets")
test_16 = []
test_64 = []
test_128 = []
for files in test_sets:
    for matrix in files[0]:
        test_16.append(matrix)
    for matrix in files[1]:
        test_64.append(matrix)
    for matrix in files[2]:
        test_128.append(matrix)
with h5py.File('dataset.hdf5', 'w') as f:
    f.create_dataset('test_16', data=test_16, compression="gzip")
    f.create_dataset('test_64', data=test_64, compression="gzip")
    f.create_dataset('test_128', data=test_128, compression="gzip")

In [None]:
train_sets = main(train_path, "train_sets")
train_16 = []
train_64 = []
train_128 = []
for files in train_sets:
    for matrix in files[0]:
        train_16.append(matrix)
    for matrix in files[1]:
        train_64.append(matrix)
    for matrix in files[2]:
        train_128.append(matrix)
with h5py.File('dataset.hdf5', 'a') as f:
    f.create_dataset('train_16', data=train_16, compression="gzip")
    f.create_dataset('train_64', data=train_64, compression="gzip")
    f.create_dataset('train_128', data=train_128, compression="gzip")

HBox(children=(FloatProgress(value=0.0, description='train_sets', max=115850.0, style=ProgressStyle(descriptio…