In [9]:
import stable_baselines3
from configparser import ConfigParser
import bug_lib as BL
import subprocess
import os
from datetime import date
import state_machine_rewarder as SMR
import re
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.stats import linregress

In [None]:
def parserConfig():
    cfg = ConfigParser()
    cfg.read('config.ini')
    config = {}
    config['root_dir'] = cfg.get('param','root_dir')
    # config['real_life'] = cfg.get('param','real_life')

    # config['bug_num'] = int(cfg.get('param','bug_num'))
    # config['specific_bug'] = cfg.get('param','specific_bug')
    
    temp = cfg.get('param','specified_bug_id')[1:-1]
    config['specified_bug_id'] = [int(t.strip()) for t in temp.split(',')]

    config['epoches'] = int(cfg.get('param','epoches'))
    config['rounds'] = int(cfg.get('param','rounds'))
    return config

In [None]:
# 使用方式
# run_python_and_capture_print('/path/to/your/python/file.py', '/path/to/your/log/file.txt')
def run_python_and_capture_print(python_file_path, log_file_path):
    command = f"python {python_file_path}"
    with open(log_file_path, 'w') as f:
        process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)

        for line in iter(process.stdout.readline, b''):
            line = line.decode('utf-8').rstrip()  # 解码输出并去除尾部空格
            print(line)  # 可以在控制台打印
            f.write(line + '\n')  # 写入到日志文件

def write_in_log_end(description, log_file_path):
    with open(log_file_path, 'a') as f:
        f.write('\n' + description)
    return

In [None]:
# for i in range(10):
#     os.chdir("..")
#     os.system('pwd')

### Inject Bugs & Run Experiment

In [None]:
def main(config):
    
    for round in range(config['rounds']):
        print("round: " + str(round) + "----")
        BL.recover_project
        BL.inject_bugs(config)
        
        # pip重新安装repository
        os.chdir("..")
        # os.system('ls')
        os.system('pip install -e .[docs,tests,extra]')
        
        log_dir = config['root_dir'] + '/logs/'
        if not os.path.exists(log_dir):
            os.makedirs(log_dir)
        log_name = log_dir + 'time_' + date.today() + 'round_' + str(round)
        with open(log_name, 'w') as log_file:
            log_file.write(config)
            log_file.write("-------------")
        
        for epoch in range(config['epoches']):
            print("epoch: " + str(epoch) + "----")
            
            training_script = + '' # path of training script
            
            run_python_and_capture_print(training_script, log_name)

In [None]:
bug_version_list = [
    [0],
    [1],
    # ...,
    [1,2,3]
]

# create different config files using bug_version list

config = parserConfig()

for bug_version in bug_version_list:
    config['specific_bug'] = bug_version
    

### Validation Part

In [None]:
# actions_dict = state_machine_rewarder.ActionScoringStateMachine.state_action_dict
actions_dict = {
            (2, 2): 'up',
            (2, 1): 'right',
            (3, 1): 'down',
            (3, 2): 'down',
        }

# actions_dict = SMR.ActionScoringStateMachine.

def parse_log_cal_correct_action_percentage_by_epoch(file_path, start_epoch, end_epoch, actions_dict):
    with open(file_path, 'r') as log_file:
        log_data = log_file.read()

    log_divided_by_epoch = log_data.split("restart! --------")
    correct_actions_percentage_list = []
    for epoch_index in range(start_epoch, end_epoch):
        epoch_log = log_divided_by_epoch[epoch_index]
        # 使用正则表达式匹配日志中的坐标和行动
        matches = re.findall(r"\((\d+),\s*(\d+)\)\s(\w+)", epoch_log)

        correct_actions_count = 0
        total_actions_count = len(matches)

        # 对每个匹配项检查是否遵循给定的行动字典
        for match in matches:
            coord = (int(match[0]), int(match[1]))
            action = match[2]
    
            # 检查坐标是否在字典中，并且行动是否一致
            if coord in actions_dict and actions_dict[coord] == action:
                correct_actions_count += 1

        # 计算正确行动的百分比
        correct_actions_percentage = (correct_actions_count / total_actions_count) * 100

        # 打印结果
        # print(f"Correct actions percentage: {correct_actions_percentage:.2f}%")
        correct_actions_percentage_list.append(correct_actions_percentage)
    return correct_actions_percentage_list

In [None]:
def linelar_regression(data):
# data = [100, 102, 98, 97, 105, 110, 108, 112, 115, 118, 121, 125]
# data = result

    # 将数据转换成Pandas Series对象
    series = pd.Series(data)

    # 计算简单移动平均(SMA)和指数移动平均(EMA)
    sma = series.rolling(window=3).mean()
    ema = series.ewm(span=3, adjust=False).mean()

    # 使用线性回归判断趋势
    slope, intercept, r_value, p_value, std_err = linregress(range(len(data)), data)

    # 绘制数据和趋势线
    plt.figure(figsize=(12, 6))
    plt.plot(data, label='Original Data', marker='o')
    # plt.plot(sma, label='SMA 3-period', linestyle='--')
    # plt.plot(ema, label='EMA 3-period', linestyle='--')
    plt.plot(range(len(data)), intercept + slope * np.asarray(range(len(data))), 'r', label=f'Trend Line: slope={slope:.2f}')
    plt.legend()
    plt.title('Data Trend Analysis')
    plt.show()

    # 输出线性回归结果
    print(f"Slope of trend line: {slope:.2f}")
    print(f"R-squared: {r_value**2:.2f}")
    print(f"P-value of trend line: {p_value:.2f}")

    # 如果斜率显著大于0，我们可以认为存在上升趋势
    if p_value < 0.05 and slope > 0:
        print("The data shows a statistically significant increasing trend.")
    else:
        print("The data does not show a statistically significant increasing trend.")