In [None]:
!pip install datasets transformers[sentencepiece]

In [2]:
import torch
from torch.nn import CrossEntropyLoss
from torch.utils.data.dataloader import DataLoader
from datasets import load_dataset, Dataset

from transformers import AutoTokenizer, GPT2LMHeadModel, pipeline, AutoModel, AutoConfig

import json
import time
from google.colab import drive
from google.colab import runtime

In [3]:
# HYPERPARAMS AND GLOBAL VARS
do_generate_proofs             = True     # set to True to generate proofs
sequence_length                = 1024     # input sequence length of the model
max_new_tokens                 = 256      # only needed if do_generate_proofs == True
batch_size                     = 2        # only needed if do_generate_proofs == True
proofs_per_theorem             = 50       # only needed if do_generate_proofs == True
temperature                    = 1        # only needed if do_generate_proofs == True
do_sample                      = True     # only needed if do_generate_proofs == True
top_p                          = 0.95     # only needed if do_generate_proofs == True

model_repo_name                = "Andrusyshyn/gpt2-pretrained-for-coq-pt-custom-train"
model_commit_hash              = "32c2695d0f5f0b6117529f2eaa7f240b95cc42eb"

drive_mounted                  = True
drive_mounted_path             = "/content/gdrive/"    # only needed if drive_mounted == True
theorems_input_file            = drive_mounted_path + "My Drive/UCU/diploma/theorems/test_theorems_comp.json"
theorems_output_file           = drive_mounted_path + "My Drive/UCU/diploma/theorems/generated_comp_n06_k50.json"

do_test_loss                   = False    # set ot True to calculate model loss on the test dataset
test_batch_size                = 4        # only needed if do_test_loss == True
test_data_archived             = True     # set to True if you need to extract the data from archive
raw_test_archive               = drive_mounted_path + "My Drive/UCU/diploma/datasets/dataset_test.zip"    # only needed if data_archived == True
raw_test_json                  = "./dataset_test.json"    # path to the test dataset (after extracting it will be in the current working directory)

torch_seed                     = 77
torch.manual_seed(torch_seed)

<torch._C.Generator at 0x7ca2a04e27d0>

In [4]:
# CUSTOM LOSS FUNCTION
def loss_function(inputs, logits):
    shifted_labels = inputs[..., 1:].contiguous()
    shifted_logits = logits[..., :-1, :].contiguous()
    loss_func = CrossEntropyLoss(reduction='none')
    loss = loss_func(shifted_logits.view(-1, shifted_logits.size(-1)), shifted_labels.view(-1))
    loss_per_sequence = loss.view(shifted_logits.size(0), shifted_logits.size(1)).mean(axis=1)
    return loss_per_sequence.mean()

In [5]:
# LOSS EVALUATION FUNCTION
def test_loss(p_model, p_test_dataloader, p_device):
    if torch.cuda.is_available(): torch.cuda.empty_cache()

    p_model.eval()
    losses = []
    with torch.no_grad():
        for step, batch in enumerate(p_test_dataloader):
            with torch.no_grad():
                input_ids = batch["input_ids"].to(p_device)
                logits = p_model(input_ids).logits
                loss = loss_function(input_ids, logits)
                losses.append(loss.item())
    loss = torch.mean(torch.Tensor(losses))
    try:
        perplexity = torch.exp(loss)
    except OverflowError:
        perplexity = float("inf")

    if torch.cuda.is_available(): torch.cuda.empty_cache()
    return loss.item(), perplexity.item()

In [6]:
# TOKENIZE RAW DATASET
def get_tokenized_dataset(p_raw_dataset, p_context_length, p_tokenizer):
    concatenated_tokenized_samples = []
    for sample in p_raw_dataset:
        tokenized_sample = p_tokenizer(sample["content"], truncation=False)["input_ids"]
        concatenated_tokenized_samples.extend(tokenized_sample + [p_tokenizer.eos_token_id])

    tokenized_dataset_list = []
    for i in range(0, len(concatenated_tokenized_samples), p_context_length):
        input_ids = concatenated_tokenized_samples[i : i + p_context_length]
        if len(input_ids) == p_context_length:
            tokenized_dataset_list.append(torch.tensor(input_ids))

    return Dataset.from_dict({"input_ids": tokenized_dataset_list})

In [7]:
# EXTRACT THEOREM STATEMENT FROM WHOLE PROOF
def extract_theorem_statement(theorem_with_proof):
    pos = theorem_with_proof.find("\nProof.")
    if pos != -1:
        return theorem_with_proof[:pos]

    print("THEOREM PROOF DOES NOT START WITH 'Proof.'")
    return theorem_with_proof

In [8]:
# TRUNCATE PROOF TILL STOP WORD
def truncate_on_Qed(generated_proof: str):
    qed_stop = "Qed."
    defined_stop = "Defined."
    save_stop = "Save"

    pos_qed = generated_proof.find(qed_stop)
    pos_defined = generated_proof.find(defined_stop)
    pos_save = generated_proof.find(save_stop)

    poses_stops = []
    if (pos_qed != -1):
        poses_stops.append(pos_qed)
    if (pos_defined != -1):
        poses_stops.append(pos_defined)
    if (pos_save != -1):
        poses_stops.append(pos_save)
    if (poses_stops == []):
        return (generated_proof, False)

    return (generated_proof[:min(poses_stops)], True)

In [9]:
# PROOF GENERATION
def generate_proofs(input_file, output_file, p_pipe, num_proofs):
    cuda_available = torch.cuda.is_available()
    new_json_data = None
    with open(input_file, mode='r') as json_input:
        new_json_data = json.load(json_input)

    theorems_processed = 0
    proofs_with_end = 0
    for project in new_json_data["projects"].keys():
        for i in range(0, len(new_json_data["projects"][project]), batch_size):
            theorems = new_json_data["projects"][project][i:i+batch_size]
            input_sequences = []
            theorem_declarations = []
            for theorem in theorems:
                theorem_declaration = extract_theorem_statement(theorem["proof"])
                theorem_declarations.append(theorem_declaration)
                input_sequence = theorem["context"] + theorem_declaration
                input_sequences.append(input_sequence)

            generated_texts = p_pipe(input_sequences, num_return_sequences=num_proofs,
                                     max_new_tokens=max_new_tokens,
                                     return_full_text=False,
                                     do_sample=True, top_p=top_p, temperature=temperature)
            if cuda_available:
                torch.cuda.empty_cache()

            ind = 0
            for generated_text in generated_texts:
                generated_proofs = []
                for proof in generated_text:
                    proof_with_no_context = theorem_declarations[ind] + proof['generated_text']
                    truncated_proof, found_end = truncate_on_Qed(proof_with_no_context)
                    if found_end:
                        proofs_with_end += 1
                    generated_proofs.append(truncated_proof + theorems[ind]["end_command"])
                new_json_data["projects"][project][ind+i]["generated_proofs"] = generated_proofs
                theorems_processed += 1
                ind += 1

            if theorems_processed % 10 == 0:
                print(theorems_processed)
                with open(output_file, mode='w') as json_output:
                    json.dump(new_json_data, json_output, indent=4)

    new_json_data["hyperparams"] = {
        "sequence_length": sequence_length,
        "max_new_tokens": max_new_tokens,
        "batch_size": batch_size,
        "proofs_per_theorem": proofs_per_theorem,
        "temperature": temperature,
        "do_sample": do_sample,
        "top_p": top_p,
        "model_repo_name": model_repo_name,
        "model_commit_hash": model_commit_hash,
        "torch_seed": torch_seed
    }
    with open(output_file, mode='w') as json_output:
        json.dump(new_json_data, json_output, indent=4)
    print("Theorems Processed: ", theorems_processed)
    print("Proofs with end:    ", proofs_with_end)

In [10]:
if drive_mounted:
    drive.mount(drive_mounted_path)

Mounted at /content/gdrive/


In [11]:
# UNPACK DATASETS
if do_test_loss:
    if test_data_archived:
        if raw_test_archive.endswith(".gz"):
            !gzip -dkv "{raw_test_archive}"
        if raw_test_archive.endswith(".zip"):
            !unzip "{raw_test_archive}"

In [12]:
# LOAD TEST DATASET
if do_test_loss:
    raw_test_dataset = load_dataset("json", data_files=raw_test_json, field="data")
    print(raw_test_dataset)

In [13]:
# LOAD MODEL AND TOKENIZER
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
coq_tokenizer = AutoTokenizer.from_pretrained(model_repo_name, revision=model_commit_hash)
coq_model = GPT2LMHeadModel.from_pretrained(model_repo_name, revision=model_commit_hash).to(device)
print(f"Tokenizer vocab size:                              {len(coq_tokenizer)}")
print(f"Model size:                                        {coq_model.num_parameters()}")
print(f"Model size (only trainable params):                {coq_model.num_parameters(only_trainable=True)}")
print(f"Model size (only trainable non-embeddings params): {coq_model.num_parameters(only_trainable=True, exclude_embeddings=True)}")
pipe = pipeline("text-generation", model=coq_model, tokenizer=coq_tokenizer, batch_size=batch_size, device=0 if torch.cuda.is_available() else -1)
pipe.tokenizer.pad_token_id = coq_model.config.eos_token_id
pipe.tokenizer.padding_side = 'left'

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/440 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/465k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/265k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.24M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/438 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/897 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.3M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/111 [00:00<?, ?B/s]

Tokenizer vocab size:                              30000
Model size:                                        22560768
Model size (only trainable params):                22560768
Model size (only trainable non-embeddings params): 10647552


In [14]:
if do_test_loss:
    # TOKENIZE RAW DATASET
    test_dataset = get_tokenized_dataset(raw_test_dataset["train"], sequence_length, coq_tokenizer)
    test_dataset.set_format("torch")

    # CREATE DATALOADER
    test_dataloader = DataLoader(test_dataset, batch_size=test_batch_size)
    print(test_dataset)
    print("len(test_dataloader): ", len(test_dataloader))

In [15]:
# EVALUATE TEST LOSS
if do_test_loss:
    _loss,_perp = test_loss(coq_model, test_dataloader, device)
    print("Test Loss:       ", _loss)
    print("Test Perplexity: ", _perp)

In [None]:
# PROOF GENERATION
if do_generate_proofs:
    time_start = time.perf_counter()
    generate_proofs(theorems_input_file, theorems_output_file, pipe, proofs_per_theorem)
    time_end = time.perf_counter()
    print(f"Total time: {time_end - time_start} seconds")

In [None]:
runtime.unassign()