In [4]:
import re
import os
import numpy as np
import glob
from tqdm import tqdm
import pandas as pd
from subprocess import Popen, PIPE, call, check_call
import datetime
import shutil
from datetime import datetime
import time
import logging

from help_functions import * #user def
from lut import tests
from itertools import compress


## General defines, paths etc

In [6]:
transcript_path = os.environ['NOELV'] # Should point to the NOELV-generic directory
grlib_path = os.environ['GRLIB'] # Same usage as Gaisler
riscv_dv = os.environ['r_dv']+"/" # Points to the riscv-dv repository
finish_after = int(1e5) # Number of maximum instructions to execute
boot_loader_length = 41 # Number of boot loader instructions
instr_cnt = finish_after+boot_loader_length+2 #+41 to account for bootloader, +2 for trailing counter
logging.basicConfig(format='%(asctime)s - %(message)s',level=logging.INFO, encoding='utf-8',datefmt='%Y-%m-%d %H:%M:%S')

In [7]:
#Each level correponds to one cell/function
settings = {
    "gen_test":{
        "errors":["You do not have a valid license to run Riviera-PRO","Error: Simulation initialization failed."],
        "paths":{
            "test":riscv_dv+"out/ovpsim_sim/*.log",
            "srec_test":riscv_dv+"/out/asm_test/*.srec",
        },
        "test_param" : "python3 run.py --custom_target target/rv64_noelv/ --iss ovpsim\
             --simulator riviera --isa rv64gc --mabi lp64 -o "+riscv_dv+'out',
        "process_param" : 'cd '+riscv_dv+' && source ~/.bashrc && module load aldec/riviera/2021.10 &&',
        "timeout" : False,
        "arg_timeout" : "-gdisas=1 -gfinish_after_en=1 -gfinish_after="+str(instr_cnt),
        "arg_no_timeout" : "-gdisas=1 -gfinish_after_en=1 -gfinish_after="+str(int(instr_cnt*1.1)),
        "iteration":-1, #will be set when run
    },
    "run_rtl":{
        "paths":{
            "cp_dest":transcript_path+'ram.srec',
            "rtl_log":riscv_dv+"out/rtl_log/",
            "srec":riscv_dv+"out/asm_test/",
            "transcript":transcript_path+"transcript",
            "Makefile":grlib_path+"/designs/noelv-generic/Makefile"
            
        },
        "process_param":'cd '+transcript_path+' && source ~/.bashrc && module add mentor/questasim/2021.3 && make sim-run',
        "test_sucess":["# ** Failure: *** IU in error mode, simulation halted ***","Test finished after","** Failure: Assertion violation."],
    },
    "parse_rtl":{
                "reg_ex":{
                    "lookup":re.compile("#\s*\d*\s*ns\s*:\sC\d I\d : \d*\s*\[\d\] @0x[0-9a-fA-F]{16} \(0x[0-9a-fA-F]{4,8}\)\s*.*"),
                    "pc":re.compile("(?<=@0x)[0-9a-fA-F]{16}"),
                    "binary" : re.compile("(?<=(\(0x))[a0-z9]{4,8}(?=\))"), #opcode
                    "instr_str" : re.compile("(?<!\])(?<=\)\s)[a-z0-9.,\- ()]*(?=\s*W)"),
                    "gpr" : re.compile("(?<=\[)[a-z0-9]*\s*=0x[0-9a-fA-FX]{16}(?=\]\[\d\]\sWF?)"),
                    "isFloat":re.compile("^f(?!ence)"),
                    "isFloatReg":re.compile("^f[sta]\d{1,2}"),
                    "csr" : re.compile("(?<=\[)[a-z0-9]*\s*=0x[0-9a-fA-F]{16}(?=\]\[\d\] IPC)"),
                    "FPU_id":re.compile("(?<=# FPU 0x)[0-9a-fA-F]{2}"),
                    "FPU_val":re.compile("(?<=# FPU 0x[0-9a-fA-F]{2} ).*"),
                    "mode" : re.compile("(?<=PRV\[)\d*"),
                    "valid" : re.compile("(?<=\[)\d(?=\]\s@)"),
                    "exception" : re.compile("(?<=\[)\d(?=\]\sPRV)"),
                },
                "columns":['pc','instr','gpr','csr','binary','mode','instr_str','valid','exception'],

    },
    "gen_csv":{
        "process":"python3 "+riscv_dv+"scripts/ovpsim_log_to_trace_csv.py --dont_truncate_after_first_ecall --log ",
        "paths":{
            "log":riscv_dv+"out/ovpsim_sim/"
        },
        "errors":["FileNotFoundError","error: unrecognized arguments:"],

    },
    "gen_cov":{
        "paths":{
            "logs": riscv_dv+"out/ovpsim_sim/",
        },
        "test_param" : "python3 cov.py --iss ovpsim -si riviera --custom_target target/rv64_noelv/",
        "process_param": 'cd '+riscv_dv+' && source ~/.bashrc && module load aldec/riviera/2021.10 && ',
        "test_sucess": "Coverage results are saved",
    },
    "trace_compare":{
        "columns":["Test",'Iteration','Date','GPR Pass','CSR Pass','PC Pass','Binary Pass','Truncation','Verification Status'],

    }



    
}

## Generate RISCV-DV tests and run OVPsim


In [8]:
def gen_test():
    paths_before_run = glob.glob(settings['gen_test']['paths']["test"])
    srec_after_run = []
    logging.info('Generating Tests')
    process = Popen(settings['gen_test']['process_param']+settings["gen_test"]["test_param"], shell=True, stdout=PIPE, stderr=PIPE)
    timeouts = []
    stdout, stderr = process.communicate()
    error = stderr.decode('ascii')
    f = open(riscv_dv+'gen_test_output.log','a')
    f.write(error)
    f.close()

    find_error(settings['gen_test']['errors'], error)

    paths_after_run = glob.glob(settings['gen_test']['paths']["test"])
    if(len(paths_after_run) <= len(paths_before_run)):
        raise ValueError("No new tests were generated")

    for path in list(set(paths_after_run)-set(paths_before_run)):
        try:
            f = open(path, "r")
        except IOError:
            logging.exception('Warning: File not found '+path)
            continue
        timeouts.append(settings["gen_test"]["arg_timeout"] if f.read().find("Info "+str(finish_after)+":") != -1 else settings["gen_test"]["arg_no_timeout"])
        srec_after_run.append(path[path.find('riscv_'):].rstrip('.log')+'.srec')
        f.close()

    return srec_after_run, timeouts



## Run RTL simulation on the newly generated tests


In [9]:
def run_rtl(srec_files, timeout_arg):
    if(len(srec_files) != len(timeout_arg)):
        raise ValueError("Lists are not the same length")
    rtl_log_filenames = []
    logging.info('Running RTL simulation')
    for srec,timeout in zip(srec_files,timeout_arg):
        name = srec[srec.find('riscv_'):].rstrip('.srec')
        logging.debug('Running RTL : '+name)

        try:
            f = open(settings['run_rtl']['paths']['Makefile'],"r")
        except IOError:
            raise RuntimeError('NOEL-V Makefile not found')
        txt = f.read()
        tmp = re.sub("(?<=VSIMOPT=).*",timeout,txt)
        f.close()
        f = open(settings['run_rtl']['paths']['Makefile'],"w")
        f.write(tmp)
        f.close()

        shutil.copy(settings['run_rtl']['paths']['srec']+srec, settings['run_rtl']['paths']['cp_dest']) #copy the current test to the sim dir
        logging.debug("Copied {} to the NOELV directory".format(srec))
        #launch sim
        process = Popen(settings['run_rtl']['process_param'], shell=True, stdout=PIPE, stderr=PIPE)
        stdout, stderr = process.communicate()
        rtl_out = stdout.decode('ascii')


        if(find_success(settings['run_rtl']['test_sucess'],rtl_out)):
            logging.debug("RTL Simulation Success\n")
        else:
            logging.warning("RTL Simulation Failed\n")

        if not os.path.exists(settings['run_rtl']['paths']['rtl_log']):
            os.mkdir(settings['run_rtl']['paths']['rtl_log'])
        name = srec[srec.find('riscv_'):].rstrip('.srec')
        shutil.copy(settings['run_rtl']['paths']['transcript'], settings['run_rtl']['paths']['rtl_log']+name+".rtl.log")
        rtl_log_filenames.append(name+".rtl.log")
    return rtl_log_filenames

## Parse RTL transcript and create rtl_log

In [10]:
def replace_gpr(pd_series,gpr,re_gpr):
    tmp = []
    for line in pd_series:
        split_line = line.split(":")
        if(len(split_line[0]) > 0 and split_line[0] == gpr):
            split_line[0] = re_gpr
        ln = split_line[0]+":"+split_line[1]
        tmp.append(ln)
    return pd.Series(tmp)

def parse_rtl(filenames):
    rtl_data_frames = []
    logging.info('Parsing RTL')
    
    for filename in filenames:
        filename = settings['run_rtl']['paths']['rtl_log']+filename
        try:
            f = open(filename, "r")
        except IOError:
            logging.exception('Warning: File not found '+filename)
            continue

        
        transcript = f.readlines()
        f.close()
        pc,binary,instr_str,gpr,csr,mode,exception,valid = [],[],[],[],[],[],[],[]
        for ln_idx,line in enumerate(transcript):
            if(re.search(settings["parse_rtl"]["reg_ex"]["lookup"],line)): #check that line conforms to format then extract information
                pc.append(re.search(settings['parse_rtl']['reg_ex']["pc"], line).group(0))
                binary.append(re.search(settings['parse_rtl']['reg_ex']["binary"], line).group(0))
                instr_tmp = re.search(settings['parse_rtl']['reg_ex']["instr_str"], line).group(0).strip(" ")
                instr_str.append(instr_tmp)
                csr.append(re.search(settings['parse_rtl']['reg_ex']["csr"],line).group(0).replace(' ','').replace("=0x",":"))
                mode.append(re.search(settings['parse_rtl']['reg_ex']["mode"],line).group(0))
                exception.append(re.search(settings['parse_rtl']['reg_ex']["exception"],line).group(0))
                valid.append(re.search(settings['parse_rtl']['reg_ex']['valid'], line).group(0))
                
                gpr_tmp = re.search(settings['parse_rtl']['reg_ex']["gpr"], line).group(0).replace(" ","").replace("=0x",":")
                if(bool(re.search(settings['parse_rtl']['reg_ex']['isFloat'], instr_tmp) and \
                    bool(re.search(settings['parse_rtl']['reg_ex']['isFloatReg'],gpr_tmp)))\
                ):
                    bound = ln_idx+100 if ln_idx + 100 < len(transcript) else len(transcript)
                    #Look for the FPU's gpr value maximum 100 lines ahead
                    for fpu_line in transcript[ln_idx:bound]:
                        if(bool(re.search(settings['parse_rtl']['reg_ex']['FPU_id'],fpu_line))):
                            if((gpr_tmp[-2:] == re.search(settings['parse_rtl']['reg_ex']['FPU_id'],fpu_line).group(0)) and (gpr_tmp.split(':')[0].find('f') >= 0)):
                                gpr_tmp = re.search(settings['parse_rtl']['reg_ex']['FPU_val'], fpu_line).group(0).replace(" ","").replace("=0x",":")
                                break
                gpr.append(gpr_tmp)


        columns = settings['parse_rtl']['columns']
        rtl_log = pd.DataFrame(columns=columns)
        rtl_log[columns[0]] = pc
        rtl_log[columns[1]] = [x.split(' ')[0] for x in instr_str ]
        rtl_log[columns[2]] = gpr
        rtl_log[columns[3]] = csr
        rtl_log[columns[4]] = binary
        rtl_log[columns[5]] = mode
        rtl_log[columns[6]] = instr_str
        rtl_log[columns[7]] = valid
        rtl_log[columns[8]] = exception
        rtl_log = rtl_log[boot_loader_length:].reset_index(drop=True) #boot_loader_length is where the bootloader ends

        rtl_log.gpr = replace_gpr(rtl_log.gpr, 'fp','s0') #replace fp with s0 to keep convention consistent
        rtl_log['valid'] = rtl_log['valid'].astype('int32') #not true when we have an exception (ecall for instance)
        rtl_log = rtl_log[~((rtl_log['instr'] == "addi") & (rtl_log.shift(1)['instr'] == "mret") & (rtl_log['valid'] == 0))].reset_index(drop=True) #deal with gaislers addi injection after mret, this removes all of those lines
        rtl_log['filename'] = filename.rstrip(".rtl.log")
        rtl_log = rtl_log[~((rtl_log['instr'] == "unknown") & (rtl_log['valid'] == 0) & (rtl_log.shift(1)['valid'] == 0))].reset_index(drop=True)
        rtl_data_frames.append(rtl_log)
    return rtl_data_frames 

## Convert OVPsimlog to csv

In [11]:
def convert_sim_log_to_dataframe(filenames):
    #expected .rtl.log extension
    sim_data_frames = []
    logging.info('Converting OVPsim log files to CSV')
    for file in filenames:
        logging.debug('Converting {} to csv'.format(file.rstrip('rtl.log')+'.log'))
        log = settings['gen_csv']['paths']['log']+file.rstrip('.rtl.log')+'.log'
        csv = settings['gen_csv']['paths']['log']+file.rstrip('.rtl.log')+".csv"
        process = Popen(settings['gen_csv']['process']+log+ " --csv "+csv, shell=True, stdout=PIPE, stderr=PIPE)
        stdout, stderr = process.communicate()
        error = stderr.decode('ascii')
        find_error(settings['gen_csv']['errors'], error)
        time.sleep(1) #try this to make sure that the csv file has time to be closed before being read        
        sim_log = pd.read_csv(csv)
        sim_log['mode'] = sim_log['mode'].astype('Int64')
        sim_log['gpr'] = sim_log['gpr'].astype('str')
        sim_log['csr'] = sim_log['csr'].astype('str')
        sim_log['filename'] = file.rstrip(".log")
        sim_data_frames.append(sim_log)
    return sim_data_frames


## Compare series to see if run was successful

In [31]:
def truncate_logs(rtl_log,sim_log,no_truncation):
    if(abs(len(sim_log)-len(rtl_log) >= 100)):
        logging.debug("Lengths of series differs with "+str(abs(len(sim_log)-len(rtl_log)))+" elements")
        no_truncation = False
    if(len(sim_log) < len(rtl_log)):
        rtl_log=rtl_log.truncate(after=len(sim_log)-1)
        logging.debug("Truncating rtl_log to {} elements to meet size of sim_log".format(len(rtl_log)))
    elif(len(sim_log) == len(rtl_log)):
        logging.debug("Both logs are equally sized")
    else:
        sim_log=sim_log.truncate(after=len(rtl_log)-1)
        logging.debug("Truncating sim_log to {} elements to meet size of rtl_log".format(len(rtl_log)))
    return rtl_log,sim_log,no_truncation

#Truncation
def compare_traces(rtl_data_frames, sim_data_frames):
    if(len(rtl_data_frames) != len(sim_data_frames)):
        raise ValueError('Lists of dataframes are not the same length')
    errors = pd.DataFrame(columns=settings["trace_compare"]["columns"])
    logging.info('Comparing Traces')
    for rtl_log, sim_log in zip(rtl_data_frames,sim_data_frames):
        no_truncation = True
        rtl_log, sim_log, no_truncation = truncate_logs(rtl_log,sim_log,no_truncation)
        #Csr match conditions
        mt_csr_0 = sim_log.csr != "nan"

            
        rtl_csr = rtl_log[(rtl_log.csr.ne(sim_log.csr) & mt_csr_0)].csr #these are the ones we can't exclude directly
        sim_csr = sim_log[rtl_log.csr.ne(sim_log.csr) & mt_csr_0].csr.str.split(";") 
        csr_match_count = compare_csr(sim_csr, rtl_csr)
        mt_csr_1 = csr_information_present(sim_csr,rtl_csr) #ignore csr comparisons where for example mstatus is present the iss but not in the rtl log

        # (OUTDATED COMMENT, CONFIGURING PMP_G IS POSSIBLE IN OVPSIM)Since we couldn't configure the iss granularity for PMP we do an unsafe ignore here based on what the operand containing pmpaddr and the instruction being a csr instruction
        # We then check that the gpr content of the instructions only differs by one, however, this could potentially miss some gpr missmatches.
        gpr_pmp_missmatch_tol = 1 #connected to pmp_g in NOEL-V
        rtl_tmp = rtl_log[(sim_log.gpr != 'nan') & (sim_log.operand.str.find('pmpaddr') >= 0) & (sim_log.instr.str.find('csr') >= 0)].gpr
        sim_tmp = sim_log[(sim_log.gpr != 'nan') & (sim_log.operand.str.find('pmpaddr') >= 0) & (sim_log.instr.str.find('csr') >= 0)].gpr
        count = (np.abs(sim_tmp.apply(lambda x: int(x.split(':')[1],16)) - rtl_tmp.apply(lambda x: int(x.split(':')[1],16))) == gpr_pmp_missmatch_tol).value_counts()
        soft_match_gpr = 0
        if(np.where(count.index == True)[0].size > 0):
            soft_match_gpr = count[True]
        matching_gpr = len(sim_log[(sim_log.gpr != "nan") & (rtl_log.gpr == sim_log.gpr)])
        nan_cnt_gpr = len(sim_log[sim_log.gpr == "nan"])
        #these csrs only contain one update to the csr, however, where ovpsim reports more than one update we use csr_match_count
        matching_csr = sim_log[((sim_log.csr != "nan")) & (rtl_log.csr.eq(sim_log.csr))].csr.count() + csr_match_count
        nan_cnt_csr = sim_log[(sim_log.csr == "nan")].csr.count() + len(mt_csr_1)-csr_match_count#[mt_csr_1 == False].count()
        
        tot_cnt_gpr = matching_gpr + nan_cnt_gpr + soft_match_gpr
        gpr_match = (tot_cnt_gpr == len(rtl_log.gpr))
        binary_match = rtl_log.binary.eq(sim_log.binary).all()
        pc_match = rtl_log.pc.eq(sim_log.pc).all()

        csr_match = (matching_csr+nan_cnt_csr == len(rtl_log.csr))
        ig_csr_perc = 1-matching_csr/(rtl_log.csr.count()-sim_log.csr[(sim_log.csr=="nan")].count())

        if(gpr_match and binary_match and pc_match and csr_match and no_truncation):
            logging.debug("GPR, Binary, PC and CSR contents match\nIgnored {:.1%} CSR and {:.1%} of GPR comparisons".format(ig_csr_perc,nan_cnt_gpr/rtl_log.gpr.count()))
            filename = rtl_log.filename.values[0]
            filename = filename[filename.find('riscv_'):]
            #if test passes then remove all logs except the iss log, that is still needed for the coverage report
            files_to_rm = glob.glob(riscv_dv+'out/**/'+filename+'*')
            files_to_rm.append(glob.glob(riscv_dv+'out/sim_'+filename+'*')[0])
            for file in files_to_rm:
                if(not (file.find('log') > 0) and (file.find('ovpsim') > 0)):
                    os.remove(file)
        else:
            logging.debug("PC match: {}\nBinary match: {}\nGPR match: {}\nCSR match: {}\nIgnored {:.1%} CSR and {:.1%} GPR comparisons".format(pc_match,binary_match,gpr_match,csr_match,ig_csr_perc,nan_cnt_gpr/rtl_log.gpr.count()))
            errors = pd.DataFrame(columns=settings["trace_compare"]["columns"])
            name = rtl_log.filename.values[0][rtl_log.filename.values[0].find('riscv_'):]
            logging.debug(name)
            errors['Test'] = [name]
            errors['Iteration'] = [re.search('(?<=:\d{2}_)\d*',name).group(0)]
            errors['Date'] = [re.search('\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2}',name).group(0)]
            errors['GPR Pass'] = [gpr_match]
            errors['PC Pass'] = [pc_match]
            errors['Binary Pass'] = [binary_match]
            errors['CSR Pass'] = [csr_match]
            errors['Truncation'] = [not no_truncation]
            # print(errors)
            if not os.path.exists(riscv_dv+'error_log.csv'):
                errors.to_csv(riscv_dv+'error_log.csv',mode='a',header=True,index=False)
            else:
                errors.to_csv(riscv_dv+'error_log.csv',mode='a',header=False,index=False)

In [None]:
#main
for _ in tqdm(range(29)):
    print('\n')
    s,t = gen_test()
    rtl_list = run_rtl(s,t)
    rtl_dataframes = parse_rtl(rtl_list)
    sim_dataframes = convert_sim_log_to_dataframe(rtl_list)
    compare_traces(rtl_dataframes,sim_dataframes)


In [None]:
# Use this to fetch the rtl list again from the path of rtl_log
rtl_logs = glob.glob(riscv_dv+'out/rtl_log/*')
rtl_list = [bool(re.search("^((?!12:29:42).)*$",x)) for x in rtl_logs]
rtl_list = list(compress(rtl_logs,rtl_list))
rtl_list = [x[x.find('riscv_'):] for x in rtl_list]
rtl_dataframes = parse_rtl(rtl_list)
sim_dataframes = convert_sim_log_to_dataframe(rtl_list)
compare_traces(rtl_dataframes,sim_dataframes)

## Diagnose potential problems put into the error log

In [52]:
#What are the errors? 
errors = pd.read_csv(riscv_dv+'error_log.csv')
num = 17-2 #specify row of csv to display errors
test = errors[num:num+1]
name = [test.Test.values[0]+'.rtl.log']
# name = ['riscv_full_interrupt_test_2022-03-12_20:08:12_0.rtl.log']
rtl_log = parse_rtl(name)
sim_log = convert_sim_log_to_dataframe(name)
rtl_log,sim_log,_ = truncate_logs(rtl_log[0],sim_log[0],False)
if(not test['GPR Pass'].bool()):
    print("GPR issue")
    print(np.unique(rtl_log[(rtl_log.gpr != sim_log.gpr) & (sim_log.gpr != 'nan') & ~((sim_log.gpr != 'nan') & (sim_log.operand.str.find('pmpaddr') >= 0) & (sim_log.instr.str.find('csr') >= 0))].gpr))
    print(np.unique(sim_log[(rtl_log.gpr != sim_log.gpr) & (sim_log.gpr != 'nan') & ~((sim_log.gpr != 'nan') & (sim_log.operand.str.find('pmpaddr') >= 0) & (sim_log.instr.str.find('csr') >= 0))].gpr))
if(not test['CSR Pass'].bool()):
    print("CSR issue")
    print(rtl_log[(rtl_log.csr != sim_log.csr) & (sim_log.csr != 'nan') & (rtl_log.csr != 'ustatus:0000000000000000')].csr)
    print(sim_log[(rtl_log.csr != sim_log.csr) & (sim_log.csr != 'nan') & (rtl_log.csr != 'ustatus:0000000000000000')].csr)
if(not test['Binary Pass'].bool()):
    print("OPcode issue")
    print(rtl_log[(rtl_log['binary'] != sim_log['binary']) & (sim_log['binary'] != 'nan')]['binary'])
    print(sim_log[(rtl_log['binary'] != sim_log['binary']) & (sim_log['binary'] != 'nan')]['binary'])
if(not test['PC Pass'].bool()):
    print("PC issue")
    print(rtl_log[(rtl_log.pc != sim_log.pc) & (sim_log.pc != 'nan')].pc)
    print(sim_log[(rtl_log.pc != sim_log.pc) & (sim_log.pc != 'nan')].pc)


2022-03-14 10:09:39 - Parsing RTL
2022-03-14 10:09:43 - Converting OVPsim log files to CSV


GPR issue
['s0:8000000a00086800']
['s0:0000000a00082800']


## Run the next cell to disasemble the program in which you had an error

In [None]:
objdump = os.environ['RISCV_OBJDUMP']
settings = '-d'
program = riscv_dv+'out/asm_test/'+test.Test.values[0]+'.o'

process = Popen('{} {} {}'.format(objdump,settings,program),stdout=PIPE, shell=True)
stdout, stderr = process.communicate()
disas_program = stdout.decode('utf-8')
print(disas_program)

In [23]:
## Coverage results

path = glob.glob(settings["gen_cov"]["paths"]["logs"])
process = Popen(settings["gen_cov"]["process_param"]+settings["gen_cov"]["test_param"]+" --dir "+path[0], shell=True, stdout=PIPE, stderr=PIPE)

stdout, stderr = process.communicate()
cov_out = stdout.decode('ascii')
cov_err = stderr.decode('ascii')
if(find_success(settings['gen_cov']['test_sucess'],cov_err)):
    logging.info("Coverage results produced successfully")
else:
    logging.error(cov_err)


2022-03-11 12:29:50 - Coverage results produced successfully
