In [1]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [6]:
#% cd "/content/drive/My Drive/00_MY Projects and courses/reaction_prediction_seq2seq/evaluation"


/content/drive/My Drive/00_MY Projects and courses/reaction_prediction_seq2seq/evaluation
/content


In [7]:
# ! wget https://anaconda.org/rdkit/rdkit/2018.09.1.0/download/linux-64/rdkit-2018.09.1.0-py36h71b666b_1.tar.bz2 -O rdkit.tar.bz2 -q
# ! mkdir rdkit
# ! tar xjf rdkit.tar.bz2 -C rdkit

# ! cp -r rdkit/lib/python3.6/site-packages/* /usr/local/lib/python3.6/dist-packages/
# ! cp -r rdkit/lib/*.so.* /usr/lib/x86_64-linux-gnu/

# ! ln -s /usr/lib/x86_64-linux-gnu/libboost_python3-py36.so.1.65.1 /usr/lib/x86_64-linux-gnu/libboost_python3.so.1.65.1

ln: failed to create symbolic link '/usr/lib/x86_64-linux-gnu/libboost_python3.so.1.65.1': File exists


In [0]:
import os
import tempfile
import numpy as np
import networkx as nx
from rdkit.Chem import AllChem
from collections import Counter

In [0]:
# functions to reformat the google seq2seq beam files

def decode_single_example(predicted_ids, parent_ids, scores, vocab,
                          num_top_candidates, calc_num_completed=False):
  """
  Decodes a numpy array of predicted ids, parent_ids, scores, each with
  dimensions seq_length x k (# of beams), into a python list of tuples.
  Each tuple has the form ((chars), path log prob, path prob) and represents
  a possible beam candidate. Adapted
  from https://gist.github.com/MInner/5204e649f6a7b0541b232f1f0f9fc8ba. The
  number of beam candidates will be <= N
  :param predicted_ids: numpy array of predicted ids, dim seq_length x k
  :param parent_ids:  numpy array of predicted ids, dim seq_length x k
  :param scores:  numpy array of predicted ids, dim seq_length x k
  :param vocab: a list where the index corresponds to the char in the vocab
  :param num_top_candidates: the number of top candidates (N) to keep for each
  example, where N <= k (beam width used in seq2seq decoder). The actual
  number of candidates generated will always be less than or equal to N.
  :param calc_num_completed: if True, also returns the number of completed
  candidates (ie candidates that
  :return: a list of tuples, each of the form
  ((chars), path log prob, path prob). The prob is weighted by the number of
  candidates in the list. Eg if the list contains 1 candidate, then the prob
  of that candidate will be 1.0
  """
  k = predicted_ids.shape[1]
  if num_top_candidates > k:
    raise ValueError("num_top_candidates cannot be greater than k")

  def _add_graph_level(graph, level, parent_ids, names, scores):
    """Adds a level to the passed graph"""
    for i, parent_id in enumerate(parent_ids):
      new_node = (level, i)
      parent_node = (level - 1, parent_id)
      graph.add_node(new_node)
      graph.node[new_node]["name"] = names[i]
      graph.node[new_node]["score"] = str(scores[i])
      graph.node[new_node]["size"] = 100
      # Add an edge to the parent
      graph.add_edge(parent_node, new_node)

  def create_graph(predicted_ids, parent_ids, scores, vocab=None):
    def get_node_name(pred):
      return vocab[pred] if vocab else str(pred)

    seq_length = predicted_ids.shape[0]
    graph = nx.DiGraph()
    for level in range(seq_length):
      names = [get_node_name(pred) for pred in predicted_ids[level]]
      _add_graph_level(graph, level + 1, parent_ids[level], names,
                       scores[level])
    graph.node[(0, 0)]["name"] = "START"
    return graph

  def get_path_to_root(graph, node):
    p = graph.predecessors(node)
    assert len(p) <= 1
    self_seq = [graph.node[node]['name'].split('\t')[0]]
    if len(p) == 0:
      return self_seq
    else:
      return self_seq + get_path_to_root(graph, p[0])

  def _tree_node_predecessor(pos):
    return graph.node[graph.predecessors(pos)[0]]

  graph = create_graph(predicted_ids, parent_ids, scores, vocab)

  pred_end_node_names = {pos for pos, d in graph.node.items()
                         if d['name'] == 'SEQUENCE_END'
                         and len(graph.predecessors(pos)) > 0
                         and _tree_node_predecessor(pos)[
                           'name'] != 'SEQUENCE_END'}

  # if no suitable end nodes, occurs when all candidates have been cutoff
  # prematurely due to exceeding the max seq2seq decode length. We create a
  # dummy output that is an invalid molecule SMILES, with nan for both log
  # probs and scores
  if len(pred_end_node_names) == 0:
    if calc_num_completed:
      return [(("-"), float("nan"), float("nan"))], 0

    return [(("-"), float("nan"), float("nan"))]

  result = [(tuple(get_path_to_root(graph, pos)[1:-1][::-1]),
             float(graph.node[pos]['score']))
            for pos in pred_end_node_names]

  filtered_result = filter(lambda x: 'SEQUENCE_END' not in x[0], result)

  s_result = sorted(filtered_result, key=lambda x: x[1], reverse=True)

  # take the top N candidates, sorted by decreasing log prob
  top_N_result = s_result[:num_top_candidates]

  nn_probs = np.exp(np.array(list(zip(*top_N_result))[1]))
  probs = nn_probs / np.sum(nn_probs)
  result_w_prob = [(path, score, prob) for (path, score), prob in
                   zip(s_result, probs)]  #NB: the length of the
  # result_w_prob will be <= N

  if calc_num_completed:
    return result_w_prob, len(pred_end_node_names)

  return result_w_prob

def create_beam_source_target_parallel_txt_files(beam_path, vocab_path,
                                                 output_pred_path, input_target_path,
                                                 output_target_path,
                                                 input_source_path,
                                                 output_source_path,
                                                 num_top_candidates):
  """
  Converts the beam.npz data from the google seq2seq inference with a beam
  decoding width of k into a pred file containing the SMILES of the top k
  candidates, with each character separated by a space, for each example in
  order. NB: currently there is no guarantee that each example will have
  exactly k candidates. Also reformats the original target and source files so
  that each candidate in the beam source file corresponds to the correct
  target and source in parallel txt format.
  :param beam_path: path to the beam.npz file
  :param vocab_path: path to the vocab file
  :param output_pred_path: path to the output pred file
  :param input_target_path: path to original target file
  :param output_target_path:  path to reformatted target file
  :param input_source_path: path to original source file
  :param output_source_path:  path to reformatted source file
  :param num_top_candidates: the number of top candidates (N) to keep for each
  example, where N <= k (beam width used in seq2seq decoder)
  :return:
  """

  def _read_vocab_from_txt(vocab_path):
    """
    Reads the vocabulary file from the vocab path into a list, and also adds
    the special tokens from the google seq2seq model
    :param vocab_path:
    :return:  a list where the index corresponds to the char
    """
    with open(vocab_path, "r") as infile:
      vocab = infile.readlines()
    vocab = [_.strip() for _ in vocab]
    vocab += ["UNK", "SEQUENCE_START", "SEQUENCE_END"]

    return vocab

  vocab = _read_vocab_from_txt(vocab_path)
  beam_data = np.load(beam_path)

  all_predicted_ids = beam_data["predicted_ids"]
  all_parent_ids = beam_data["beam_parent_ids"]
  all_scores = beam_data["scores"]

  num_examples = all_predicted_ids.shape[0]

  with open(output_pred_path, "w") as out_pred, \
      open(input_target_path, "r") as in_target, \
      open(output_target_path, "w") as out_target, \
    open(input_source_path, "r") as in_source, \
    open(output_source_path, "w") as out_source:

    for i in range(num_examples):
      decoded = decode_single_example(all_predicted_ids[i],
                                      all_parent_ids[i], all_scores[i], vocab,
                                      num_top_candidates)

      target = in_target.readline() # target for this example, includes \n
      source = in_source.readline() # source for this example, includes \n

      for candidate in decoded:
        smiles_char_string = " ".join(candidate[0])  # SMILES string with each
        # char separated by a space
        out_pred.write(smiles_char_string + "\n")
        out_target.write(target)
        out_source.write(source)

In [0]:
# functions to evaluate the predictions

def single_exact_match_score(prediction, ground_truth, canonicalize=False):
  """
  Calculates the exact match score for a single prediction and ground truth
  pair, where they represent the reactant parts of a reaction SMILES string.
  Uses InChI comparison if canonicalize
  :param prediction:  a string of tokens either separated or not by space
  delimiters
  :param ground_truth:  a string of tokens separated or not by space delimiters
  :param canonicalize:  if True, canonicalizes the prediction and ground
  truth reactant SMILES prior to the exact match comparison if both the
  prediction and ground truth are valid reactant SMILES.
  :return:  1.0 if exact match, 0.0 otherwise
  """
  if canonicalize:
    prediction_smiles = "".join(prediction.split())
    ground_truth_smiles = "".join(ground_truth.split())
    pred_mol_obj = AllChem.MolFromSmiles(prediction_smiles)
    ground_truth_mol_obj = AllChem.MolFromSmiles(ground_truth_smiles)
    if pred_mol_obj and ground_truth_mol_obj: # if either is a None obj,
  # then that corresponding SMILES could not be parsed
      prediction = AllChem.MolToInchi(pred_mol_obj)
      ground_truth = AllChem.MolToInchi(ground_truth_mol_obj)

  return float(prediction == ground_truth)

def check_valid_smiles(input_smiles):
  """
  Returns True if the input species SMILES is overall valid gramatically
  :param input_smiles: a species smiles to check (can be multiple species
  separated by .)
  :return: True if input species SMILES is valid, False if otherwise
  """
  m = AllChem.MolFromSmiles(input_smiles)
  if m:
    return True
  else:
    return False

def nested_list_to_1d_array(nested_list):
    """
    Converts a list of lists into a 1d array. Guarantees that the resulting
    array will be 1d, even if the nested list contains lists of equal length.
    Addresses the inconsistent behaviour of np.array array creation, since a
    list of lists of unequal length results in a 1d array, while a list of
    lists of equal length results in a nd array.
    :param nested_list: a list of lists
    :return: a 1d numpy array of Dim (num of nested lists,)
    """
    len_list = len(nested_list) # num of nested lists
    result = np.empty(len_list, dtype=np.object)
    result[:] = nested_list

    return result

def evaluate_beam_by_rxn_class(beam_prediction_path,
                                    beam_ground_truth_path,
                               beam_source_path, result_path):
  """
  Evaluates a prediction file and ground truth file pair in parallel text
  format by reaction class, and writes the metrics and correct/incorrect
  predictions to a results file. The reaction
  class information is contained in the test source file that was used as
  input in the inference step to obtain the prediction file.
  :param beam_prediction_path: path to the beam prediction file, resulting
  from create_beam_source_target_parallel_txt_files
  :param beam_ground_truth_path: path to the reformatted target file,
  resulting from create_beam_source_target_parallel_txt_files
  :param beam_source_path:  path to the reformatted source file, resulting from
  create_beam_source_target_parallel_txt_files
  :param result_path: path to output file
  :return:
  """
  def _read_list_from_txt(input_path):
    """
    Reads a text file containing lines of chars into a list of list of the
    elements
    :param input_path: path to input file
    :return:  a list of list of the elements
    """
    with open(input_path, 'r') as infile:
      list_of_lists = []
      for line in infile:
        list_of_lists.append([i for i in line.rstrip().split()])
    return list_of_lists

  def _evaluate_array(prediction_array, ground_truth_array,
                      test_source_array, result_path, reaction_class=None):
    """
    Evaluates a prediction array and ground truth array pair in parallel text
    format, and appends the metrics and correct/incorrect predictions to a
    results file
    :param prediction_array: 1d numpy array of lists of elements
    :param ground_truth_array:  1d numpy array of lists of elements
    :param result_path: path to output file
    :param reaction_class:  a reaction class character
    :return:
    """
    # initialize global metrics
    total_em_score = 0  # if any of the beam candidates of an example is an
    # exact match, we score that example as correct. Theoretically, the max
    # possible total_em_score is num_examples.
    total_invalid_smiles = 0
    total_valid_smiles = 0

    # lists will eventually have len num_examples, where each entry
    # corresponds to the num of em predictions for each example
    num_em_per_example_list = []

    num_examples = 0

    # initialize lists for correct/incorrect predictions
    correct = []  # preds only (ie only reactants), not whole rxn smiles
    incorrect = []

    num_candidates = prediction_array.shape[0]  # each target can have
    # multiple candidates due to beam search

    current_target = " ".join(ground_truth_array[0])  # initialize current
    # target to the first target

    # initialize current target (not candidate) metrics
    current_em = 0
    for i in range(num_candidates):
      prediction = " ".join(prediction_array[i])  # to get into suitable
      # format for the single metric calculations, which take in a string of
      # elements separated by spaces
      truth = " ".join(ground_truth_array[i])

      # count valid and invalid prediction smiles
      if check_valid_smiles("".join(prediction_array[i])):
        total_valid_smiles += 1
      else:
        total_invalid_smiles += 1

      # when the current candidate is from a different example/target,
      # update current target and metrics for the previous example/target
      if truth != current_target:
        # update global metrics with current target metrics
        total_em_score += current_em
        num_em_per_example_list.append(current_em)
        # reinitialize current target metrics
        current_em = 0
        # update the current target
        current_target = truth
        num_examples += 1

      # calculate single example metrics and update
      single_em = single_exact_match_score(prediction, truth, canonicalize=True)
      # have this as a separate variable to be used for indicate
      # correct/incorrect pred
      current_em += single_em

      # format the product source SMILES without the reaction char, so that
      # later we can formulate the reaction SMILES. Also format the pred and
      # truth SMILES
      product_smiles = "".join(test_source_array[i][1:])
      pred_smiles = "".join(prediction_array[i])
      truth_smiles = "".join(ground_truth_array[i])

      # update correct or incorrect list
      if single_em:
        correct.append((pred_smiles,
                        truth_smiles,
                        product_smiles))
      else:
        incorrect.append((pred_smiles,
                          truth_smiles,
                          product_smiles))

      # once we reach the last candidate, need to update the metrics for the
      # this example/target. #TODO(Bowen): pretty hacky. A lot of repeated
      # code from the earlier check whether truth != current_target
      if i == range(num_candidates)[-1]:
        # update global metrics with current target metrics
        total_em_score += current_em
        num_em_per_example_list.append(current_em)
        num_examples += 1

    # calculate final metrics
    num_examples_with_em = sum(x > 0 for x in num_em_per_example_list)
    average_em = num_examples_with_em / num_examples  # this is the top k accuracy

    # write result file
    with open(result_path, "a") as result_file:
      if reaction_class:
        result_file.write("###### {} ######".format(reaction_class) + "\n")
      else:
        result_file.write("###### OVERALL ######" + "\n")
      result_file.write("METRICS" + "\n")
      result_file.write("Number of examples: {}".format(num_examples) + "\n")
      result_file.write(
        "Number of candidates: {}".format(num_candidates) + "\n")
      result_file.write("Number of examples with exact matches: {}".format((
        num_examples_with_em)) + "\n")
      result_file.write("Top-k accuracy: {}".format(average_em) + "\n")

      # record valid and invalid prediction smiles
      result_file.write("Number of valid SMILES: {}".format(
        total_valid_smiles) + "\n")
      result_file.write("Number of invalid SMILES: {}".format(
        total_invalid_smiles) + "\n")

      result_file.write("\n")

      result_file.write("INCORRECT PREDICTIONS" + "\n")
      for pred, truth, prod in incorrect:
        pred_rxn_smiles = pred + ">>" + prod
        truth_rxn_smiles = truth + ">>" + prod
        result_file.write("{} {}".format(pred_rxn_smiles, truth_rxn_smiles) + "\n")
        
      result_file.write("\n")

      result_file.write("CORRECT PREDICTIONS" + "\n")
      for pred, truth, prod in correct:
        pred_rxn_smiles = pred + ">>" + prod
        truth_rxn_smiles = truth + ">>" + prod
        result_file.write("{} {}".format(pred_rxn_smiles, truth_rxn_smiles) +
                          "\n")

      result_file.write("\n\n")

  def _get_rxn_class_indices(rxn_class, test_source_array):
    """
    Given a test source array of lists, in which each list contains a
    reaction class character in the first entry, returns an array that has
    entries of True where the corresponding list in the test source array
    contains a reaction class character equal to rxn_class, and False otherwise
    :param rxn_class: a reaction class character
    :param test_source_array: a 1d numpy array of lists, where each list
    contains a reaction class character in the first entry
    :return:  a 1d numpy boolean array
    """
    index_list = []
    for list in test_source_array:
      if list[0] == rxn_class:
        index_list.append(True)
      else:
        index_list.append(False)
    index_array = np.array(index_list)

    assert index_array.shape == test_source_array.shape

    return index_array

  # read files into 1d numpy arrays. Use read_list_from_txt and then convert
  # list of lists to array because the reaction class char is a str and
  # cannot be converted into int
  prediction_array = nested_list_to_1d_array(_read_list_from_txt(
    beam_prediction_path))
  ground_truth_array = nested_list_to_1d_array(_read_list_from_txt(
    beam_ground_truth_path))
  test_source_array = nested_list_to_1d_array(_read_list_from_txt(
    beam_source_path))

  # calculate metric and print correct/incorrect examples for overall array
  _evaluate_array(prediction_array, ground_truth_array, test_source_array,
                  result_path, reaction_class=None)

  # find array of unique reaction classes from test source
  rxn_classes = list(set([element[0] for element in test_source_array]))

  # find indices for each reaction class and then create new class array.
  # Calculate metrics and print correct/incorrect examples for each reaction
  # class array
  for rxn_class in rxn_classes:
    rxn_class_indices = _get_rxn_class_indices(rxn_class, test_source_array)
    class_prediction_array = prediction_array[rxn_class_indices]
    class_ground_truth_array = ground_truth_array[rxn_class_indices]
    class_test_source_array = test_source_array[rxn_class_indices]
    _evaluate_array(class_prediction_array, class_ground_truth_array,
                    class_test_source_array, result_path, rxn_class)

In [11]:
% cd "/content/drive/My Drive/00_MY Projects and courses/reaction_prediction_seq2seq"

/content/drive/My Drive/00_MY Projects and courses/reaction_prediction_seq2seq


In [0]:
# evaluate the seq2seq predictions

# define paths
cwd = os.getcwd()
prediction_dir = os.path.join(os.path.dirname(cwd), 'predictions')
processed_data_dir = os.path.join(os.path.dirname(cwd), 'processed_data')

vocab_path = os.path.join(processed_data_dir, 'vocab')
target_path = os.path.join(processed_data_dir, 'test_targets')
source_path = os.path.join(processed_data_dir, 'test_sources')

# no beam (ie beam width = 1)
pred_path = os.path.join(prediction_dir, 'predictions_44000_steps_beam_1_test.txt')
result_path = os.path.join(cwd, 'beam_1_test_results')

evaluate_beam_by_rxn_class(pred_path, target_path, source_path, result_path)

# beams (ie beam width != 1). Write the formatted beam target and beam source files in a temp dir
with tempfile.TemporaryDirectory() as temp_dir:
  for beam_width in [3, 5, 10, 20, 50]:
    beam_path = os.path.join(prediction_dir, 'beams_44000steps_' + str(beam_width) + '_test.npz')
    processed_pred_path = os.path.join(temp_dir, 'pred_beam_' + str(beam_width))
    processed_target_path = os.path.join(temp_dir, 'target_beam_' + str(beam_width))
    processed_source_path = os.path.join(temp_dir, 'source_beam_' + str(beam_width))
    result_path = os.path.join(cwd, 'beam_' + str(beam_width) + '_test_results')
    
    # reformat beam files
    create_beam_source_target_parallel_txt_files(beam_path, vocab_path,
                                               processed_pred_path,
                                               target_path,
                                               processed_target_path,
                                               source_path,
                                               processed_source_path, beam_width)
    # evaluate reformatted beam predictions
    evaluate_beam_by_rxn_class(processed_pred_path, processed_target_path, processed_source_path, result_path)

FileNotFoundError: ignored