In [1]:
import os
import openai

from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv()) # read local .env file
openai.api_key = os.environ['OPENAI_API_KEY']
from typing import List
from pydantic import BaseModel, Field
from langchain.utils.openai_functions import convert_pydantic_to_openai_function
from langchain.agents import tool

import pickle
import networkx as nx
import ast
import re
import os
from datasets import load_dataset
from dotenv import load_dotenv
from langchain_openai import OpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_community.callbacks import get_openai_callback
load_dotenv()
import commit

def neighbors_by_relation(G, node, relation_type):
    
    neighbors = []
    for u, v, data in G.edges(node, data=True):
        if data.get('relation') == relation_type:
            neighbor = v if u == node else u  # Handle undirected edges
            neighbors.append(neighbor)
    return neighbors

def load_graph(pickle_path):
    """Loads a NetworkX DiGraph from a pickle file."""
    with open(pickle_path, "rb") as f:
        graph = pickle.load(f)
    return graph

dataset = load_dataset("lahirum/SWE_Experimental", split="train")
# filter = [0, 1, 2, 3, 4,5, 6, 7, 8, 9]
# dataset = dataset.select(filter)


  from .autonotebook import tqdm as notebook_tqdm


In [12]:
def extract_function_classes(file_path):
    try:
        with open(file_path, "r") as file:
            file_content = file.read()
            parsed_data = ast.parse(file_content)
    except Exception as e:  # Catch all types of exceptions
        print(f"Error in file {file_path}: {e}")
        return [], [], ""
    info = []

    for node in ast.walk(parsed_data):
        if isinstance(node, ast.ClassDef):
            info.append(node.name)
           
        elif isinstance(node, ast.FunctionDef) or isinstance(
            node, ast.AsyncFunctionDef
        ):
            if node.name =="__init__":
                continue
            info.append(node.name)             
    return info

import os

def get_file_structure(root_dir: str) -> dict:
    file_structure = {}

    for dirpath, dirnames, filenames in os.walk(root_dir):
      paths = dirpath.split("/")
      
      filenames = [file for file in filenames if file.endswith('.py')]
      rel_path = os.path.join(root_dir, dirpath)
      rel_path = "." if rel_path == "." else rel_path.replace("\\", "/")
      if not filenames:
        continue
      filenames = [dirpath+"/"+file for file in filenames]
      file_structure[dirpath] = filenames
    return file_structure


In [6]:
import prompts
import schema
from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0.1,
    #max_retries=2,
)


In [5]:
parser = JsonOutputFunctionsParser()

model_extract = llm.bind(
    functions=[convert_pydantic_to_openai_function(schema.SuspiciousComponentOutput)],
    function_call="auto",
)

extract_chain = prompts.prompt_extract | model_extract

model_select = llm.bind(
    functions=[convert_pydantic_to_openai_function(schema.FileSuspicionOutput)],
    function_call="auto",
)
select_chain = prompts.file_path_filter_prompt | model_select

model_filter_list = llm.bind(
    functions=[convert_pydantic_to_openai_function(schema.SuspiciousFilesOutputList)],
    function_call="auto",
)
filter_list_chain = prompts.get_suspicious_file_list_from_list_of_files_prompt | model_filter_list 

model_select_list = llm.bind(
    functions=[convert_pydantic_to_openai_function(schema.SuspiciousFilesOutputList)],
    function_call="auto",
)
select_list_chain = prompts.suspicious_files_filter_list_usingclfn_prompt | model_select_list

model_select_with_reason = llm.bind(
    functions=[convert_pydantic_to_openai_function(schema.SuspiciousFileReasoningOutput)],
    function_call="auto",
)

select_with_reason_chain = prompts.suspicious_files_with_reason_prompt | model_select_with_reason

model_select_directory = llm.bind(
    functions=[convert_pydantic_to_openai_function(schema.SuspiciousDirectoryOutput)],
    function_call="auto",
)
select_directory_chain = prompts.suspicious_directory_prompt | model_select_directory

  functions=[convert_pydantic_to_openai_function(schema.SuspiciousComponentOutput)],


In [8]:
i = 0
commit_id = dataset[i]['base_commit']
name = dataset[i]['instance_id'].split("__")[0]
problem_description = dataset[i]['problem_statement']
graph = load_graph(f"graph_{name}.pkl")

In [11]:
with get_openai_callback() as callback:
    result = extract_chain.invoke({"problem_description": problem_description})
    # print(result)
    print(callback)
    


Tokens Used: 436
	Prompt Tokens: 408
		Prompt Tokens Cached: 0
	Completion Tokens: 28
		Reasoning Tokens: 0
Successful Requests: 1
Total Cost (USD): $7.8e-05


AttributeError: 'dict' object has no attribute 'function_call'

In [None]:
from utils.compress import get_skeleton
import json
def get_most_suspicious_files(graph, file):
    """
    Given a graph and a file, find the most suspicious files related to the given file.
    """
    if "/" in file:
        file = file.split("/")[-1]
    if "." in file:
        file = file.split(".")[0]
    suspicious_files = []
    for neighbor in neighbors_by_relation(graph, "module_"+file,  'path'):
        suspicious_files.append(neighbor)
    filtered = select_chain.invoke({"problem_description": problem_description, "file_list": suspicious_files})
    filtered = json.loads(filtered.additional_kwargs['function_call']['arguments'])
    selected_file = filtered['suspicious_file']
    candiate_structure = {}
    for neighbor in neighbors_by_relation(graph, selected_file,  'imports')+[selected_file]:
        try:
            # with open(neighbor, "r", encoding="utf-8") as f:
            #     raw_code = f.read()
            candiate_structure[neighbor] = extract_function_classes(neighbor)
            # get_skeleton(raw_code, keep_constant = False, keep_indent=False, total_lines =15, prefix_lines=5,suffix_lines=5)
        except Exception as e:
            continue
        
    filtered_list = select_list_chain.invoke({"problem_description": problem_description, "file_structure": candiate_structure})
    filtered_list = json.loads(filtered_list.additional_kwargs['function_call']['arguments'])
    filtered_list = filtered_list['suspicious_files']
    
    filtered_candidate_structure = {}
    for file in filtered_list:
        try:
            with open(file, "r", encoding="utf-8") as f:
                raw_code = f.read()
            filtered_candidate_structure[file]=get_skeleton(raw_code, keep_constant = False, keep_indent=True, total_lines =15, prefix_lines=5,suffix_lines=5)
        except Exception as e:
            continue
    answer = select_with_reason_chain.invoke({"problem_description": problem_description, "file_structure": filtered_candidate_structure})
    answer = json.loads(answer.additional_kwargs['function_call']['arguments'])
        
    return answer

In [13]:
def get_most_suspicious_files_using_file_structure(graph, problem_description, name):
    """
    Given a graph and a file, find the most suspicious files related to the given file.
    """
    file_structure = get_file_structure(name)
    directories = file_structure.keys()
    filtered = select_directory_chain.invoke({"problem_description": problem_description, "directory_list": directories})
    filtered = json.loads(filtered.additional_kwargs['function_call']['arguments'])
    selected_directory = filtered['suspicious_directory']
    suspicious_files = file_structure[selected_directory] 
    
    print(len(suspicious_files))
    candiate_structure = {}
    for file in suspicious_files:
        try:
            candiate_structure[file] = extract_function_classes(file)
            # get_skeleton(raw_code, keep_constant = False, keep_indent=False, total_lines =15, prefix_lines=5,suffix_lines=5)
        except Exception as e:
            continue
    with get_openai_callback() as callback:    
        filtered_list = select_list_chain.invoke({"problem_description": problem_description, "file_structure": candiate_structure})
        print(callback)
    filtered_list = json.loads(filtered_list.additional_kwargs['function_call']['arguments'])
    filtered_list = filtered_list['suspicious_files']
    
    
    filtered_candidate_structure = {}
    for file in filtered_list:
        try:
            with open(file, "r", encoding="utf-8") as f:
                raw_code = f.read()
            filtered_candidate_structure[file]=get_skeleton(raw_code, keep_constant = False, keep_indent=True, total_lines =15, prefix_lines=5,suffix_lines=5)
        except Exception as e:
            continue
    
    with get_openai_callback() as callback:
        answer = select_with_reason_chain.invoke({"problem_description": problem_description, "file_structure": filtered_candidate_structure})
        print(callback)
    answer = json.loads(answer.additional_kwargs['function_call']['arguments'])
        
    return answer