In [1]:
import vertexai # to interact with googles code chatbot ai
from vertexai.preview.language_models import CodeChatModel, ChatModel, InputOutputTextPair, ChatMessage,CodeGenerationModel, TextGenerationModel
import syfco # to convert between different form

import benchmark
from benchmark import Benchmark, build_prompt

import verify # used to verify verilog solutions
import prompting # prompting helper class
import json
import os

# both for handling async stuff
import asyncio

import csv # to write the benchmark results to a csv file
from utils import *

Used packages:
google-cloud-aiplatform
docker

In [2]:
vertexai.init(project="rg-finkbeiner-30141001", location="us-central1")
chat_model = ChatModel.from_pretrained("chat-bison")
parameters = {
    "temperature": 0.5,
    "max_output_tokens": 1024
}

In [3]:
benchmarks = [
    Benchmark(bm, "../../verilog/") for bm in json.loads(read_file("benchmarks.json"))
]
benchmarks_auto = [
    Benchmark(bm, "") for bm in json.loads(read_file("benchmarks_auto.json"))
]

In [5]:
timeouts = []
for bm in benchmarks_auto:
  try:
    benchmark.build_prompt(bm, mode="bosy")
  except EncodingWarning:
    print("timeout: " + bm.name)
    timeouts.append(bm)

TimeoutError: 

In [8]:
from prompting import _interpolate_string
import time

class ExamplesPrompt(prompting.DefaultPromptTemplate):
  def add_example(self, replacements: dict = {}):
    pair = InputOutputTextPair(replacements["SPEC"], "```verilog\n" + replacements["IMPL"] + "\n```")
    self.examples.append(pair)
  def build_prompt(self, replacements: dict = {}):
        return _interpolate_string(
            self._start + "\n" + self._question,
            replacements,
        ), self.examples

def get_module_signature(bm : Benchmark, params = None, module_name="fsm"):
  if params == None:
    params = bm.generate_params
  spec = read_file(bm.specification)
  inputs = syfco.inputs(spec, overwrite_params=params)
  outputs = syfco.outputs(spec, overwrite_params=params) 
  return f'''module {module_name} ({
    ", ".join(
      ["input " + inp for inp in inputs] + ["output reg " + out for out in outputs]
    )
  })''';

def improve_iteratively(response, bm):
  code = extract_normalized_verilog_code(response, bm.name)
  res = "NO_CODE" if code == None else verify.verify_code(
    bm.specification, code, overwrite_params=bm.generate_params
    ).name
  match res:
    case "NO_CODE":
      prompt = "This doesn't seem like a complete verilog module. Could you please give me the entire module?"
    case "ERROR_CONVERT_TO_VERILOG":
      prompt = "This code isn't syntactically valid verilog code. Could you please fix the syntax errors?"
    case "ERROR_COMBINE_AIGER":
      prompt = f"Could you please make sure to use the module signature `{get_module_signature(bm)}` in your code. Please also try to think about your code again using the new signature. Make sure it matches the specification!"
    case "FALSE_RESULT":
      prompt = "The code doesn't satisfy the specification. Please think extensively about how you need to change your code to satisfy the specification. If necessary, rewrite the module from ground up"
    case "SUCCESS":
      return (code, res)
    case _:
      return (None, res)
  return (prompt, res)


def run_single_iterative(bm, type):
  # bm.build_prompt might do some heavy work like synthesizing examples with bosy
  prompt, examples = build_prompt(bm, params=bm.generate_params, template=ExamplesPrompt, mode=type)
  chat = chat_model.start_chat(examples=examples)
  response = chat.send_message(prompt, **parameters)
  for i in range(0, 3):
    (prompt, res) = improve_iteratively(response.text, bm)
    if res == "SUCCESS" or res == None:
      return res
    response = chat.send_message(prompt, **parameters)
  return res

def run_single_explicit_examples(bm, type):
    print("Starting benchmark " + bm.name + "/" + type)
    # bm.build_prompt might do some heavy work like synthesizing examples with bosy
    prompt, examples = build_prompt(bm, params=bm.generate_params, template=ExamplesPrompt, mode=type)
    print("Finished generating prompt for " + bm.name + "/" + type)
    chat = chat_model.start_chat(examples=examples)
    response = chat.send_message(prompt, **parameters)
    code = extract_normalized_verilog_code(response.text, bm.name)
    print(code if code else "NO_CODE::\n" + response.text)
    if code == None:
      return "NO_CODE"
    else:
      res = verify.verify_code(bm.specification, code, overwrite_params=bm.generate_params)
      return res.name

def run_single_best_k(k):
  def run_single (bm, type):
    # bm.build_prompt might do some heavy work like synthesizing examples with bosy
    prompt, examples = build_prompt(bm, params=bm.generate_params, template=ExamplesPrompt, mode=type)
    best = "NO_CODE"
    for i in range(0, k):
      chat = chat_model.start_chat(examples=examples)
      response = chat.send_message(prompt, **parameters)
      code = extract_normalized_verilog_code(response.text, bm.name)
      if code == None:
        continue
      res = verify.verify_code(bm.specification, code, overwrite_params=bm.generate_params)
      if res ==  verify.ReturnCode.SUCCESS:
        return res.name
      elif res == verify.ReturnCode.FALSE_RESULT:
        best = res.name
      elif best != "FALSE_RESULT":
        best = res.name
    return best
  return run_single

def run_single_default(bm, type, template=prompting.DefaultPromptTemplate):
    print("Starting benchmark " + bm.name + "/" + type)
    # bm.build_prompt might do some heavy work like synthesizing examples with bosy
    prompt = build_prompt(bm, params=bm.generate_params, template=template, mode=type)
    print("Finished generating prompt for " + bm.name + "/" + type)
    chat = chat_model.start_chat()
    response = chat.send_message(prompt, **parameters)
    code = extract_normalized_verilog_code(response.text, bm.name)
    print(code if code else "NO_CODE::\n" + response.text)
    if code == None:
      return "NO_CODE"
    else:
      res = verify.verify_code(bm.specification, code, overwrite_params=bm.generate_params)
      return res.name

async def run_benchmarks(benchmarks, file, example_types = ["self", "bosy", "strix", "none"], run_single=run_single_default):
  # setting up csv writing
  f = open(file, 'w', newline='')
  csvwriter = csv.DictWriter(f, fieldnames=["benchmark"] + example_types, dialect='unix', quoting=csv.QUOTE_NONE)
  csvwriter.writeheader()

  # get event loop to be able to run the requests in parallel
  loop = asyncio.get_event_loop()
  
  count = 0
  def workaround(bm, type):
    try:
      time.sleep(10 * count) # needed because of quota
      count = count + 1
      print(count)
      return run_single(bm, type)
    except TimeoutError:
      return "TIMEOUT"

  async def _run_single(bm, type):
    try:
      return await loop.run_in_executor(None, workaround, bm, type)
    except TimeoutError:
      print("test2")
      return "TIMEOUT2"
  async def run_single_benchmark(bm):
    result = {
      "benchmark": bm.name
    }
    async_res = await asyncio.gather(
      *[_run_single(bm, type) for type in example_types]
    )
    for i, res in enumerate(async_res):
      result[example_types[i]] = res
    csvwriter.writerow(result)

  await asyncio.gather(
    *[run_single_benchmark(bm) for bm in benchmarks]
  )
  f.close()

await run_benchmarks(benchmarks_auto, example_types=["bosy", "strix", "none"],file = "res_auto.csv")

Starting benchmark amba_decomposed_arbiter/bosyStarting benchmark amba_decomposed_arbiter/strix
Starting benchmark amba_decomposed_lock/strix
Starting benchmark collector_v2/strix
Starting benchmark collector_v3/bosy
Starting benchmark amba_decomposed_encode/strix
Starting benchmark collector_v3/none
Starting benchmark detector/strix
Starting benchmark amba_decomposed_lock/bosy
Starting benchmark amba_decomposed_lock/none
Starting benchmark collector_v1/bosy
Starting benchmark amba_decomposed_arbiter/none
Starting benchmark collector_v1/none
Starting benchmark collector_v1/strix
Starting benchmark collector_v2/bosy
Starting benchmark amba_decomposed_encode/none
Starting benchmark collector_v2/none
Starting benchmark amba_decomposed_encode/bosy
Starting benchmark collector_v3/strix
Starting benchmark detector/bosy

Finished generating prompt for collector_v3/strix
Finished generating prompt for collector_v2/none
Finished generating prompt for collector_v2/strix
Finished generating promp

CancelledError: 

module amba_decomposed_arbiter (
    input HBUSREQ_0,
    input HBUSREQ_1,
    input HBUSREQ_2,
    input HBUSREQ_3,
    input HBUSREQ_4,
    input HBUSREQ_5,
    input HBUSREQ_6,
    input HBUSREQ_7,
    output HGRANT_0,
    output HGRANT_1,
    output HGRANT_2,
    output HGRANT_3,
    output HGRANT_4,
    output HGRANT_5,
    output HGRANT_6,
    output HGRANT_7,
    output BUSREQ,
    output ALLREADY,
    output DECIDE
);

    reg HGRANT_0_r, HGRANT_1_r, HGRANT_2_r, HGRANT_3_r, HGRANT_4_r, HGRANT_5_r, HGRANT_6_r, HGRANT_7_r;
    reg ALLREADY_r, DECIDE_r;

    always @(*) begin
        HGRANT_0_r <= HBUSREQ_0;
        HGRANT_1_r <= HBUSREQ_1;
        HGRANT_2_r <= HBUSREQ_2;
        HGRANT_3_r <= HBUSREQ_3;
        HGRANT_4_r <= HBUSREQ_4;
        HGRANT_5_r <= HBUSREQ_5;
        HGRANT_6_r <= HBUSREQ_6;
        HGRANT_7_r <= HBUSREQ_7;
        ALLREADY_r <= ! (HGRANT_0_r || HGRANT_1_r || HGRANT_2_r || HGRANT_3_r || HGRANT_4_r || HGRANT_5_r || HGRANT_6_r || HGRANT_7_r);
        DECI

In [40]:
import tree_of_thoughts

class GoogleTextModel(tree_of_thoughts.AbstractLanguageModel):
    def __init__(self):
        self.model = CodeGenerationModel.from_pretrained("code-bison@001")
    def generate_thoughts(self, state, k, initial_prompt, rejected_solutions=None):
        if (type(state) == str):
            state_text = state
        else:
            state_text = '\n'.join(state)

        # history = [ChatMessage(content=msg, author="user" if (i%2) == 0 else "bot") for i, msg in enumerate(state)]

        prompt = f"""You're an TreeofThoughts, an superintelligent AI model devoted to writing correct Verilog code. You're purpose is to generate a verilog module satisfying an LTL specification. You must generate solutions on the basis of determining the most reliable solution in the shortest amount of time, while taking rejected solutions into account and learning from them. 
        Considering the reasoning provided:\n\n
        ###'{state_text}'\n\n###
        Devise the best possible solution for the task: {initial_prompt}, Here are evaluated solutions that were rejected: 
        ###{rejected_solutions}###, 
        complete the {initial_prompt} without making the same mistakes you did with the evaluated rejected solutions. Be simple. Be direct. Provide intuitive solutions as soon as you think of them.
        Write down your observations in format 'Observation:xxxx', then write down your thoughts in format 'Thoughts:xxxx'."""
        thoughts = [
            self.model.predict(prefix=prompt, **parameters)
            for _ in range(0, k)
        ]

        return thoughts
    
    def evaluate_states(self, states):
        return super().evaluate_states(states)
def test():
    model = GoogleTextModel()
    model.generate_thoughts([], 3, initial_prompt="")

prompt = """
Verilog code fulfilling specifications:
n = 2. It satisfies the LTL specification G (F r_0) && G (F r_1) <-> G (F g):
```verilog
module fsm(r_0, r_1, g);
  input r_0;
  input r_1;
  output g;
  reg [0:0] state;

  assign g = ((state == 1) && (r_1 && r_0) || (state == 0) && r_0) ? 1 : 0;

  initial
  begin
    state = 0;
  end
  always @($global_clock)
  begin
    case(state)
      0: if (!(!r_1 && r_0))
           state = 0;
         else 
           state = 1;

      1: if ((r_1 && !r_0))
           state = 0;
         else 
           state = 1;

    endcase
  end
endmodule
```
n = 4. It satisfies the LTL specification G (F r_0) && G (F r_1) && G (F r_2) && G (F r_3) <-> G (F g):
Please write a Verilog module fulfilling the following expectations. Make sure the code is fully synthesizable.:
```verilog
module fsm(r_0, r_1, r_2, r_3, g);
"""
#print(CodeGenerationModel.from_pretrained("code-bison@001").predict(prefix=prompt, **parameters))
res = (TextGenerationModel.from_pretrained("text-bison").predict(prompt=prompt, **parameters)).text
code = (extract_normalized_verilog_code("""```verilog
module fsm(r_0, r_1, r_2, r_3, g);\n""" + res, "detector"))
verify.verify_code(benchmarks[0].specification, code, overwrite_params=benchmarks[0].generate_params)

<ReturnCode.FALSE_RESULT: 11>

In [None]:
async def run_benchmarks(benchmarks, file, example_types = ["self", "bosy", "strix", "none"], template = prompting.DefaultPromptTemplate):
  # setting up csv writing
  f = open(file, 'w', newline='')
  csvwriter = csv.DictWriter(f, fieldnames=["benchmark"] + example_types, dialect='unix')
  csvwriter.writeheader()

  # get event loop to be able to run the requests in parallel
  loop = asyncio.get_event_loop()

  def __run_single(bm, type):
    print("Starting benchmark " + bm.name + "/" + type)
    # bm.build_prompt might do some heavy work like synthesizing examples with bosy
    prompt = bm.build_prompt(params=bm.generate_params, template=template, mode=type)
    print("Finished generating prompt for " + bm.name + "/" + type)
    chat = chat_model.start_chat()
    response = chat.send_message(prompt, **parameters)
    code = extract_normalized_verilog_code(response.text, bm.name)
    print(code if code else "NO_CODE::\n" + response)
    if code == None:
      return "NO_CODE"
    else:
      res = verify.verify_code(bm.specification, code, overwrite_params=bm.generate_params)
      return res.name
  async def run_single(bm, type):
    return await loop.run_in_executor(None, __run_single, bm, type)
  async def run_single_benchmark(bm):
    result = {
      "benchmark": bm.name
    }
    async_res = await asyncio.gather(
      *[run_single(bm, type) for type in example_types]
    )
    for i, res in enumerate(async_res):
      result[example_types[i]] = res
    csvwriter.writerow(result)

  await asyncio.gather(
    *[run_single_benchmark(bm) for bm in benchmarks]
  )
    #  prompt = bm.build_prompt(params={"n": 8}, template=template, example=type)
    #  chat = chat_model.start_chat()
    #  response = chat.send_message(prompt, **parameters)
    #  code = extract_normalized_verilog_code(response.text, bm.name)
    #  if code == None:
    #    current_result[type] = "no_code"
    #  else:
    #    print(code)
    #    res = verify.verify_code(bm.specification, code, overwrite_params={"n": 8})
    #    current_result[type] = res.name

  # dont forget to close the file again
  f.close()

await run_benchmarks(benchmarks, file = "test.csv")

In [9]:
class TreeOfThoughtsPrompt(prompting.PromptTemplate):
  _start = "Simulate three brilliant, logical experts collaboratively answering a question. Each one verbosely explains their thought process in real-time, considering the prior explanations of others and openly acknowledging mistakes. At each step, whenever possible, each expert refines and builds upon the thoughts of others, acknowledging their contributions. They continue until there is a definitive answer to the question. For clarity, your entire response should be in a markdown table. The question is to generate a verilog module satisfying a specific LTL specification."
  _example = "Here is an example for %PARAMS%. It satisfies the LTL specification %SPEC%:\n%IMPL%"
  _question = "Please write a Verilog module fulfilling the following expectations. Make sure the code is fully synthesizable.:\n%SPEC%"

verify.verify_file("mux.tlsf", "examples/mux.vl", overwrite_params={"n": 2}, debug=True)

[32mConverting Verilog to AIGER[m

 /----------------------------------------------------------------------------\
 |                                                                            |
 |  yosys -- Yosys Open SYnthesis Suite                                       |
 |                                                                            |
 |  Copyright (C) 2012 - 2020  Claire Xenia Wolf <claire@yosyshq.com>         |
 |                                                                            |
 |  Permission to use, copy, modify, and/or distribute this software for any  |
 |  purpose with or without fee is hereby granted, provided that the above    |
 |  copyright notice and this permission notice appear in all copies.         |
 |                                                                            |
 |  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES  |
 |  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF          |
 | 

<ReturnCode.SUCCESS: 0>

In [None]:
prompt = benchmarks[0].build_prompt(template=prompting.DefaultPromptTemplate, mode="self")

print(prompt)
chat = chat_model.start_chat()
response = chat.send_message(prompt, **parameters)

print(response)
code = extract_code_block(response.text)

res = verify.verify_code("detector.tlsf", code, overwrite_params={"n": 8}, debug=False)
print(res)

You are an expert in writing correct verilog code, that fulfill certain formal properties specified in LTL.
Here is an example for n=2. It satisfies the LTL specification G (F r_0) && G (F r_1) <-> G (F g):
```
module detector(r_0, r_1, g);
  input r_0;
  input r_1;
  output g;
  reg [0:0] state;

  assign g = ((state == 1) && r_1 || (state == 0) && (!r_1 && r_0)) ? 1 : 0;

  initial
  begin
    state = 0;
  end
  always @($global_clock)
  begin
    case(state)
      0: if (!r_0)
           state = 0;
         else 
           state = 1;

      1: if (!(r_1 && !r_0))
           state = 1;
         else 
           state = 0;

    endcase
  end
endmodule

```
Here is an example for n=4. It satisfies the LTL specification G (F r_0) && G (F r_1) && G (F r_2) && G (F r_3) <-> G (F g):
```
module detector(r_0, r_1, r_2, r_3, g);
  input r_0;
  input r_1;
  input r_2;
  input r_3;
  output g;
  reg [1:0] state;

  assign g = ((state == 3) && (r_2 && r_0) || (state == 0) && r_0 || (state == 1

In [15]:
def generate_multiple(k = 3, message_history=[]):
    responses = []

    prompt = benchmarks[0].build_prompt(template=prompting.DefaultPromptTemplate, mode="bosy")
    for _ in range(0,3):
        chat = chat_model.start_chat(message_history=message_history)
        response = chat.send_message(prompt, **parameters)
        print(chat.message_history)
        responses.append(response)
    print(responses)
generate_multiple()

[ChatMessage(content='You are an expert in writing correct verilog code, that fulfill certain formal properties specified in LTL.\nHere is an example for n=2. It satisfies the LTL specification G (F r_0) && G (F r_1) <-> G (F g):\n```\nmodule detector(r_0, r_1, g);\n  input r_0;\n  input r_1;\n  output g;\n  reg [0:0] state;\n\n  assign g = ((state == 1) && r_1 || (state == 0) && (!r_1 && r_0)) ? 1 : 0;\n\n  initial\n  begin\n    state = 0;\n  end\n  always @($global_clock)\n  begin\n    case(state)\n      0: if (!r_0)\n           state = 0;\n         else \n           state = 1;\n\n      1: if (!(r_1 && !r_0))\n           state = 1;\n         else \n           state = 0;\n\n    endcase\n  end\nendmodule\n\n```\nHere is an example for n=4. It satisfies the LTL specification G (F r_0) && G (F r_1) && G (F r_2) && G (F r_3) <-> G (F g):\n```\nmodule detector(r_0, r_1, r_2, r_3, g);\n  input r_0;\n  input r_1;\n  input r_2;\n  input r_3;\n  output g;\n  reg [1:0] state;\n\n  assign g = ((

In [None]:

contents = read_file("detector.tlsf")
ltl = syfco.convert(contents, "ltl", overwrite_params={"n": 4})
bosy_f = syfco.convert(contents, "bosy", overwrite_params={"n": 4})
bosy_impl = bosy.synthesize(bosy_f)

template = prompting.PromptTemplate()
template.add_example({
    "SPEC": syfco.convert(contents, "ltl", overwrite_params={"n": 2}),
    "IMPL": read_file("../../verilog/detector/detector_2.vl"),
    "PARAMS": "n=2"
})
prompt = template.build_prompt({"SPEC": syfco.convert(contents, "ltl", overwrite_params={"n": 4})})
print(prompt)

You are an expert in writing correct verilog code, that fulfill certain formal properties specified in LTL.
Here is an example for n=2. It satisfies the LTL specification G (F r_0) && G (F r_1) <-> G (F g):
module detector(
  input [1:0] r,
  input clk,
  output reg g
);
  reg [1:0] state;
  initial state = '0;
  always @(posedge clk) begin
    state = state | r;
    g = 0;
    if(state == '1) begin
      g = 1;
      state = '0;
    end
  end
endmodule

Please write a Verilog module fulfilling the following expectations. Make sure the code is fully synthesizable.:
G (F r_0) && G (F r_1) && G (F r_2) && G (F r_3) <-> G (F g)


In [None]:

contents = read_file("detector.tlsf")
ltl = syfco.convert(contents, "ltl", overwrite_params={"n": 2})
bosy_f = syfco.convert(contents, "bosy", overwrite_params={"n": 2})
bosy_impl = rename_module(bosy.synthesize(bosy_f), "detector", "fsm")
template = prompting.PromptTemplate()
template.add_example({
    "SPEC": syfco.convert(contents, "ltl", overwrite_params={"n": 2}),
    "IMPL": bosy_impl,
    "PARAMS": "n=2"
})

prompt = template.build_prompt({"SPEC": syfco.convert(contents, "ltl", overwrite_params={"n": 4})})
print(prompt)

You are an expert in writing correct verilog code, that fulfill certain formal properties specified in LTL.
Here is an example for n=2. It satisfies the LTL specification G (F r_0) && G (F r_1) <-> G (F g):
module detector(r_0, r_1, g);
  input r_0;
  input r_1;
  output g;
  reg [0:0] state;

  assign g = ((state == 1) && (r_1 && r_0) || (state == 0) && r_0) ? 1 : 0;

  initial
  begin
    state = 0;
  end
  always @($global_clock)
  begin
    case(state)
      0: if (!(!r_1 && r_0))
           state = 0;
         else 
           state = 1;

      1: if ((r_1 && !r_0))
           state = 0;
         else 
           state = 1;

    endcase
  end
endmodule

Please write a Verilog module fulfilling the following expectations. Make sure the code is fully synthesizable.:
G (F r_0) && G (F r_1) && G (F r_2) && G (F r_3) <-> G (F g)
