Constructing LeanDojo Benchmark (Lean 4)
===================================

This script uses [LeanDojo](https://leandojo.org/) to construct LeanDojo Benchmark 4 in the appendix of our paper:

[LeanDojo: Theorem Proving with Retrieval-Augmented Language Models](https://leandojo.org/)      
NeurIPS 2023 (Datasets and Benchmarks Track)    
[Kaiyu Yang](https://yangky11.github.io/), [Aidan Swope](https://aidanswope.com/about), [Alex Gu](https://minimario.github.io/), [Rahul Chalamala](https://rchalamala.github.io/), [Peiyang Song](https://peiyang-song.github.io/), [Shixing Yu](https://billysx.github.io/), [Saad Godil](https://www.linkedin.com/in/saad-godil-9728353/), [Ryan Prenger](https://www.linkedin.com/in/ryan-prenger-18797ba1/), [Anima Anandkumar](http://tensorlab.cms.caltech.edu/users/anima/)

The dataset is constructed from [mathlib4](https://github.com/leanprover-community/mathlib4/tree/3c307701fa7e9acbdc0680d7f3b9c9fed9081740) (`3c307701fa7e9acbdc0680d7f3b9c9fed9081740`) and will be saved to `../leandojo_benchmark_4`. It includes 2000 theorems for validation, 2000 theorems for testing, and the rest for training. Please refer to our paper for details. For most use cases, you shouldn't need to generate the data and can directly use our official LeanDojo Benchmark 4 downloadable [here](https://zenodo.org/doi/10.5281/zenodo.8040109).

This script is for Lean 4. We also have a [version for Lean 3](https://github.com/lean-dojo/LeanDojo/blob/main/scripts/generate-benchmark-lean3.ipynb).


In [1]:
import json
import shutil
import random
import networkx as nx
from copy import copy
from pathlib import Path
from loguru import logger
from datetime import datetime
from collections import defaultdict
from typing import Dict, List, Union

import lean_dojo
from lean_dojo import *
from lean_dojo.constants import LEAN4_PACKAGES_DIR

random.seed(3407)  # https://arxiv.org/abs/2109.08203

URL = "https://github.com/leanprover-community/mathlib4"
COMMIT = "3c307701fa7e9acbdc0680d7f3b9c9fed9081740"
DST_DIR = Path("../leandojo_benchmark_4")
NUM_VAL = NUM_TEST = 2000

## Splitting the Theorems

We will split the theorems into train/val/test using two different strategies.

In [2]:
SPLIT_NAME = str  # train/val/test
SPLIT = Dict[SPLIT_NAME, List[TracedTheorem]]
SPLIT_STRATEGY = str

### Splitting Randomly

The first and the simplest strategy is splitting the theorems randomly, which can be implemented by a random shuffle followed by a sequential split.

In [3]:
def _split_sequentially(
    traced_theorems: List[TracedTheorem],
) -> SPLIT:
    """Split ``traced_theorems`` sequentially into train/val/test."""
    num_theorems = len(traced_theorems)
    num_train = num_theorems - NUM_VAL - NUM_TEST
    return {
        "train": traced_theorems[:num_train],
        "val": traced_theorems[num_train : num_train + NUM_VAL],
        "test": traced_theorems[num_train + NUM_VAL :],
    }


def split_randomly(
    traced_theorems: List[TracedTheorem],
) -> SPLIT:
    """Split ``traced_theorems`` randomly into train/val/test."""
    logger.info("Splitting the theorems randomly")
    traced_theorems = copy(traced_theorems)
    random.shuffle(traced_theorems)
    return _split_sequentially(traced_theorems)

### Splitting by Premise

The second strategy is splitting by premise. We want to test the prover's capability in using novel premises, i.e., premises that have never been used in training. Please see the implementation below. Note that validation and testing theorems may share premises. So the **testing performance should be reported using models trained on the training set only, NOT training plus validation.**

In [4]:
def split_by_premise(
    traced_theorems: List[TracedTheorem],
) -> SPLIT:
    """
    Split theorems into train/val/test so that proofs in val/test rely on at
    least one novel premise that does not appear in train.
    """
    logger.info("Splitting the theorems by premises")

    # Figure out the number of theorems in train/val/test.
    num_theorems = len(traced_theorems)
    num_val_test = NUM_VAL + NUM_TEST
    num_train = num_theorems - num_val_test
    theorems_val_test = set()

    # Map each premise to a list of theorems using it.
    theorems_by_premises = defaultdict(list)
    for t in traced_theorems:
        for p in t.get_premise_full_names():
            theorems_by_premises[p].append(t)

    # Sort the premises by the number of theorems using them (in ascending order).
    theorems_by_premises = sorted(theorems_by_premises.items(), key=lambda x: len(x[1]))

    # For each premise, put all theorems using it into val_test so that it does not appear in train.
    for _, thms in theorems_by_premises:
        if len(theorems_val_test) < num_val_test:
            theorems_val_test.update(thms)

    # All other theorems go to train.
    theorems_train = [t for t in traced_theorems if t not in theorems_val_test]
    theorems_val_test = list(theorems_val_test)
    random.shuffle(theorems_val_test)

    return {
        "train": theorems_train,
        "val": theorems_val_test[:NUM_VAL],
        "test": theorems_val_test[NUM_VAL:],
    }

Given a traced repo, we can split the theorems using these strategies.

In [5]:
def split_data(traced_repo: TracedRepo) -> Dict[SPLIT_STRATEGY, SPLIT]:
    # Skip theorems in the Lean 4 repo itself.
    traced_theorems = [
        thm for thm in traced_repo.get_traced_theorems() if not thm.repo.is_lean4
    ]
    logger.info(f"{len(traced_theorems)} theorems in total")

    return {
        "random": split_randomly(traced_theorems),
        "novel_premises": split_by_premise(traced_theorems),
    }

## Exporting the Data
Once theorems are splitted into train/val/test. We export them to JSON formats that can be easily used in machine learning.

In [6]:
def _get_file_path(traced_repo: TracedRepo, thm: TracedTheorem) -> str:
    if thm.repo == traced_repo.repo:
        # The theorem belongs to the traced repo itself.
        return str(thm.theorem.file_path)
    else:
        # The theorem belongs to one of the dependencies.
        for name, dep in traced_repo.dependencies.items():
            if dep == thm.repo:
                return f"{LEAN4_PACKAGES_DIR}/{name}/{thm.theorem.file_path}"
        raise ValueError(f"Unable to find the dependency {thm.repo}")


def export_proofs(splits: Dict[SPLIT_STRATEGY, SPLIT], dst_path: Path, traced_repo: TracedRepo) -> None:
    """Export all proofs in a traced repo to ``dst_path''."""
    for strategy, split in splits.items():
        split_dir = dst_path / strategy
        split_dir.mkdir(parents=True)

        for name, theorems in split.items():
            data = []
            num_tactics = 0

            for thm in theorems:
                tactics = [
                    {
                        "tactic": t.tactic,
                        "annotated_tactic": t.get_annotated_tactic(),
                        "state_before": t.state_before,
                        "state_after": t.state_after,
                    }
                    for t in thm.get_traced_tactics()
                    if t.state_before != "no goals"
                    and "·" not in t.tactic  # Ignore "·".
                ]
                num_tactics += len(tactics)
                data.append(
                    {
                        "url": thm.repo.url,
                        "commit": thm.repo.commit,
                        "file_path":  _get_file_path(traced_repo, thm),
                        "full_name": thm.theorem.full_name,
                        "start": list(thm.start),
                        "end": list(thm.end),
                        "traced_tactics": tactics,
                    }
                )
            oup_path = split_dir / f"{name}.json"
            json.dump(data, oup_path.open("wt"))
            logger.info(
                f"{len(theorems)} theorems and {num_tactics} tactics saved to {oup_path}"
            )


def export_premises(traced_repo: TracedRepo, dst_path: Path) -> None:
    """Export all premise definitions in a traced repo to ``dst_path``."""
    oup_path = dst_path / "corpus.jsonl"
    num_premises = 0

    with oup_path.open("wt") as oup:
        G = traced_repo.traced_files_graph

        for tf_node in reversed(list(nx.topological_sort(G))):
            tf = G.nodes[tf_node]["traced_file"]
            imports = [str(_) for _ in G.successors(tf_node)]
            premises = tf.get_premise_definitions()
            num_premises += len(premises)
            oup.write(
                json.dumps(
                    {"path": str(tf.path), "imports": imports, "premises": premises}
                )
                + "\n"
            )
    logger.info(
        f"{num_premises} theorems/definitions from {len(traced_repo.traced_files)} files saved to {oup_path}"
    )


def export_licenses(traced_repo: TracedRepo, dst_path: Path) -> None:
    """Export the licenses of a traced repo and all its dependencies to ``dst_path``."""
    license_dir = dst_path / "licenses"
    license_dir.mkdir()
    all_repos = [traced_repo.repo] + list(traced_repo.dependencies.values())

    for repo in all_repos:
        lic = repo.get_license()
        if lic is None:
            continue
        with (license_dir / repo.name).open("wt") as oup:
            oup.write(lic)

    with (license_dir / "README.md").open("wt") as oup:
        oup.write(
            "This directory contains licenses of Lean repos used to generate this dataset. The dataset itself is released under [CC BY 2.0](https://creativecommons.org/licenses/by/2.0/)."
        )


def export_metadata(traced_repo: TracedRepo, dst_path: Path, **kwargs) -> None:
    """Export the metadata of a traced repo to ``dst_path''."""
    metadata = dict(kwargs)
    metadata["creation_time"] = str(datetime.now())
    metadata["from_repo"] = {
        "url": traced_repo.repo.url,
        "commit": traced_repo.repo.commit,
    }
    metadata["leandojo_version"] = lean_dojo.__version__
    json.dump(metadata, (dst_path / "metadata.json").open("wt"))


def export_data(
    traced_repo: TracedRepo,
    splits: Dict[SPLIT_STRATEGY, SPLIT],
    dst_path: Union[str, Path],
    **kwargs,
) -> None:
    """Export a traced repo whose theorems have been splitted to ``dst_path``."""
    if isinstance(dst_path, str):
        dst_path = Path(dst_path)
    if dst_path.exists():
        logger.warning(f"{dst_path} already exists. Removing it now.")
        shutil.rmtree(dst_path)

    # Export the proofs.
    export_proofs(splits, dst_path, traced_repo)

    # Export the premises (theorems, definitions, etc.).
    export_premises(traced_repo, dst_path)

    # Export the licenses.
    export_licenses(traced_repo, dst_path)

    # Export metadata.
    export_metadata(traced_repo, dst_path, **kwargs)

In [7]:
repo = LeanGitRepo(URL, COMMIT)
traced_repo = trace(repo)
splits = split_data(traced_repo)
export_data(traced_repo, splits, DST_DIR, dataset_name="LeanDojo Benchmark 4")

[32m2024-03-13 19:47:41.611[0m | [1mINFO    [0m | [36mlean_dojo.data_extraction.trace[0m:[36mtrace[0m:[36m116[0m - [1mLoading the traced repo from /Users/yangky/.cache/lean_dojo/leanprover-community-mathlib4-3c307701fa7e9acbdc0680d7f3b9c9fed9081740/mathlib4[0m
2024-03-13 19:47:45,594	INFO worker.py:1715 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m
100%|██████████| 5042/5042 [33:51<00:00,  2.48it/s]   
Following Github server redirection from /repos/mhuisi/lean4-cli to /repositories/341363356
[32m2024-03-13 20:27:56.583[0m | [1mINFO    [0m | [36m__main__[0m:[36msplit_data[0m:[36m6[0m - [1m106446 theorems in total[0m
[32m2024-03-13 20:27:56.636[0m | [1mINFO    [0m | [36m__main__[0m:[36msplit_randomly[0m:[36m18[0m - [1mSplitting the theorems randomly[0m
[32m2024-03-13 20:27:56.682[0m | [1mINFO    [0m | [36m__main__[0m:[36msplit_by_premise[0m:[36m8[0m - [1mSplitting the theorems by premises[0m
[32

## Data Format

This is the resulting data directory:

```
├─corpus.jsonl
├─metadata.json
├─licenses
│ ├─lean4
│ ├─mathlib4
│ ├─doc-gen4
│ ├─aesop
│ ├─ProofWidgets4
│ ├─std4
│ └─README.md
├─random
│ ├─train.json
│ ├─val.json
│ └─test.json
└─novel_premises
  ├─train.json
  ├─val.json
  └─test.json
```

`corpus.jsonl` is a corpus of all theorems and definitions in mathlib4 that can potentially be used as premises. Sub-directories `random` and `novel_premise` are different strategies for splitting the theorems. For each strategy, we have `*.json` files for train/val/test. The sub-directory `licenses` contains license information.

### Corpus of Potential Premises

`corpus.jsonl` is in [JSON Lines format](https://jsonlines.org/); a line includes the potential premises defined in a single `*.lean` file.

In [8]:
!cat ../leandojo_benchmark_4/corpus.jsonl | wc -l

    5042


python(38298) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.


Let's look at one of them.

In [9]:
corpus_path = DST_DIR / "corpus.jsonl"
lines = list(corpus_path.open())
file_in_corpus = json.loads(lines[2000])
file_in_corpus.keys()

dict_keys(['path', 'imports', 'premises'])

We can check the file's path and other files it imports.

In [10]:
file_in_corpus["path"], file_in_corpus["imports"]

('Mathlib/Data/MvPolynomial/Equiv.lean',
 ['Mathlib/Data/MvPolynomial/Variables.lean',
  'Mathlib/Data/Polynomial/AlgebraMap.lean',
  'Mathlib/Data/MvPolynomial/Rename.lean',
  'Mathlib/Logic/Equiv/Fin.lean',
  'Mathlib/Algebra/BigOperators/Fin.lean',
  'Mathlib/Data/Finsupp/Fin.lean',
  '.lake/packages/lean4/src/lean/Init.lean'])

In [11]:
len(file_in_corpus["premises"])

40

We can inspect the first potential premise:

In [12]:
file_in_corpus["premises"][0]

{'full_name': 'MvPolynomial.pUnitAlgEquiv',
 'code': "@[simps]\ndef pUnitAlgEquiv : MvPolynomial PUnit R ≃ₐ[R] R[X] where\n  toFun := eval₂ Polynomial.C fun _ => Polynomial.X\n  invFun := Polynomial.eval₂ MvPolynomial.C (X PUnit.unit)\n  left_inv := by\n    let f : R[X] →+* MvPolynomial PUnit R := Polynomial.eval₂RingHom MvPolynomial.C (X PUnit.unit)\n    let g : MvPolynomial PUnit R →+* R[X] := eval₂Hom Polynomial.C fun _ => Polynomial.X\n    show ∀ p, f.comp g p = p\n    apply is_id\n    · ext a\n      dsimp\n      rw [eval₂_C, Polynomial.eval₂_C]\n    · rintro ⟨⟩\n      dsimp\n      rw [eval₂_X, Polynomial.eval₂_X]\n  right_inv p :=\n    Polynomial.induction_on p (fun a => by rw [Polynomial.eval₂_C, MvPolynomial.eval₂_C])\n    (fun p q hp hq => by rw [Polynomial.eval₂_add, MvPolynomial.eval₂_add, hp, hq]) fun p n _ => by\n      rw [Polynomial.eval₂_mul, Polynomial.eval₂_pow, Polynomial.eval₂_X, Polynomial.eval₂_C,\n        eval₂_mul, eval₂_C, eval₂_pow, eval₂_X]\n  map_mul' _ _ := e

Each premise has a fully qualified name, its definition (in the form of Lean code), and the exact location it is defined.


### Theorems/Proofs Data

Now let's take a look at the theorems/proofs data, taking the `random` split as an example.

In [13]:
train_path = DST_DIR / "random/train.json"
proofs_train = json.load(train_path.open())
len(proofs_train)

102446

Each element in `proofs_val` represents a theorem. Let's check one of them.

In [15]:
for proof in proofs_train:
    if proof["traced_tactics"] != []:
        break
proof.keys()

dict_keys(['url', 'commit', 'file_path', 'full_name', 'start', 'end', 'traced_tactics'])

In [16]:
proof["url"], proof["commit"], proof["file_path"], proof["full_name"]

('https://github.com/leanprover-community/mathlib4',
 '3c307701fa7e9acbdc0680d7f3b9c9fed9081740',
 'Mathlib/CategoryTheory/Endofunctor/Algebra.lean',
 'CategoryTheory.Endofunctor.Algebra.Initial.right_inv')

We see the theorem's name and where it is defined. The thereom includes some traced tactics.

In [17]:
len(proof["traced_tactics"])

3

Let's look at a traced tactic.

In [18]:
proof["traced_tactics"][0]

{'tactic': 'rw [strInv, ← (h.to ⟨F.obj A.1, F.map A.str⟩).h, ← F.map_id, ← F.map_comp]',
 'annotated_tactic': ['rw [<a>strInv</a>, ← (h.to ⟨F.obj A.1, F.map A.str⟩).<a>h</a>, ← F.map_id, ← F.map_comp]',
  [{'full_name': 'CategoryTheory.Endofunctor.Algebra.Initial.strInv',
    'def_path': 'Mathlib/CategoryTheory/Endofunctor/Algebra.lean',
    'def_pos': [228, 5],
    'def_end_pos': [228, 11]},
   {'full_name': 'CategoryTheory.Endofunctor.Algebra.Hom.h',
    'def_path': 'Mathlib/CategoryTheory/Endofunctor/Algebra.lean',
    'def_pos': [68, 3],
    'def_end_pos': [68, 4]}]],
 'state_before': 'C : Type u\ninst✝ : Category.{v, u} C\nF : C ⥤ C\nA A₀ A₁ A₂ : Algebra F\nf : A₀ ⟶ A₁\ng : A₁ ⟶ A₂\nh : Limits.IsInitial A\n⊢ A.str ≫ strInv h = 𝟙 (F.toPrefunctor.obj A.a)',
 'state_after': 'C : Type u\ninst✝ : Category.{v, u} C\nF : C ⥤ C\nA A₀ A₁ A₂ : Algebra F\nf : A₀ ⟶ A₁\ng : A₁ ⟶ A₂\nh : Limits.IsInitial A\n⊢ F.toPrefunctor.map\n      ((Limits.IsInitial.to h { a := F.toPrefunctor.obj A.a, str :

`annotated_tactic` is the tactic with premises annotated by `<a> ... </a>`. For each premise, we know its fully qualified name and the exact location it is defined, which is invaluable for training machine learning models for premise selection.