In [1]:
import os
import pandas as pd

cwd = os.getcwd()  # get directory for storage

# This file automates the entire pipeline for assertion generation with chatgpt

## Step 1) Get Asserted Code From Github

### Step 1.1) Clean and process the code
### Step 1.2) Extract Ground-Truth Assertions & Relevant Statistics

In [16]:
from google.cloud import bigquery as bq

def get_asserted_code(num=100000, ext="%.py", verbose=True):
    query_string = """SELECT f.repo_name, c.content
FROM `bigquery-public-data.github_repos.files` AS f
JOIN `bigquery-public-data.github_repos.contents` AS c
ON f.id = c.id
WHERE
NOT c.binary
AND f.path LIKE '%.py'
AND REGEXP_CONTAINS(c.content, r'(?m)^\s*assert ')
LIMIT """ + str(num)
    
    if isinstance(num, int):
        secret_dir = "Data/secret/"
        api_key = cwd + "/" + secret_dir + os.listdir(secret_dir)[0]
        assert api_key[-5:] == ".json"  # confirm that it was found
        os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = api_key
        query_string = query_string.replace("%.py", ext)

        if verbose:
            print("*Running Query:")
            print(query_string)
            print()
        client = bq.Client()
        df = (
            client.query(query_string)
            .result()
            .to_dataframe(
                create_bqstorage_client=True,
            )
        )
    elif isinstance(num, str):
        # load data from file
        df = pd.read_csv(num)
        print("Found data at", num)
    else:
        print("first param type undefined, must be string signifying directory of csv or\
               int signifying number of records to scrib from bigquery...")
        assert False
    
    if verbose:
        print("*Handling Duplicates...")
    init_len = len(df)
    df.drop_duplicates(subset=["content"], keep="first", inplace=True)
    if verbose:
        print("Duplicate Ratio = ", (len(df)/init_len))
    return df

# small test
verilog_dir = cwd+"/Data/BigQuery/VerilogAssertions-ALL.csv"
python_dir = cwd+"/Data/BigQuery/PythonAssertions100k.csv"
tester_df = get_asserted_code(python_dir).head(1000)  # 10
tester_df

Found data at /Users/korahughes/Documents/GitHub/LLMCodeGen/Data/BigQuery/PythonAssertions100k.csv
*Handling Duplicates...
Duplicate Ratio =  1.0


Unnamed: 0,repo_name,content
0,tqchen/tvm,# Licensed to the Apache Software Foundation (...
1,Lujeni/ansible,# (c) 2017 Red Hat Inc.\n#\n# This file is par...
2,lukas-hetzenecker/home-assistant,"""""""The tests for the Pilight sensor platform.""..."
3,schnoebe/fedora-mock,import fcntl\nimport glob\nimport grp\nimport ...
4,samstav/fastfood,# -*- coding: utf-8 -*-\n# Copyright 2015 Rack...
...,...,...
995,tiran/freeipa,#\n# Copyright (C) 2018 FreeIPA Contributors ...
996,ioam/lancet,#\n# Work in progress. Only SimpleOptimization...
997,MarnuLombard/namebench,#!/usr/bin/python2.4\n#\n# Copyright 2008 Goog...
998,pandegroup/vs-utils,"""""""\nTests for serial.py.\n""""""\nimport cPickle..."


In [17]:
conditionals = dict([[cond, i] for i, cond in enumerate(["==", "!=", "<=", ">=", "<", ">"])])
compounding_statements = ["and"]
bad_statements = ["or"]  # TODO: properly account for OR
def get_assertions(func, is_split=True, verbose=True):
    """
    Format: "assert [expression], [return_string]"
    
    Exceptions to Handle:
    - 'in'/'not in' keyword
    - boolean functions - ex. isinstance(var, type)
    - separation of attributes - ex. len(var), var[i]
    """
    if verbose:
        print("\n*Extracting Assertions")
    out = []
    lines = []
    for temp in func.split('\n'):
        if "assert" in temp:
            bad_flag = False
            for bad in bad_statements:
                if bad in temp:
                    bad_flag = True
                    continue
            if not bad_flag:
                lines.append(temp.strip())
    # TODO: experiment with smaller content window for assertions
    ind = 0
    while ind < len(lines):
        data = lines[ind].strip()
        start = data.find('assert')
        if start != -1:
            # account for combination statements
            for statement in compounding_statements:
                add_statement = data.find(statement)
                if add_statement != -1:
                    extra_line = data[add_statement+len(statement):]
                    lines.insert(ind+1, "assert "+extra_line)
                    data = data[:add_statement].strip()
            
            com = data.find(',')   # parsing out return_string
            if com != -1:
                data = data[:com]

            if is_split:
                data = [var.strip() for var in data.split()]
                if data[0] != "assert":
                    print("something was found before the assertion in this line:\n", data)
                    continue
                data = data[1:]
                
                condition = True  # assertion [variable] == condition by default
                if data[0] == "not":  # accounting for not
                    condition = False
                    data = data[1:]
                    
                assert len(data) >= 1, "empty assertion found?: " + data
                if len(data) == 1:  # adding == to simlify
                    data = data + ["==", str(condition)]
                
                for i in range(len(data)):
                    if data[i] == "is":  # simplifying is to ==
                        data[i] = "=="
                    if data[i] in conditionals.keys():  # com
                        data = [' '.join(data[:i]), data[i], ' '.join(data[i+1:])]  # conditionals[data[i]]
                        continue
            
            if verbose and len(data) != 3:
                print("Weird assertion found:\n", data, '\n', '\n'.join(lines[ind-1:ind+2]))
                print()
#             assert len(data) == 3, "found conditional-less assertion:\n" + str(data) + '\n' + str(lines[ind-1:ind+2])
            else:
                out.append(data)
        ind += 1
    return out

# small test
tester_df["assertions"] = tester_df["content"].apply(lambda code: get_assertions(code))
tester_df

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [2]:
temp = pd.read_csv(cwd+"/Data/BigQuery/PythonAssertions100k.csv")
# temp = pd.read_csv(cwd+"/Data/BigQuery/VerilogAssertions-ALL.csv")
temp

Unnamed: 0,repo_name,content
0,tqchen/tvm,# Licensed to the Apache Software Foundation (...
1,Lujeni/ansible,# (c) 2017 Red Hat Inc.\n#\n# This file is par...
2,lukas-hetzenecker/home-assistant,"""""""The tests for the Pilight sensor platform.""..."
3,schnoebe/fedora-mock,import fcntl\nimport glob\nimport grp\nimport ...
4,samstav/fastfood,# -*- coding: utf-8 -*-\n# Copyright 2015 Rack...
...,...,...
33788,raphaelm/django-i18nfield,from i18nfield.admin import I18nModelAdmin\nfr...
33789,fniephaus/alfred-rworkflow,# The MIT License (MIT)\n#\n# Copyright (c) 20...
33790,bgris/ODL_bgris,# -*- coding: utf-8 -*-\r\n#\r\n# Copyright © ...
33791,chrsrds/scikit-learn,"""""""\nTesting for the base module (sklearn.ense..."


In [10]:
rand = temp.sample()
print(rand["content"].iloc[0])

# -*- coding: utf-8 -*-
from django.conf import settings
from django.http import HttpResponse
from django.test import TestCase
from django_auth_ldap.backend import _LDAPUser
from django.test.client import RequestFactory
from typing import Any, Callable, Dict

import mock
import re

from zerver.lib.actions import do_deactivate_realm, do_deactivate_user, \
    do_reactivate_realm, do_reactivate_user
from zerver.lib.initial_password import initial_password
from zerver.lib.session_user import get_session_dict_user
from zerver.lib.test_helpers import (
    ZulipTestCase
)
from zerver.models import \
    get_realm, get_user_profile_by_email, email_to_username, UserProfile

from zproject.backends import ZulipDummyBackend, EmailAuthBackend, \
    GoogleMobileOauth2Backend, ZulipRemoteUserBackend, ZulipLDAPAuthBackend, \
    ZulipLDAPUserPopulator, DevAuthBackend, GitHubAuthBackend

from social.strategies.django_strategy import DjangoStrategy
from social.storage.django_orm import BaseDjangoStor

## Step 2) Generate LLM Prompt & Query a GPT

In [None]:
def generate_prompt(asserted_code, verbose=True):
    ...
    
    
banned_vars = ['', '*', 'self']
def get_variables(func, verbose=False):
    out = []
    for line in func.split('\n'):
        line = line.strip()
        if "def " in line:  # add params if its a function
            start = line.find('(')
            end = line.find(')')
            for new_param in line[start+1:end].split(','):
                default = new_param.find("=")
                if default != -1:
                    new_param = new_param[:default]
                new_param = new_param.strip()
                if new_param not in out and new_param not in banned_vars:
                    if verbose:
                        print("*Found  {", new_param, "}  at:\n", line, '\n')
                    out.append(new_param)
        else: # add variables if equals operation
            find_var = line.find(' = ')
            if find_var != -1:
                new_var = line[:find_var].strip()
                
                if ',' in new_var: # handle tuple equalities edge case (ex: a, b, c = fn_output())
                    var_list = [tuple_var.strip() for tuple_var in new_var.split(',')]
                else:
                    var_list = [new_var]
                for new_var in var_list:
                    if new_var not in out and new_var not in banned_vars:
                        if verbose:
                            print("**Found  {", new_var, "}  at:\n", line, '\n')
                        out.append(new_var)
            # TODO: handle indexing
    return out

# out = get_variables(df.sample()["content"].iloc[0])
get_vars = lambda code: get_variables(code)
df["variables"] = df["content"].apply(get_vars)
df

In [None]:
# querying
import openai
import altair as alt
import json
from vega_datasets import data

def run_gpt4(messages):
    OPENAI_API_KEY = "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" # os.environ['OPENAI_API_KEY']
    openai.api_key = OPENAI_API_KEY
    response = openai.ChatCompletion.create(
        model="gpt-4",
        messages=messages
    )
    return response["choices"][0]["message"]["content"]


# TODO: add coding language versatility
def gpt_oneshot(input_prompt, directive="You are a helpful bot that adds assertions to pieces of Python code.", verbose=False):
    message_hist = [{"role": "system", "content": directive},
                    {"role": "user", "content": input_prompt}]  # init
    response = run_gpt4(message_hist)
    if verbose:
        print("chat_gpt: ", response, '\n')
#     message_hist.append({"role": "system", "content": response})
    return response

print("\n\n", gpt_oneshot("what do you do?"))

## Step 3) Parse & Evaluate GPT's Response

### Step 3.1) Restore the assertion(s) generated to code and evaluate
> Metrics of evaluation, does it run? does it add to the code? is it ground-truth-like? human evaluator rank? gpt evaluator rank?

## Step 4) ...