##### Copyright 2023 Google LLC.

Licensed under the Apache License, Version 2.0 (the "License");

In [None]:
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Utility decision diagrams: Details

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/google-research/r_u_sure/r_u_sure/notebooks/Utility_decision_diagrams_details.ipynb)

This notebook shows some of the details involved in the full utility functions used in the main paper, and explains how to interpret them.

For an introduction to the concepts discussed here, see the companion notebook [`Utility_decision_diagrams_intro.ipynb`](https://colab.research.google.com/github/google-research/r_u_sure/r_u_sure/notebooks/Utility_decision_diagrams_intro.ipynb).

## Setup

### Installation

To run this notebook, you need a Python environment with `r_u_sure` installed. 

If you are running this from Colab, you can install it by running the following command:

In [None]:
try:
  import r_u_sure
except ImportError:
  try:
    import google.colab
    in_colab = True
  except ImportError:
    in_colab = False
  
  if in_colab:
    print("Installing r_u_sure from GitHub...")
    !pip install "r_u_sure @ git+https://github.com/google-research/r_u_sure"
  else:
    # Don't install in this case, to avoid messing up the python environment.
    print("WARNING: Not running in Colab and r_u_sure not found. "
          "Please install r_u_sure following the instructions in the README.")
    raise

### Imports

In [None]:
import textwrap

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from IPython import display

In [None]:
from r_u_sure.wrappers import parser_tools

from r_u_sure.tree_structure import packed_sequence_nodes
from r_u_sure.tree_structure import sequence_node_helpers
from r_u_sure.tree_structure import transforms

from r_u_sure.decision_diagrams import gated_state_dag
from r_u_sure.decision_diagrams import packed_dags

from r_u_sure.edit_distance_utility import region_decisions
from r_u_sure.edit_distance_utility import edit_dags
from r_u_sure.edit_distance_utility import constraint_dags
from r_u_sure.decision_diagrams import consistent_path_dual_solver

# from r_u_sure.wrappers import uncertainty_regions_wrapper

from r_u_sure.rendering import render_in_notebook

## Parsing source code into trees

For this notebook, we'll focus on the edit localization task with a few small hand-written programs.

In [None]:
example_programs = [
    """\
    print("hello world")
    def my_function_A(x):
      if not x:
        print(1)
    """,
    """\
    print("hello world")
    def my_function_B(x):
      if x is None:
        print(1)
    """,
    """\
    print("hello world")
    def my_function_C(x):
      if not x:
        print(2)
    """
]
example_programs = [textwrap.dedent(program) for program in example_programs]

The first step is to parse our model samples into trees using our pseudoparser.

In [None]:
parser_helper = parser_tools.ParserHelper(language="python")

In [None]:
parsed_programs = [
    parser_helper.parse_to_nodes(source_string)
    for source_string in example_programs
]

We can visualize the parsed tree structure to see what the parser produces. Note that our parser doesn't exactly correspond to the full language AST, and focuses on matching brackets, splitting statements (for example, by unbracketed newlines in Python) and (in Python) grouping indents.

In [None]:
for i, parsed_program in enumerate(parsed_programs):
  print(f"==== Example {i} ====")
  print(sequence_node_helpers.render_debug(parsed_program))
  print("\n\n")

We will designate one model sample as the prototype. Before building our utility function, we need to specify what the set of possible annotations is. We do this by inserting special "control nodes" into the tree, which do not correspond to any output but instead denote locations where we need to make a decision during the search process.

Here, as a demonstration, we insert region start/end nodes and also early exit nodes, which corresponds to searching over UNSURE annotations and also optionally choosing a truncation point.

In [None]:
suggestion_prototype = parsed_programs[0]
suggestion_prototype = transforms.insert_region_options_around_subsequences(
    suggestion_prototype,
    allow_empty_regions=False,  # This is usually True, but makes the graphs larger.
    node_filter=parser_tools.allow_regions_around_pseudoparse_node,
)
suggestion_prototype = transforms.insert_early_exit(suggestion_prototype)

In [None]:
print(f"==== Prototype ====")
print(sequence_node_helpers.render_debug(suggestion_prototype))

We also need to specify how much of the sequence is the shared context (i.e. what the user already wrote) and where in the sequence the suggestion starts. Let's imagine that the user has just written `def`, and the rest of each program is a suggestion.

We handle this by first parsing the full program, and then deleting any node that comes before the user's "cursor position". This ensures that the result still respects the AST structure of the full program.

In [None]:
cursor_position = example_programs[0].find("def") + 4

parsed_programs_truncated = [
    transforms.truncate_prefix_at_offset(program, cursor_position)
    for program in parsed_programs
]
suggestion_prototype_truncated = transforms.truncate_prefix_at_offset(
    suggestion_prototype, cursor_position)

In [None]:
# Render just the first one, to show how the truncation works:
print(f"==== Target 0 ====")
print(sequence_node_helpers.render_debug(parsed_programs_truncated[0]))

This representation is convenient to build and render, but not very fast to iterate over and difficult to index into. The next step is to convert it into a "packed" representation, which consists of a collection of flat arrays of indices.

In [None]:
packed_targets = [
    parser_tools.pack_sequence_from_pseudoparser(
        sequence, with_numba=True
    )
    for sequence in parsed_programs_truncated
]
packed_prototype = parser_tools.pack_sequence_from_pseudoparser(
    suggestion_prototype_truncated, with_numba=True
)

In [None]:
# Some of the packed contents:
for i, item in enumerate(packed_prototype.preorder_traversal[:30]):
  print(f"preorder_traversal[{i}] == {item}")
  if item.category == packed_sequence_nodes.PackedSequenceNodeCategory.GROUP_NODE:
    print(f"  group_nodes[{item.index_in_category}] == {packed_prototype.group_nodes[item.index_in_category]}")
  elif item.category == packed_sequence_nodes.PackedSequenceNodeCategory.TEXT_TOKEN_NODE:
    print(f"  text_token_nodes[{item.index_in_category}] == {packed_prototype.text_token_nodes[item.index_in_category]}")
  elif item.category == packed_sequence_nodes.PackedSequenceNodeCategory.TEXT_DECORATION_NODE:
    print(f"  text_decoration_nodes[{item.index_in_category}] == {packed_prototype.text_decoration_nodes[item.index_in_category]}")

print("...")

## Constructing the edit DAG

An "edit DAG" is a decision diagram that encodes the utility (actually represented as a cost) of a suggestion based on how easy it is to modify into the (hypothetical) target code.

Before building an edit DAG, we need to first configure the utilities and costs associated with different types of prediction. In general, we award positive utility (negative cost) when suggesting things that are correct, and incur negative utility (positive cost) when suggesting things that are wrong.

In [None]:
utility_config = edit_dags.make_character_count_cost_config(
    # Utilities and costs per character in the suggestion.
    high_confidence_match_utility_per_char=1.0,
    high_confidence_delete_cost_per_char=1.0,
    low_confidence_match_utility_per_char=0.7,
    low_confidence_delete_cost_per_char=0.3,
    # Costs per character of the target we insert. Usually zero.
    insert_cost_per_char=0.0,
    # Cost for starting an edit. These costs also apply for inserts,
    # and ensure that locations of inserts are flagged as UNSURE as well.
    low_confidence_region_cost=0.75,
    high_confidence_start_editing_cost=5.0,
    low_confidence_start_editing_cost=2.5,
)

To speed up construction, we first set up a JIT-compiled version of the graph builder logic and specialize it to our configuration. The graph building logic can also run outside of JIT, but it's slower. (It will do the actual compilation the first time this is called.)

In [None]:
construct_edit_dag = edit_dags.make_edit_dag_builder(
    utility_config, with_numba=True
)

We can now build the decision diagram (called a DAG in our implementation) by providing our suggestion prototype along with a particular target.

In [None]:
example_target = packed_targets[1]
dag, render_data = construct_edit_dag(prototype=packed_prototype, target=example_target)

Our DAG can be rendered in Colab to show the details of its structure. For efficiency reasons, we only render the edges and nodes that can be reached from the starting and ending node. Try scrolling to zoom in and clicking and dragging to move.

A few things to notice:

- There are a number of named states at each position. These are used to track the current editing mode and confidence level.
- Edges between these states are sometimes labeled with decisions that must be made in order to take those edges. These decisions correspond to region start/end points, region membership, and early exiting.
- When we decide to truncate a suggestion, there is an edge that jumps straight to the final state.
- The nested blue boxes represent recursively matched subtrees.

In [None]:
annotator = edit_dags.EditDagGraphAnnotator(
    prototype=packed_prototype,
    target=example_target,
    render_data=render_data,
    render_config=edit_dags.EditDagRenderConfig(),
)
render_in_notebook.render_dag_in_notebook(
    gated_state_dag.prune_to_reachable(dag),
    annotator,
    hover_for_info=False,
    pan_and_zoom=True,
    max_width="100%",
)

The next step is to remove unreachable nodes and "pack" this DAG into a more efficient form. Packing transforms the list of edges into a decision-ordered representation, which divides states and edges into layers that affect different variables. We JIT-compile this operation as well.

(This packing step is usually the bulk of the runtime of the method. We have prioritized flexibility in the decision diagram implementation by making the states Python objects, which means we must do a lot of Python dictionary lookups and state comparisions while building the packed version.)

In [None]:
pack_edit_dag, _ = edit_dags.make_specialized_dag_packer()

In [None]:
reachable_dag = gated_state_dag.prune_to_reachable_jit(dag)
packed_dag, conversion_data = pack_edit_dag(reachable_dag)

We can repeat the process for the full set of targets:

In [None]:
packed_edit_dags_and_conversion_data = []

for target in packed_targets:
  dag, render_data = construct_edit_dag(prototype=packed_prototype, target=target)
  reachable_dag = gated_state_dag.prune_to_reachable_jit(dag)
  packed_dag, conversion_data = pack_edit_dag(reachable_dag)
  packed_edit_dags_and_conversion_data.append((packed_dag, conversion_data))

## Constructing the constraint DAG

The edit DAG ensures that subtrees match one-to-one, but it doesn't constrain where regions start or end. For this purpose, we also build a "constraint DAG" that depends only on the prototype.

The constraint DAG has the property that all complete paths from the initial to the final node correspond to valid sets of decisions. Here validity means that the corresponding uncertainty regions obey certain constraints that are designed to ensure that the uncertainty regions are semantically sensible, e.g. that they do not begin outside a particular sub-tree of the AST and then end within that sub-tree.

All of the edges in this DAG have cost zero.

In [None]:
construct_constraint_dag = constraint_dags.make_constraint_dag_builder(
    with_numba=True)

In [None]:
dag = construct_constraint_dag(prototype=packed_prototype)

We can visualize this DAG as well:

In [None]:
annotator = constraint_dags.ConstraintDagGraphAnnotator(
    prototype=packed_prototype,
    render_config=constraint_dags.ConstraintDagRenderConfig(),
)
render_in_notebook.render_dag_in_notebook(
    gated_state_dag.prune_to_reachable(dag),
    annotator,
    hover_for_info=False,
    pan_and_zoom=True,
    max_width="100%",
)

And pack it:

In [None]:
pack_constraint_dag, _ = constraint_dags.make_specialized_dag_packer()

In [None]:
reachable_dag = gated_state_dag.prune_to_reachable_jit(dag)
packed_constraint_dag_and_conversion_data = pack_constraint_dag(reachable_dag)

## Building and solving the combined system

All of these diagrams have a compatible set of decisions, so we can combine them all into a single system. We have one edit DAG per target, and a single constraint DAG as well.

In [None]:
all_dags_and_conversion_data = (
    packed_edit_dags_and_conversion_data +
    [packed_constraint_dag_and_conversion_data]
)

In [None]:
system = consistent_path_dual_solver.make_system(all_dags_and_conversion_data)

Dual decomposition then updates these penalties one at a time to increase agreement between the subproblems. Although most of the progress is made in the first sweep, combining complex programs may require multiple passes to fully optimize the bound.

In [None]:
opt_results = consistent_path_dual_solver.solve_system_with_sweeps(system.data)

In [None]:
(assignment_vector, cost_of_assignment) = consistent_path_dual_solver.greedy_extract(
    system.data,
    consistent_path_dual_solver.SweepDirection.FORWARD)

assignment = consistent_path_dual_solver.assignments_from_assignment_vector(
    system, assignment_vector)

In [None]:
_, axs = plt.subplots(1, 3, figsize=(15, 5))

# Invert costs to show utilities.
axs[0].plot(
    np.arange(opt_results.objective_at_step.shape[0]),
    opt_results.objective_at_step,
)
axs[0].set_xlabel("Solver iteration")
axs[0].set_ylabel("Dual bound")

axs[1].plot(opt_results.variable_at_step, opt_results.objective_at_step)
axs[1].set_xlabel("Index of updated variable")
axs[1].set_ylabel("Dual bound")

axs[2].plot(opt_results.time_at_sweep, opt_results.objective_at_sweep)
axs[2].set_xlabel("Wall clock time (seconds)")
axs[2].set_ylabel("Dual bound")

axs[0].axhline(y=cost_of_assignment, color="gray", dashes=[1, 1])
axs[1].axhline(y=cost_of_assignment, color="gray", dashes=[1, 1])
axs[2].axhline(y=cost_of_assignment, color="gray", dashes=[1, 1])

plt.tight_layout()

## Extracting the solution

Our solver outputs the solution as a mapping from decision keys to their respective values.

In [None]:
assignment

Passing this assignment mapping into our various rendering helper functions allows us to reconstruct the actual suggestion. Here, the chosen solution has two uncertainty regions and also is truncated before a disagreement point.

In [None]:
display.HTML(region_decisions.render_regions_to_html(
    prototype=packed_prototype,
    assignments=assignment
))

In [None]:
for i, (target, (packed_edit_dag, conversion_data)) in enumerate(
    zip(packed_targets, packed_edit_dags_and_conversion_data)
):
  path, _ = packed_dags.constrained_best_path(
      packed_edit_dag, conversion_data,
      constraints=assignment
  )
  print(f"==== Target {i} ====")
  display.display(display.HTML(edit_dags.extract_edit_sequence_html(
      path,
      prototype=packed_prototype,
      target=target,
      prefix=example_programs[0][:cursor_position],
  )))
  # ፠ denotes the start of an edit, which corresponds to the
  # high_confidence_start_editing_cost and
  # low_confidence_start_editing_cost in the utility configuration.

## Comparing to ground truth

When we know the ground truth target state (e.g. from our test set), we can use similar machinery to score our suggestion.

As before, we convert the ground truth target into a packed tree representation, as if it was a target.

In [None]:
example_ground_truth = textwrap.dedent(
    """\
    print("hello world")
    def my_function_from_ground_truth(x, y):
      if x or y:
        print(x + y)
    """
)

In [None]:
ground_truth_parsed = transforms.truncate_prefix_at_offset(
    parser_helper.parse_to_nodes(example_ground_truth),
    cursor_position)
packed_ground_truth = parser_tools.pack_sequence_from_pseudoparser(
    ground_truth_parsed, with_numba=True)

We construct the edit DAG, which allows us to compute the edit distance between the suggestion and the ground truth.

In [None]:
dag, render_data = construct_edit_dag(prototype=packed_prototype, target=packed_ground_truth)

In [None]:
reachable_dag = gated_state_dag.prune_to_reachable_jit(dag)
packed_dag, conversion_data = pack_edit_dag(reachable_dag)

Now we can find the shortest path in this DAG, but constrain it to be consistent with the assignment we computed already. This tells us the edit distance between the specific optimized suggestion and the true intent.

In [None]:
path, _ = packed_dags.constrained_best_path(
    packed_dag, conversion_data,
    constraints=assignment
)

In [None]:
print(sum(edge.cost for edge in path))

We can visualize the inferred edit sequence, and compute various metrics.

In [None]:
display.display(display.HTML(edit_dags.extract_edit_sequence_html(
    path,
    prototype=packed_prototype,
    target=packed_ground_truth,
    prefix=example_programs[0][:cursor_position],
)))

In [None]:
edit_dags.extract_edit_summary_metrics(
    path=path, prototype=packed_prototype, target=packed_ground_truth
)