Constructing LeanDojo Benchmark (Lean 4)
===================================

This script uses [LeanDojo](https://leandojo.org/) to construct LeanDojo Benchmark 4 in the appendix of our paper:

[LeanDojo: Theorem Proving with Retrieval-Augmented Language Models](https://leandojo.org/)      
NeurIPS 2023 (Datasets and Benchmarks Track)    
[Kaiyu Yang](https://yangky11.github.io/), [Aidan Swope](https://aidanswope.com/about), [Alex Gu](https://minimario.github.io/), [Rahul Chalamala](https://rchalamala.github.io/), [Peiyang Song](https://peiyang-song.github.io/), [Shixing Yu](https://billysx.github.io/), [Saad Godil](https://www.linkedin.com/in/saad-godil-9728353/), [Ryan Prenger](https://www.linkedin.com/in/ryan-prenger-18797ba1/), [Anima Anandkumar](http://tensorlab.cms.caltech.edu/users/anima/)

The dataset is constructed from [mathlib4](https://github.com/leanprover-community/mathlib4/tree/29dcec074de168ac2bf835a77ef68bbe069194c5) (`29dcec074de168ac2bf835a77ef68bbe069194c5`) and will be saved to `../leandojo_benchmark_4`. It includes 2000 theorems for validation, 2000 theorems for testing, and the rest for training. Please refer to our paper for details. For most use cases, you shouldn't need to generate the data and can directly use our official LeanDojo Benchmark 4 downloadable [here](https://zenodo.org/doi/10.5281/zenodo.8040109).

This script is for Lean 4. We also have a [version for Lean 3](https://github.com/lean-dojo/LeanDojo/blob/main/scripts/generate-benchmark-lean3.ipynb).


In [1]:
import json
import shutil
import random
import networkx as nx
from copy import copy
from pathlib import Path
from loguru import logger
from datetime import datetime
from collections import defaultdict
from typing import Dict, List, Union

import lean_dojo
from lean_dojo import *
from lean_dojo.constants import LEAN4_PACKAGES_DIR

random.seed(3407)  # https://arxiv.org/abs/2109.08203

if 0:
    URL = "https://github.com/leanprover-community/mathlib4"
    COMMIT = "29dcec074de168ac2bf835a77ef68bbe069194c5"
    DST_DIR = Path("data/leandojo_benchmark_4")
else:
    URL = "https://github.com/ZongjingLi/Soulforge"
    COMMIT = "8fae6d99dfe7dcc0109ba552dc299bf44973660f"
    DST_DIR = Path("data/soulforge")
NUM_VAL = NUM_TEST = 2000

## Splitting the Theorems

We will split the theorems into train/val/test using two different strategies.

In [2]:
SPLIT_NAME = str  # train/val/test
SPLIT = Dict[SPLIT_NAME, List[TracedTheorem]]
SPLIT_STRATEGY = str

### Splitting Randomly

The first and the simplest strategy is splitting the theorems randomly, which can be implemented by a random shuffle followed by a sequential split.

In [3]:
def _split_sequentially(
    traced_theorems: List[TracedTheorem],
) -> SPLIT:
    """Split ``traced_theorems`` sequentially into train/val/test."""
    num_theorems = len(traced_theorems)
    num_train = num_theorems - NUM_VAL - NUM_TEST
    return {
        "train": traced_theorems[:num_train],
        "val": traced_theorems[num_train : num_train + NUM_VAL],
        "test": traced_theorems[num_train + NUM_VAL :],
    }


def split_randomly(
    traced_theorems: List[TracedTheorem],
) -> SPLIT:
    """Split ``traced_theorems`` randomly into train/val/test."""
    logger.info("Splitting the theorems randomly")
    traced_theorems = copy(traced_theorems)
    random.shuffle(traced_theorems)
    return _split_sequentially(traced_theorems)

### Splitting by Premise

The second strategy is splitting by premise. We want to test the prover's capability in using novel premises, i.e., premises that have never been used in training. Please see the implementation below. Note that validation and testing theorems may share premises. So the **testing performance should be reported using models trained on the training set only, NOT training plus validation.**

In [4]:
def split_by_premise(
    traced_theorems: List[TracedTheorem],
) -> SPLIT:
    """
    Split theorems into train/val/test so that proofs in val/test rely on at
    least one novel premise that does not appear in train.
    """
    logger.info("Splitting the theorems by premises")

    # Figure out the number of theorems in train/val/test.
    num_theorems = len(traced_theorems)
    num_val_test = NUM_VAL + NUM_TEST
    num_train = num_theorems - num_val_test
    theorems_val_test = set()

    # Map each premise to a list of theorems using it.
    theorems_by_premises = defaultdict(list)
    for t in traced_theorems:
        for p in t.get_premise_full_names():
            theorems_by_premises[p].append(t)

    # Sort the premises by the number of theorems using them (in ascending order).
    theorems_by_premises = sorted(theorems_by_premises.items(), key=lambda x: len(x[1]))

    # For each premise, put all theorems using it into val_test so that it does not appear in train.
    for _, thms in theorems_by_premises:
        if len(theorems_val_test) < num_val_test:
            theorems_val_test.update(thms)

    # All other theorems go to train.
    theorems_train = [t for t in traced_theorems if t not in theorems_val_test]
    theorems_val_test = list(theorems_val_test)
    random.shuffle(theorems_val_test)

    return {
        "train": theorems_train,
        "val": theorems_val_test[:NUM_VAL],
        "test": theorems_val_test[NUM_VAL:],
    }

Given a traced repo, we can split the theorems using these strategies.

In [5]:
def split_data(traced_repo: TracedRepo) -> Dict[SPLIT_STRATEGY, SPLIT]:
    # Skip theorems in the Lean 4 repo itself.
    traced_theorems = [
        thm for thm in traced_repo.get_traced_theorems() if not thm.repo.is_lean4
    ]
    logger.info(f"{len(traced_theorems)} theorems in total")

    return {
        "random": split_randomly(traced_theorems),
        "novel_premises": split_by_premise(traced_theorems),
    }

## Exporting the Data
Once theorems are splitted into train/val/test. We export them to JSON formats that can be easily used in machine learning.

In [6]:
def _get_file_path(traced_repo: TracedRepo, thm: TracedTheorem) -> str:
    if thm.repo == traced_repo.repo:
        # The theorem belongs to the traced repo itself.
        return str(thm.theorem.file_path)
    else:
        # The theorem belongs to one of the dependencies.
        for name, dep in traced_repo.dependencies.items():
            if dep == thm.repo:
                return f"{LEAN4_PACKAGES_DIR}/{name}/{thm.theorem.file_path}"
        raise ValueError(f"Unable to find the dependency {thm.repo}")


def export_proofs(
    splits: Dict[SPLIT_STRATEGY, SPLIT], dst_path: Path, traced_repo: TracedRepo
) -> None:
    """Export all proofs in a traced repo to ``dst_path''."""
    for strategy, split in splits.items():
        split_dir = dst_path / strategy
        split_dir.mkdir(parents=True)

        for name, theorems in split.items():
            data = []
            num_tactics = 0

            for thm in theorems:
                tactics = [
                    {
                        "tactic": t.tactic,
                        "annotated_tactic": t.get_annotated_tactic(),
                        "state_before": t.state_before,
                        "state_after": t.state_after,
                    }
                    for t in thm.get_traced_tactics()
                    if t.state_before != "no goals"
                    and "·" not in t.tactic  # Ignore "·".
                ]
                num_tactics += len(tactics)
                data.append(
                    {
                        "url": traced_repo.repo.url,
                        "commit": traced_repo.repo.commit,
                        "file_path": _get_file_path(traced_repo, thm),
                        "full_name": thm.theorem.full_name,
                        "start": list(thm.start),
                        "end": list(thm.end),
                        "traced_tactics": tactics,
                    }
                )
            oup_path = split_dir / f"{name}.json"
            json.dump(data, oup_path.open("wt"))
            logger.info(
                f"{len(theorems)} theorems and {num_tactics} tactics saved to {oup_path}"
            )


def export_premises(traced_repo: TracedRepo, dst_path: Path) -> None:
    """Export all premise definitions in a traced repo to ``dst_path``."""
    oup_path = dst_path / "corpus.jsonl"
    num_premises = 0

    with oup_path.open("wt") as oup:
        G = traced_repo.traced_files_graph

        for tf_node in reversed(list(nx.topological_sort(G))):
            tf = G.nodes[tf_node]["traced_file"]
            imports = [str(_) for _ in G.successors(tf_node)]
            premises = tf.get_premise_definitions()
            num_premises += len(premises)
            oup.write(
                json.dumps(
                    {"path": str(tf.path), "imports": imports, "premises": premises}
                )
                + "\n"
            )
    logger.info(
        f"{num_premises} theorems/definitions from {len(traced_repo.traced_files)} files saved to {oup_path}"
    )


def export_licenses(traced_repo: TracedRepo, dst_path: Path) -> None:
    """Export the licenses of a traced repo and all its dependencies to ``dst_path``."""
    license_dir = dst_path / "licenses"
    license_dir.mkdir()
    all_repos = [traced_repo.repo] + list(traced_repo.dependencies.values())

    for repo in all_repos:
        lic = repo.get_license()
        if lic is None:
            continue
        with (license_dir / repo.name).open("wt") as oup:
            oup.write(lic)

    with (license_dir / "README.md").open("wt") as oup:
        oup.write(
            "This directory contains licenses of Lean repos used to generate this dataset. The dataset itself is released under [CC BY 2.0](https://creativecommons.org/licenses/by/2.0/)."
        )


def export_metadata(traced_repo: TracedRepo, dst_path: Path, **kwargs) -> None:
    """Export the metadata of a traced repo to ``dst_path''."""
    metadata = dict(kwargs)
    metadata["creation_time"] = str(datetime.now())
    metadata["from_repo"] = {
        "url": traced_repo.repo.url,
        "commit": traced_repo.repo.commit,
    }
    metadata["leandojo_version"] = lean_dojo.__version__
    json.dump(metadata, (dst_path / "metadata.json").open("wt"))


def export_data(
    traced_repo: TracedRepo,
    splits: Dict[SPLIT_STRATEGY, SPLIT],
    dst_path: Union[str, Path],
    **kwargs,
) -> None:
    """Export a traced repo whose theorems have been splitted to ``dst_path``."""
    if isinstance(dst_path, str):
        dst_path = Path(dst_path)
    if dst_path.exists():
        logger.warning(f"{dst_path} already exists. Removing it now.")
        shutil.rmtree(dst_path)

    # Export the proofs.
    export_proofs(splits, dst_path, traced_repo)

    # Export the premises (theorems, definitions, etc.).
    export_premises(traced_repo, dst_path)

    # Export the licenses.
    export_licenses(traced_repo, dst_path)

    # Export metadata.
    export_metadata(traced_repo, dst_path, **kwargs)

In [7]:
repo = LeanGitRepo(URL, COMMIT)
traced_repo = trace(repo)
splits = split_data(traced_repo)
export_data(traced_repo, splits, DST_DIR, dataset_name="LeanDojo Benchmark 4")

[32m2025-03-11 11:39:27.795[0m | [1mINFO    [0m | [36mlean_dojo.data_extraction.trace[0m:[36mget_traced_repo_path[0m:[36m210[0m - [1mTracing LeanGitRepo(url='https://github.com/ZongjingLi/Soulforge', commit='8fae6d99dfe7dcc0109ba552dc299bf44973660f')[0m


✔ [2/8] Built Soulforge.Basic
✔ [3/8] Built Soulforge
ℹ [4/8] Built Main
info: ././././Main.lean:20:0: "Finished Building Malkyriss"
✔ [5/8] Built Main:c.o
✔ [6/8] Built Soulforge.Basic:c.o
✔ [7/8] Built Soulforge:c.o
✔ [8/8] Built soulforge
Build completed successfully.


 95%|█████████▌| 943/988 [03:50<00:09,  4.69it/s]

✔ [2/3] Built Lean4Repl
Build completed successfully.


2025-03-11 11:43:44,181	INFO worker.py:1832 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m
100%|██████████| 988/988 [00:25<00:00, 38.03it/s] 
2025-03-11 11:44:15,583	INFO worker.py:1832 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m
100%|██████████| 988/988 [00:06<00:00, 153.53it/s]
[32m2025-03-11 11:44:32.937[0m | [1mINFO    [0m | [36mlean_dojo.data_extraction.trace[0m:[36mtrace[0m:[36m248[0m - [1mLoading the traced repo from /Users/sunyiqi/.cache/lean_dojo/ZongjingLi-Soulforge-8fae6d99dfe7dcc0109ba552dc299bf44973660f/Soulforge[0m
2025-03-11 11:44:33,837	INFO worker.py:1832 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m
100%|██████████| 988/988 [00:39<00:00, 25.16it/s] 
[32m2025-03-11 11:45:17.839[0m | [1mINFO    [0m | [36m__main__[0m:[36msplit_data[0m:[36m6[0m - [1m4 theorems in total[0m
[32m2025-03-11 11:45:17.839[0m | [1mINFO   

## Data Format

This is the resulting data directory:

```
├─corpus.jsonl
├─metadata.json
├─licenses
│ ├─lean4
│ ├─mathlib4
│ ├─doc-gen4
│ ├─aesop
│ ├─ProofWidgets4
│ ├─std4
│ └─README.md
├─random
│ ├─train.json
│ ├─val.json
│ └─test.json
└─novel_premises
  ├─train.json
  ├─val.json
  └─test.json
```

`corpus.jsonl` is a corpus of all theorems and definitions in mathlib4 that can potentially be used as premises. Sub-directories `random` and `novel_premise` are different strategies for splitting the theorems. For each strategy, we have `*.json` files for train/val/test. The sub-directory `licenses` contains license information.

### Corpus of Potential Premises

`corpus.jsonl` is in [JSON Lines format](https://jsonlines.org/); a line includes the potential premises defined in a single `*.lean` file.

In [8]:
!cat ../leandojo_benchmark_/corpus.jsonl | wc -l

cat: ../leandojo_benchmark_/corpus.jsonl: No such file or directory
       0


Let's look at one of them.

In [10]:
corpus_path = DST_DIR / "corpus.jsonl"
lines = list(corpus_path.open())
print(len(lines))
file_in_corpus = json.loads(lines[1])
file_in_corpus.keys()

988


dict_keys(['path', 'imports', 'premises'])

We can check the file's path and other files it imports.

In [11]:
file_in_corpus["path"], file_in_corpus["imports"]

('.lake/packages/lean4/src/lean/Init/Coe.lean',
 ['.lake/packages/lean4/src/lean/Init/Prelude.lean'])

In [12]:
len(file_in_corpus["premises"])


19

In [13]:
for premise in file_in_corpus["premises"]:
    print(premise["full_name"])
    if "zero_mem" == premise["full_name"]:
        print(premise["full_name"])



Coe
CoeTC
CoeOut
CoeOTC
CoeHead
CoeHTC
CoeTail
CoeHTCT
CoeDep
CoeT
CoeFun
CoeSort
boolToProp
boolToSort
decPropToBool
optionCoe
subtypeCoe
Lean.Internal.liftCoeM
Lean.Internal.coeM


We can inspect the first potential premise:

In [14]:
file_in_corpus["premises"][0]

{'full_name': 'Coe',
 'code': 'class Coe (α : semiOutParam (Sort u)) (β : Sort v) where\n  \n  coe : α → β',
 'start': [120, 1],
 'end': [129, 14],
 'kind': 'commanddeclaration'}

Each premise has a fully qualified name, its definition (in the form of Lean code), and the exact location it is defined.


### Theorems/Proofs Data

Now let's take a look at the theorems/proofs data, taking the `random` split as an example.

In [17]:
train_path = DST_DIR / "random/train.json"
proofs_train = json.load(train_path.open())
len(proofs_train)

0

Each element in `proofs_val` represents a theorem. Let's check one of them.

In [16]:
for proof in proofs_train[:]:
    if proof["traced_tactics"] != []:
        break
proof.keys()

NameError: name 'proof' is not defined

In [18]:
proof["url"], proof["commit"], proof["file_path"], proof["full_name"]

NameError: name 'proof' is not defined

We see the theorem's name and where it is defined. The theorem includes some traced tactics.

In [None]:
len(proof["traced_tactics"])

1

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

class MathlibDataset(Dataset):
    def __init__(self, theorems_file):
        super().__init__()
        self.proofs = json.load(theorems_file.open())
        #for proof in json.load(theorems_file.open()):
        #    if proof["traced_tactics"] != []:
        #        self.proofs.append(proof)
        # TODO: when use the train data, this becomes extremely long process, just.. weird

    def __len__(self):return len(self.proofs)

    def __getitem__(self, idx):
        """ output a data bind of tactics of a proof
        name : the full name of the proof
        tactics : a List of Diction each term is a dict
        """
        proof = self.proofs[idx]
        tactics = proof["traced_tactics"]
        return {
            "proof" : proof,
            "name" : proof["full_name"],
            "tactics" : tactics,
            "init" : tactics[0]["state_before"]
        }

    def proof_finished(self, tactic_state):
        """check if a given tactic action leads to end of the proof"""
        return tactic_state["state_after"] == "no goals"

train_path = DST_DIR / "random/train.json"

dataset = MathlibDataset(train_path)
print(len(dataset))

print(dataset[6]["init"])

118517
C : Type u
inst✝¹ : Category.{v, u} C
X Y Z : Scheme
𝒰 : X.OpenCover
f : X ⟶ Z
g : Y ⟶ Z
inst✝ : ∀ (i : 𝒰.J), HasPullback (𝒰.map i ≫ f) g
s : PullbackCone f g
i : 𝒰.J
⊢ (pullbackP1Iso 𝒰 f g i).inv ≫ pullback.fst = (gluing 𝒰 f g).ι i


In [None]:
print(proof["full_name"])
print(proof["file_path"])
print(proof["traced_tactics"][0]["state_before"])
for tactics in proof["traced_tactics"]:
    print(tactics["tactic"])



AlgebraicGeometry.Scheme.Pullback.pullbackP1Iso_inv_snd
Mathlib/AlgebraicGeometry/Pullbacks.lean
C : Type u
inst✝¹ : Category.{v, u} C
X Y Z : Scheme
𝒰 : X.OpenCover
f : X ⟶ Z
g : Y ⟶ Z
inst✝ : ∀ (i : 𝒰.J), HasPullback (𝒰.map i ≫ f) g
s : PullbackCone f g
i : 𝒰.J
⊢ (pullbackP1Iso 𝒰 f g i).inv ≫ pullback.snd = pullback.fst
simp_rw [pullbackP1Iso, pullback.lift_snd]


In [None]:
def proof_finished(tactic_state): return tactic_state["state_after"] == "no goals"

tactics = proof["traced_tactics"][0]

[print(line) for line in tactics["state_before"].split("\n")]
print("\nby{}\n".format(tactics["tactic"]))
print(tactics["state_after"])
print(proof_finished(tactics))


C : Type u
inst✝¹ : Category.{v, u} C
X Y Z : Scheme
𝒰 : X.OpenCover
f : X ⟶ Z
g : Y ⟶ Z
inst✝ : ∀ (i : 𝒰.J), HasPullback (𝒰.map i ≫ f) g
s : PullbackCone f g
i j : 𝒰.J
⊢ pullbackFstιToV 𝒰 f g i j ≫ pullback.fst = pullback.snd

bysimp [pullbackFstιToV, p1]

no goals
True


Let's look at a traced tactic.

In [None]:
proof["traced_tactics"][1]

{'tactic': 'exact zero_mem _',
 'annotated_tactic': ['exact <a>zero_mem</a> _',
  [{'full_name': 'ZeroMemClass.zero_mem',
    'def_path': 'Mathlib/Algebra/Group/Submonoid/Basic.lean',
    'def_pos': [80, 3],
    'def_end_pos': [80, 11]}]],
 'state_before': 'R : Type u\nS : Type v\nA : Type w\nB : Type u₁\nM : Type v₁\ninst✝⁶ : Semiring R\ninst✝⁵ : Semiring S\ninst✝⁴ : AddCommMonoid A\ninst✝³ : Module R S\ninst✝² : Module S A\ninst✝¹ : Module R A\ninst✝ : IsScalarTower R S A\ns : Set S\nt : Set A\nk : S\nhks : k ∈ span R s\nx : A\nhx : x ∈ t\n⊢ 0 ∈ span R (s • t)',
 'state_after': 'no goals'}

`annotated_tactic` is the tactic with premises annotated by `<a> ... </a>`. For each premise, we know its fully qualified name and the exact location it is defined, which is invaluable for training machine learning models for premise selection.