In [1]:
import os
import json
import pickle
import numpy as np
from pathlib import Path
from typing import Dict
from tqdm.auto import tqdm  

import os
from pathlib import Path

MATHLIB_DIR = Path("~/lean/mathlib4/Mathlib").expanduser()  
LeanDojo_DIR = Path("~/leandojo_benchmark_4/").expanduser()
Save_DIR = Path("./STP/SFT/").expanduser()  # Use Path for consistency

os.makedirs(Save_DIR, exist_ok=True)  # Now works with the full path

  from .autonotebook import tqdm as notebook_tqdm


# Read Lean Files

In [2]:
def read_lean_files(directory: Path) -> Dict[str, str]:
    """
    Recursively reads all `.lean` files in the specified directory and returns
    a dictionary mapping file paths to their contents, displaying a progress bar.

    Args:
        directory (Path): The root directory to start searching for `.lean` files.

    Returns:
        Dict[str, str]: A dictionary where keys are file paths (as strings) and
                        values are the contents of the files.
    """
    if not directory.exists():
        raise FileNotFoundError(f"The directory {directory} does not exist.")
    if not directory.is_dir():
        raise NotADirectoryError(f"The path {directory} is not a directory.")

    # Collect all .lean files first to determine the total number for the progress bar
    lean_files = list(directory.rglob('*.lean'))
    total_files = len(lean_files)
    print(f"Found {total_files} `.lean` files to process.")

    file_contents = {}

    # Initialize tqdm progress bar
    with tqdm(total=total_files, desc="Reading .lean files", unit="file") as pbar:
        for file_path in lean_files:
            try:
                # Read the file content with UTF-8 encoding
                with file_path.open() as f:
                    content = f.readlines()
                # Use the absolute path as the key
                file_contents[str(file_path.resolve())] = content
            except Exception as e:
                print(f"Error reading {file_path}: {e}")
            finally:
                pbar.update(1)  # Update the progress bar

    return file_contents


try:
    lean_file_dict = read_lean_files(MATHLIB_DIR)
    lean_file_dict = {k.split('mathlib4/')[-1]: v for k, v in lean_file_dict.items()}
    print(f"Total .lean files read: {len(lean_file_dict)}")
    # Example: Print the first file's path and its first 100 characters
    if lean_file_dict:
        first_path = next(iter(lean_file_dict))
        print(f"First file: {first_path}")
        print(f"Content snippet: {lean_file_dict[first_path][:10]}")
except Exception as error:
    print(f"An error occurred: {error}")

Found 4360 `.lean` files to process.


Reading .lean files: 100%|██████████| 4360/4360 [00:03<00:00, 1172.59file/s]

Total .lean files read: 4360
First file: Mathlib/Tactic.lean
Content snippet: ['import Mathlib.Tactic.Abel\n', 'import Mathlib.Tactic.AdaptationNote\n', 'import Mathlib.Tactic.ApplyAt\n', 'import Mathlib.Tactic.ApplyCongr\n', 'import Mathlib.Tactic.ApplyFun\n', 'import Mathlib.Tactic.ApplyWith\n', 'import Mathlib.Tactic.ArithMult\n', 'import Mathlib.Tactic.ArithMult.Init\n', 'import Mathlib.Tactic.Attr.Core\n', 'import Mathlib.Tactic.Attr.Register\n']





In [3]:
for x , y in lean_file_dict.items() :  # a dict of files with their lines in mathlib
    print(x) # the file name 
    print(len(y)) #Number of lines you have in the file
    break

Mathlib/Tactic.lean
217


# Read LeanDojo Data

In [4]:
train_path = LeanDojo_DIR / "random/train.json"
val_path = LeanDojo_DIR / "random/val.json"
test_path = LeanDojo_DIR / "random/test.json"
proofs_train = json.load(train_path.open())
proofs_val = json.load(val_path.open())
proofs_test = json.load(test_path.open())

In [5]:
print(len(proofs_train))
print(len(proofs_val))
print(len(proofs_test))

118517
2000
2000


In [6]:
for x in proofs_train : 
    print(x)
    print(x['file_path'])
    break

{'url': 'https://github.com/leanprover-community/mathlib4', 'commit': '29dcec074de168ac2bf835a77ef68bbe069194c5', 'file_path': 'Mathlib/RingTheory/IntegralRestrict.lean', 'full_name': 'Algebra.intTrace_eq_trace', 'start': [216, 1], 'end': [223, 81], 'traced_tactics': [{'tactic': 'ext x', 'annotated_tactic': ['ext x', []], 'state_before': 'A : Type u_1\nK : Type u_2\nL : Type u_3\nB : Type u_4\ninst✝³⁰ : CommRing A\ninst✝²⁹ : CommRing B\ninst✝²⁸ : Algebra A B\ninst✝²⁷ : Field K\ninst✝²⁶ : Field L\ninst✝²⁵ : Algebra A K\ninst✝²⁴ : IsFractionRing A K\ninst✝²³ : Algebra B L\ninst✝²² : Algebra K L\ninst✝²¹ : Algebra A L\ninst✝²⁰ : IsScalarTower A B L\ninst✝¹⁹ : IsScalarTower A K L\ninst✝¹⁸ : IsIntegralClosure B A L\ninst✝¹⁷ : FiniteDimensional K L\nAₘ : Type ?u.319125\nBₘ : Type ?u.319128\ninst✝¹⁶ : CommRing Aₘ\ninst✝¹⁵ : CommRing Bₘ\ninst✝¹⁴ : Algebra Aₘ Bₘ\ninst✝¹³ : Algebra A Aₘ\ninst✝¹² : Algebra B Bₘ\ninst✝¹¹ : Algebra A Bₘ\ninst✝¹⁰ : IsScalarTower A Aₘ Bₘ\ninst✝⁹ : IsScalarTower A B B

In [7]:
from collections import defaultdict
# What is leandojo dataset : extracted form of Mathlib, contain the tactics extracted / 
# load the leandojo dataseet
# use the train/eval and test splits
#  filter statemenets that they don't have tactics
entry_dict = defaultdict(list)
s=0
p=0
for entry in proofs_train + proofs_val + proofs_test:
    p+=1
    if len(entry['traced_tactics']) > 0:
        s+=1
        entry_dict[entry['file_path']].append(entry)
    
print(len(entry_dict))
print(sum(len(v) for v in entry_dict.values()))

3606
61544


In [8]:
print(s/p * 100)
print(s)
print(p)

50.23302888578728
61544
122517


In [None]:
for x,y in entry_dict.items() :
    print(x)
    for z in y :# z is a thoerem
        print(z['file_path'])
        print(z['full_name'])
        print(z['start'])
        print(z['end'])
        
        for tact in z['traced_tactics'] :   # z : dict_keys(['url', 'commit', 'file_path', 'full_name', 'start', 'end', 'traced_tactics'])
            print(tact['annotated_tactic']) # dict_keys(['tactic', 'annotated_tactic', 'state_before', 'state_after']) tactic : actual tactic  / annotated_tactic : the information regarding the theorems using in this tactic ( their position in the mathlib library)
        break #means that we have 16 statement in this file ? 
    break

Mathlib/RingTheory/IntegralRestrict.lean
Mathlib/RingTheory/IntegralRestrict.lean
Algebra.intTrace_eq_trace
[216, 1]
[223, 81]
['ext x', []]
['haveI : <a>IsIntegralClosure</a> B A (<a>FractionRing</a> B) :=\n    <a>IsIntegralClosure.of_isIntegrallyClosed</a> _ _ _', [{'full_name': 'IsIntegralClosure', 'def_path': 'Mathlib/RingTheory/IntegralClosure.lean', 'def_pos': [716, 7], 'def_end_pos': [716, 24]}, {'full_name': 'FractionRing', 'def_path': 'Mathlib/RingTheory/Localization/FractionRing.lean', 'def_pos': [289, 8], 'def_end_pos': [289, 20]}, {'full_name': 'IsIntegralClosure.of_isIntegrallyClosed', 'def_path': 'Mathlib/RingTheory/IntegrallyClosed.lean', 'def_pos': [223, 10], 'def_end_pos': [223, 56]}]]
['haveI : <a>IsLocalization</a> (<a>algebraMapSubmonoid</a> B A⁰) (<a>FractionRing</a> B) :=\n    <a>IsIntegralClosure.isLocalization</a> _ (<a>FractionRing</a> A) _ _', [{'full_name': 'IsLocalization', 'def_path': 'Mathlib/RingTheory/Localization/Basic.lean', 'def_pos': [96, 17], 'def_e

In [10]:
l1 = list(entry_dict.keys())
l2 = list(lean_file_dict.keys())

In [11]:
s = 0
for x in l2 : 
    if x not in l1 : 
        s+=1

print(s)
print(len(l1))
print(len(l2))

875
3606
4360


In [12]:
def get_statement(theorem_str):
    return theorem_str.split(':=')[0].strip()

In [13]:
mathlib_theorems = []
theorem_dict = {}

empty_file = 0
for file_name in tqdm(lean_file_dict.keys()):
    if (len(entry_dict[file_name]) == 0): # if file name does not exist in Leandojo
        empty_file += 1
        continue
    lines = lean_file_dict[file_name]
    entries = entry_dict[file_name]
    entries = sorted(entries, key = lambda x: x['start'][0])
#    print(entries[0]['start'][0])
 #   break
    for entry in entries:
        theorem_str = ''.join(lines[entry['start'][0] - 1: entry['end'][0]]) 
   # and this enable us 
        theorem_item = (file_name, theorem_str)
        mathlib_theorems.append(theorem_item)
        theorem_dict[entry['full_name']] = theorem_item # what is the full_name ? the theorem full name
    
print('[warning] # Files without theorem:', empty_file)
print('# Mathlib theorems:', len(mathlib_theorems))

  0%|          | 0/4360 [00:00<?, ?it/s]

100%|██████████| 4360/4360 [00:01<00:00, 3283.22it/s]

# Mathlib theorems: 59750





In [None]:
for x, y in theorem_dict.items() :  # therem dict contains pairs of (file_name, thorem str with proof )
    print(x)
    print(y[1])
    break

FirstOrder.Language.Term.listDecode_encode_list
theorem listDecode_encode_list (l : List (L.Term α)) :
    listDecode (l.bind listEncode) = l.map Option.some := by
  suffices h : ∀ (t : L.Term α) (l : List (Sum α (Σi, L.Functions i))),
      listDecode (t.listEncode ++ l) = some t::listDecode l by
    induction' l with t l lih
    · rfl
    · rw [cons_bind, h t (l.bind listEncode), lih, List.map]
  intro t
  induction' t with a n f ts ih <;> intro l
  · rw [listEncode, singleton_append, listDecode]
  · rw [listEncode, cons_append, listDecode]
    have h : listDecode (((finRange n).bind fun i : Fin n => (ts i).listEncode) ++ l) =
        (finRange n).map (Option.some ∘ ts) ++ listDecode l := by
      induction' finRange n with i l' l'ih
      · rfl
      · rw [cons_bind, List.append_assoc, ih, map_cons, l'ih, cons_append, Function.comp]
    have h' : ∀ i : Fin n,
        (listDecode (((finRange n).bind fun i : Fin n => (ts i).listEncode) ++ l)).get? ↑i =
          some (some (ts i)) := 

In [None]:
sum_premises = 0
conjecture_examples = [] # examples of the conjecture

try:
    for file_name in tqdm(lean_file_dict.keys()):
        if (len(entry_dict[file_name]) == 0):
            empty_file += 1
            continue
    
        theorems_in_file = []
        lines = lean_file_dict[file_name]
        entries = entry_dict[file_name]
        entries = sorted(entries, key = lambda x: x['start'][0]) # sort as a function of lines
        
        for entry in entries:
            theorem_str = ''.join(lines[entry['start'][0] - 1: entry['end'][0]])
            contexts = ''.join(lines[:entry['start'][0] - 1]).strip() # this all the theorems that were used before the current theorem + variables
            
            premises = set()
            for tactics in entry['traced_tactics']: # for every tactic
                for p in tactics['annotated_tactic'][1]: # for every theorem used in this tactic
                    if p['full_name'] in theorem_dict:
                        premises.add(p['full_name']) # we add the thorem that were used in the current proof
    
            for p_name in premises: # For all permises used in the proof,  
                for p_context, p_theorem, p_premises in reversed(theorems_in_file):
                    if p_name in p_premises: #  if there is a lemma used in this proof and another proof
                        conjecture_examples.append((file_name, p_context, p_theorem, theorem_dict[p_name][1], theorem_str))
                                    # we add file_name of the file of the current proof, context_of_the_last_theorem, last_theorem, commun lemma, current_theorem
            theorems_in_file.append((contexts, theorem_str, premises)) # (what is before the thorem, thoerem, list of lemma used in the proof)
            sum_premises += len(premises) 
except:
    pass
print(sum_premises)
print('# Conjecture examples:', len(conjecture_examples))

  0%|          | 0/4360 [00:00<?, ?it/s]

100%|██████████| 4360/4360 [00:01<00:00, 2787.82it/s]

79579
# Conjecture examples: 51369





In [17]:
conjecture_examples[-1][2:4]

('theorem rightDual_ρ [RightRigidCategory V] (h : H) : Xᘁ.ρ h = (X.ρ (h⁻¹ : H))ᘁ := by\n  rw [← SingleObj.inv_as_inv]; rfl\n',
 '  apply IsIso.inv_eq_of_hom_inv_id\n  rw [comp_as_mul, inv_mul_self, id_as_one]\n#align category_theory.single_obj.inv_as_inv CategoryTheory.SingleObj.inv_as_inv\n')

In [18]:
print(Save_DIR)

STP/SFT


# Construct dataset

In [None]:
START_LEMMA_STMT = '<easy theorem>'
START_THM = '<hard theorem>'
END_THM = '</hard theorem>'
INVOKED_LEMMA = '<lemma>'

def format_conjecture_example(context, easy_theorem, shared_lemma, hard_theorem):
    prompt = f'Complete the following Lean 4 code:\n\n```lean4\n{context.strip()}\n' \
          f'{INVOKED_LEMMA}\n{shared_lemma.strip()}\n{START_LEMMA_STMT}\n' \
          f'{easy_theorem.strip()}\n{START_THM}'
    target = f'\n{get_statement(hard_theorem).strip()}\n{END_THM}'
    return {'prompt': prompt, 'target': target}

conjecture_ds = []
for file_name, context, theorem, shared_lemma, theorem_str in conjecture_examples:
    conjecture_ds.append(format_conjecture_example(context, theorem, shared_lemma, theorem_str)) # context is too big and we don't have it 

print(len(conjecture_ds))

In [None]:
with open(os.path.join(Save_DIR, 'conjecture.json'), 'w') as f:
    json.dump(conjecture_ds, f)

In [None]:
print(conjecture_ds[10]['prompt'])
print(conjecture_ds[10]['target'])

Complete the following Lean 4 code:

```lean4
/-
Copyright (c) 2021 Bryan Gin-ge Chen. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Adam Topaz, Bryan Gin-ge Chen, Yaël Dillies
-/
import Mathlib.Order.BooleanAlgebra
import Mathlib.Logic.Equiv.Basic

#align_import order.symm_diff from "leanprover-community/mathlib"@"6eb334bd8f3433d5b08ba156b8ec3e6af47e1904"

/-!
# Symmetric difference and bi-implication

This file defines the symmetric difference and bi-implication operators in (co-)Heyting algebras.

## Examples

Some examples are
* The symmetric difference of two sets is the set of elements that are in either but not both.
* The symmetric difference on propositions is `Xor'`.
* The symmetric difference on `Bool` is `Bool.xor`.
* The equivalence of propositions. Two propositions are equivalent if they imply each other.
* The symmetric difference translates to addition when considering a Boolean algebra as a Boolean
  ring.

## Main de

In [None]:
with open(os.path.join(Save_DIR, 'theorem_dict.pkl'), 'wb') as f:
    pickle.dump(theorem_dict, f)

In [None]:
print(len(theorem_dict))

59742


In [None]:
theorem_dict['nat_sub_dvd_pow_sub_pow'] # example

('Mathlib/Algebra/GeomSum.lean',
 'theorem nat_sub_dvd_pow_sub_pow (x y n : ℕ) : x - y ∣ x ^ n - y ^ n := by\n  rcases le_or_lt y x with h | h\n  · have : y ^ n ≤ x ^ n := Nat.pow_le_pow_left h _\n    exact mod_cast sub_dvd_pow_sub_pow (x : ℤ) (↑y) n\n  · have : x ^ n ≤ y ^ n := Nat.pow_le_pow_left h.le _\n    exact (Nat.sub_eq_zero_of_le this).symm ▸ dvd_zero (x - y)\n')

## eval dataset

In [None]:
PROVER_PROMPT = 'Complete the following Lean 4 code:\n\n```lean4\n'
eval_theorems = []

empty_file = 0
for file_name in tqdm(lean_file_dict.keys()):
    if (len(entry_dict[file_name]) == 0):
        empty_file += 1
        continue
    lines = lean_file_dict[file_name]
    entries = entry_dict[file_name]
    entries = sorted(entries, key = lambda x: x['start'][0])
    
    for entry in entries:
        context_str = PROVER_PROMPT + ''.join(lines[:entry['start'][0] - 1]) # prompt + all the contexte
        theorem_str = ''.join(lines[entry['start'][0] - 1: entry['end'][0]]) # the theorem is in the target
        theorem_item = (file_name, context_str, theorem_str)
        eval_theorems.append(theorem_item)
print('# Mathlib theorems:', len(eval_theorems))

  0%|          | 0/4360 [00:00<?, ?it/s]

 49%|████▉     | 2151/4360 [00:00<00:00, 2434.59it/s]

: 

In [None]:
rng = np.random.default_rng(0)
rng.shuffle(eval_theorems)
cutoff = 4096
train_theorems = [{'prompt': entry[1], 'target': entry[2]} for entry in eval_theorems[cutoff:]]
eval_theorems = [{'prompt': entry[1], 'target': entry[2]} for entry in eval_theorems[:cutoff]]

In [None]:
with open(os.path.join(Save_DIR, 'eval.json'), 'w') as f:
    json.dump(eval_theorems, f)

## mathlib + Conjecture

In [None]:
train_ds = conjecture_ds + train_theorems
rng = np.random.default_rng(0)
rng.shuffle(train_ds)
print(len(train_ds))

106921


In [None]:
with open(os.path.join(Save_DIR, 'mathlib_conjecture.json'), 'w') as f:
    json.dump(train_ds, f)