### Introduction

This Jupyter Notebook can be divided into three parts: (1) a single case to demonstrate the entire Upbeat pipeline, (2) a small-scale test to show the capability of Upbeat, and (3) the evaluation presented in our paper.

The complete experiment in our paper included more test cases and ran for a longer duration. However, this notebook focuses on minimal examples to explore the capabilities of Upbeat at an affordable cost.

### Instructions for Running the Notebook

1. Copy the Notebook: Please make a copy of main.ipynb.
2. Restart and Clear Output: Open the copied notebook and select "Restart & Clear Output" from the Kernel menu.
3. Run the Notebook: Press the play button to execute each cell sequentially. Ensure you wait for each cell to complete before proceeding to the next one.

**Note:** It is crucial to wait for each step to finish. Moving to the next cell prematurely may cause errors due to incomplete data processing.

### Link to Paper

In the following sections, we provide the corresponding section numbers from our paper, where the relevant techniques are described, or the data is presented.

### 1 Case Study

First, we need to import necessary tools and load code segment database. 

In [None]:
import random
import ast
import os
os.chdir("../src/Generate")

from basic_operation.dict_operation import get_rest_args
from DBOperation.dboperation_sqlite import DataBaseHandle
from combine_fragment import CodeFragmentGenerator, get_contained_cons
from cons_generator.cwvp import generate_if_cons_exist
from generate import Generate
from class_for_info.fragment_info import CodeFragmentInfo

generate = Generate("../config.json")
fragment = CodeFragmentGenerator(1, "CodeFragment_CW")
frag_db = DataBaseHandle("../../data/query/corpus-v3.db")
frag_list = frag_db.selectAll("select * from CodeFragment_CW")

class colors:
    RED = '\033[91m'
    GREEN = '\033[92m'
    BLUE = '\033[94m'
    RESET = '\033[0m'
    BOLD = '\033[1m'

print("Done.")

#### 1.1 Generate Test Codes & Test Input Datas (Section 3.2 & Section 3.3)

Then, we can select a code segment from corpus. `pre-condition` contains a set of variable necessary for the successful execution of the code segment, and `post-conditions` contains the available variables.

In [None]:
frag = random.choice(frag_list)
available_variables = ast.literal_eval(frag[2])
needful_variables = ast.literal_eval(frag[3])
print(colors.BOLD+"content of code segment:\n"+colors.RESET+frag[1]+
      colors.BOLD+"pre-conditions:"+colors.RESET,needful_variables,
      "\n"+colors.BOLD+"post-conditions:"+colors.RESET,available_variables)

Next, a test code is synthesized via assembling type-directed code segments. 

If constraints are present in the synthesized test code, Upbeat generates both valid and invalid values; otherwise, Upbeat will generate a specified boundary value or random value. 

In [None]:
# Except variables that are related to constraints
bool_expr_list, func_ret_list, quanternion_list, needful_args, partial_reset_stmt = get_contained_cons(frag[1], needful_variables)
needful_variables = get_rest_args(needful_variables, needful_args)
# Start to assemble
this_fragment_info = CodeFragmentInfo(frag[1], available_variables, needful_variables, frag[4], frag[5], frag[6])
combined_fragment, combined_import = fragment.generate_a_code_frag(this_fragment_info)
if combined_fragment is None:
    print(colors.RED+"!!!Generation failed!"+colors.RESET)
else:
    combined_fragment = combined_fragment.replace("\n\n", "\n").replace("//no cons\n//no cons\n", "//no cons\n")
print(colors.BOLD+"==test code==\n"+colors.RESET+combined_fragment)
if len(bool_expr_list) > 0 or len(func_ret_list) > 0:
    print(colors.BLUE+"There exist constraints! Start to generate inputs."+colors.RESET)
    valid_stmt, invalid_stmt = \
        generate_if_cons_exist(bool_expr_list, func_ret_list, quanternion_list, needful_args)
    # print("==valid==\n"+valid_stmt+"\n==invalid==\n"+invalid_stmt)
else:
    valid_stmt, invalid_stmt = "//no cons\n", "//no cons\n"
# If generation failed
if valid_stmt is None:
    print(colors.RED+"!!!Generation failed!"+colors.RESET)
else:
    print(colors.BOLD+"valid_stmt:\n"+colors.RESET+valid_stmt+\
          colors.BOLD+"invalid_stmt:\n"+colors.RESET+invalid_stmt)

#### 1.3 Assemble Test Case (Section 3.4)

Finally, Upbeat inserts all generated elements into the template. This includes the key components (variable declarations, a test code, and diagnostic statements) and other necessary content (import statements, reset statements, and callable declarations).

If there are API constraints in the relevant code segments, Upbeat will generate two test cases (one for valid inputs and another for invalid inputs). Otherwise, Upbeat will generate a single test case. 

In [None]:
valid_testcase, invalid_testcase = "", ""
# Get self_defined_callables
defined_callables = ""
for item in fragment.self_defined_callables:
    if isinstance(item, str):
        if item not in defined_callables:
            defined_callables += item
    else:
        if item.content not in defined_callables:
            defined_callables += item.content
# Process valid test case
valid_fragment = valid_stmt+combined_fragment+partial_reset_stmt
valid_import = combined_import
valid_testcase = generate.assemble_testcase(valid_fragment, valid_import, [defined_callables])
print(colors.BOLD+"==valid_testcase==\n"+colors.RESET+valid_testcase)
# Process invalid test case
if invalid_stmt is None or invalid_stmt == valid_stmt:
    print(colors.BOLD+"==invalid_testcase==\nSame as below."+colors.RESET)
else:
    invalid_fragment = invalid_stmt+combined_fragment+partial_reset_stmt
    invalid_import = combined_import
    invalid_testcase = generate.assemble_testcase(invalid_fragment, invalid_import, [defined_callables])
    print(colors.BOLD+"==invalid_testcase==\n"+colors.RESET+invalid_testcase)

#### 1.4 Execute Test Cases (Section 3.5)
There are two test oracles in Upbeat: language-level testing via constraints and differential testing. 

In language-level testing, any results deviate from expected behaviors, crashed and timeouts will be served as anomalous. 

In [None]:
os.chdir("../Fuzzing")

from Fuzzing.lib.Harness import *
from Fuzzing.lib.post_processor import *
from Generate.basic_operation.file_operation import initParams

# You can also change `valid_testcase`` into `invalid_testcase` if they are different
testcase_content = valid_testcase

print(colors.BOLD+"==language-level testing=="+colors.RESET)
print("Please wait a few seconds until you see the message 'nothing happended' or '!!!find wrong cons'.")
susp_flag = False
# Record expected behavior
if "//wrong" in testcase_content or "//invalid" in testcase_content:
    flag = 0
elif "//correct" in testcase_content or "//valid" in testcase_content:
    flag = 1
else:
    flag = -1
# Execute test case
output = execute(0, testcase_content, ["dotnet", "run"], False)
print(colors.BOLD+"output1:\n"+colors.RESET+output.stdout)
# Detect anomalous
if  ((flag == 1 and output.returnCode != 0) or 
    (flag == 0 and output.returnCode == 0) or 
    output.outputClass in ["timout", "crash"]):
    print(colors.RED+"!!!find wrong cons"+colors.RESET)
    susp_flag = True
else:
    print("\033[92mnothing happened"+colors.RESET)

In differential testing, Upbeat will detect any inconsistency, crash, or timeout. 

**Note:** The voting scheme has filtered some simple faulty behaviors. For example, ToffoliSimulator only supports parts of basic gates, it will throw a `NotImplementedException` exception if the test cases contain unsupported callables. Upbeat does not compare this exception with other results. 

In [None]:
# differential testing activates only if language-level testing detects no anomalies
if not susp_flag and output.returnCode not in [134, 137]:
    outputs = [output]
    command_list = [["dotnet", "run", "-s", "SparseSimulator"],
                    ["dotnet", "run", "-s", "ToffoliSimulator"]]
    # execute on SparseSimulator and ToffoliSimulator
    for i, cmd in enumerate(command_list, start=2):
        tmp_output = execute(0, output.testcaseContent, cmd, False)
        outputs.append(tmp_output)
        print(colors.BOLD+"output"+str(i)+":\n"+colors.RESET+tmp_output.stdout)
    # voting scheme
    vote(outputs, output.testcaseContent)
else:
    print("Already be anomalous in the language-level testing.")

#### Filter Anomalous (An extra module)

To reduce costs during manual analysis, we integrate a straightforward module in Upbeat. Upbeat classifies anomalies into three categories:
+ Bugs that have already been analyzed.
+ Faulty cases that have already been analyzed.
+ New anomalies that are awaiting verification.

In the following cell, `filter_boundary()` is used to filter anomalies in language-level testing, and `filter_differential()` is used to filter anomalies in differential testing.

**Note:** If differential testing is not performed, there are two options: (1) Generate new test cases and run the test again. (2) Comment out the last line in the following cell (i.e., filter_differential(result_db)). 

In [None]:
os.chdir("../")

from Fuzzing.history_bug_filter import filter_boundary, filter_differential

if os.path.exists("new_anomalies.txt"):
    print("There already exists a version of filtered results. ")
    with open("bug.txt", "r") as f1:
        content1 = f1.read()
    print(colors.BOLD+"Bugs that have already been analyzed:"+colors.RESET)
    if content1 == "":
        print("Nothing")
    else:
        print(content1)
    with open("faulty.txt", "r") as f2:
        content2 = f2.read()
    print(colors.BOLD+"Faulty that have already been analyzed:"+colors.RESET)
    if content2 == "":
        print("Nothing")
    else:
        print(content2)
    with open("new_anomalies.txt", "r") as f3:
        content3 = f3.read()
    print(colors.BOLD+"New anomalies awaiting verification:"+colors.RESET)
    if content3 == "":
        print("Nothing")
    else:
        print(content3)    
else:
    print("Start to filter.")
    result_db = DataBaseHandle("../../data/result/UPBEAT.db")
    history_db = DataBaseHandle("../../data/query/history-bugs.db")
    filter_boundary(result_db, history_db)
    filter_differential(result_db)

### 2 Small-Scale Testing

To better demonstrate the capability of Upbeat, a small-scale test is performed here. It will take about 1 hour, please wait before moving to the next cell.

In [None]:
os.chdir("../Generate")
from main import main as generate_testcases

# Approximate 2 seconds for 100 test cases.
generate_testcases()

# Approximate 50 minuts for 100 test cases.
os.chdir("../Fuzzing")
from hybrid_testing import main as testing
from history_bug_filter import main as filtering

testing()

os.chdir("../")
filtering()

### 3 Experimental Results

In our paper, we design four RQs to evaluate Upbeat:
* RQ1: How effectively Upbeat is on detecting boundary bugs in Q# libraries?
* RQ2: How does Upbeat compare with prior methods and baselines on bug detection?
* RQ3: How do individual components of Upbeat contribute to its overall performance?
* RQ4: How effective is Upbeat in extracting constraints from Q# libraries and API documents?

Please run the following cells to view our experiment results.

#### 3.1 Results for RQ1 (Section 5.2)

During our experiment period, Upbeat has uncovered 16 implementation bugs and 4 API document errors. To review all the bugs detected by Upbeat during this period, please run the following cell.

In [None]:
os.chdir("/root/upbeat/jupyter")

import re

def extract_tables_from_md(md_file):
    with open(md_file, 'r', encoding='utf-8') as file:
        md_content = file.read()

    table_pattern = r'\|.*\|[\s\S]*?\n(?=\n|\Z)'
    tables = re.findall(table_pattern, md_content)

    return tables

def main():
    md_file = '../data/experiment/BugList.md'
    tables = extract_tables_from_md(md_file)

    for table in tables:
        print(table)

if __name__ == "__main__":
    main()

#### 3.2 Results for RQ2 (Section 5.2)

To answer RQ2, we compare Upbeat to the eight baselines, including QSharp-Fuzz, Quito, QSharpCheck, Muskit, QDiff, MorphQ, Upbeat-M and Upbeat-r. 

We use two metrics: code coverage and bug-exposing capability. The code coverage measures the code coverage for the Q# library APIs. The bug-exposing capability represents the number of unique anomalies.

Upbeat outperforms the competing baselines by providing better code coverage and identifying more potential bugs with the same test time. Execute the following two cells to observe the coverage and anomaly results.

In [None]:
import os
import matplotlib.pyplot as plt
import numpy as np
from scipy.interpolate import make_interp_spline, interp1d

from Fuzzing.calculate_code_coverage import calculate_coverage

color_list = ['#9EB3C2', '#AFCAD0', '#C0E0DE', '#8BC3D9', '#6EACC7', '#468FAF', '#297596', '#014F86', '#013A63']
tool_list = ['qsharpfuzz', 'quito', 'qsharpcheck', 'upbeat-m', 'muskit', 'qdiff', 'morphq', 'upbeat-r', 'upbeat']
# marker_list = [',', 'o', '^', 'v', 'D', '<', '>', 'p', '*']

def draw_one_line(y, label, color):
    x = range(0, 25)
    x_list = np.linspace(0, 24, 50)
    f = interp1d(x, y, kind='linear')
    y_list = f(x_list)
    plt.plot(x_list, y_list, label=label, color=color)

input_folder = "../data/experiment/cov-result-origin/"
output_folder = "../data/experiment/cov-result-calculated/"
# for input_file in os.listdir(input_folder):
#     print("processing "+input_file)
#     calculate_coverage(input_folder+input_file, output_folder+input_file)
line_cov_list, block_cov_list = [], []
for tool, color in zip(tool_list, color_list):
    line_cov, block_cov = [0.0], [0.0]
    output_file = tool+".txt"
    # print("drawing "+output_file)
    with open(output_folder+output_file, "r") as f:
        lines = f.readlines()
    for line in lines:
        if len(line) == 0:
            continue
        block_cov.append(float(line.split(" ")[1]))
        line_cov.append(float(line.split(" ")[2]))
    line_cov_list.append(line_cov)
    block_cov_list.append(block_cov)
plt.figure(figsize=(6, 4))
for line_cov, tool, color in zip(line_cov_list, tool_list, color_list):
    draw_one_line(line_cov, tool, color)
plt.legend(fontsize='small')
plt.xticks(np.arange(0, 25, 1))
plt.yticks(np.arange(0, 60, 5))
plt.margins(x=0, y=0)
# plt.tight_layout()
plt.show()
plt.figure(figsize=(6, 4))
for block_cov, tool, color in zip(block_cov_list, tool_list, color_list):
    draw_one_line(block_cov, tool, color)
plt.legend(fontsize='small')
plt.xticks(np.arange(0, 25, 1))
plt.yticks(np.arange(0, 45, 5))
plt.margins(x=0, y=0)
# plt.tight_layout()
plt.show()

In [None]:
from tabulate import tabulate

regex = r"Can be detected by (.*)\."
lang_results, diff_results = {}, {}
lang_dir = "../data/experiment/anomalies-lang/"
for f in os.listdir(lang_dir):
    with open(lang_dir+f) as fi:
        second_line = fi.readlines()[1]
    # print("second_line:"+second_line)
    match = re.search(regex, second_line)
    tool = match.group(1)
    if tool in lang_results:
        lang_results[tool] += 1
    else:
        lang_results[tool] = 1
print(tabulate(lang_results.items(), headers=["Tool", "#Anomalies via language-level test"]))
print("\n")
abl_dir = "../data/experiment/anomalies-diff/"
for f in os.listdir(abl_dir):
    with open(abl_dir+f) as fi:
        second_line = fi.readlines()[1]
    match = re.search(regex, second_line)
    tool = match.group(1)
    if tool in diff_results:
        diff_results[tool] += 1
    else:
        diff_results[tool] = 1
print(tabulate(diff_results.items(), headers=["Tool", "#Anomalies via differential testing"]))

To observe these anomalous behaviors in detail, you can execute all test cases by running the following cell. This will take about 4 minutes.

In [None]:
import shutil
import subprocess

def run_one_testcase(cmd, dst_folder, testcase):
    for root, dirs, files in os.walk(dst_folder):
        for dirName in dirs:
            if dirName == "bin" or dirName == "obj":
                shutil.rmtree(os.path.join(dst_folder, dirName), ignore_errors=True)
    result = subprocess.run(cmd, capture_output=True, text=True, cwd=dst_folder)
    print("Output of "+"".join(cmd)+f" for {testcase}:")
    print(result.stdout)
    print(result.stderr)

def run_all_testcases(src_folder, dst_folder, cmd_list):
    for testcase in os.listdir(src_folder):
        src_file = os.path.join(src_folder, testcase)
        dst_file = os.path.join(dst_folder, "Program.qs")
        shutil.copy(src_file, dst_file)
        for cmd in cmd_list:
            run_one_testcase(cmd, dst_folder, testcase)

proj_path = "../data/experiment/temp-q-project"

lang_path = "../data/experiment/anomalies-lang/"
print("==Start to run anomalies via language-level testing==")
run_all_testcases(lang_path, proj_path, [['dotnet', 'run']])

diff_path = "../data/experiment/anomalies-diff/"
print("==Start to run anomalies via language-level testing==")
run_all_testcases(diff_path, proj_path, 
                  [['dotnet', 'run'], 
                  ['dotnet', 'run', '-s', 'ToffoliSimulator'], 
                  ['dotnet', 'run', '-s', 'SparseSimulator']])

#### 3.3 Results for RQ3 (Section 5.3)

When conducting ablation study, we evaluate two variants of Upbeat: Upbeat-A and Upbeat-B. Upbeat-A removes the inputs generator and keeps other parts of Upbeat unchanged. Upbeat-B removes the code segment assembler and keeps other components.

The Upbeat components all positively contribute to the bugexposing capability of the framework. Run the following two cells to observe the bugs discovered by Upbeat-A, Upbeat-B and Upbeat.

In [None]:
abl_results = {}
abl_dir = "../data/experiment/ablation-study/"
for f in os.listdir(abl_dir):
    with open(abl_dir+f) as fi:
        second_line = fi.readlines()[1]
    match = re.search(regex, second_line)
    tool = match.group(1)
    if tool in abl_results:
        abl_results[tool] += 1
    else:
        abl_results[tool] = 1
print(tabulate(abl_results.items(), headers=["Tool", "#Bugs"]))

#### 3.4 Results for RQ4 (Section 5.4)

We use two metrics, Recall and Precision, to evaluate the completeness and correctness of Upbeat-extracted constraints. 

The first computes the ratio of Upbeat-recognized constraints to the total number of constraints. The second metric computes the ratio correctly extracted constraints samples to the total number of constraints.

Upbeat is capable of extracting the majority of constraints from both source code and API documents with high accuracy. Execute the following cell to view the detailed analysis results.

In [None]:
import json
from tabulate import tabulate


def get_rate(num1: int, num2: int):
    if num2 == 0:
        return 0.0
    else:
        return num1 / num2

def convert_to_percent(n):
    n = round(n, 2)
    # print("n:",n)
    return "%.0f%%" % (n * 100)

def calculate(d: dict):
    classical_id, classical_ex, quantum_id, quantum_ex = 0.0, 0.0, 0.0, 0.0
    classical_id_total, classical_ex_total, quantum_id_total, quantum_ex_total = 0, 0, 0, 0
    for namespace, properties in d.items():
        classical_id += get_rate(properties["classical-identified"], properties["classical-id-total"])
        classical_ex += get_rate(properties["classical-extracted"], properties["classical-ex-total"])
        quantum_id += get_rate(properties["quantum-identified"], properties["quantum-id-total"])        
        quantum_ex += get_rate(properties["quantum-extracted"], properties["quantum-ex-total"])
        if properties["classical-id-total"] != 0:
            classical_id_total += 1
        if properties["classical-ex-total"] != 0:
            classical_ex_total += 1
        if properties["quantum-id-total"] != 0:
            quantum_id_total += 1
        if properties["quantum-ex-total"] != 0:
            quantum_ex_total += 1
    # print("quantum_extracted:", quantum_ex)
    return convert_to_percent(classical_id / classical_id_total), convert_to_percent(classical_ex / classical_ex_total), \
           convert_to_percent(quantum_id / quantum_id_total), convert_to_percent(quantum_ex / quantum_ex_total)

with open("../data/experiment/constraint-extraction/source-code.json") as f1:
    code_dict = json.load(f1)
code_result = calculate(code_dict)
tab = [("Source Code", "classical", code_result[0], code_result[1]), ("", "quantum", code_result[2], code_result[3])]
with open("../data/experiment/constraint-extraction/api-document.json") as f2:
    doc_dict = json.load(f2)
doc_result = calculate(doc_dict)
tab.append(("API Document", "classical", doc_result[0], doc_result[1]))
tab.append(("", "quantum", doc_result[2], doc_result[3]))
print(tabulate(tab, headers=["Source", "Type", "Recall", "Precision"]))