In [136]:
import pandas as pd
import ast
import sys
import math
import numpy as np
from typing import *

if '../src/' not in sys.path:
    sys.path.append('../src/')
if '../src/api/v6/' not in sys.path:
    sys.path.append('../src/api/v6/')

from representations.tree.tree import Tree
from representations.builders.ast.tearers.tearer_factory import TearerFactory

In [137]:

def build_test_code(
    code: str,
    imports: str,
    test: str,
    code_embed_str: str = "# end code block to test",
    fail_on_error: bool = False,
    verbose: str = "Fatal",
):
    try:
        code_insert_idx = test.find(code_embed_str)
        program_code = imports
        program_code += "\n"
        program_code += test[:code_insert_idx]
        program_code += code
        program_code += "\n"
        program_code += test[code_insert_idx:]
    except Exception as e:
        if verbose == "Error":
            print("[ERROR] Failed to unparse code rep to code\n", e)
        if fail_on_error:
            raise e
        program_code = ""
    finally:
        return program_code


def parse_code_rep_to_code(code_rep: str, rules_enabled: bool = False, verbose: str = "Fatal") -> str:
    try:
        tree = Tree.unparse(code_rep)
        tearer = TearerFactory().get_tearer(tree.root_node, rules_enabled=rules_enabled)
        asdl = tearer.tear(tree.root_node)
        code = ast.unparse(asdl)
    except Exception as e:
        if verbose == "Error":
            print(f"[Error] failed to prase code rep to code:\n", e)
        code = ""
    finally:
        return code
    

def eval_code(code: str):
    test_results = {}
    try:
        context = {}
        exec(code, context)
        test_results = context.get("test_results", {})
        test_results["execution_success"] = test_results.get("execution_success", 0) + 1
    except AssertionError as e:
        test_results["assertion_failure"] = test_results.get("assertion_failure", 0) + 1
    except Exception as e:
        test_results["execution_failure"] = test_results.get("execution_failure", 0) + 1

    code_failure = test_results.get("code_failure", 0)
    assertion_failure = test_results.get("assertion_failure", 0)
    execution_failure = test_results.get("execution_failure", 0)
    execution_success = test_results.get("execution_success", 0)
    correct = test_results.get("correct", 0)
    incorrect = test_results.get("incorrect", 0)
    total = (correct + incorrect) or math.inf
    accuracy = (1 - code_failure) * (correct / total)

    results = dict(
        code_failure=code_failure,
        execution_success=execution_success,
        execution_failure=execution_failure,
        assertion_failure=assertion_failure,
        correct=correct,
        incorrect=incorrect,
        accuracy=accuracy,
    )

    return results


def pass_at_k(n, c, k):
    """
    :param n: total number of samples
    :param c: number of correct samples
    :param k: k in pass@$k$
    """
    if (n - c) < k:
        return 1.0
    score =  1.0 - np.prod(1.0 - k / np.arange(n - c + 1, n + 1))
    return score


def humaneval_accuracy_score(
    n,
    k,
    data: pd.DataFrame,
    code_column_name: str = "pred_code",
    score_id_labels1: Union[str, List[str]] = ["sample_id", "n"],
    score_id_labels2: Union[str, List[str]] = "sample_id",
    score_column_name: str = "accuracy",
):
    test_codes = data.apply(
        lambda x: build_test_code(
            code=x[code_column_name], imports=x["imports"], test=x["test"]
        ),
        axis=1,
    )
    test_results = test_codes.apply(lambda test_code: eval_code(test_code))
    test_results_df = pd.DataFrame.from_records(
        test_results.values, index=test_results.index
    )
    test_scores = (
        test_results_df.reset_index(drop=False)
        .groupby(score_id_labels1)[score_column_name]
        .mean()
    )
    scores = (
        test_scores.reset_index(drop=False)
        .groupby(score_id_labels2)[score_column_name]
        .max()
    )
    c = (scores == 1).sum()
    print(f"c = {c}, n= {n}, k = {k}")
    score = pass_at_k(n, c, k)
    return dict(score=score, results=test_results_df)


def model_eval(
    n,
    k,
    results_df=None,
    results_file_path=None,
    output_column="output",
    gold_column="code",
    parse_to_code=False,
    parse_rules_enabled=False,
    compute_humanval=True,
    compute_bleu=True,
):
    results_df = (
        pd.read_csv(results_file_path) if results_file_path else results_df.copy()
    )
    results_df["sample_id"] = results_df["sample_id"].astype(int)
    results_df.set_index(["sample_id", "sample_minor_id", "n"], inplace=True)
    results_df.sort_index(inplace=True)

    code_column = "generated_code"
    if parse_to_code:
        results_df[code_column] = results_df[output_column].apply(
            lambda x: parse_code_rep_to_code(x, rules_enabled=parse_rules_enabled)
        )
    else:
        results_df[code_column] = results_df[output_column]

    results_df["test"] = results_df["test"].str.replace(
        "= next(iterator)", "= next(iterator, None)", regex=True
    )
    results_df[code_column] = results_df[code_column].str.replace(
        " = ContentType.", " = MessageContentType.", regex=True
    )
    results_df[code_column] = results_df[code_column].str.replace(
        "Message.", "Messages.", regex=True
    )

    humaneval_results = (
        humaneval_accuracy_score(n=n, k=k, data=results_df, code_column_name=code_column)
        if compute_humanval
        else {}
    )

    bleu_results = {}

    results = dict(
        humaneval=humaneval_results,
        bleu=bleu_results
    )
    return results

In [145]:
file_path = '../dist/experiments_results/test-152-codet5p-220m-rep2rep-n100.csv.gz'
parse_to_code = True
parse_rules_enabled = True

In [164]:
model_eval(
    n=100,
    k=10,
    results_df=pd.read_csv(file_path),
    results_file_path=None,
    output_column="output",
    gold_column="code",
    parse_to_code=True,
    parse_rules_enabled=False,
    compute_humanval=True,
)

c = 1, n= 100, k = 10


{'humaneval': {'score': 0.09999999999999998,
  'results':                                 code_failure  execution_success   
  sample_id sample_minor_id n                                       
  0         NaN             0.0              0                  0  \
                            1.0              0                  0   
                            2.0              0                  0   
                            3.0              0                  0   
                            4.0              0                  0   
  ...                                      ...                ...   
  238       NaN             95.0             0                  0   
                            96.0             0                  0   
                            97.0             0                  0   
                            98.0             0                  0   
                            99.0             0                  0   
  
                                  execution_

In [146]:
df = pd.read_csv(file_path, compression='gzip', index_col=False)
df = df.loc[:, ~df.columns.str.contains('^Unnamed')]

In [159]:
results_df = df.copy()
results_df.head(3)

Unnamed: 0,sample_id,test_id,sample_minor_id,text,code,test,imports,lang_rep,code_rep,text_lang_rep,lang_rep_pretty,code_rep_pretty,output,target,n
0,78,78_a,a,"If I get a text message from my boss Tony, the...",message_content_type = MessageContentType(text...,# test data\ndata_model = DataModel(reset=True...,from entities.generic import *\nfrom entities....,[ root [ S [ Command [ Condition [ If [ Test [...,[ Module [ message_content_type = MessageConte...,Text: If I get a text message from my boss Ton...,[ root [ S [ Command [ Condition [ If [ Test [...,[ Module [ message_content_type = MessageConte...,[ Module [ contact = Contact.resolve_from_tex...,[ Module [ message_content_type = MessageConte...,0.0
1,78,78_a,a,"If I get a text message from my boss Tony, the...",message_content_type = MessageContentType(text...,# test data\ndata_model = DataModel(reset=True...,from entities.generic import *\nfrom entities....,[ root [ S [ Command [ Condition [ If [ Test [...,[ Module [ message_content_type = MessageConte...,Text: If I get a text message from my boss Ton...,[ root [ S [ Command [ Condition [ If [ Test [...,[ Module [ message_content_type = MessageConte...,[ Module [ contact = Contact.resolve_from_tex...,[ Module [ message_content_type = MessageConte...,1.0
2,78,78_a,a,"If I get a text message from my boss Tony, the...",message_content_type = MessageContentType(text...,# test data\ndata_model = DataModel(reset=True...,from entities.generic import *\nfrom entities....,[ root [ S [ Command [ Condition [ If [ Test [...,[ Module [ message_content_type = MessageConte...,Text: If I get a text message from my boss Ton...,[ root [ S [ Command [ Condition [ If [ Test [...,[ Module [ message_content_type = MessageConte...,[ Module [ contact = Contact.resolve_from_tex...,[ Module [ message_content_type = MessageConte...,2.0


In [161]:
code_column = "generated_code"
output_column = "output"

results_df["sample_id"] = results_df["sample_id"].astype(int)
results_df.set_index(["sample_id", "sample_minor_id", "n"], inplace=True)
results_df.sort_index(inplace=True)

if parse_to_code:
    results_df[code_column] = results_df[output_column].apply(
        lambda x: parse_code_rep_to_code(x, rules_enabled=parse_rules_enabled)
    )
else:
    results_df[code_column] = results_df[output_column]
    
# results_df["test"] = results_df["test"].str.replace(
#     "= next(iterator)", "= next(iterator, None)", regex=True
# )
# results_df[code_column] = results_df[code_column].str.replace(
#     " = ContentType.", " = MessageContentType.", regex=True
# )
# results_df[code_column] = results_df[code_column].str.replace(
#     "Message.", "Messages.", regex=True
# )

In [162]:
data = results_df.copy()
code_column_name = code_column

test_codes = data.apply(
    lambda x: build_test_code(
        code=x[code_column_name], imports=x["imports"], test=x["test"]
    ),
    axis=1,
)

test_results = test_codes.apply(lambda test_code: eval_code(test_code))

test_results_df = pd.DataFrame.from_records(
    test_results.values, index=test_results.index
)


score_column_name = "accuracy"
score_id_labels1 = ["sample_id", "n"]

test_scores = (
    test_results_df.reset_index(drop=False)
        .groupby(score_id_labels1)[score_column_name]
        .mean()
    )


score_id_labels2 = ["sample_id"]

scores = (
    test_scores.reset_index(drop=False)
        .groupby(score_id_labels2)[score_column_name]
        .max()
)

In [163]:
(scores == 1).sum()

3

In [123]:
test_results_df.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,code_failure,execution_success,execution_failure,assertion_failure,correct,incorrect,accuracy
sample_id,sample_minor_id,n,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
0,,0.0,0,0,1,0,0,0,0.0
0,,1.0,0,0,1,0,0,0,0.0
0,,2.0,0,0,0,1,0,0,0.0
0,,3.0,0,0,1,0,0,0,0.0
0,,4.0,0,0,0,1,0,0,0.0


In [124]:
test_results_df[test_results_df['correct'] == 1]

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,code_failure,execution_success,execution_failure,assertion_failure,correct,incorrect,accuracy
sample_id,sample_minor_id,n,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
3,b,24.0,0,1,0,0,1,0,1.0
3,b,79.0,0,1,0,0,1,0,1.0
3,b,96.0,0,1,0,0,1,0,1.0
13,b,21.0,0,1,0,0,1,0,1.0
13,b,31.0,0,1,0,0,1,0,1.0
...,...,...,...,...,...,...,...,...,...
209,b,34.0,0,1,0,0,1,0,1.0
209,b,43.0,0,1,0,0,1,0,1.0
209,b,69.0,0,1,0,0,1,0,1.0
209,b,79.0,0,1,0,0,1,0,1.0


In [127]:
tr_df = test_results_df[test_results_df['correct'] == 1].reset_index()
tr_df['sample_id'].unique()

array([  3,  13,  15,  35,  40,  58,  63,  76,  85, 104, 112, 118, 124,
       209])

In [114]:
test_results_df[test_results_df["correct"] == 1]

Unnamed: 0,code_failure,execution_success,execution_failure,assertion_failure,correct,incorrect,accuracy
524,0,1,0,0,1,0,1.0
579,0,1,0,0,1,0,1.0
596,0,1,0,0,1,0,1.0
1421,0,1,0,0,1,0,1.0
1431,0,1,0,0,1,0,1.0
...,...,...,...,...,...,...,...
14434,0,1,0,0,1,0,1.0
14443,0,1,0,0,1,0,1.0
14469,0,1,0,0,1,0,1.0
14479,0,1,0,0,1,0,1.0


In [84]:
results_df.iloc[0]

test_id                                                            0
text               Check the availability of Pepsi at Walmart and...
code               product_name = ProductName.resolve_from_text("...
test               # test data\ndata_model = DataModel(reset=True...
imports            from entities.generic import *\nfrom entities....
lang_rep           [ root [ S [ Command [ Action [ hd [ Check ] ]...
code_rep           [ Module [ product_name = ProductName.resolve_...
text_lang_rep      Text: Check the availability of Pepsi at Walma...
lang_rep_pretty    [ root [ S [ Command [ Action [ hd [ Check ] ]...
code_rep_pretty    [ Module [ product_name = ProductName.resolve_...
output              [ Module [ product_name = ProductName.resolve...
target             [ Module [ product_name = ProductName.resolve_...
generated_code     product_name = ProductName.resolve_from_text('...
Name: (0, nan, 0.0), dtype: object

In [89]:
code_rep = results_df.iloc[0]["output"]
imports = results_df.iloc[0]["imports"]
test = results_df.iloc[0]["test"]

# tree = Tree.unparse(code_rep)
# tearer = TearerFactory().get_tearer(tree.root_node, rules_enabled=True)
# asdl = tearer.tear(tree.root_node)
# code = ast.unparse(asdl)
# print(code)


In [94]:
code = parse_code_rep_to_code(results_df.iloc[0]["code_rep"], rules_enabled=parse_rules_enabled)
print(code)

product_name = ProductName.resolve_from_text('Pepsi')
location = Location.resolve_from_text('Walmart')
products = Shopping.find_products(product_name=product_name, location=location)
Responder.respond(response=products)
location = Location.resolve_from_text('Walgreens')
products = Shopping.find_products(product_name=product_name, location=location)
Responder.respond(response=products)


In [95]:
test_code = build_test_code(
    code=code, 
    imports=imports, 
    test=test
)
print(test_code)

from entities.generic import *
from entities.calendar import *
from entities.home import *
from entities.map import *
from entities.message import *
from entities.music import *
from entities.navigation import *
from entities.reminder import *
from entities.shopping import *
from entities.weather import *
from actions.calendar import *
from actions.clock import *
from actions.calendar import *
from actions.home import *
from actions.map import *
from actions.messages import *
from actions.music import *
from actions.navigation import *
from actions.reminders import *
from actions.responder import *
from actions.shopping import *
from actions.weather import *
from providers.data_model import DataModel
from datetime import datetime, timedelta
import utils.api_utils as utils
from utils.test_utils import *

# test data
data_model = DataModel(reset=True)
data_product_name_pepsi = ProductName(text="Pepsi")
data_model.append(data_product_name_pepsi)
data_product_name_coca = ProductName(text="

In [109]:
eval_code(test_code)

{'code_failure': 0,
 'execution_success': 1,
 'execution_failure': 0,
 'assertion_failure': 0,
 'correct': 4,
 'incorrect': 0,
 'accuracy': 1.0}

In [194]:
import pandas as pd

def get_score(score, soft=True):
    if soft:
        return score
    return 1 if score == 1 else 0

# Assuming df is your DataFrame
# Example: 
df = pd.DataFrame({'sample_id': [1, 1, 2, 2, 3, 1], 'n': [1, 2, 1, 2, 1, 3], 'accuracy': [1.0, 0.95, 1.0, 0.85, 0.99, 1.0]})
df['accuracy2'] = df['accuracy'].apply(lambda x: get_score(x))
# Group by 'sample_id' and count the number of records with 'accuracy' equal to 1.0
accuracy_count = (
    df.groupby('sample_id')['accuracy2'].sum()# if (soft or (x['accuracy'] == 1.0)) else 0))
).mean()

print(accuracy_count)

1.9300000000000004
