In [1]:
import os
from openai import OpenAI
from dotenv import load_dotenv
load_dotenv()
client = OpenAI(
  api_key=os.getenv('OPENAI_API_KEY'),
  base_url="https://test-cloudflare-7nq.pages.dev/v1/"
)

pymatgen-analysis-defects单元测试文件读取和分割

(Read pymatgen-analysis-defects unit test files and split)

In [None]:
import json
import uuid
from tree_sitter import Language, Parser

class PythonCodeParser:
    def __init__(self):
        self.parser = self.setup_parser()

    def setup_parser(self):
        PY_LANGUAGE = Language('build_python_parser/my-languages.so', 'python')
        parser = Parser()
        parser.set_language(PY_LANGUAGE)
        return parser

    def parse_code(self, file_path):
        with open(file_path, 'r') as f:
            source_code = f.read()

        tree = self.parser.parse(bytes(source_code, "utf8"))
        root_node = tree.root_node

        imports, classes, functions, variables = [], [], [], []

        for node in root_node.children:
            if node.type in ('import_statement', 'import_from_statement'):
                imports.append(source_code[node.start_byte:node.end_byte].strip())
            elif node.type == 'decorated_definition':
                function_node = node.children[-1]  # The actual function definition
                function_start = node.start_byte
                function_end = function_node.end_byte
                functions.append(source_code[function_start:function_end].strip())
            elif node.type == 'class_definition':
                classes.append(source_code[node.start_byte:node.end_byte].strip())
            elif node.type == 'function_definition':
                functions.append(source_code[node.start_byte:node.end_byte].strip())
            elif node.type == 'expression_statement' and '=' in source_code[node.start_byte:node.end_byte]:
                variables.append(source_code[node.start_byte:node.end_byte].strip())

        data = {
            "uuid": str(uuid.uuid4()),
            "filename": file_path,
            "imports": imports,
            "classes": classes,  # Store classes as a list
            "global functions": functions,  # Store functions as a list
            "global variables": variables  # Store variables as a list
        }

        return data


    def save_to_json(self, file_path, output_json_path):
        data = self.parse_code(file_path)
        with open(output_json_path, 'w') as f:
            json.dump(data, f, indent=4)
            
            
def process_directory(root_dir, parser):
    # Create the 'txt_code' directory if it doesn't exist
    output_root_dir = os.path.join(os.getcwd(), 'code_segments/pymatgen_analysis_defects/')
    if not os.path.exists(output_root_dir):
        os.makedirs(output_root_dir)

    for subdir, _, files in os.walk(root_dir):
        for file in files:
            if file.endswith('.py') and file != '__init__.py':
                file_path = os.path.join(subdir, file)
                
                # Create corresponding subdirectory structure in 'txt_code'
                relative_subdir = os.path.relpath(subdir, root_dir)
                output_subdir = os.path.join(output_root_dir, relative_subdir)
                
                if not os.path.exists(output_subdir):
                    os.makedirs(output_subdir)
                
                json_output_path = os.path.join(output_subdir, file.replace('.py', '.json'))
                
                # Save JSON file
                parser.save_to_json(file_path, json_output_path)

# Example usage
parser = PythonCodeParser()
process_directory('../tool_source_code/pymatgen-analysis-defects/tests', parser)

In [1]:
import json
import os

class JSONAssembler:
    def __init__(self, base_file_path):
        self.base_file_path = base_file_path

    def read_json(self, file_path: str) -> dict:
        with open(file_path, 'r') as file:
            data = json.load(file)
        return data

    def assemble_base(self):
        data = self.read_json(self.base_file_path)
        output = ""

        for key in ["imports", "global variables", "classes", "global functions"]:
            if key in data and data[key]:
                if isinstance(data[key], list):
                    output += "\n".join(data[key]) + "\n"
                else:
                    output += data[key] + "\n"

        return output.strip()

    def assemble_others(self, json_files):
        outputs = []

        for file_path in json_files:
            if file_path == self.base_file_path:
                continue  # Skip the base file

            data:dict = self.read_json(file_path)
            imports = "\n".join(data.get("imports", []))
            global_vars = "\n".join(data.get("global variables", []))

            for class_def in data.get("classes", []):
                assembled = "\n".join(filter(None, [imports, global_vars, class_def]))
                outputs.append(assembled)

            for function_def in data.get("global functions", []):
                assembled = "\n".join(filter(None, [imports, global_vars, function_def]))
                outputs.append(assembled)

        return outputs

    def gather_json_files(self, dir_path):
        json_files = []
        for root, _, files in os.walk(dir_path):
            for file in files:
                if file.endswith('.json'):
                    json_files.append(os.path.join(root, file))
        return json_files

    def process_and_output(self, other_json_dir):
        # Process base file
        base_output = self.assemble_base()
        # Gather other JSON files, excluding the base file
        other_json_files = self.gather_json_files(other_json_dir)
        # Assemble other files
        other_outputs = self.assemble_others(other_json_files)
        return base_output, other_outputs

In [2]:
# Usage example:
base_file = '../code_segments/pymatgen_analysis_defects/conftest.json'
other_json_dir = '../code_segments/pymatgen_analysis_defects'
assembler = JSONAssembler(base_file)
base_output, other_output = assembler.process_and_output(other_json_dir)

## QA问题集生成Agent

In [3]:
with open("extract_params_from_test.txt", "r") as file:
    EXTRACT_PARAMS_PROMPT= file.read()
with open("generate_new_unit_test.txt", "r") as file:
    GENERATE_UNIT_TEST_PROMPT = file.read()
with open("build_question_from_test.txt", "r") as file:
    BUILD_QUESTION_PROMPT = file.read()
with open("build_question_from_test_user.txt", "r") as file:
    BUILD_QUESTION_PROMPT_USER = file.read()   

In [None]:
import re
def extract_params_prompt(other_code):
    UNIT_TEST_PROMPT = "**Unit Test Code:**\n```python\n{unit_test_code}\n```".format(unit_test_code=other_code)
    messages = [
        {
            "role": "system",
            "content": EXTRACT_PARAMS_PROMPT
        },
        {
            "role": "user",
            "content": UNIT_TEST_PROMPT
        }
    ]
    response = client.chat.completions.create(
    model="gpt-4o",
    messages=messages,
    temperature=0,
    )
    match = re.search(r'```json(.*?)```', response.choices[0].message.content, re.DOTALL)
    if match:
        return match.group(1).strip()
    else:
        print("No match found in the response.")
        print(other_code)
        print(response.choices[0].message.content)

def build_question_from_test(base_code, other_code, properties_json):
    messages = [
        {
            "role": "system",
            "content": BUILD_QUESTION_PROMPT
        },
        {
            "role": "user",
            "content": BUILD_QUESTION_PROMPT_USER.format(unit_test_code=other_code, properties_json=properties_json, file_reading_functions=base_code)
        }
    ]
    response = client.chat.completions.create(
    model="gpt-4o",
    messages=messages,
    temperature=0,
    )
    match = re.search(r'<question>(.*?)</question>', response.choices[0].message.content, re.DOTALL)
    if match:
        return match.group(1).strip()
    else:
        print("No match found in the response.")
        print(response.choices[0].message.content)

def generate_new_unit_test(properties_json):
    messages = [
        {
            "role": "system",
            "content": GENERATE_UNIT_TEST_PROMPT
        },
        {
            "role": "user",
            "content": "**Properties JSON:**\n```json\n{properties_json}\n```".format(properties_json=properties_json)
        }
    ]
    response = client.chat.completions.create(
    model="gpt-4o",
    messages=messages,
    temperature=0,
    )
    match = re.search(r'```python(.*?)```', response.choices[0].message.content, re.DOTALL)
    if match:
        return match.group(1).strip()
    else:
        print("No match found in the response.")
        print(response.choices[0].message.content)

In [None]:
from loguru import logger
logger.add("defect_questions.log", rotation=None)
defect_questions = []
for i in range(len(other_output)):
    properties_json_str = extract_params_prompt(other_output[i])
    if properties_json_str:
        question = build_question_from_test(base_output, other_output[i], properties_json_str)
        new_unit_test = generate_new_unit_test(properties_json_str)
    if question and new_unit_test:
            defect_questions.append({
                "properties_json_str": properties_json_str,
                "question": question,
                "new_unit_test": new_unit_test
            })
    logger.info(defect_questions[-1])

In [None]:
with open('defect_questions.json', 'w') as json_file:
    json.dump(defect_questions, json_file, indent=4)

加载数据

show data

In [3]:
import json
with open('defect_questions.json', 'r') as json_file:
    defect_questions = json.load(json_file)

In [4]:
print(base_output)

from collections import defaultdict
from pathlib import Path
import pytest
from monty.serialization import loadfn
from pymatgen.analysis.defects.core import PeriodicSite, Substitution
from pymatgen.analysis.defects.thermo import DefectEntry, FormationEnergyDiagram
from pymatgen.analysis.phase_diagram import PhaseDiagram
from pymatgen.core import Element, Structure
from pymatgen.core.periodic_table import Specie
from pymatgen.io.vasp.outputs import WSWQ, Chgcar, Locpot, Procar, Vasprun
@pytest.fixture(scope="session")
def test_dir():
    return Path.cwd() / 'tool_source_code/pymatgen-analysis-defects/tests/test_files/'
@pytest.fixture(scope="session")
def gan_struct(test_dir):
    return Structure.from_file(test_dir / "GaN.vasp")
@pytest.fixture(scope="session")
def stable_entries_Mg_Ga_N(test_dir):
    return loadfn(test_dir / "stable_entries_Mg_Ga_N.json")
@pytest.fixture(scope="session")
def defect_Mg_Ga(gan_struct):
    ga_site = gan_struct[0]
    mg_site = PeriodicSite(Specie("Mg")

In [43]:
number = 0

In [44]:
print(other_output[number])

from pymatgen.analysis.defects.plotting.thermo import (
    plot_chempot_2d,
    plot_formation_energy_diagrams,
)
from pymatgen.core import Element
def test_chempot_plot(basic_fed) -> None:
    plot_chempot_2d(basic_fed, x_element=Element("Mg"), y_element=Element("Ga"))


In [None]:
print(defect_questions[number]['properties_json_str'])

In [None]:
print(defect_questions[number]['question'])

In [None]:
print(defect_questions[number]['new_unit_test'])

分割数据到文件夹里

split triplets to folder (question_segments)

In [11]:
import os

# Define the directory path
directory_path = 'question_segments/pymatgen_analysis_defects'
# Check if the directory already exists
if not os.path.exists(directory_path):
    # Create the directory
    os.makedirs(directory_path)


In [16]:
import json
import os

for defect_question in defect_questions:
    try:
        properties_json = json.loads(defect_question['properties_json_str'])
    except:
        print(defect_question)
    folder_path = os.path.join(directory_path, properties_json['JSON_File_Name'])
    # Check if the folder exists, if not, create it
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
    with open(os.path.join(folder_path, 'properties.json'), 'w') as json_file:
        json.dump(properties_json, json_file, indent=4)
    with open(os.path.join(folder_path, 'question.txt'), 'w') as txt_file:
        txt_file.write(defect_question['question'])
    with open(os.path.join(folder_path, 'new_unit_test.py'), 'w') as py_file:
        py_file.write(defect_question['new_unit_test'])