In [1]:
# Fix up pvs conversion
from pathlib import Path
import pandas as pd
import collections
import numpy as np

from coprover import DATA_ROOT, RSC_ROOT, PROJ_ROOT
from coprover.feats.featurize_cmdpred import *

In [2]:
cmdpred_df = pd.read_csv(Path(RSC_ROOT, "pvs_cmd_pred", "data","cmdpred_N3.pvslib.tsv.gz"), sep='\t', names=['sequent', 'command', 'cmd_history', 'uri', 'depth'])
#Qcmdpred_df = pd.read_csv(Path(PROJ_ROOT, "results","cmdpred_N3.pvslib.tsv.gz"), sep='\t', names=['sequent', 'command', 'cmd_history', 'uri', 'depth'])                         

In [3]:
cmdpred_df

Unnamed: 0,sequent,command,cmd_history,uri,depth
0,<ANT> <CONS> s-formula forall variable None va...,skosimp*,"NOOP,NOOP,NOOP",vect2_cont_comp-proofs/comp_rr_vr_cont#0,0
1,<ANT> <CONS> s-formula apply constant None app...,typepred,"NOOP,NOOP,skosimp*",vect2_cont_comp-proofs/comp_rr_vr_cont#1,1
2,<ANT> s-formula apply constant None apply cons...,typepred,"NOOP,skosimp*,typepred",vect2_cont_comp-proofs/comp_rr_vr_cont#2,2
3,<ANT> s-formula apply constant None apply cons...,expand,"skosimp*,typepred,typepred",vect2_cont_comp-proofs/comp_rr_vr_cont#3,3
4,<ANT> s-formula apply constant None apply cons...,expand,"typepred,typepred,expand",vect2_cont_comp-proofs/comp_rr_vr_cont#4,4
...,...,...,...,...,...
167070,<ANT> s-formula apply constant None apply cons...,typepred,"typepred,skosimp,replace",inverse_image_Union-proofs/inverse_image_Union#21,9
167071,<ANT> s-formula apply constant None apply cons...,expand,"skosimp,replace,typepred",inverse_image_Union-proofs/inverse_image_Union#22,10
167072,<ANT> s-formula apply constant None apply cons...,expand,"replace,typepred,expand",inverse_image_Union-proofs/inverse_image_Union#23,11
167073,<ANT> s-formula apply constant None apply cons...,expand,"typepred,expand,expand",inverse_image_Union-proofs/inverse_image_Union#24,12


In [4]:
# Identify proofs by name
start_state = None
last_state = None
last_proofname = None

def get_proofname(uri):
    return uri.split('#', 1)[0]


proofnames = [get_proofname(row.uri) for idx, row in cmdpred_df.iterrows()]
cmdpred_df['proofname'] = proofnames
grp_obj = cmdpred_df.groupby('proofname')
proofnames = list(grp_obj.groups.keys())
print(f"Total unique proofs={len(proofnames)}")

Total unique proofs=10855


In [5]:
"""
For each proof, we segment it into distinct branches of the proof trace.  Note that for now, each branch is considered its own distinctive sequence, and we do not attach it to its parents.  This is due to the fact that the first branch is the main proof, and the rest are TCCs or obligations that are required for completion.  This may be subject to change.

Note that this does require the command histories be rewritten, which is performed here.

TODO: Consider if we should move this into the main code
"""
def sanity_episode(episode):
    prev_depth = -1
    for row in episode:
        assert(row.depth >= prev_depth)
        prev_depth = row.depth
    return True

def get_episodes(proofname):
    # Break a proof down into smaller distinct episodes (with monotonic depth) based on depth
    episodes = []
    curr_episode = []
    prev_depth = -1
    cmd_window = collections.deque([NOOP_CMD for _ in range(3)])
    for idx, row in grp_obj.get_group(proofname).iterrows():
        if row.depth <= prev_depth:
            # Splinter off previous and start new episode
            episodes.append(curr_episode)
            curr_episode = []
            cmd_window = collections.deque([NOOP_CMD for _ in range(3)])
        row.cmd_history = ",".join(cmd_window)
        prev_depth = row.depth
        curr_episode.append(row)
        cmd_window.append(row.command)
        cmd_window.popleft()
    episodes.append(curr_episode)
    for episode in episodes:
        sanity_episode(episode)
    return episodes

In [10]:
PROOFNAME = "proofname"
STATE = "source_text"
LABEL = "target_text"
POS = "p"
NEG = "n"
CMD_HISTORY = "cmd_history"

class CTuple:
    """ Comparison tuple"""
    def __init__(self, proofname, row1, row2):
        self.row1, self.row2 = row1, row2
        assert str(self.row1) != str(self.row2)
        assert self.row1.depth != self.row2.depth
        self.proofname = proofname
        if row1.depth < row2.depth:
            self.label = POS
        else:
            self.label = NEG
        self.row1, self.row2 = row1, row2

    def __str__(self, str):
        return self.proofname
    
    def _statestr(self, add_cmd_hist=True):
        if add_cmd_hist:
            cmdhist_str = self.row2.cmd_history.replace(",", " ")
        else:
            cmdhist_str = " "
        return "{} {} {}".format(cmdhist_str, self.row1.sequent, self.row2.sequent).strip()  # First naive formulation
    
    def as_row(self):
        return {
            PROOFNAME: self.proofname,
            STATE: self._statestr(),
            CMD_HISTORY: self.row2.cmd_history.replace(",", " "),
            LABEL: self.label
        }

In [14]:
rng = np.random.default_rng(343)
MIN_EP_LEN = 2
SAMPLES_PER_EP = 3

exp_tuples = []

for proofname in tqdm(proofnames):
    episodes = get_episodes(proofname)
    for eidx, episode in enumerate(episodes):
        episode_name = f"{proofname}#{eidx}"
        # Get a list of lower depth rows
        if len(episode) > MIN_EP_LEN:
            row1_idxes = rng.permutation(range(len(episode) - 1))[0:SAMPLES_PER_EP]
            for row1_idx in row1_idxes:
                row1 = episode[row1_idx]
                row2 = episode[row1_idx + 1]
                # Sample a positive and a negative
                exp_tuples.append(CTuple(episode_name, row1, row2))
                exp_tuples.append(CTuple(episode_name, row2, row1))
                

100%|██████████████████████████████████████████████████████████████████████████████████| 10855/10855 [01:41<00:00, 106.80it/s]


In [15]:
label_hist = collections.Counter([t.label for t in exp_tuples])
print(label_hist)

Counter({'p': 54673, 'n': 54673})


In [16]:
inst_df = pd.DataFrame([t.as_row() for t in exp_tuples])
inst_df.to_csv("compare_pred.v1.csv.gz", header=True)