# Imports

In [1]:
from dataclasses import dataclass, field
from typing import Optional

import torch
from datasets import load_dataset
from peft import LoraConfig, get_peft_model, prepare_model_for_int8_training
from tqdm import tqdm
from transformers import AutoModelForCausalLM, AutoTokenizer, HfArgumentParser, pipeline

from trl import AutoModelForCausalLMWithValueHead, PPOConfig, PPOTrainer, set_seed
from trl.core import LengthSampler

import json


Welcome to bitsandbytes. For bug reports, please run

python -m bitsandbytes

 and submit this information together with your error trace to: https://github.com/TimDettmers/bitsandbytes/issues
bin /home/chloe/mambaforge/envs/python3.9/lib/python3.9/site-packages/bitsandbytes/libbitsandbytes_cuda118.so
CUDA SETUP: CUDA runtime path found: /home/chloe/mambaforge/envs/python3.9/lib/libcudart.so
CUDA SETUP: Highest compute capability among GPUs detected: 8.6
CUDA SETUP: Detected CUDA version 118
CUDA SETUP: Loading binary /home/chloe/mambaforge/envs/python3.9/lib/python3.9/site-packages/bitsandbytes/libbitsandbytes_cuda118.so...


Either way, this might cause trouble in the future:
If you get `CUDA error: invalid device function` errors, the above might be the cause and the solution is to make sure only one ['libcudart.so', 'libcudart.so.11.0', 'libcudart.so.12.0'] in the paths that we search based on your env.
  warn(msg)


# Utils

In [2]:
import requests
import re

def coqc(v):
  '''
  Returns line number of first error. -2 iff no errors, and -1 otherwise.
  '''
  r = requests.post("https://coq.livecode.ch/check", data = { 'v': v }).json()
  if r['status'] == 0:
    return -2
  r = r['log']

  pattern = r'line (\d+),'
  match = re.search(pattern, r)  
  if match:
    line_number = int(match.group(1))
  else:
    line_number = -1
  return line_number

In [3]:
def cfeedback(v):
  '''
  Returns the compiler error if one exists. Returns None if everything compiles cleanly.
  '''
  r = requests.post("https://coq.livecode.ch/check", data = { 'v': v }).json()
  if r['status'] == 0:
    return None
  r = r['log']
  return r

In [4]:
def get_linenumber(cf):
  pattern = r'line (\d+),'
  match = re.search(pattern, cf)  
  if match:
    line_number = int(match.group(1))
  else:
    line_number = -1
  return line_number

In [5]:
def get_totallines(response):
    return len(response.split('\n'))

In [6]:
def get_line(line_number, response):
    broken = response.split('\n')
    return broken[line_number-1]

In [7]:
config = PPOConfig(
    model_name="edbeeching/gpt-neo-125M-imdb-lora-adapter-merged",
    learning_rate=1.41e-5,
    log_with='wandb',
    mini_batch_size=1,# prev: 16
    batch_size=1, # prev: 256, but working with super limited samples so will try lower batch size for now
    # gradient_accumulation_steps=1, --> apparently this is unrecognized
)

# We then define the arguments to pass to the sentiment analysis pipeline.
# We set `return_all_scores` to True to get the sentiment score for each token.
sent_kwargs = {"return_all_scores": True, "function_to_apply": "none", "batch_size": config.mini_batch_size}

def build_dataset(config, dataset_name="CoqLLMtrain.csv"):
    """
    Build dataset for training. This builds the dataset from `load_dataset`, one should
    customize this function to train the model on its own dataset.
    Args:
        dataset_name (`str`):
            The name of the dataset to be loaded.
    Returns:
        dataloader (`torch.utils.data.DataLoader`):
            The dataloader for the dataset.
    """
    tokenizer = AutoTokenizer.from_pretrained(config.model_name)
    tokenizer.pad_token = tokenizer.eos_token
    ds = load_dataset("csv", data_files="CoqLLMtrain.csv", split="train")

    def concat(sample):
      ex = sample['specification'] + "Test case 1: " + sample['test_case_1'] + \
      ", test case 2: " + sample['test_case_2'] + ", test case 3: " + sample['test_case_3']
      return ex

      # return sample['specification'] + "Test case 1: " + sample['test_case_1'] + \
      # ", test case 2: " + sample['test_case_2'] + ", test case 3: " + sample['test_case_3'] + " Prove some formal properties. Please only write code for the last stand-alone example. *)"


    def tokenize(sample):
        sample["input_ids"] = tokenizer.encode(concat(sample))
        sample["query"] = tokenizer.decode(sample["input_ids"])
        return sample

    ds = ds.map(tokenize, batched=False)
    ds.set_format(type="torch")
    return ds

# multi-shot boilerplate
multishot = "(* Stand-alone Example 1: Write a function that doubles a number. Test case 1: double 3 = 6. Prove some formal properties. *) \nFixpoint double (n: nat): nat := match n with | 0 => 0 | S n => S (S (double n)) end. \n\nLemma example_double_3: double 3 = 6.\nProof. simpl. reflexivity. Qed. \n\n Theorem theorem_double_distribute: \nforall a b, double a + double b = double (a + b).\n Proof.\n intros.\n induction a.\n - simpl. reflexivity.\n - simpl. rewrite IHa. reflexivity. \n Qed. \n\n (* Stand-alone Example 2: Write a function that creates a list of n elements. Test case 1: replicate 1 0 = []. Test case 2: replicate 1 2 = [1; 1]. Prove some formal properties. *) \n Require Import Coq.Lists.List. \n Open Scope list_scope. \n Import ListNotations. \n Fixpoint replicate {X: Type} (x: X) (n: nat): list X := \n match n with \n | 0 => []\n | S n => x :: replicate x n \n end. \n Lemma example_replicate_0: replicate 1 0 = []. \n Proof. simpl. reflexivity. Qed.\n Lemma example_replicate_2: replicate 1 2 = [1; 1].\n Proof. simpl. reflexivity. Qed.\n\n Theorem replicate_length:\n\t forall n, length (replicate 1 n) = n.\n Proof. \n intros. \n induction n.\n - simpl. reflexivity. \n - simpl. rewrite IHn. reflexivity.\n Qed. \n Theorem replicate_length_any: \n\t forall (X: Type) (x: X) n, length (replicate x n) = n. \n Proof.\n intros. \n induction n.\n - simpl. reflexivity.\n- simpl. rewrite IHn. reflexivity.\n Qed."

# We retrieve the dataloader by calling the `build_dataset` function.
dataset = build_dataset(config)

Found cached dataset csv (/home/chloe/.cache/huggingface/datasets/csv/default-c1deeeaa4744c13b/0.0.0/6954658bab30a358235fa864b05cf819af0e179325c740e4bc853bcc7ec513e1)
Loading cached processed dataset at /home/chloe/.cache/huggingface/datasets/csv/default-c1deeeaa4744c13b/0.0.0/6954658bab30a358235fa864b05cf819af0e179325c740e4bc853bcc7ec513e1/cache-dfdf8327a2824aea.arrow


In [8]:
import os
import openai
from tenacity import (
    retry,
    stop_after_attempt,
    wait_random_exponential,
    wait_fixed
)  # for exponential backoff


openai.organization = "org-f2tK1brD8eM1W91o2X5WgNoy"
openai.api_key = "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"

# Generate GPT4 Samples

In [9]:
q = dataset[5]['query']
# basically look at prompt 3 and onwards
print(q)

Write a function that computes the sum of a list of numbers.Test case 1: sum [1;2;3] = 6, test case 2: sum [1.5; 2.5; 3.0] = 7.0, test case 3: sum [] = 0


In [10]:
systemText = """ You are an AI assistant helping users write Coq code in order to implement given function specifications. 
1. The program you write should only contain Coq code in response to the given function specification. 
3. Any step-by-step reasoning that is not Coq code should be written as a comment.
3. As the user provides compiler feedback, modify and update the program accordingly and keep the variables and the general program structure consistent.
4. In addition to implementing the function, give at least 2 properties as theorems with their proofs.

The following are examples.

Query from user:
Write a function that creates a list of n elements. Test case 1: replicate 1 0 = []. Test case 2: replicate 1 2 = [1; 1]. 

Response from assistant:
\`\`\`
Require Import Coq.Lists.List. 
 Open Scope list_scope. 
 Import ListNotations. 
 Fixpoint replicate (x: X) (n: nat): list X := 
 match n with 
 | 0 => []
 | S n => x :: replicate x n 
 end. 
 Lemma example_replicate_0: replicate 1 0 = []. 
 Proof. simpl. reflexivity. Qed.
 Lemma example_replicate_2: replicate 1 2 = [1; 1].
 Proof. simpl. reflexivity. Qed.

 Theorem replicate_length:
	 forall n, length (replicate 1 n) = n.
 Proof. 
 intros. 
 induction n.
 - simpl. reflexivity. 
 - simpl. rewrite IHn. reflexivity.
 Qed. 
 Theorem replicate_length_any: 
	 forall (X: Type) (x: X) n, length (replicate x n) = n. 
 Proof.
 intros. 
 induction n.
 - simpl. reflexivity.
- simpl. rewrite IHn. reflexivity.
 Qed.
\`\`\`

Query from user:
Your code produces an error in the line Fixpoint replicate (x: X) (n: nat): list X :=\n{}Can you please explain what this error means? Let's think step by step. Please rewrite all code if you rewrite any code.
File \"./ex.v\", line 4, characters 24-25:\nError: The reference X was not found in the current environment.

Response from assistant:
\`\`\`
Require Import Coq.Lists.List. 
 Open Scope list_scope. 
 Import ListNotations. 
 Fixpoint replicate '{X: Type'} (x: X) (n: nat): list X := 
 match n with 
 | 0 => []
 | S n => x :: replicate x n 
 end. 
 Lemma example_replicate_0: replicate 1 0 = []. 
 Proof. simpl. reflexivity. Qed.
 Lemma example_replicate_2: replicate 1 2 = [1; 1].
 Proof. simpl. reflexivity. Qed.

 Theorem replicate_length:
	 forall n, length (replicate 1 n) = n.
 Proof. 
 intros. 
 induction n.
 - simpl. reflexivity. 
 - simpl. rewrite IHn. reflexivity.
 Qed. 
 Theorem replicate_length_any: 
	 forall (X: Type) (x: X) n, length (replicate x n) = n. 
 Proof.
 intros. 
 induction n.
 - simpl. reflexivity.
- simpl. rewrite IHn. reflexivity.
 Qed.
\`\`\`"""

messages=[{"role": "system", "content": systemText}]

def generate(q):
  '''
  Generate output from the correct model and clean it from pre- and post- rambles if possible.
  ''' 
  # make this script retry if the connection is rejected for some reason
  while True:
    try:
      messages.append({"role": "user", "content": q})
      response = openai.ChatCompletion.create(
                    model='gpt-4-0314', 
                    messages=messages)
      response = response.choices[0].message.content
      messages.append({"role": "assistant", "content": response})
      
      # clean the response if possible
      c_response = response
      try:
        match = re.search('```coq(.*?)```', c_response, re.DOTALL)
        c_response = match.group(1)
      except:
        pass
      try:
        match = re.search('```(.*?)```', c_response, re.DOTALL)
        c_response = match.group(1)
      except:
        pass
      return c_response
    except:
      pass

def run_trial(q_core, pid, outfile, verbose=True, ntrials=10):
  '''
  Runs one trial on one prompt. 
  - q: function spec with test cases
  - pid: the prompt id
  '''
  q = q_core
  if verbose:
    print("The task: {}".format(q))

  for t in range(ntrials): 
    # for recording the dataset
    out = {
            "prompt_id": pid,
            "iteration": t,
            "instruction": q,
            "output": None,
            "compiler_feedback": None,
            "stats": {
                        "total_lines" : None,
                        "compiled_lines": None,
                        "percent_compiled": None
                    }
            }

    # generate model response
    response = generate(q)

    # get compiler feedback
    cf = cfeedback(response)

    if verbose:
      print("-----Attempt {}---------".format(t))
      print(response)

    if cf is not None:
      line_number = get_linenumber(cf) - 1
      total_lines = get_totallines(response)
      percent_compiled = (line_number)/total_lines
      linetxt = get_line(line_number, response)

      # get the model to reflect on the error
      q = "Your code produces an error in the line {}\n{}Can you please explain what this error means? Let's think step by step. Please rewrite all code if you rewrite any code."\
        .format(linetxt, cf)
      if verbose:
        print(q)
        print(percent_compiled)
    else:
      total_lines = get_totallines(response)
      line_number = total_lines
      percent_compiled = 1.0
      q = "The model solved the problem!"
      if verbose:
        print(q)
        print(percent_compiled)

    # append all data to json lines file
    out["output"] = response
    out["compiler_feedback"] = cf
    out["stats"]["total_lines"] = total_lines
    out["stats"]["compiled_lines"] = line_number
    out["stats"]["percent_compiled"] = percent_compiled

    with open(outfile, 'a') as file:
      file.write(json.dumps(out) + '\n')
    if verbose:
      print("recorded in {}".format(outfile))

    # don't continue if model has completely solved problem
    if cf is None:
      break

  return None

def main():
  pass

if __name__ == "__main__":
  main()
  outfile = "gpt4_EasyMediumHard01.ndjson"
  # run_trial(q, 0, outfile)
  for i in range(0,3):
    q = dataset[i]['query'] 
    run_trial(q, i, outfile)


The task: Write a function that reverses a list.Test case 1: reverse [1;2;3] = [3;2;1], test case 2: reverse ["hello"; "world"] = ["world"; "hello"], test case 3: reverse [] = []
-----Attempt 0---------

Require Import Coq.Lists.List.
Open Scope list_scope.
Import ListNotations.

Fixpoint my_reverse {X: Type} (l: list X) : list X :=
  match l with
  | [] => []
  | x :: xs => my_reverse xs ++ [x]
  end.

Example test_reverse_1: my_reverse [1; 2; 3] = [3; 2; 1].
Proof. simpl. reflexivity. Qed.

Example test_reverse_2: my_reverse ["hello"; "world"] = ["world"; "hello"].
Proof. simpl. reflexivity. Qed.

Example test_reverse_3: my_reverse ([]: list nat) = [].
Proof. simpl. reflexivity. Qed.

(* Properties *)
Theorem my_reverse_app_distr:
  forall (X: Type) (l1 l2: list X),
    my_reverse (l1 ++ l2) = my_reverse l2 ++ my_reverse l1.
Proof.
  intros. induction l1.
  - simpl. rewrite app_nil_r. reflexivity.
  - simpl. rewrite IHl1. rewrite <- app_assoc. reflexivity.
Qed.

Theorem my_reverse_my

In [11]:
# sample loading from the ndjson file

import json

data = []

# Read the NDJSON file line by line
with open(outfile, 'r') as file:
    for line in file:
        # Parse each line as a JSON object and append it to the data list
        data.append(json.loads(line.strip()))

# Now, the 'data' variable contains a list of the JSON objects from the file
print(data)


[{'prompt_id': 0, 'iteration': 0, 'instruction': 'Write a function that computes the sum of a list of numbers.Test case 1: sum [1;2;3] = 6, test case 2: sum [1.5; 2.5; 3.0] = 7.0, test case 3: sum [] = 0', 'output': "\nRequire Import Coq.Lists.List.\nRequire Import Coq.Arith.PeanoNat.\nOpen Scope list_scope.\nImport ListNotations.\n\nFixpoint sum_nat (xs: list nat) : nat :=\n  match xs with\n  | [] => 0\n  | x :: xs' => x + sum_nat xs'\n  end.\n\nFixpoint sum_int (xs: list Z) : Z :=\n  match xs with\n  | [] => 0\n  | x :: xs' => x + sum_int xs'\n  end.\n\nLemma test_case_sum_nat_1: sum_nat [1; 2; 3] = 6.\nProof. simpl. reflexivity. Qed.\n\nLemma test_case_sum_nat_3: sum_nat [] = 0.\nProof. simpl. reflexivity. Qed.\n\n(* You'll need to import Coq's ZArith library for these test cases *)\nRequire Import Coq.ZArith.Z.\n\nLemma test_case_sum_int_1: sum_int [1; 2; 3] = 6.\nProof. simpl. reflexivity. Qed.\n\nLemma test_case_sum_int_2: sum_int [] = 0.\nProof. simpl. reflexivity. Qed.\n", 'com

In [13]:
run0 = data[0]
print(run0["instruction"])

Write a function that computes the sum of a list of numbers.Test case 1: sum [1;2;3] = 6, test case 2: sum [1.5; 2.5; 3.0] = 7.0, test case 3: sum [] = 0


# Tree Search on GPT4

In [None]:
def generate(q):
  '''
  Generate output from the correct model and clean it from pre- and post- rambles if possible.
  ''' 
  response = openai.ChatCompletion.create(
                model='gpt-4-0314', 
                messages=[{"role": "user", "content": q}]).choices[0].message.content
  
  # clean the response if possible
  c_response = response
  try:
    match = re.search('```coq(.*?)```', response, re.DOTALL)
    c_response = match.group(1)
  except:
    pass

  # # remove extra comments from the response
  # c_response = re.sub(r'\(\*.*?\*\)', '', c_response, flags=re.DOTALL)
  # c_response = c_response.strip()

  # return the original model response and also the cleaned response
  return response, c_response

def run_tree(q_core, verbose=True, depth=10, width=5, k=2):
  '''
  Runs one trial on one prompt.
  Navigates tree in breadth-first search manner. 
  '''
  q = multishot + "\n" + q_core
  openLst = []
  closedLst = []
  path = []

  # generate the first 'width' model responses
  for i in range(width):
    node = {}
    _, response = generate(q)
    node["code"] = response
    node["cf"] = cfeedback(response)
    node["lines"] = coqc(response)
    openLst.append(node)

  # helper function for choosing the top k nodes from openLst



  while len(path) < depth:
    # generate model responses
    for w in range(width):
        _, response = generate(q)

        # help it import the libraries it will need
        library_imports = "Require Import Coq.Lists.List.\nOpen Scope list_scope.\nImport ListNotations."
        response = library_imports + "\n\n" + response

        # get compiler feedback
        cf = cfeedback(response)
        linenumber = coqc(response) - 4

    # modify model query
    if cf is None:
      break
    q = q_core + "\n" + "(* Your code is wrong. Please rewrite it in Coq to fix the following compiler error. Please do not introduce any more\
       stand-alone examples. *) \n" + "(* " + cf + " *)"
    if verbose:
      print("### Instruction")
      print(q)
    
    q += "\n\n" + response

    if verbose:
      print("### Input")
      print(response)

  return None

def main():
  pass

if __name__ == "__main__":
  main()
  for i in range(4,15):
    q = dataset[i]['query'] 
    run_trial(q)
