# Communicating with Lean

<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Communicating-with-Lean" data-toc-modified-id="Communicating-with-Lean-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Communicating with Lean</a></span><ul class="toc-item"><li><span><a href="#Lean-Server" data-toc-modified-id="Lean-Server-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Lean Server</a></span></li><li><span><a href="#Testing-tactics-with-the-Lean-Server" data-toc-modified-id="Testing-tactics-with-the-Lean-Server-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>Testing tactics with the Lean Server</a></span><ul class="toc-item"><li><span><a href="#Speed-and-robustness" data-toc-modified-id="Speed-and-robustness-1.2.1"><span class="toc-item-num">1.2.1&nbsp;&nbsp;</span>Speed and robustness</a></span></li><li><span><a href="#Stepping-through-a-full-proof" data-toc-modified-id="Stepping-through-a-full-proof-1.2.2"><span class="toc-item-num">1.2.2&nbsp;&nbsp;</span>Stepping through a full proof</a></span></li></ul></li><li><span><a href="#Creating-a-simple-Lean-solver-(with-breath-first-search)-in-Python" data-toc-modified-id="Creating-a-simple-Lean-solver-(with-breath-first-search)-in-Python-1.3"><span class="toc-item-num">1.3&nbsp;&nbsp;</span>Creating a simple Lean solver (with breath first search) in Python</a></span></li><li><span><a href="#Custom-parsing-of-Lean-expressions" data-toc-modified-id="Custom-parsing-of-Lean-expressions-1.4"><span class="toc-item-num">1.4&nbsp;&nbsp;</span>Custom parsing of Lean expressions</a></span></li><li><span><a href="#TODOs" data-toc-modified-id="TODOs-1.5"><span class="toc-item-num">1.5&nbsp;&nbsp;</span>TODOs</a></span></li><li><span><a href="#What-I-want-from-Lean-4" data-toc-modified-id="What-I-want-from-Lean-4-1.6"><span class="toc-item-num">1.6&nbsp;&nbsp;</span>What I want from Lean 4</a></span></li><li><span><a href="#Playing-around" data-toc-modified-id="Playing-around-1.7"><span class="toc-item-num">1.7&nbsp;&nbsp;</span>Playing around</a></span></li></ul></li></ul></div>

This is a hacky prototype showing how one can use Lean's server for communication between Lean and another program.  We have in mind implementing something like HOList for Lean. The main idea is that theorem proving is like a puzzle similar to the Rubik cube.  One has states and actions one can perform to move to other states.  This states are goals (or lists of goals) and the actions are tactics.  

What we build here is an interface where one says to Lean "apply this tactic to this goal" and Lean says whether that tactic succeeds and if so what the new list of goals are.  It is written in a Python notebook, but the ideas transcend languages.

In [1]:
import subprocess
import tempfile
import json
from pprint import pprint
import time
from datetime import datetime
import collections
from IPython.display import display

## Lean Server
This class starts up a Lean server and communicates with it.  There are a number of requests one can make to the Lean server, but we only need one or two.  To understand the Lean server, it is important to know it is not like a REPL, but more like the LSP.  It is designed to partial compile and comment on files as they are being written.  The use here is more of a hack (and it is unfortunately quite a bit slow we will see).

One communicates to Lean by sending JSON requests and getting back responses.  We only need one or two request types for our purposes.  The first request we need is a "sync" request.  It sends a "file" to Lean to be synced.  (The file doesn't actually need to exist.)  The second request, an "info" request, asks Lean to provide more information about particular places in the code.  Unfortunately, the responses to the requests don't actually provide a lot of direct answers.  Especially for the "sync" request, one has to wait for an "all_messages" request to provide answers.  (And even then, it is difficult to determine that the response you got is up-to-date with the current Lean file.  We've employed a lot of tricks to make sure we are getting the most up-to-date information from Lean.)  Our class will store any "all_messages" responses it sees into a queue for latter processing.

Ultimately, the "info" requests turned out not to be a reliable as we hoped, so we just use the "sync" request and only use the "info" request to prevent blocking of the channel.

In [2]:
class LeanServer:
    """
    Open up a Lean server and communicate with it.  
    
    Right now it only has support for the "sync" and "info" requests (and their)
    corresponding responses.  It does however, store all "all_messages" response
    into a queue for later processing.
    """
    
    def __init__(self):
        self.cntr = -1
        self.cntr2 = -1
        self.all_messages = []
        self.all_messages_time = -1
        self.current_tasks = []
        self.current_tasks_time = -1
        self.proc = subprocess.Popen(['lean', '--server'], 
                    universal_newlines=True, 
                    stdin=subprocess.PIPE, # pipe STDIN and STDOUT to send and receive messages
                    stdout=subprocess.PIPE, 
                    #stderr=subprocess.PIPE
                )
    
    # make into a context manager so that it closes lean server automatically
    def __enter__(self):
        self.proc.__enter__()
        return self
    
    def __exit__(self, type, value, traceback):
        self.proc.__exit__(type, value, traceback)
    
    def seq_num(self):
        self.cntr += 1
        return self.cntr
    
    def send_request(self, request, expected_response, verbose=False, sleep=0.0):
        seq_num = self.seq_num()
        request1 = request.copy()
        request1['seq_num'] = seq_num
        j = json.dumps(request1)
        
        # TODO: Use logging instead
        if verbose:
            print()
            print("=>:", datetime.now().strftime('%H:%M:%S.%f'))
            pprint(j)
            print()
        
        # send
        print(j, file=self.proc.stdin, flush=True)
        
        # wait for response
        time.sleep(sleep)
        while True:
            self.cntr2 += 1
            raw_output = self.proc.stdout.readline()
            output = json.loads(raw_output)
            if verbose:
                print("<=:", datetime.now().strftime('%H:%M:%S.%f'))
                pprint(output)
                print()
            if 'response' in output:
                if output['response'] == expected_response and output['seq_num'] == seq_num:
                    return output
                # TODO: Handle error differently.  It means the JSON is bad
                elif output['response'] == 'error':
                    return output
                # record messages since they may point to errors in the lean code
                elif output['response'] == 'all_messages':
                    self.all_messages = output['msgs']
                    self.all_messages_time = self.cntr2
                # record tasks to know when Lean has stopped processing file
                elif output['response'] == 'current_tasks':
                    self.current_tasks = output['tasks']
                    self.current_tasks_time = self.cntr2
    
    def send_sync_request(self, file_name, content, verbose=False):
        request = {
            'command':'sync',
            'file_name': file_name,
            "content": content,
        }
        
        return self.send_request(request, expected_response='ok', verbose=verbose, sleep=0.0)
    
    def send_info_request(self, file_name, line, column, verbose=False):
        request = {            
            'command':'info',
            'file_name': file_name,
            'column': column,
            'line': line
        }
        
        return self.send_request(request, expected_response='ok', verbose=verbose, sleep=0.0)

In [3]:
# sync file (the "file" doesn't need to exist)
with LeanServer() as lean_server:
    response1 = lean_server.send_sync_request(
        file_name='/tmp/fake_lean_file.lean', 
        content='example (p: Prop) : p = p := begin sorry end')
    time.sleep(1)
    response2 = lean_server.send_info_request(
        file_name='/tmp/fake_lean_file.lean', 
        line=1, 
        column=0)
    
    # the responses themselves aren't always useful
    display(response1)
    display(response2)
    
    # the "all_messages" responses are more useful
    display(lean_server.all_messages)

{'message': 'file invalidated', 'response': 'ok', 'seq_num': 0}

{'response': 'ok', 'seq_num': 1}

[{'caption': '',
  'file_name': '/tmp/fake_lean_file.lean',
  'pos_col': 0,
  'pos_line': 1,
  'text': "declaration '[anonymous]' uses sorry"}]

In much of the code, use the optional parameter `verbose=True` to see the message traffic.

In [4]:
# sync file (the "file" doesn't need to exist)
with LeanServer() as lean_server:
    lean_server.send_sync_request(
        file_name='/tmp/fake_lean_file.lean', 
        content='example (p: Prop) : p = p := begin sorry end',
        verbose=True)
    lean_server.send_info_request(
        file_name='/tmp/fake_lean_file.lean', 
        line=1,
        column=0,
        verbose=True)


=>: 20:39:04.576699
('{"content": "example (p: Prop) : p = p := begin sorry end", "file_name": '
 '"/tmp/fake_lean_file.lean", "seq_num": 0, "command": "sync"}')

<=: 20:39:04.821563
{'is_running': False, 'response': 'current_tasks', 'tasks': []}

<=: 20:39:05.023426
{'is_running': False, 'response': 'current_tasks', 'tasks': []}

<=: 20:39:05.152391
{'message': 'file invalidated', 'response': 'ok', 'seq_num': 0}


=>: 20:39:05.152593
('{"file_name": "/tmp/fake_lean_file.lean", "column": 0, "seq_num": 1, "line": '
 '1, "command": "info"}')

<=: 20:39:05.153266
{'response': 'ok', 'seq_num': 1}



## Testing tactics with the Lean Server

Again, tactic-based theorem proving is like a puzzle.  You go from state to state exploring.  What we need is a way to test if we can use a tactic on a goal and what the next goals are.  This is what the LeanTacticTestingInterface does.  (Note, it is much faster to ask the Lean server about multiple goal-tactic pairs instead of just one at a time.)

In [5]:
# For storing information about a Lean goal
LeanGoal = collections.namedtuple('LeanGoal', ['universes', 'assumptions', 'conclusion'])

In [307]:
class LeanTacticTestingInterface:
    """
    Interface to test a tactic on a goal (or multiple tactics on multiple goals)
    """
    def __init__(self):
        self.lean_server = LeanServer()
        
        self.msg_cntr = 0
        self.file_name = 'dummy.lean'   # the name of the "file" being checked (not a real file)
        self.dummy_file = 'dummy.lean'  # a file always kept empty 
        self.eval_line_pos = None       # used for checking that a file is complete
        
        # set up a dummy file for later use
        self.lean_server.send_sync_request(file_name=self.dummy_file, content="")
        
        # one can implement their own custom state parser.  This is the default.
        self.custom_parser = r"""
def join (sep : string) : list string → string
| [x]     := x
| []      := ""
| (x::xs) := x ++ sep ++ join xs

meta def expr_to_string (e : expr bool.tt) : tactic string :=
do
  tactic.set_options (options.mk.set_bool `pp.all tt),
  f ← tactic.pp e,
  --let f := e.to_raw_fmt,
  return $ to_string f
  

meta def local_cxt_to_string (v : expr bool.tt) : tactic string := 
do 
  tp ← tactic.infer_type v,
  v_str ← expr_to_string v,
  tp_str ← expr_to_string tp,
  return $ v_str ++ "\nn" ++ tp_str

meta def goal_to_string (g : expr) : tactic string :=
do 
  tactic.set_goals [g],
  goal ← tactic.target,
  local_cxt ← tactic.local_context,
  let local_cxt_len := list.length local_cxt,
  goal_str ← expr_to_string goal,
  local_cxt_strs ← (list.mmap local_cxt_to_string local_cxt),
  let s1 := goal_str ++ "\nn",
  let s2 := "Local Context Vars: " ++ (to_string local_cxt_len) ++ "\nn",
  let s3 := join "\nn" local_cxt_strs,
  return $ s1 ++ s2 ++ s3

meta def custom_parse_state : tactic string :=
do 
 gs ← tactic.get_goals,
 -- loop over all goals (has effect of resetting the goal each time)
 let gs_len := list.length gs,
 goal_strings ← gs.mmap goal_to_string,
 tactic.set_goals gs,  -- set goals back
 let s := "Goals: " ++ (to_string gs_len) ++ "\nn" ++ (join "\nn" goal_strings),
 return s
 
meta def trace_custom_state : tactic unit :=
do 
 s ← custom_parse_state,
 tactic.trace s,
 return ()
"""
    
     # make into a context manager so that it closes lean server automatically
    def __enter__(self):
        self.lean_server.__enter__()
        return self
    
    def __exit__(self, type, value, traceback):
        self.lean_server.__exit__(type, value, traceback)
        
    def parse_state(self, universes, goal_state):
        lines = iter(goal_state.split("\n\n"))
        
        goal_cnt = int(next(lines).split()[-1])
        
        goals = []
        for _ in range(goal_cnt):
            target = next(lines)
            assump_cnt = int(next(lines).split()[-1])
            assumptions = []
            for _ in range(assump_cnt):
                v = next(lines)
                tp = next(lines)
                assumptions.append(v + " : " + tp)
            goal = LeanGoal(universes, assumptions, target)
            goals.append(goal)

        return goals
    
    def check_content(self, content, verbose):
        """
        Have Lean check the contents of a Lean "file". 
        """
        self.msg_cntr += 1
        
        # empty out the previous file for good measure
        self.lean_server.send_sync_request(file_name=self.file_name, content="", verbose=verbose)
        
        # use a few different file names to help avoid old data in the messages
        self.file_name = 'fake_lean_file{}.lean'.format(self.msg_cntr % 10)
        
        # put an eval at the end to make sure the response is not from an old file
        extended_content = content + "\n#eval " + str(self.msg_cntr)
        self.eval_line_pos = extended_content.count("\n") + 1
        
        if verbose:
            print("-- {} --------------".format(self.file_name))
            for i, l in enumerate(extended_content.split('\n')):
                print("{:3}   {}".format(i+1, l))
            print("-------------------------")
            
        # send the file
        self.lean_server.send_sync_request(file_name=self.file_name, content=extended_content, verbose=verbose)
        
    def get_messages(self, verbose):
        """
        Get messages for the current content being checked.
        Return all the messages for that "file"
        
        This is really really hard to make robust.  How do I know Lean has finished
        or worst that Lean isn't still providing information from an old
        version of the file?
        """
        
        # wait for tasks to stop running
        tasks_stopped_time = None
        for _ in range(10000):  # lazy time-out mechanism
            
            # keep sending requests to a dummy file to collect more "all_messages" responses
            self.lean_server.send_info_request(file_name=self.dummy_file, line=1, column=0, verbose=verbose)
            
            # check that tasks are complete for file_name
            task_running = False
            for task in self.lean_server.current_tasks:
                if task['file_name'] == self.file_name:
                    task_running = True
                    break
            if not task_running:
                tasks_stopped_time = self.lean_server.current_tasks_time
                break
        else:
            raise TimeoutError
            
                
        # check that messages are complete by looking at eval at the end
        for _ in range(10000):  # lazy time-out mechanism
            
            # keep sending requests to a dummy file to collect more "all_messages" responses
            self.lean_server.send_info_request(self.dummy_file, line=1, column=0, verbose=verbose)
            
            # wait until messages are after current tasks
            #if self.lean_server.all_messages_time < tasks_stopped_time:
            #    continue
                
            # filter the messages to those for this "file" 
            # and also check for the final the eval statement
            filtered_messages = []
            messages_are_complete = False
            for msg in self.lean_server.all_messages:
                if msg['file_name'] == self.file_name:
                    if msg['pos_line'] < self.eval_line_pos:
                        filtered_messages.append(msg)
                    elif msg['pos_line'] == self.eval_line_pos and msg['caption'] == "eval result":
                        if msg['text'] == str(self.msg_cntr):
                            messages_are_complete = True
                        else:
                            break

            # clear messages
            self.lean_server.all_messages = []
            
            if messages_are_complete:
                return filtered_messages
            
        raise TimeoutError
    
    def preamble(self):
        return self.custom_parser + r"""
        
meta def trace_custom_state : tactic unit :=
do 
 s ← custom_parse_state,
 tactic.trace s,
 return ()
 
set_option pp.all true  --more helpful output

"""
        
    def example_block(self, goal, tactic):
        block = ""
        # line: preamble + 9 * n + 0
        block += "section example_block\n"
        # line: preamble + 9 * n + 1
        if goal.universes:
            block += "universes " + " ".join(goal.universes) + "\n"
        else:
            block += "--no universes\n"
        # line: preamble + 9 * n + 2  
        block += "example "
        block += " ".join("(" + l + ")" for l in goal.assumptions)
        block += " : "
        block += goal.conclusion + " :=\n"
        # line: preamble + 9 * n + 3
        block += "begin\n"  
        # line: preamble + 9 * n + 4
        block += tactic + ",\n"
        # line: preamble + 9 * n + 5
        block += 'trace_custom_state,\n' # custom parsed goal state
        # line: preamble + 9 * n + 6
        block += 'trace_state,\n'        # pretty printed goal state
        # line: preamble + 9 * n + 7
        block += "end\n"
        # line: preamble + 9 * n + 8
        block += "end example_block\n"

        return block
            
    def apply_tactic_multiple(self, lean_goals, tactics, verbose=False):
        assert len(lean_goals) == len(tactics)
        
        # build content
        preamble = self.preamble()
        
        # this is the line (indexed from 1) that the first block will be on
        preamble_length = preamble.count("\n") + 1
        
        content = preamble
        for goal, tactic in zip(lean_goals, tactics):
            content += self.example_block(goal, tactic)
        
        # start checking the file
        self.check_content(content, verbose=verbose)
            
        # positions in the block
        theorem_line = 2
        tactic_line = 4
        custom_state_line = 5
        goals_line = 6
        block_length = 9
        
        for _ in range(1000):  # lazy timeout method
            # start checking the file
            msgs = self.get_messages(verbose=verbose)
        
            # extract data from messages
            results = [[None, None, [], []] for _ in lean_goals]
            for msg in msgs:
                block_line_pos = (msg['pos_line'] - preamble_length) % block_length
                block_num = (msg['pos_line'] - preamble_length) // block_length

                if msg['pos_line'] < preamble_length:
                    continue
                elif block_line_pos == goals_line and msg['severity'] == 'information':
                    results[block_num][0] = msg['text']
                elif block_line_pos == custom_state_line and msg['severity'] == 'information':
                    results[block_num][1] = msg['text']
                elif block_line_pos == tactic_line and msg['severity'] == 'error':
                    results[block_num][2].append(msg['text'])
                elif block_line_pos == theorem_line and msg['severity'] == 'error':
                    results[block_num][3].append(msg['text'])

            # parse data and check for completeness
            final_results = []
            complete = True
            for i, ((goal_state, custom_state, tactic_errors, theorem_errors), prev_goal) in enumerate(zip(results, lean_goals)):
                if theorem_errors:
                    result = ('theorem_error', theorem_errors, '')
                elif tactic_errors:
                    result = ('tactic_failure', tactic_errors, '')
                elif goal_state is not None and custom_state is not None:
                    result = ('success', self.parse_state(prev_goal.universes, custom_state), custom_state.strip())
                else:
                    complete = False  # need to get messages again
                    break

                final_results.append(result)

            if complete:
                return final_results
        
        raise TimeoutError
        
    def apply_tactic(self, lean_goal, tactic, verbose=False):
        """
        Apply tactic to a goal and see what you get.
        """
        
        result = self.apply_tactic_multiple([lean_goal], [tactic], verbose=verbose)
        return result[0]

In [308]:
with LeanTacticTestingInterface() as lean_interface:
    goal = LeanGoal([], ['p q : Prop'],  'p = p')
    display(lean_interface.apply_tactic(goal, 'refl'))

('success', [], 'Goals: 0')

In [312]:
with LeanTacticTestingInterface() as lean_interface:
    goal = LeanGoal(['u', 'v'], ['a b: Type u'],  'a = a')
    display(lean_interface.apply_tactic(goal, 'refl'))

('success', [], 'Goals: 0')

To see what is going on, we show the logs with `verbose=True`.  This also prints the file being sent to Lean.  

The main idea is that we encode everything we want to check into a Lean file, send that via a "sync" request, and read off the response from the messages.

In [313]:
with LeanTacticTestingInterface() as lean_interface:
    goal = LeanGoal(['u', 'v'], ['a b: Type u'],  'a = a')
    display(lean_interface.apply_tactic(goal, 'refl', verbose=True))


=>: 10:43:37.665250
'{"content": "", "file_name": "dummy.lean", "seq_num": 1, "command": "sync"}'

<=: 10:43:37.689380
{'message': 'file unchanged', 'response': 'ok', 'seq_num': 1}

-- fake_lean_file1.lean --------------
  1   import data.complex.basic
  2   def join (sep : string) : list string → string
  3   | [x]     := x
  4   | []      := ""
  5   | (x::xs) := x ++ sep ++ join xs
  6   
  7   meta def local_cxt_to_string (v : expr bool.tt) : tactic string := 
  8   do 
  9     tp ← tactic.infer_type v,
 10     tactic.set_options (options.mk.set_bool `pp.all tt),
 11     vf ← tactic.pp v,
 12     tpf  ← tactic.pp tp,
 13     return $ (to_string vf) ++ "\n" ++ (to_string tpf)
 14   
 15   meta def goal_to_string (g : expr) : tactic string :=
 16   do 
 17     tactic.set_goals [g],
 18     goal ← tactic.target,
 19     local_cxt ← tactic.local_context,
 20     let local_cxt_len := list.length local_cxt,
 21     tactic.set_options (options.mk.set_bool `pp.all tt),
 22     goal_str ← 

('success', [], 'Goals: 0')

In [311]:
# lots of examples
with LeanTacticTestingInterface() as lean_interface:
    goal1 = LeanGoal([], ['p q : Prop'],  'p = p')
    display(lean_interface.apply_tactic(goal1, 'refl'))
    display(lean_interface.apply_tactic(goal1, 'skip'))
    display(lean_interface.apply_tactic(goal1, 'done'))
    display(lean_interface.apply_tactic(goal1, 'abc123'))
    goal2 = LeanGoal([], ['n m : nat'],  'n = n')
    display(lean_interface.apply_tactic(goal2, 'induction n'))
    goal3 = LeanGoal([], [], 'abc = xyz')
    display(lean_interface.apply_tactic(goal3, 'skip'))
    display(lean_interface.apply_tactic(goal3, 'fgh'))

('success', [], 'Goals: 0')

StopIteration: 

In [11]:
# examples of executing in batch
with LeanTacticTestingInterface() as lean_interface:
    goal1 = LeanGoal([], ['p q : Prop'],  'p = p')
    goal2 = LeanGoal([], ['n m : nat'],  'n = n')
    goal3 = LeanGoal([], [], 'abc = xyz')
    
    goals = [goal1, goal1, goal1, goal1, goal2, goal3, goal3]
    tactics = ['refl', 'skip', 'done', 'abc123', 'induction n', 'skip', 'fgh']
    
    display(lean_interface.apply_tactic_multiple(goals, tactics))

[('success', [], ''),
 ('success',
  [LeanGoal(universes=[], assumptions=['p q : Prop'], conclusion='@eq.{1} Prop p p')],
  ''),
 ('tactic_failure',
  ['done tactic failed, there are unsolved goals\nstate:\np q : Prop\n⊢ @eq.{1} Prop p p'],
  ''),
 ('tactic_failure',
  ["unknown identifier 'abc123'",
   "don't know how to synthesize placeholder\ncontext:\np q : Prop\n⊢ Type ?l_1"],
  ''),
 ('success',
  [LeanGoal(universes=[], assumptions=['m : nat'], conclusion='@eq.{1} nat nat.zero nat.zero'),
   LeanGoal(universes=[], assumptions=['m n_n : nat', 'n_ih : @eq.{1} nat n_n n_n'], conclusion='@eq.{1} nat (nat.succ n_n) (nat.succ n_n)')],
  ''),
 ('theorem_error',
  ["unknown identifier 'abc'",
   "unknown identifier 'xyz'",
   "don't know how to synthesize placeholder\ncontext:\n⊢ Sort ?l_1"],
  ''),
 ('theorem_error',
  ["unknown identifier 'abc'",
   "unknown identifier 'xyz'",
   "don't know how to synthesize placeholder\ncontext:\n⊢ Sort ?l_1"],
  '')]

### Speed and robustness
Unfortunately, this is quit slow.

In [12]:
# time it
total_runs = 50
total_time = 0
goal = LeanGoal([], [],  'true')
with LeanTacticTestingInterface() as lean_interface:
    # give the system a chance to "warm up"
    lean_interface.apply_tactic(goal, 'refl')
    time.sleep(2)
    
    for i in range(total_runs):
        a = time.time()
        response = lean_interface.apply_tactic(goal, 'refl')
        b = time.time()
        total_time += b - a
"Each call to the Lean interface takes on average {} seconds".format(total_time / total_runs)

'Each call to the Lean interface takes on average 0.10722775936126709 seconds'

But sending multiple calls to Lean speeds things up.

In [13]:
# time running multiple goal-tactics at a time 
total_runs = 50
batch_size = 32
total_time = 0
goals = [LeanGoal([], [],  'true')] * batch_size
tactics = ['refl'] * batch_size
with LeanTacticTestingInterface() as lean_interface:
    # give the system a chance to "warm up"
    lean_interface.apply_tactic(LeanGoal([], [],  'true'), 'refl')
    time.sleep(2)
    
    for i in range(total_runs):
        a = time.time()
        response = lean_interface.apply_tactic_multiple(goals, tactics)
        b = time.time()
        total_time += b - a
display("Each call to the Lean interface takes on average {} seconds".format(total_time / total_runs))
display("Each goal/tactic pair (w/ batch size {}) takes on average {} seconds".format(batch_size, total_time / total_runs / batch_size))

'Each call to the Lean interface takes on average 0.2728519582748413 seconds'

'Each goal/tactic pair (w/ batch size 32) takes on average 0.008526623696088791 seconds'

It is very difficult to make this process robust. If one is not careful, the results returned by `apply_tactic_multiple` could be for a previous call to the method.  There are ways to get around this, but if one is not careful they would significantly increase Lean's memory usage over time.  (For example, using a new file name for every "sync" request would be bad, since Lean caches the results of each file.)  This is a test that `apply_tactic_multiple` is robust.

In [14]:
# test robustness.  Make sure we are getting correct results back 
# and not results for a previous request.
rounds = 100
with LeanTacticTestingInterface() as lean_interface:
    for r in range(rounds):
        count = rounds - r
        goals = [LeanGoal([], ['p_{}_{} : Prop'.format(r, i)],  'p_{}_{}'.format(r, i)) for i in range(count)]
        
        if r % 3 == 0:
            tactics = ['skip' for _ in range(count)]
            results = lean_interface.apply_tactic_multiple(goals, tactics)
            assert len(results) == count
            for i, (response, new_goals, _) in enumerate(results):
                assert response == 'success'
                assert new_goals[0].conclusion == 'p_{}_{}'.format(r, i)
        else:
            tactics = ['simp' for _ in range(count)]
            results = lean_interface.apply_tactic_multiple(goals, tactics)
            assert len(results) == count
            for i, (response, _, _) in enumerate(results):
                assert response == 'tactic_failure', response
"Passed tests"

'Passed tests'

### Stepping through a full proof

In [15]:
with LeanTacticTestingInterface() as lean_interface:
    display(lean_interface.apply_tactic(LeanGoal([], ['m n : nat'], "m + n = n + m"), "induction n"))

('success',
 [LeanGoal(universes=[], assumptions=['m : nat'], conclusion='@eq.{1} nat (@has_add.add.{0} nat nat.has_add m nat.zero) (@has_add.add.{0} nat nat.has_add nat.zero m)'),
  LeanGoal(universes=[], assumptions=['m n_n : nat', 'n_ih : @eq.{1} nat (@has_add.add.{0} nat nat.has_add m n_n) (@has_add.add.{0} nat nat.has_add n_n m)'], conclusion='@eq.{1} nat (@has_add.add.{0} nat nat.has_add m (nat.succ n_n)) (@has_add.add.{0} nat nat.has_add (nat.succ n_n) m)')],
 '')

In [16]:
with LeanTacticTestingInterface() as lean_interface:
    display(lean_interface.apply_tactic(
        LeanGoal([], ['m n_n : ℕ','n_ih : m + n_n = n_n + m'], 'm + nat.succ n_n = nat.succ n_n + m'), 
        "induction m"))

('success',
 [LeanGoal(universes=[], assumptions=['n_n : nat', 'n_ih : @eq.{1} nat (@has_add.add.{0} nat nat.has_add nat.zero n_n) (@has_add.add.{0} nat nat.has_add n_n nat.zero)'], conclusion='@eq.{1} nat (@has_add.add.{0} nat nat.has_add nat.zero (nat.succ n_n))'),
  LeanGoal(universes=[], assumptions=['n_n m_n : nat'], conclusion='@eq.{1} nat (@has_add.add.{0} nat nat.has_add (nat.succ m_n) (nat.succ n_n))')],
 '')

In [17]:
with LeanTacticTestingInterface() as lean_interface:
    display(lean_interface.apply_tactic(
        LeanGoal([], ['n_n : ℕ','n_ih : 0 + n_n = n_n + 0'],
         '0 + nat.succ n_n = nat.succ n_n + 0'), 
        "simp"))

('success', [], '')

In [18]:
with LeanTacticTestingInterface() as lean_interface:
    display(lean_interface.apply_tactic(
        LeanGoal([], ['n_n m_n : ℕ',
         'm_ih : m_n + n_n = n_n + m_n → m_n + nat.succ n_n = nat.succ n_n + m_n',
         'n_ih : nat.succ m_n + n_n = n_n + nat.succ m_n'],
         'nat.succ m_n + nat.succ n_n = nat.succ n_n + nat.succ m_n'), 
        "simp"))

('success', [], '')

## Creating a simple Lean solver (with breath first search) in Python

Here we put all of this together to create (a non-practical) BFS prover.  Some highlights:
- Some tactics like `apply` take a parameter.  We search the local context for all possible things to put in this parameter.  Note, we do no type checking before sending it to Lean.  The user inputs the tactic as `apply {}` to signify that something needs to be filled in from the context.
- Our setup assumes the tactic is only applied to the first goal.  This isn't true of all tactics, so we force this by "treeifying" our proof, that is adding brackets `{ ... }` where necessary.
- Sending multiple calls to Lean at the same time makes it much faster.  This is the only difference between the "fast" version.
- We provide a way to verify a proof in the end (sending the whole proof as if it was a single tactic), and also walking through the proof to get information (which might be useful for machine-learning-like training).
- This is just a proof-of-concept and not very useful in itself.

In [19]:
def premises_from_goal(goal):
    prems = []
    for l in goal.assumptions:
        a, _ = l.split(":")
        for v in a.strip().split(" "):
            prems.append(v)
    return prems

def tactic_applications(tactics, prems):
    tacs = []
    for t in tactics:
        if "{}" in t:
            for p in prems:
                tacs.append(t.replace("{}", p))
        else:
            tacs.append(t)
    return tacs

def treeify_proof(proof_list):
    s = ""
    goal_cnts = [1]  # open goal counts at each depth
    for tactic, new_goal_cnt in proof_list:
        while True:
            # take goals
            goal_cnt = goal_cnts.pop()

            # if no goals at this depth, end block and take off stack again
            if goal_cnt == 0:
                s += ", }"
                continue
            
            if goal_cnt == 1:
                if s: # don't put comma in front
                    s += ", "
                
            # if more than one goal, take one of them, start block and put rest back on stack
            if goal_cnt > 1:
                goal_cnts.append(goal_cnt - 1)
                s += ", { "
            
            break
            
        # apply tactic
        s += tactic
        
        # put remaining goals on stack
        goal_cnts.append(new_goal_cnt)
            
    return s 

class BreathFirstProofSearch(LeanTacticTestingInterface):
        
    def proof_search(self, lean_goal, tactics, verbose=False):
        starting_goal = lean_goal
        starting_goal_list = (starting_goal,)
        empty_tactics = tuple()
        goal_list_queue = collections.deque()
        goal_list_queue.append((starting_goal_list, empty_tactics))

        while goal_list_queue:
            goal_list, ts = goal_list_queue.popleft()
            goal = goal_list[0]
            prems = premises_from_goal(goal)
            all_tactics = tactic_applications(tactics, prems)
            for t in all_tactics:
                response, new_goals, _ = self.apply_tactic(goal, t, verbose=verbose)
                if response == 'success':
                    new_goal_list = tuple(new_goals) + goal_list[1:]
                    new_ts = ts + ((t, len(new_goals)),)
                    if not new_goal_list:
                        return treeify_proof(new_ts)

                    goal_list_queue.append((new_goal_list, new_ts))
        return None
    
    def proof_search_fast(self, lean_goal, tactics, verbose=False):
        starting_goal = lean_goal
        starting_goal_list = (starting_goal,)
        empty_tactics = tuple()
        goal_list_queue = collections.deque()
        goal_list_queue.append((starting_goal_list, empty_tactics))

        while goal_list_queue:
            goal_list, ts = goal_list_queue.popleft()
            goal = goal_list[0]
            prems = premises_from_goal(goal)
            all_tactics = tactic_applications(tactics, prems)
            
            # faster to batch process all apply_tactic calls
            results = self.apply_tactic_multiple([goal] * len(all_tactics), all_tactics, verbose=verbose)
            for t, (response, new_goals, _) in zip(all_tactics, results):
                if response == 'success':
                    new_goal_list = tuple(new_goals) + goal_list[1:]
                    new_ts = ts + ((t, len(new_goals)),)
                    if not new_goal_list:
                        return treeify_proof(new_ts)

                    goal_list_queue.append((new_goal_list, new_ts))
        return None
    
    def is_valid_proof(self, goal, proof, verbose=False):
        t = proof.replace("\n", "")  # treat proof as a single tactic
        response, goals, _ = self.apply_tactic(goal, t, verbose=verbose)
        if response == 'success' and goals == []:
            return True
        return goals
    
    def walk_through_proof(self, lean_goal, proof):
        ts = proof.replace(" {", "").replace("}, ", "").split(", ")
        goals = [lean_goal]
        for t in ts:
            goal = goals[0]
            print(" Primary Goal:  ", goal)
            print(" Tactic applied:", t)
            _, new_goals, _ = self.apply_tactic(goal, t)
            goals = new_goals + goals[1:]
            print(" New Goals: ", new_goals)
            print(" All Goals: ", goals)
            print()

In [20]:
def example(goal, tactics, verbose=False):
    with BreathFirstProofSearch() as bfs:
        print("Goal:", goal)
        print("Tactics:", tactics)
        print()
        start = time.time()
        proof = bfs.proof_search_fast(goal, tactics, verbose=verbose)
        end = time.time()
        print("Time to proof:", end-start, "seconds")
        print("Proof:", proof)
        print()
        print("Verified:", bfs.is_valid_proof(goal, proof, verbose=verbose))
        print()
        print("Walk through proof:")
        bfs.walk_through_proof(goal, proof)

In [21]:
example(
    goal = LeanGoal([], ['p q r : Prop'], 'p ∧ q → q ∧ p'), 
    tactics=['apply {}', 'cases {}', 'intro', 'split', 'left', 'right'],
)

Goal: LeanGoal(universes=[], assumptions=['p q r : Prop'], conclusion='p ∧ q → q ∧ p')
Tactics: ['apply {}', 'cases {}', 'intro', 'split', 'left', 'right']

Time to proof: 2.8006479740142822 seconds
Proof: intro, cases a, split, { apply a_right, }, apply a_left

Verified: True

Walk through proof:
 Primary Goal:   LeanGoal(universes=[], assumptions=['p q r : Prop'], conclusion='p ∧ q → q ∧ p')
 Tactic applied: intro
 New Goals:  [LeanGoal(universes=[], assumptions=['p q r : Prop', 'a : and p q'], conclusion='and q p')]
 All Goals:  [LeanGoal(universes=[], assumptions=['p q r : Prop', 'a : and p q'], conclusion='and q p')]

 Primary Goal:   LeanGoal(universes=[], assumptions=['p q r : Prop', 'a : and p q'], conclusion='and q p')
 Tactic applied: cases a
 New Goals:  [LeanGoal(universes=[], assumptions=['p q r : Prop', 'a_left : p', 'a_right : q'], conclusion='and q p')]
 All Goals:  [LeanGoal(universes=[], assumptions=['p q r : Prop', 'a_left : p', 'a_right : q'], conclusion='and q p')]

In [22]:
example(
    goal = LeanGoal([], ['p q r : Prop'], '(p -> (q -> r)) <-> (p /\ q -> r)'), 
    tactics=['apply {}', 'cases {}', 'intro', 'split', 'left', 'right']
)

Goal: LeanGoal(universes=[], assumptions=['p q r : Prop'], conclusion='(p -> (q -> r)) <-> (p /\\ q -> r)')
Tactics: ['apply {}', 'cases {}', 'intro', 'split', 'left', 'right']

Time to proof: 7.008030891418457 seconds
Proof: split, { intro, intro, cases a_1, apply a, { apply a_1_left, }, apply a_1_right, }, intro, intro, intro, apply a, split, { apply a_1, }, apply a_2

Verified: True

Walk through proof:
 Primary Goal:   LeanGoal(universes=[], assumptions=['p q r : Prop'], conclusion='(p -> (q -> r)) <-> (p /\\ q -> r)')
 Tactic applied: split
 New Goals:  [LeanGoal(universes=[], assumptions=['p q r : Prop'], conclusion='(p → q → r) → and p q → r'), LeanGoal(universes=[], assumptions=['p q r : Prop'], conclusion='(and p q → r) → p → q → r')]
 All Goals:  [LeanGoal(universes=[], assumptions=['p q r : Prop'], conclusion='(p → q → r) → and p q → r'), LeanGoal(universes=[], assumptions=['p q r : Prop'], conclusion='(and p q → r) → p → q → r')]

 Primary Goal:   LeanGoal(universes=[], ass

In [23]:
example(
    goal = LeanGoal([], ['p q r : Prop'], '¬(p ↔ ¬p)'), 
    tactics=['apply {}', 'cases {}', 'intro', 'split', 'left', 'right']
)

Goal: LeanGoal(universes=[], assumptions=['p q r : Prop'], conclusion='¬(p ↔ ¬p)')
Tactics: ['apply {}', 'cases {}', 'intro', 'split', 'left', 'right']

Time to proof: 38.70181894302368 seconds
Proof: intro, cases a, apply a_mp, { apply a_mpr, intro, apply a_mp, { apply a, }, apply a, }, apply a_mpr, intro, apply a_mp, { apply a, }, apply a

Verified: True

Walk through proof:
 Primary Goal:   LeanGoal(universes=[], assumptions=['p q r : Prop'], conclusion='¬(p ↔ ¬p)')
 Tactic applied: intro
 New Goals:  [LeanGoal(universes=[], assumptions=['p q r : Prop', 'a : iff p (not p)'], conclusion='false')]
 All Goals:  [LeanGoal(universes=[], assumptions=['p q r : Prop', 'a : iff p (not p)'], conclusion='false')]

 Primary Goal:   LeanGoal(universes=[], assumptions=['p q r : Prop', 'a : iff p (not p)'], conclusion='false')
 Tactic applied: cases a
 New Goals:  [LeanGoal(universes=[], assumptions=['p q r : Prop', 'a_mp : p → not p', 'a_mpr : not p → p'], conclusion='false')]
 All Goals:  [LeanG

## Custom parsing of Lean expressions

Our `apply_tactic` will return a list of goals, parsed like `LeanGoal(['m n : nat'], "m + n = n + m")`.  This is easy to read by a human and easy to input back into Lean, but it isn't very machine parsable and moreover, it hides a lot of information.  It is however possible to have `apply_tactic` output other information about the goals.  

For example, here we right a simple piece of code which gets all the goals (both target and local context), fetches the types of the local context and prints them using a slightly more verbose and informative format which is still, nonetheless, valid Lean code.

With more work, we could reformat this however we like.

In [24]:
simple_custom_parser = r"""
meta def goal_to_string (g : expr) : tactic string :=
do 
  tactic.set_goals [g],
  goal ← tactic.target,
  local_cxt ← tactic.local_context,
  local_cxt_types ← list.mmap tactic.infer_type local_cxt,
  let s := "Goal: " ++ (to_string goal) ++ "\nLocal Context: "++ (to_string local_cxt) ++ "\nLocal Context Types: " ++ (to_string local_cxt_types) ++ "\n",
  return s

meta def custom_parse_state : tactic string :=
do 
 gs ← tactic.get_goals,
 -- loop over all goals (has effect of resetting the goal each time)
 goal_strings ← gs.mmap goal_to_string,
 tactic.set_goals gs,  -- set goals back
 let s := (to_string goal_strings),
 return s
"""

In [25]:
with LeanTacticTestingInterface() as lean_interface:
    lean_interface.custom_parser = simple_custom_parser
    _, _, custom_msg = (lean_interface.apply_tactic(LeanGoal([], ['m n : nat'], "m + n = n + m"), "induction n"))
print(custom_msg)

[Goal: eq.{1} nat (has_add.add.{0} nat nat.has_add m nat.zero) (has_add.add.{0} nat nat.has_add nat.zero m)
Local Context: [m]
Local Context Types: [nat]
, Goal: eq.{1} nat (has_add.add.{0} nat nat.has_add m (nat.succ n_n)) (has_add.add.{0} nat nat.has_add (nat.succ n_n) m)
Local Context: [m, n_n, n_ih]
Local Context Types: [nat, nat, eq.{1} nat (has_add.add.{0} nat nat.has_add m n_n) (has_add.add.{0} nat nat.has_add n_n m)]
]


Compare this too what Lean prints.  This is the same information (except we don't print the goal name).
```
2 goals
case nat.zero
m : ℕ
⊢ m + 0 = 0 + m
 
case nat.succ
m n_n : ℕ,
n_ih : m + n_n = n_n + m
⊢ m + nat.succ n_n = nat.succ n_n + m
```

Now our print out is still Lean code, which is both good and bad. It is good since it can be inputted back into Lean.  It however doesn't fully capture the abstract syntax tree (AST) of the expressions involved.  For that, it would be good to write code which serializes the AST of each expression into say s-expressions, JSON, or something else.  

Below is an example serializing Lean into s-expressions which capture the syntax tree for the most common occurrences of the `expr` type.  (Also, see Lewis and Wu [interface between Mathematica and Lean](https://robertylewis.com/leanmm/) for an example of parsing Lean expressions into Mathematica code.)

In [26]:
lean_to_mathematica_parser = r"""
def join (sep : string) : list string → string
| [x]     := x
| []      := ""
| (x::xs) := x ++ sep ++ join xs

def escapec : char → string
| '\"' := "\\\""
| '\t' := "\\t"
| '\n' := "\\n"
| '\\' := "\\\\"
| c    := string.singleton c

def escape (s : string) : string :=
s.fold "" (λ s c, s ++ escapec c)

def wrap_in_parens (ss : list string) : string := "( " ++ (join " " ss) ++ " )"

meta def form_of_string (s : string) : string := "\"" ++ (escape s) ++ "\""

meta def form_of_nat (n : nat) : string := to_string n

meta def form_of_unsigned (n : unsigned) : string := to_string n

meta def form_of_name : name → string
| name.anonymous         := wrap_in_parens ["name.anonymous"]
| (name.mk_string s nm)  := wrap_in_parens ["name.mk_string", form_of_string s, form_of_name nm]
| (name.mk_numeral i nm) := wrap_in_parens ["name.mk_numeral", form_of_unsigned i, form_of_name nm]

meta def form_of_lvl : level → string
| level.zero         := wrap_in_parens ["level.zero"]
| (level.succ l)     := wrap_in_parens ["level.succ", form_of_lvl l]
| (level.max l1 l2)  := wrap_in_parens ["level.max", form_of_lvl l1, form_of_lvl l2]
| (level.imax l1 l2) := wrap_in_parens ["level.imax", form_of_lvl l1, form_of_lvl l2]
| (level.param nm)   := wrap_in_parens ["level.param", form_of_name nm]
| (level.mvar nm)    := wrap_in_parens ["level.mvar", form_of_name nm]

meta def form_of_lvl_list : list level → string
| []       := wrap_in_parens ["list.nil"]
| (h :: t) := wrap_in_parens ["list.cons", form_of_lvl h, form_of_lvl_list t]

meta def form_of_binder_info : binder_info → string
| binder_info.default             := wrap_in_parens ["binder_info.default"]
| binder_info.implicit            := wrap_in_parens ["binder_info.implicit"]
| binder_info.strict_implicit     := wrap_in_parens ["binder_info.strict_implicit"]
| binder_info.inst_implicit       := wrap_in_parens ["binder_info.inst_implicit"]
| other                           := wrap_in_parens ["<other>"]

meta def form_of_expr : expr → string
| (expr.var i)                     := wrap_in_parens ["expr.var", (format.to_string (to_fmt i) options.mk)]
| (expr.sort lvl)                  := wrap_in_parens ["expr.sort", form_of_lvl lvl]
| (expr.const nm lvls)             := wrap_in_parens ["expr.const", form_of_name nm, form_of_lvl_list lvls]
| (expr.mvar nm nm' tp)            := wrap_in_parens ["expr.mvar", form_of_name nm, form_of_name nm',form_of_expr tp]
| (expr.local_const nm ppnm bi tp) := wrap_in_parens ["expr.local_const", form_of_name nm, form_of_name ppnm, form_of_binder_info bi, form_of_expr tp]
| (expr.app f e)                   := wrap_in_parens ["expr.app", form_of_expr f, form_of_expr e]
| (expr.lam nm bi tp bod)          := wrap_in_parens ["expr.lam", form_of_name nm, form_of_binder_info bi, form_of_expr tp, form_of_expr bod]
| (expr.pi nm bi tp bod)           := wrap_in_parens ["expr.pi", form_of_name nm, form_of_binder_info bi, form_of_expr tp, form_of_expr bod]
| (expr.elet nm tp val bod)        := wrap_in_parens ["<expr.elet>"]
| (expr.macro mdf mlst)            := wrap_in_parens ["<expr.macro>"]

meta def custom_parse_state : tactic string :=
do 
 goal ← tactic.target,
 return (form_of_expr goal)
"""

In [27]:
with LeanTacticTestingInterface() as lean_interface:
    lean_interface.custom_parser = lean_to_mathematica_parser
    _, _, custom_msg = (lean_interface.apply_tactic(LeanGoal([], ['m n : nat'], "m + n = n + m"), "skip"))
print(custom_msg)

( expr.app ( expr.app ( expr.app ( expr.const ( name.mk_string "eq" ( name.anonymous ) ) ( list.cons ( level.succ ( level.zero ) ) ( list.nil ) ) ) ( expr.const ( name.mk_string "nat" ( name.anonymous ) ) ( list.nil ) ) ) ( expr.app ( expr.app ( expr.app ( expr.app ( expr.const ( name.mk_string "add" ( name.mk_string "has_add" ( name.anonymous ) ) ) ( list.cons ( level.zero ) ( list.nil ) ) ) ( expr.const ( name.mk_string "nat" ( name.anonymous ) ) ( list.nil ) ) ) ( expr.const ( name.mk_string "has_add" ( name.mk_string "nat" ( name.anonymous ) ) ) ( list.nil ) ) ) ( expr.local_const ( name.mk_numeral 678 ( name.mk_numeral 13 ( name.mk_string "_fresh" ( name.mk_numeral 0 ( name.anonymous ) ) ) ) ) ( name.mk_string "m" ( name.anonymous ) ) ( binder_info.default ) ( expr.const ( name.mk_numeral 1 ( name.anonymous ) ) ( list.nil ) ) ) ) ( expr.local_const ( name.mk_numeral 679 ( name.mk_numeral 13 ( name.mk_string "_fresh" ( name.mk_numeral 0 ( name.anonymous ) ) ) ) ) ( name.mk_string "

The above output is effectively just the AST for the expression `m + n = n + m`.

## TODOs

- Code:
    - Is there a way to make this faster?
    - Is there a way to make this more robust?  (It works for now, but still feels rickety.)
    - Make this production level code.  It is just a proof of concept and still quite hacky.
- Functionality
    - How should one enter definitions and notation?
    - Can we even enter all Lean theorems this way (assuming there definitions and notation are stored in mathlib)?
      - A next step would be to get a list of all theorems and see if we can apply the `skip` tactic to them.
    - How should one enter premises?  The easiest way is to add them as assumptions to the goal.
    - What about imports, etc?
    - We only apply tactics to a single goal and not a goal stack

## What I want from Lean 4

- Either a REPL like interface which is fast,
- or a way to quickly in a few milliseconds call Lean for the text of a small file and get a direct (and unambiguous) response to my request.
- Better documentation of the communication interface
- Better documentation of the tactic commands

## Playing around

In [4]:
import pandas as pd
import numpy as np

In [15]:
df = pd.read_csv('../lean/lean_for_machine_learning_prototype/data/thm_dump2-2020-02-03.csv')
df['universes'] = df['universes_raw'].str.replace("[", "").str.replace("]", "").str.replace(",", "")
df

Unnamed: 0.1,Unnamed: 0,expr,expr_after_deserializing,name,state_after_intros,state_after_refl,state_after_set_goal,thm_stmt,universes_raw,universes
0,0,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",heq.elim,"α : Sort u,\na : α,\np : α → Sort v,\nb : α,\n...","α : Sort u,\na : α,\np : α → Sort v,\nb : α,\n...",⊢ Π {α : Sort u} {a : α} {p : α → Sort v} {b :...,Π {α : Sort u} {a : α} {p : α → Sort v} {b : α...,"[u, v]",u v
1,1,"( expr.pi ( name.mk_string ""s"" ( name.anonymou...","( expr.pi ( name.mk_string ""s"" ( name.anonymou...",string.mk_iterator.equations._eqn_1,"α : Sort u,\na : α,\np : α → Sort v,\nb : α,\n...",no goals,"α : Sort u,\na : α,\np : α → Sort v,\nb : α,\n...","∀ (s : list.{0} char), @eq.{1} string.iterator...",[],
2,2,"( expr.pi ( name.mk_string ""c"" ( name.anonymou...","( expr.pi ( name.mk_string ""c"" ( name.anonymou...",ordering.ite_eq_gt_distrib,"c : Prop,\n_inst_1 : decidable c,\na b : order...","c : Prop,\n_inst_1 : decidable c,\na b : order...",⊢ ∀ (c : Prop) [_inst_1 : decidable c] (a b : ...,∀ (c : Prop) [_inst_1 : decidable c] (a b : or...,[],
3,3,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",combinator.K.equations._eqn_1,"c : Prop,\n_inst_1 : decidable c,\na b : order...",no goals,"c : Prop,\n_inst_1 : decidable c,\na b : order...","∀ {α : Type u₁} {β : Type u₂} (a : α) (b : β),...","[u₁, u₂]",u₁ u₂
4,4,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",list.mem_append,"α : Type u,\na : α,\ns t : list.{u} α\n⊢ iff\n...","α : Type u,\na : α,\ns t : list.{u} α\n⊢ iff\n...","⊢ ∀ {α : Type u} {a : α} {s t : list.{u} α},\n...","∀ {α : Type u} {a : α} {s t : list.{u} α},\n ...",[u],u
5,5,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",list.has_append.equations._eqn_1,"α : Type u,\na : α,\ns t : list.{u} α,\nα_1 : ...",no goals,"α : Type u,\na : α,\ns t : list.{u} α\n⊢ ∀ {α ...","∀ {α : Type u},\n @eq.{u+1} (has_append.{u} (...",[u],u
6,6,( expr.app ( expr.app ( expr.app ( expr.const ...,,_private.4170278853.escapec.equations._eqn_4,,,,@eq.{1} string\n (escapec\n (char.of_nat\...,[],
7,7,"( expr.pi ( name.mk_string ""b"" ( name.anonymou...","( expr.pi ( name.mk_string ""b"" ( name.anonymou...",eq_ff_of_not_eq_tt,"b : bool,\na : not (@eq.{1} bool b bool.tt)\n⊢...","b : bool,\na : not (@eq.{1} bool b bool.tt)\n⊢...","⊢ ∀ {b : bool}, not (@eq.{1} bool b bool.tt) →...","∀ {b : bool}, not (@eq.{1} bool b bool.tt) → @...",[],
8,8,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",psigma.sizeof._main.equations._eqn_1,"b : bool,\na : not (@eq.{1} bool b bool.tt),\n...",no goals,"b : bool,\na : not (@eq.{1} bool b bool.tt)\n⊢...",∀ {α : Type u} {β : α → Type v} [_inst_1 : has...,"[u, v]",u v
9,9,( expr.app ( expr.app ( expr.app ( expr.const ...,( expr.app ( expr.app ( expr.app ( expr.const ...,and.symm.equations._eqn_1,"⊢ @eq.{0} (∀ {a b : Prop}, and a b → and b a) ...",no goals,"⊢ @eq.{0} (∀ {a b : Prop}, and a b → and b a) ...","@eq.{0} (∀ {a b : Prop}, and a b → and b a) an...",[],


In [29]:
df_save = df[['name', 'universes', 'thm_stmt']].copy().set_index('name')
df_save['theorem'] = df_save['thm_stmt'].str.replace("\n", " ").str.replace("  ", " ")
df_save = df_save.drop('thm_stmt', axis=1)
df_save.to_csv('theorems.tsv', sep='\t')

In [271]:
goals = []
for (us, thm) in zip(df['universes'], df['thm_stmt']):
    us = us.split()
    thm = thm.replace("\n", "")
    
    # not best way to handle this, but this means the goal is too long
    if "…" in thm:
        goals.append(None)
    else:
        goals.append(LeanGoal(us, [], thm))

In [272]:
import itertools
def grouper(iterable, n, fillvalue=None):
    "Collect data into fixed-length chunks or blocks"
    # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
    args = [iter(iterable)] * n
    return itertools.zip_longest(*args, fillvalue=fillvalue)

In [285]:
def run_all_goals(goals, tactic):
    results =[None] * len(goals)
    with LeanTacticTestingInterface() as lean_interface:
        ii = 0
        for goal_group in grouper(enumerate(goals), 32, None):
            print(ii)
            ii += 1
            goal_group = [x for x in goal_group if x is not None]
            
            goal_batch = [g for i, g in goal_group if g is not None]
            indices = [i for i, g in goal_group if g is not None]
            tactics = [tactic for i, g in goal_group if g is not None]
            results_batch = lean_interface.apply_tactic_multiple(goal_batch, tactics)
            
            for i, r in zip(indices, results_batch):
                results[i] = r
    return results   

In [289]:
my_results = run_all_goals(goals, 'skip')

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133


In [290]:
[(i, x) for i, x in enumerate(my_results) if x is None or x[0] != 'success']

[(6, ('theorem_error', ["unknown identifier 'escapec'"], '')),
 (9,
  ('theorem_error',
   ['type mismatch at application\n  @eq.{0} (∀ {a b : Prop}, and a b → and b a) (@and.symm ?m_1 ?m_2)\nterm\n  @and.symm ?m_1 ?m_2\nhas type\n  and ?m_1 ?m_2 → and ?m_2 ?m_1\nbut is expected to have type\n  ∀ {a b : Prop}, and a b → and b a'],
   '')),
 (16, ('theorem_error', ["unknown identifier 'decidable_nonneg'"], '')),
 (26, ('theorem_error', ["unknown identifier 'escapec'"], '')),
 (29,
  ('theorem_error',
   ['type mismatch at application\n  @eq.{0} (∀ {a b : Prop}, and a b → and b a) (@and.symm ?m_1 ?m_2)\nterm\n  @and.symm ?m_1 ?m_2\nhas type\n  and ?m_1 ?m_2 → and ?m_2 ?m_1\nbut is expected to have type\n  ∀ {a b : Prop}, and a b → and b a'],
   '')),
 (36, ('theorem_error', ["unknown identifier 'decidable_nonneg'"], '')),
 (40,
  ('theorem_error',
   ["unknown identifier 'to_nat_core._main'",
    "unknown identifier 'to_nat_core._main'"],
   '')),
 (57, None),
 (68, ('theorem_error', ["u

In [292]:
summary = [x[0] if x is not None else 'too_big' for x in my_results]
summary[:10]

['success',
 'success',
 'success',
 'success',
 'success',
 'success',
 'theorem_error',
 'success',
 'success',
 'theorem_error']

In [302]:
first_errors = [x[1][0] if x is not None and x[0] == 'theorem_error' else '' for x in my_results]
first_errors[:10]

['',
 '',
 '',
 '',
 '',
 '',
 "unknown identifier 'escapec'",
 '',
 '',
 'type mismatch at application\n  @eq.{0} (∀ {a b : Prop}, and a b → and b a) (@and.symm ?m_1 ?m_2)\nterm\n  @and.symm ?m_1 ?m_2\nhas type\n  and ?m_1 ?m_2 → and ?m_2 ?m_1\nbut is expected to have type\n  ∀ {a b : Prop}, and a b → and b a']

In [303]:
df_extended = df.copy()
df_extended['results_1'] = np.array(summary)
df_extended['results_1_first_error'] = np.array(first_errors)
df_extended['loadable_from_expr'] = df_extended['state_after_set_goal'].notnull()
df_extended['refl_provable'] = df_extended['state_after_refl'] == "no goals"
df_extended

Unnamed: 0.1,Unnamed: 0,expr,expr_after_deserializing,name,state_after_intros,state_after_refl,state_after_set_goal,thm_stmt,universes_raw,universes,results_1,results_1_first_error,loadable_from_expr,refl_provable
0,0,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",heq.elim,"α : Sort u,\na : α,\np : α → Sort v,\nb : α,\n...","α : Sort u,\na : α,\np : α → Sort v,\nb : α,\n...",⊢ Π {α : Sort u} {a : α} {p : α → Sort v} {b :...,Π {α : Sort u} {a : α} {p : α → Sort v} {b : α...,"[u, v]",u v,success,,True,False
1,1,"( expr.pi ( name.mk_string ""s"" ( name.anonymou...","( expr.pi ( name.mk_string ""s"" ( name.anonymou...",string.mk_iterator.equations._eqn_1,"α : Sort u,\na : α,\np : α → Sort v,\nb : α,\n...",no goals,"α : Sort u,\na : α,\np : α → Sort v,\nb : α,\n...","∀ (s : list.{0} char), @eq.{1} string.iterator...",[],,success,,True,True
2,2,"( expr.pi ( name.mk_string ""c"" ( name.anonymou...","( expr.pi ( name.mk_string ""c"" ( name.anonymou...",ordering.ite_eq_gt_distrib,"c : Prop,\n_inst_1 : decidable c,\na b : order...","c : Prop,\n_inst_1 : decidable c,\na b : order...",⊢ ∀ (c : Prop) [_inst_1 : decidable c] (a b : ...,∀ (c : Prop) [_inst_1 : decidable c] (a b : or...,[],,success,,True,False
3,3,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",combinator.K.equations._eqn_1,"c : Prop,\n_inst_1 : decidable c,\na b : order...",no goals,"c : Prop,\n_inst_1 : decidable c,\na b : order...","∀ {α : Type u₁} {β : Type u₂} (a : α) (b : β),...","[u₁, u₂]",u₁ u₂,success,,True,True
4,4,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",list.mem_append,"α : Type u,\na : α,\ns t : list.{u} α\n⊢ iff\n...","α : Type u,\na : α,\ns t : list.{u} α\n⊢ iff\n...","⊢ ∀ {α : Type u} {a : α} {s t : list.{u} α},\n...","∀ {α : Type u} {a : α} {s t : list.{u} α},\n ...",[u],u,success,,True,False
5,5,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",list.has_append.equations._eqn_1,"α : Type u,\na : α,\ns t : list.{u} α,\nα_1 : ...",no goals,"α : Type u,\na : α,\ns t : list.{u} α\n⊢ ∀ {α ...","∀ {α : Type u},\n @eq.{u+1} (has_append.{u} (...",[u],u,success,,True,True
6,6,( expr.app ( expr.app ( expr.app ( expr.const ...,,_private.4170278853.escapec.equations._eqn_4,,,,@eq.{1} string\n (escapec\n (char.of_nat\...,[],,theorem_error,unknown identifier 'escapec',False,False
7,7,"( expr.pi ( name.mk_string ""b"" ( name.anonymou...","( expr.pi ( name.mk_string ""b"" ( name.anonymou...",eq_ff_of_not_eq_tt,"b : bool,\na : not (@eq.{1} bool b bool.tt)\n⊢...","b : bool,\na : not (@eq.{1} bool b bool.tt)\n⊢...","⊢ ∀ {b : bool}, not (@eq.{1} bool b bool.tt) →...","∀ {b : bool}, not (@eq.{1} bool b bool.tt) → @...",[],,success,,True,False
8,8,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",psigma.sizeof._main.equations._eqn_1,"b : bool,\na : not (@eq.{1} bool b bool.tt),\n...",no goals,"b : bool,\na : not (@eq.{1} bool b bool.tt)\n⊢...",∀ {α : Type u} {β : α → Type v} [_inst_1 : has...,"[u, v]",u v,success,,True,True
9,9,( expr.app ( expr.app ( expr.app ( expr.const ...,( expr.app ( expr.app ( expr.app ( expr.const ...,and.symm.equations._eqn_1,"⊢ @eq.{0} (∀ {a b : Prop}, and a b → and b a) ...",no goals,"⊢ @eq.{0} (∀ {a b : Prop}, and a b → and b a) ...","@eq.{0} (∀ {a b : Prop}, and a b → and b a) an...",[],,theorem_error,type mismatch at application\n @eq.{0} (∀ {a ...,True,True


In [298]:
df_extended.set_index(['refl_provable', 'loadable_from_expr', 'results_1']).index.value_counts()

(True, True, success)            1967
(False, True, success)           1766
(True, True, theorem_error)       279
(False, False, success)           134
(False, False, theorem_error)      67
(False, True, theorem_error)       33
(False, False, too_big)            12
dtype: int64

In [304]:
df_extended[(df_extended['refl_provable'] == False) & (df_extended['loadable_from_expr'] == True) & (df_extended['results_1'] == 'theorem_error')]

Unnamed: 0.1,Unnamed: 0,expr,expr_after_deserializing,name,state_after_intros,state_after_refl,state_after_set_goal,thm_stmt,universes_raw,universes,results_1,results_1_first_error,loadable_from_expr,refl_provable
176,176,"( expr.pi ( name.mk_string ""p"" ( name.anonymou...","( expr.pi ( name.mk_string ""p"" ( name.anonymou...",_private.4219602309.u_def,p : Prop\n⊢ U p (u p),p : Prop\n⊢ U p (u p),"⊢ ∀ (p : Prop), U p (u p)","∀ (p : Prop), U p (u p)",[],,theorem_error,unknown identifier 'U',True,False
537,537,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",dlist.to_list_concat,"a : int,\nα : Type u,\nx : α,\nl : dlist.{u} α...","a : int,\nα : Type u,\nx : α,\nl : dlist.{u} α...",a : int\n⊢ ∀ {α : Type u} (x : α) (l : dlist.{...,"∀ {α : Type u} (x : α) (l : dlist.{u} α),\n @...",[u],u,theorem_error,unknown identifier 'dlist',True,False
625,625,"( expr.pi ( name.mk_string ""a"" ( name.anonymou...","( expr.pi ( name.mk_string ""a"" ( name.anonymou...",nat.decidable_linear_ordered_semiring._proof_2,"a b c : nat,\nh₁ :\n @has_le.le.{0} nat\n ...","a b c : nat,\nh₁ :\n @has_le.le.{0} nat\n ...","⊢ ∀ (a b c : nat),\n @has_le.le.{0} nat\n ...","∀ (a b c : nat),\n @has_le.le.{0} nat\n (@...",[],,theorem_error,type mismatch at application\n @ordered_cance...,True,False
647,647,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",dlist.to_list_cons,"α : Type u,\nl₁ l₂ : list.{u} α,\na : α,\ns : ...","α : Type u,\nl₁ l₂ : list.{u} α,\na : α,\ns : ...","α : Type u,\nl₁ l₂ : list.{u} α,\na : α,\ns : ...","∀ {α : Type u} (x : α) (l : dlist.{u} α),\n @...",[u],u,theorem_error,unknown identifier 'dlist',True,False
683,683,"( expr.pi ( name.mk_string ""a"" ( name.anonymou...","( expr.pi ( name.mk_string ""a"" ( name.anonymou...",_private.4226664635.nonneg.elim,"a : int,\na_1 : nonneg a\n⊢ @Exists.{1} nat\n ...","a : int,\na_1 : nonneg a\n⊢ @Exists.{1} nat\n ...","⊢ ∀ {a : int},\n nonneg a →\n @Exists.{1...","∀ {a : int},\n nonneg a →\n @Exists.{1} nat\...",[],,theorem_error,unknown identifier 'nonneg',True,False
737,737,"( expr.pi ( name.mk_string ""p"" ( name.anonymou...","( expr.pi ( name.mk_string ""p"" ( name.anonymou...",_private.3494329727.v_def,"a : int,\np : Prop\n⊢ V p (v p)","a : int,\np : Prop\n⊢ V p (v p)","a : int\n⊢ ∀ (p : Prop), V p (v p)","∀ (p : Prop), V p (v p)",[],,theorem_error,unknown identifier 'V',True,False
913,913,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",dlist.of_list_to_list,"α : Type u,\nl : dlist.{u} α\n⊢ @eq.{u+1} (dli...","α : Type u,\nl : dlist.{u} α\n⊢ @eq.{u+1} (dli...","⊢ ∀ {α : Type u} (l : dlist.{u} α), @eq.{u+1} ...","∀ {α : Type u} (l : dlist.{u} α), @eq.{u+1} (d...",[u],u,theorem_error,unknown identifier 'dlist',True,False
1009,1009,"( expr.pi ( name.mk_string ""gen"" ( name.anonym...","( expr.pi ( name.mk_string ""gen"" ( name.anonym...",_private.4081065639.rand_nat_aux._main._pack.e...,"α : Type u,\n_inst_1 : decidable_linear_order....","α : Type u,\n_inst_1 : decidable_linear_order....","α : Type u,\n_inst_1 : decidable_linear_order....",∀ {gen : Type u} [_inst_1 : random_gen.{u} gen...,[u],u,theorem_error,unknown identifier 'random_gen',True,False
1211,1211,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",buffer.read_eq_read',"α : Type u,\n_inst_1 : inhabited.{u+1} α,\nb :...","α : Type u,\n_inst_1 : inhabited.{u+1} α,\nb :...",⊢ ∀ {α : Type u} [_inst_1 : inhabited.{u+1} α]...,∀ {α : Type u} [_inst_1 : inhabited.{u+1} α] (...,[u],u,theorem_error,unknown identifier 'buffer',True,False
1310,1310,"( expr.pi ( name.mk_string ""gen"" ( name.anonym...","( expr.pi ( name.mk_string ""gen"" ( name.anonym...",_private.4081065639.rand_nat_aux.equations._eqn_1,"p : Prop,\n_inst_1 : decidable p,\na : @eq.{1}...","p : Prop,\n_inst_1 : decidable p,\na : @eq.{1}...","p : Prop,\n_inst_1 : decidable p,\na : @eq.{1}...",∀ {gen : Type u} [_inst_1 : random_gen.{u} gen...,[u],u,theorem_error,unknown identifier 'random_gen',True,False


In [291]:
df.iloc[9]

Unnamed: 0                                                                  9
expr                        ( expr.app ( expr.app ( expr.app ( expr.const ...
expr_after_deserializing    ( expr.app ( expr.app ( expr.app ( expr.const ...
name                                                and.symm.equations._eqn_1
state_after_intros          ⊢ @eq.{0} (∀ {a b : Prop}, and a b → and b a) ...
state_after_refl                                                     no goals
state_after_set_goal        ⊢ @eq.{0} (∀ {a b : Prop}, and a b → and b a) ...
thm_stmt                    @eq.{0} (∀ {a b : Prop}, and a b → and b a) an...
universes_raw                                                              []
universes                                                                    
Name: 9, dtype: object

In [88]:
results = []
with LeanTacticTestingInterface() as lean_interface:
    for i, (us, thm) in enumerate(zip(df['universes'], df['thm_stmt'])):
        us = us.split()
        thm = thm.replace("\n", "")
        try:
            result = lean_interface.apply_tactic(LeanGoal(us, [], thm), "skip")
            results.append(result[0])
        except:
            results.append("Timeout")

In [89]:
df1 = df.copy()
df1['thm_stmt_results'] = np.array(results)
df1

Unnamed: 0.1,Unnamed: 0,expr,expr_after_deserializing,name,state_after_intros,state_after_refl,state_after_set_goal,thm_stmt,universes_raw,universes,thm_stmt_results
0,0,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",heq.elim,"α : Sort u,\na : α,\np : α → Sort v,\nb : α,\n...","α : Sort u,\na : α,\np : α → Sort v,\nb : α,\n...",⊢ Π {α : Sort u} {a : α} {p : α → Sort v} {b :...,Π {α : Sort u} {a : α} {p : α → Sort v} {b : α...,"[u, v]",u v,success
1,1,"( expr.pi ( name.mk_string ""s"" ( name.anonymou...","( expr.pi ( name.mk_string ""s"" ( name.anonymou...",string.mk_iterator.equations._eqn_1,"α : Sort u,\na : α,\np : α → Sort v,\nb : α,\n...",no goals,"α : Sort u,\na : α,\np : α → Sort v,\nb : α,\n...","∀ (s : list.{0} char), @eq.{1} string.iterator...",[],,success
2,2,"( expr.pi ( name.mk_string ""c"" ( name.anonymou...","( expr.pi ( name.mk_string ""c"" ( name.anonymou...",ordering.ite_eq_gt_distrib,"c : Prop,\n_inst_1 : decidable c,\na b : order...","c : Prop,\n_inst_1 : decidable c,\na b : order...",⊢ ∀ (c : Prop) [_inst_1 : decidable c] (a b : ...,∀ (c : Prop) [_inst_1 : decidable c] (a b : or...,[],,success
3,3,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",combinator.K.equations._eqn_1,"c : Prop,\n_inst_1 : decidable c,\na b : order...",no goals,"c : Prop,\n_inst_1 : decidable c,\na b : order...","∀ {α : Type u₁} {β : Type u₂} (a : α) (b : β),...","[u₁, u₂]",u₁ u₂,success
4,4,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",list.mem_append,"α : Type u,\na : α,\ns t : list.{u} α\n⊢ iff\n...","α : Type u,\na : α,\ns t : list.{u} α\n⊢ iff\n...","⊢ ∀ {α : Type u} {a : α} {s t : list.{u} α},\n...","∀ {α : Type u} {a : α} {s t : list.{u} α},\n ...",[u],u,success
5,5,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",list.has_append.equations._eqn_1,"α : Type u,\na : α,\ns t : list.{u} α,\nα_1 : ...",no goals,"α : Type u,\na : α,\ns t : list.{u} α\n⊢ ∀ {α ...","∀ {α : Type u},\n @eq.{u+1} (has_append.{u} (...",[u],u,success
6,6,( expr.app ( expr.app ( expr.app ( expr.const ...,,_private.4170278853.escapec.equations._eqn_4,,,,@eq.{1} string\n (escapec\n (char.of_nat\...,[],,theorem_error
7,7,"( expr.pi ( name.mk_string ""b"" ( name.anonymou...","( expr.pi ( name.mk_string ""b"" ( name.anonymou...",eq_ff_of_not_eq_tt,"b : bool,\na : not (@eq.{1} bool b bool.tt)\n⊢...","b : bool,\na : not (@eq.{1} bool b bool.tt)\n⊢...","⊢ ∀ {b : bool}, not (@eq.{1} bool b bool.tt) →...","∀ {b : bool}, not (@eq.{1} bool b bool.tt) → @...",[],,success
8,8,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",psigma.sizeof._main.equations._eqn_1,"b : bool,\na : not (@eq.{1} bool b bool.tt),\n...",no goals,"b : bool,\na : not (@eq.{1} bool b bool.tt)\n⊢...",∀ {α : Type u} {β : α → Type v} [_inst_1 : has...,"[u, v]",u v,success
9,9,( expr.app ( expr.app ( expr.app ( expr.const ...,( expr.app ( expr.app ( expr.app ( expr.const ...,and.symm.equations._eqn_1,"⊢ @eq.{0} (∀ {a b : Prop}, and a b → and b a) ...",no goals,"⊢ @eq.{0} (∀ {a b : Prop}, and a b → and b a) ...","@eq.{0} (∀ {a b : Prop}, and a b → and b a) an...",[],,theorem_error


In [93]:
df1.groupby('thm_stmt_results').count()

Unnamed: 0_level_0,Unnamed: 0,expr,expr_after_deserializing,name,state_after_intros,state_after_refl,state_after_set_goal,thm_stmt,universes_raw,universes
thm_stmt_results,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
Timeout,12,12,0,12,0,0,0,12,12,12
success,3861,3861,3729,3861,3729,3729,3729,3861,3861,3861
theorem_error,385,385,316,385,316,316,316,385,385,385


In [94]:
df1.query('thm_stmt_results == "theorem_error"')

Unnamed: 0.1,Unnamed: 0,expr,expr_after_deserializing,name,state_after_intros,state_after_refl,state_after_set_goal,thm_stmt,universes_raw,universes,thm_stmt_results
6,6,( expr.app ( expr.app ( expr.app ( expr.const ...,,_private.4170278853.escapec.equations._eqn_4,,,,@eq.{1} string\n (escapec\n (char.of_nat\...,[],,theorem_error
9,9,( expr.app ( expr.app ( expr.app ( expr.const ...,( expr.app ( expr.app ( expr.app ( expr.const ...,and.symm.equations._eqn_1,"⊢ @eq.{0} (∀ {a b : Prop}, and a b → and b a) ...",no goals,"⊢ @eq.{0} (∀ {a b : Prop}, and a b → and b a) ...","@eq.{0} (∀ {a b : Prop}, and a b → and b a) an...",[],,theorem_error
16,16,"( expr.pi ( name.mk_string ""a"" ( name.anonymou...","( expr.pi ( name.mk_string ""a"" ( name.anonymou...",int.decidable_le.equations._eqn_1,a b : int\n⊢ @eq.{1} (decidable (@has_le.le.{0...,no goals,"⊢ ∀ (a b : int),\n @eq.{1} (decidable (@has...","∀ (a b : int),\n @eq.{1} (decidable (@has_le....",[],,theorem_error
26,26,( expr.app ( expr.app ( expr.app ( expr.const ...,,_private.4170278853.escapec.equations._eqn_4,,,,@eq.{1} string\n (escapec\n (char.of_nat\...,[],,theorem_error
29,29,( expr.app ( expr.app ( expr.app ( expr.const ...,( expr.app ( expr.app ( expr.app ( expr.const ...,and.symm.equations._eqn_1,"⊢ @eq.{0} (∀ {a b : Prop}, and a b → and b a) ...",no goals,"⊢ @eq.{0} (∀ {a b : Prop}, and a b → and b a) ...","@eq.{0} (∀ {a b : Prop}, and a b → and b a) an...",[],,theorem_error
36,36,"( expr.pi ( name.mk_string ""a"" ( name.anonymou...","( expr.pi ( name.mk_string ""a"" ( name.anonymou...",int.decidable_le.equations._eqn_1,a b : int\n⊢ @eq.{1} (decidable (@has_le.le.{0...,no goals,"⊢ ∀ (a b : int),\n @eq.{1} (decidable (@has...","∀ (a b : int),\n @eq.{1} (decidable (@has_le....",[],,theorem_error
40,40,"( expr.pi ( name.mk_string ""it"" ( name.anonymo...",,_private.11087437.to_nat_core._main.equations....,,,,"∀ (it : string.iterator) (i r : nat),\n @eq.{...",[],,theorem_error
68,68,"( expr.pi ( name.mk_string ""n"" ( name.anonymou...","( expr.pi ( name.mk_string ""n"" ( name.anonymou...",unsigned.of_nat'.equations._eqn_1,"α : Type u,\n_inst_1 : has_lt.{u} α,\na : α,\n...",no goals,"α : Type u,\n_inst_1 : has_lt.{u} α,\na : α,\n...","∀ (n : nat),\n @eq.{1} unsigned (unsigned.of_...",[],,theorem_error
88,88,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",parser.decorate_error.equations._eqn_1,"α : Type u,\n_inst_1 : no_zero_divisors.{u} α,...",no goals,"α : Type u,\n_inst_1 : no_zero_divisors.{u} α,...",∀ {α : Type} (msg : thunk.{0} string) (p : par...,[],,theorem_error
111,111,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",array.to_buffer.equations._eqn_1,"α : Sort u,\nφ : α → Sort v,\na a' : α,\np₁ : ...",no goals,"α : Sort u,\nφ : α → Sort v,\na a' : α,\np₁ : ...","∀ {α : Type u} {n : nat} (a : array.{u} n α),\...",[u],u,theorem_error


In [95]:
error_messages = []
df_bad = df1.query('thm_stmt_results == "theorem_error"')
with LeanTacticTestingInterface() as lean_interface:
    for i, (us, thm) in enumerate(zip(df_bad['universes'], df_bad['thm_stmt'])):
        us = us.split()
        thm = thm.replace("\n", "")
        try:
            result = lean_interface.apply_tactic(LeanGoal(us, [], thm), "skip")
            error_messages.append(result[1])
        except:
            error_messages.append("Timeout")

In [96]:
error_messages

[["unknown identifier 'escapec'"],
 ['type mismatch at application\n  @eq.{0} (∀ {a b : Prop}, and a b → and b a) (@and.symm ?m_1 ?m_2)\nterm\n  @and.symm ?m_1 ?m_2\nhas type\n  and ?m_1 ?m_2 → and ?m_2 ?m_1\nbut is expected to have type\n  ∀ {a b : Prop}, and a b → and b a'],
 ["unknown identifier 'decidable_nonneg'"],
 ["unknown identifier 'escapec'"],
 ['type mismatch at application\n  @eq.{0} (∀ {a b : Prop}, and a b → and b a) (@and.symm ?m_1 ?m_2)\nterm\n  @and.symm ?m_1 ?m_2\nhas type\n  and ?m_1 ?m_2 → and ?m_2 ?m_1\nbut is expected to have type\n  ∀ {a b : Prop}, and a b → and b a'],
 ["unknown identifier 'decidable_nonneg'"],
 ["unknown identifier 'to_nat_core._main'",
  "unknown identifier 'to_nat_core._main'"],
 ["unknown identifier 'zero_lt_unsigned_sz'"],
 ["unknown identifier 'parser'",
  "unknown identifier 'parser'",
  "unknown identifier 'parser.decorate_error'",
  "unknown identifier 'parser.decorate_errors'"],
 ["unknown identifier 'buffer'", "unknown identifier 'ar

In [101]:
df_bad.iloc[10]['universes'], df_bad.iloc[10]['thm_stmt'], df_bad.iloc[10]['expr']

('u',
 '∀ {α : Type u}, @eq.{u+1} (list.{u} α) (@dlist.to_list.{u} α (@dlist.empty.{u} α)) (@list.nil.{u} α)',
 '( expr.pi ( name.mk_string "α" ( name.anonymous ) ) ( binder_info.implicit ) ( expr.sort ( level.succ ( level.param ( name.mk_string "u" ( name.anonymous ) ) ) ) ) ( expr.app ( expr.app ( expr.app ( expr.const ( name.mk_string "eq" ( name.anonymous ) ) ( list.cons ( level.succ ( level.param ( name.mk_string "u" ( name.anonymous ) ) ) ) ( list.nil ) ) ) ( expr.app ( expr.const ( name.mk_string "list" ( name.anonymous ) ) ( list.cons ( level.param ( name.mk_string "u" ( name.anonymous ) ) ) ( list.nil ) ) ) ( expr.var 0 ) ) ) ( expr.app ( expr.app ( expr.const ( name.mk_string "to_list" ( name.mk_string "dlist" ( name.anonymous ) ) ) ( list.cons ( level.param ( name.mk_string "u" ( name.anonymous ) ) ) ( list.nil ) ) ) ( expr.var 0 ) ) ( expr.app ( expr.const ( name.mk_string "empty" ( name.mk_string "dlist" ( name.anonymous ) ) ) ( list.cons ( level.param ( name.mk_string "u"

In [102]:
df_bad.iloc[10]

Unnamed: 0                                                                117
expr                        ( expr.pi ( name.mk_string "α" ( name.anonymou...
expr_after_deserializing    ( expr.pi ( name.mk_string "α" ( name.anonymou...
name                                                      dlist.to_list_empty
state_after_intros          α : Type u\n⊢ @eq.{u+1} (list.{u} α) (@dlist.t...
state_after_refl                                                     no goals
state_after_set_goal        ⊢ ∀ {α : Type u}, @eq.{u+1} (list.{u} α) (@dli...
thm_stmt                    ∀ {α : Type u}, @eq.{u+1} (list.{u} α) (@dlist...
universes_raw                                                             [u]
universes                                                                   u
thm_stmt_results                                                theorem_error
Name: 117, dtype: object

In [104]:
df_bad.iloc[10]['expr'] == df_bad.iloc[10]['expr_after_deserializing']

True

In [117]:
with LeanTacticTestingInterface() as lean_interface:
    goal = LeanGoal(['u'], [],  '∀ {α : Type u}, @eq.{u+1} (list.{u} α) (@dlist.to_list.{u} α (@dlist.empty.{u} α)) (@list.nil.{u} α)')
    display(lean_interface.apply_tactic(goal, 'intros'))

('success',
 [LeanGoal(universes=['u'], assumptions=['α : Type u'], conclusion='@eq.{u+1} (list.{u} α) (@dlist.to_list.{u} α (@dlist.empty.{u} α)) (@list.nil.{u} α)')],
 '')

In [202]:
results_skip = []
results_intros = []
results_intros_skip = []

with LeanTacticTestingInterface() as lean_interface:    
    #j = 10
    for i, (us, thm) in enumerate(zip(df['universes'], df['thm_stmt'])):
        #j -= 1
        #if j == 0: 
        #    break
        
        assert len(results_skip) == len(results_intros)
        assert len(results_skip) == len(results_intros_skip
                                       )
        us = us.split()
        thm = thm.replace("\n", "")
        try:
            goal = LeanGoal(us, [], thm)
            #print(goal)
            result = lean_interface.apply_tactic(goal, "skip")
            results_skip.append(result[0])
            #print(result)
            
            if result[0] == 'success':
                goal = result[1][0]
                #print(goal)
                result = lean_interface.apply_tactic(goal, "intros")
                results_intros.append(result[0])
                #print(result)
            else:
                results_intros.append('')
            
            if result[0] == 'success':
                goal = result[1][0]
                #print(goal)
                result = lean_interface.apply_tactic(goal, "skip")
                results_intros_skip.append(result[0]) 
                #print(result)
            else:
                results_intros_skip.append('')
                
        except:
            if len(results_intros) < len(results_skip):
                results_intros.append("Timeout")
                results_intros_skip.append("")
            elif len(results_intros_skip) < len(results_intros):
                results_intros_skip.append("Timeout")
            else:
                results_skip.append("Timeout")
                results_intros.append("")
                results_intros_skip.append("")
        #print()
        
        











































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































In [204]:
df2 = df.copy()
df2['thm_stmt_skip'] = np.array(results_skip)
df2['thm_stmt_intros'] = np.array(results_intros)
df2['thm_stmt_intros_skip'] = np.array(results_intros_skip)
df2

Unnamed: 0.1,Unnamed: 0,expr,expr_after_deserializing,name,state_after_intros,state_after_refl,state_after_set_goal,thm_stmt,universes_raw,universes,thm_stmt_skip,thm_stmt_intros,thm_stmt_intros_skip
0,0,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",heq.elim,"α : Sort u,\na : α,\np : α → Sort v,\nb : α,\n...","α : Sort u,\na : α,\np : α → Sort v,\nb : α,\n...",⊢ Π {α : Sort u} {a : α} {p : α → Sort v} {b :...,Π {α : Sort u} {a : α} {p : α → Sort v} {b : α...,"[u, v]",u v,success,success,success
1,1,"( expr.pi ( name.mk_string ""s"" ( name.anonymou...","( expr.pi ( name.mk_string ""s"" ( name.anonymou...",string.mk_iterator.equations._eqn_1,"α : Sort u,\na : α,\np : α → Sort v,\nb : α,\n...",no goals,"α : Sort u,\na : α,\np : α → Sort v,\nb : α,\n...","∀ (s : list.{0} char), @eq.{1} string.iterator...",[],,success,success,success
2,2,"( expr.pi ( name.mk_string ""c"" ( name.anonymou...","( expr.pi ( name.mk_string ""c"" ( name.anonymou...",ordering.ite_eq_gt_distrib,"c : Prop,\n_inst_1 : decidable c,\na b : order...","c : Prop,\n_inst_1 : decidable c,\na b : order...",⊢ ∀ (c : Prop) [_inst_1 : decidable c] (a b : ...,∀ (c : Prop) [_inst_1 : decidable c] (a b : or...,[],,Timeout,,
3,3,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",combinator.K.equations._eqn_1,"c : Prop,\n_inst_1 : decidable c,\na b : order...",no goals,"c : Prop,\n_inst_1 : decidable c,\na b : order...","∀ {α : Type u₁} {β : Type u₂} (a : α) (b : β),...","[u₁, u₂]",u₁ u₂,success,success,success
4,4,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",list.mem_append,"α : Type u,\na : α,\ns t : list.{u} α\n⊢ iff\n...","α : Type u,\na : α,\ns t : list.{u} α\n⊢ iff\n...","⊢ ∀ {α : Type u} {a : α} {s t : list.{u} α},\n...","∀ {α : Type u} {a : α} {s t : list.{u} α},\n ...",[u],u,Timeout,,
5,5,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",list.has_append.equations._eqn_1,"α : Type u,\na : α,\ns t : list.{u} α,\nα_1 : ...",no goals,"α : Type u,\na : α,\ns t : list.{u} α\n⊢ ∀ {α ...","∀ {α : Type u},\n @eq.{u+1} (has_append.{u} (...",[u],u,Timeout,,
6,6,( expr.app ( expr.app ( expr.app ( expr.const ...,,_private.4170278853.escapec.equations._eqn_4,,,,@eq.{1} string\n (escapec\n (char.of_nat\...,[],,theorem_error,,
7,7,"( expr.pi ( name.mk_string ""b"" ( name.anonymou...","( expr.pi ( name.mk_string ""b"" ( name.anonymou...",eq_ff_of_not_eq_tt,"b : bool,\na : not (@eq.{1} bool b bool.tt)\n⊢...","b : bool,\na : not (@eq.{1} bool b bool.tt)\n⊢...","⊢ ∀ {b : bool}, not (@eq.{1} bool b bool.tt) →...","∀ {b : bool}, not (@eq.{1} bool b bool.tt) → @...",[],,success,success,success
8,8,"( expr.pi ( name.mk_string ""α"" ( name.anonymou...","( expr.pi ( name.mk_string ""α"" ( name.anonymou...",psigma.sizeof._main.equations._eqn_1,"b : bool,\na : not (@eq.{1} bool b bool.tt),\n...",no goals,"b : bool,\na : not (@eq.{1} bool b bool.tt)\n⊢...",∀ {α : Type u} {β : α → Type v} [_inst_1 : has...,"[u, v]",u v,Timeout,,
9,9,( expr.app ( expr.app ( expr.app ( expr.const ...,( expr.app ( expr.app ( expr.app ( expr.const ...,and.symm.equations._eqn_1,"⊢ @eq.{0} (∀ {a b : Prop}, and a b → and b a) ...",no goals,"⊢ @eq.{0} (∀ {a b : Prop}, and a b → and b a) ...","@eq.{0} (∀ {a b : Prop}, and a b → and b a) an...",[],,theorem_error,,


In [205]:
df2.groupby(['thm_stmt_skip', 'thm_stmt_intros', 'thm_stmt_intros_skip'])[['thm_stmt']].count()

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,thm_stmt
thm_stmt_skip,thm_stmt_intros,thm_stmt_intros_skip,Unnamed: 3_level_1
Timeout,,,2972
success,success,success,907
theorem_error,,,379


In [147]:
with LeanTacticTestingInterface() as lean_interface:
    lean_interface.custom_parser = simple_custom_parser
    goal = LeanGoal([], ['h : ∀ (s : list.{0} char), @eq.{1} string.iterator (string.mk_iterator {data := s}) {fst := @list.nil.{0} char, snd := s}'],  '∀ (s : list.{0} char), @eq.{1} string.iterator (string.mk_iterator {data := s}) {fst := @list.nil.{0} char, snd := s}')
    result = lean_interface.apply_tactic(goal, 'intros')
result[2]

'[Goal: eq.{1} string.iterator (string.mk_iterator (string_imp.mk s)) (string.iterator_imp.mk (list.nil.{0} char) s)\nLocal Context: [h, s]\nLocal Context Types: [Pi (s : list.{0} char), (eq.{1} string.iterator (string.mk_iterator (string_imp.mk s)) (string.iterator_imp.mk (list.nil.{0} char) s)), list.{0} char]\n]'

In [154]:
goal_parser = r"""
def join (sep : string) : list string → string
| [x]     := x
| []      := ""
| (x::xs) := x ++ sep ++ join xs

meta def local_cxt_to_string (v : expr bool.tt) : tactic string := 
do 
  tp ← tactic.infer_type v,
  return $ (to_string v) ++ "\n" ++ (to_string tp)

meta def goal_to_string (g : expr) : tactic string :=
do 
  tactic.set_goals [g],
  goal ← tactic.target,
  local_cxt ← tactic.local_context,
  let local_cxt_len := list.length local_cxt,
  let goal_str := (to_string goal),
  local_cxt_strs ← (list.mmap local_cxt_to_string local_cxt),
  let s1 := goal_str ++ "\n",
  let s2 := "Local Context Vars: " ++ (to_string local_cxt_len) ++ "\n",
  let s3 := join "\n" local_cxt_strs,
  return $ s1 ++ s2 ++ s3

meta def custom_parse_state : tactic string :=
do 
 gs ← tactic.get_goals,
 -- loop over all goals (has effect of resetting the goal each time)
 let gs_len := list.length gs,
 goal_strings ← gs.mmap goal_to_string,
 tactic.set_goals gs,  -- set goals back
 let s := "Goals: " ++ (to_string gs_len) ++ "\n" ++ (join "\n" goal_strings),
 return s
"""

In [196]:
with LeanTacticTestingInterface() as lean_interface:
    goal = LeanGoal([], ['h : ∀ (s : list.{0} char), @eq.{1} string.iterator (string.mk_iterator {data := s}) {fst := @list.nil.{0} char, snd := s}'],  '∀ (s : list.{0} char), @eq.{1} string.iterator (string.mk_iterator {data := s}) {fst := @list.nil.{0} char, snd := s}')
    result = lean_interface.apply_tactic(goal, 'skip')
print(result[1])

[LeanGoal(universes=[], assumptions=['h : ∀ (s : list.{0} char), @eq.{1} string.iterator (string.mk_iterator {data := s}) {fst := @list.nil.{0} char, snd := s}'], conclusion='∀ (s : list.{0} char), @eq.{1} string.iterator (string.mk_iterator {data := s}) {fst := @list.nil.{0} char, snd := s}')]


In [198]:
with LeanTacticTestingInterface() as lean_interface:
    goal = LeanGoal(universes=[], assumptions=['h : ∀ (s : list.{0} char), @eq.{1} string.iterator (string.mk_iterator {data := s}) {fst := @list.nil.{0} char, snd := s}'], conclusion='∀ (s : list.{0} char), @eq.{1} string.iterator (string.mk_iterator {data := s}) {fst := @list.nil.{0} char, snd := s}')
    result = lean_interface.apply_tactic(goal, 'intros')
print(result[1])

[LeanGoal(universes=[], assumptions=['h : ∀ (s : list.{0} char), @eq.{1} string.iterator (string.mk_iterator {data := s}) {fst := @list.nil.{0} char, snd := s}', 's : list.{0} char'], conclusion='@eq.{1} string.iterator (string.mk_iterator {data := s}) {fst := @list.nil.{0} char, snd := s}')]


In [171]:
with LeanTacticTestingInterface() as lean_interface:
    lean_interface.custom_parser = goal_parser
    goal = LeanGoal([], ['m n : nat'], "m + n = n + m")
    result = lean_interface.apply_tactic(goal, 'induction n')
print(result[1])

[LeanGoal(universes=[], assumptions=['m : nat'], conclusion='eq.{1} nat (has_add.add.{0} nat nat.has_add m nat.zero) (has_add.add.{0} nat nat.has_add nat.zero m)'), LeanGoal(universes=[], assumptions=['m : nat', 'n_n : nat', 'n_ih : eq.{1} nat (has_add.add.{0} nat nat.has_add m n_n) (has_add.add.{0} nat nat.has_add n_n m)'], conclusion='eq.{1} nat (has_add.add.{0} nat nat.has_add m (nat.succ n_n)) (has_add.add.{0} nat nat.has_add (nat.succ n_n) m)')]


In [32]:
df.set_index('name').loc['rand_nat_aux._main._pack._wf_rec_mk_dec_tactic._aux_1']['expr']

'( expr.pi ( name.mk_string "gen" ( name.anonymous ) ) ( binder_info.implicit ) ( expr.sort ( level.succ ( level.param ( name.mk_string "u" ( name.anonymous ) ) ) ) ) ( expr.pi ( name.mk_string "gen_lo" ( name.anonymous ) ) ( binder_info.default ) ( expr.const ( name.mk_string "nat" ( name.anonymous ) ) ( list.nil ) ) ( expr.pi ( name.mk_string "gen_mag" ( name.anonymous ) ) ( binder_info.default ) ( expr.const ( name.mk_string "nat" ( name.anonymous ) ) ( list.nil ) ) ( expr.pi ( name.mk_string "r" ( name.anonymous ) ) ( binder_info.default ) ( expr.const ( name.mk_string "nat" ( name.anonymous ) ) ( list.nil ) ) ( expr.pi ( name.mk_string "v" ( name.anonymous ) ) ( binder_info.default ) ( expr.const ( name.mk_string "nat" ( name.anonymous ) ) ( list.nil ) ) ( expr.pi ( name.mk_string "g" ( name.anonymous ) ) ( binder_info.default ) ( expr.var 4 ) ( expr.pi ( name.mk_string "x" ( name.anonymous ) ) ( binder_info.default ) ( expr.const ( name.mk_string "nat" ( name.anonymous ) ) ( list