# 구현해야 할 것
1. 업로드 한 코드의 파일 tree를 추출하는 것
2. LLM을 이용해 봐야 할 코드 파일 후보를 추림
3. 각 코드 별로 답변 생성에 적합한지 판단
4. 만약 다른 코드를 찾아봐야 한다면, 다시 파일트리를 주고 판단하게 하든, 아니면 import 문으로 판단하게 하든 새롭게 봐야 할 파일을 정해야 함.

### 파일 tree 추출

In [1]:
import os
import json
import utils
import requests
import tempfile
import subprocess

from collections import defaultdict

language_map = {
    ".py": "python",
    ".js": "javascript",
    ".ts": "typescript",
    ".java": "java",
    ".cpp": "cpp",
    ".c": "c",
    ".h": "c",
    ".hpp": "cpp",
    ".html": "html",
    ".css": "css",
    ".json": "json",
    ".yaml": "yaml",
    ".yml": "yaml",
    ".md": "markdown",
    ".txt": "text",
}
codebase_path = "code_snippets/trl-main"

if codebase_path.endswith("zip"):
    temp_dir = tempfile.mkdtemp()
    # subprocess.call()
    subprocess.run(
        ["unzip", "-q", codebase_path, "-d", temp_dir],
        check=True,  # 실패 시 CalledProcessError 발생
    )
    codebase_path = temp_dir

filetree = {} # key: file_path, value: is_unseen
for path, dirs, files in os.walk(codebase_path):
    for file in files:
        ext = file.split(".")[-1]
        if not f".{ext}" in language_map.keys(): continue
        filetree[f"{path}/{file}"] = True

### 파일 선택

In [None]:
def select_file(user_query: str, filetree: dict):
    unseen_files = []
    for file, is_unseen in filetree.items():
        if is_unseen:
            unseen_files.append(file)

    prompt = """### 할 일
- 사용자 질의와 파일 목록을 보고, 어떤 파일에서 사용자 질의를 만족할 수 있는 내용이 있을지 파일 경로 및 이름을 통해 유추하고 판단한다.

### 유의사항
- 사용자 질의 의도를 만족할 수 있는 파일의 경로 최대 5가지 만을, python list 형식으로 return 한다.
- 다른 부연설명이나 추가적인 설명은 절대로 return하지 않는다.
- "파일 목록"에 없는 내용을 절대로 return하지 않는다.
"""
    user_input = f"""- 사용자 질의: {user_query}\n- 파일 목록: {unseen_files}"""

    llm_out = utils.call_llm(prompt=prompt, user_input=user_input)["content"].strip()
    # print(llm_out)
    try:
        selected_files = json.loads(llm_out.replace("'", "\""))
    except:
        if "```" in llm_out:
            llm_out = llm_out.strip("```").strip("python")
            selected_files = json.loads(llm_out.replace("'", "\""))
    for file in selected_files:
        filetree[file] = False
        
    return selected_files
    print(llm_out)

### 파일 내용 보고 답변 생성

In [None]:
def create_answer(user_query: str, selected_files: list):
    """
    Get user's query and file paths(selected_files). 
    LLM traverses files and create candidate answers if content of the file matches or is related to user's query.

    Finally, gather and summarize candidate answers into **final_answer**.
    """
    
    identify_code_prompt = """### 할 일
- 주어진 사용자 질의와 코드 내용을 보고, 사용자의 질의 의도를 충분히 만족할 수 있는 내용인지 판단한 후, 만족할 수 있다면 해당하는 부분의 코드 내용을 추출한다.

### 유의사항
- 사용자는 주어진 코드에 관한 내용을 물어보는 경우가 일반적이므로, general한 함수나 클래스에 대해서는 중요도를 낮춰야 한다.
- 부연설명이나 이유 등의 내용은 중요도를 낮춰야 한다.
- 충분히 만족할 수 없는 내용인 경우, 무조건 "pass"만을 출력한다.
- 답변은 무조건 한국어로 출력한다.
"""

    answer_candidates = []

    for file in selected_files:
        with open(file, "r") as f:
            codes = f.read()

        query_code_user_input = f"""- 사용자 질의: {user_query}\n- 코드 내용: {codes}"""
        codeidf_llm_out = utils.call_llm(prompt=identify_code_prompt, user_input=query_code_user_input)["content"].strip()
        answer_candidates.append(codeidf_llm_out)
        
    # 최종 답변 생성
    final_ans_prompt = """### 할 일
- 사용자 질의와 주어진 파일 별 답변 생성 내용을 보고, 사용자 질의에 맞는 내용만을 선택해 적절한 답변을 출력한다.

### 유의사항
- 만약 모든 "파일 별 답변 생성 내용"이 "pass" 라면 "pass" 만을 그대로 출력한다. 이 경우 부연설명이나 이유 등의 내용은 중요도를 낮춰야 한다.
- 답변은 무조건 한국어로 출력한다.
"""
    final_ans_user_input = f"""- 사용자 질의: {user_query}\n- 파일 별 답변 생성 내용: {answer_candidates}"""
    
    final_answer = utils.call_llm(prompt=final_ans_prompt, user_input=final_ans_user_input)["content"]
    return answer_candidates, final_answer
    print(final_answer)

## 사용자 질의문 등 초기화

In [2]:
with open("config.json", "r") as f:
    configs = json.load(f)

user_query = configs["query"]
final_answer = "pass" # temporaral initialization
max_count = 5
count = 0

### Query decomposition

In [3]:
subquery_decomp_prompt = configs["prompts"]["subquery_prompt"]

user_input = f"### 사용자 질의문\n- {user_query}"

llm_out = utils.call_llm(prompt=subquery_decomp_prompt, user_input=user_input)
print(llm_out["content"])

atomic_queries = json.loads(llm_out["content"])

["orpo trainer의 코드 구현 방식은 무엇인가?", "dpo trainer의 코드 구현 방식은 무엇인가?", "orpo trainer와 dpo trainer의 코드 구현의 차이점은 무엇인가?"]


In [4]:
with open("config.json", "r") as f:
    configs = json.load(f)
    prompts = configs["prompts"]

answer_candidates = []
final_answer_per_queries = {}

with open("subquery_log_file.md", "a+") as f:
    f.write(f"# Original Query \n{user_query}\n")
    for i, q in enumerate(atomic_queries):
        answer_candidates = []
        selected_files = utils.select_file(
            prompts=prompts, 
            user_query=q, 
            filetree=filetree, 
            show_past_docs=False)
        
        answer_candidates, final_answer = utils.create_answer(
            prompts=prompts,
            user_query=q,
            selected_files=selected_files,
            answer_candidates=answer_candidates,
            handle_code_indiv=False
        )

        final_answer_per_queries[q] = final_answer
    
        f.write(f"## Sub Query {i}\n{q}\n\n")
        f.write(f"## Answer\n{final_answer}\n\n")

#     combine_prompt = """### 할 일
# - 사용자 질의, 그 질의를 최소 단위로 분리한 질의, 최소 단위 질의에 대한 답변 목록을 이용하여, 사용자 질의 의도에 맞도록 이를 조합하여 최종 답변을 출력한다.

# ### 출력 내용
# - 최종 답변: 사용자 질의 의도에 맞으면서, 최소 단위 질의에 대한 답변 내용을   
# """
#     user_input = f"### 사용자 질의\n- {user_query}\n### 답변 목록\n{str(final_answer)}\n"

#     llm_out = utils.call_llm(prompt=combine_prompt, user_input=user_input)["content"]
#     f.write(f"## Final Answer\n{llm_out}\n")
#     f.write(f"### Args \n- codebase_path: {codebase_path}\n- sub query decomp: True\n")

In [8]:
combine_prompt = """### 할 일
- 사용자 질의, 그 질의를 최소 단위로 분리한 질의, 최소 단위 질의에 대한 답변 목록을 이용하여, 사용자 질의 의도에 맞도록 최종 답변을 출력한다.

### 출력 내용
- 최종 답변: 최소 단위 질의에 대한 답변 내용을 기반으로 한 사용자 질의 의도에 맞는 답변
"""
user_input = f"### 사용자 질의\n- {user_query}\n### 답변 목록\n{str(final_answer_per_queries)}\n"

print(user_input)

llm_out = utils.call_llm(prompt=combine_prompt, user_input=user_input)["content"]
print(llm_out)

### 사용자 질의
- orpo trainer와 dpo trainer의 코드 구현이 어떤 차이가 있는지 구체적으로 설명해줘
### 답변 목록
{'orpo trainer의 코드 구현 방식은 무엇인가?': '### orpo trainer의 코드 구현 방식은 무엇인가?\n\norpo trainer는 `Trainer` 클래스를 상속받아 구현된 클래스입니다. orpo trainer는 주어진 모델과 데이터셋을 사용하여 orpo 알고리즘을 적용한 학습을 수행합니다. 주요 구성 요소는 다음과 같습니다.\n\n```python\nclass ORPOTrainer(Trainer):\n    def __init__(self, model, args, data_collator, train_dataset, eval_dataset, processing_class, ...):\n        # 초기화 코드\n        super().__init__(model, args, data_collator, train_dataset, eval_dataset, processing_class, ...)\n        \n    def tokenize_row(self, feature):\n        # 데이터 토큰화 코드\n        pass\n    \n    def concatenated_inputs(self, batch):\n        # 입력 배치 데이터 처리 코드\n        pass\n    \n    def odds_ratio_loss(self, policy_chosen_logps, policy_rejected_logps):\n        # orpo 손실 함수 계산 코드\n        pass\n    \n    def concatenated_forward(self, model, batch):\n        # 모델의 순전파 코드\n        pass\n    \n    def get_batch_loss_metrics(self, model, batch):\n  

### 그냥 한 번 돌려보고 싶을때

In [6]:
# First, find appropriate file from filetree with llm
selected_files = select_file(user_query=user_query, filetree=filetree)

# Then, input selected file paths and user's query to LLM again
answer_candidates, final_answer = create_answer(user_query=user_query, selected_files=selected_files)

print(final_answer)

To create a trainer code that uses REINFORCE loss, you can define a custom loss function and pass it to the `Trainer` class using the `compute_loss_func` argument.

```python
def reinforce_loss(outputs, labels, rewards):
    log_probs = outputs[0]
    loss = -torch.mean(rewards * log_probs)
    return loss

# Example usage
trainer = Trainer(
    model=model,
    args=training_args,
    compute_loss_func=lambda outputs, labels, num_items_in_batch: reinforce_loss(outputs, labels, rewards)
)
```

This code defines a custom `reinforce_loss` function that calculates the REINFORCE loss based on the log probabilities and rewards. The `Trainer` class is then initialized with this custom loss function using the `compute_loss_func` argument. Note that the exact implementation details may vary based on the specific requirements of your model and the structure of your outputs and labels.


In [16]:
print(f"Selected files: {selected_files}")
print(f"Answer candidates: {answer_candidates}")

Selected files: ['/var/folders/g2/ss03t_z172d6366_2bfxnbpw0000gn/T/tmp86pcatv7/transformers-main/src/transformers/trainer.py']
Answer candidates: ['### Thought Process and Reasoning\n\nTo address the user\'s query about overriding the `Trainer` class to create a custom trainer, we need to understand the structure and functionality of the `Trainer` class in the Hugging Face Transformers library.\n\n1. **Understanding the `Trainer` Class**: The `Trainer` class is a central component of the Hugging Face Transformers library, designed to simplify the training and evaluation of transformer-based models. It provides a lot of functionality out of the box, including support for distributed training, mixed precision, and various optimization algorithms.\n\n2. **Identifying the Need for Customization**: The user wants to create a custom trainer by overriding the `Trainer` class. This implies that they need to modify or extend the existing functionality of the `Trainer` to suit their specific req

## 만약 답변이 만족스럽지 못하다면?

### 방법 1) 파일 재검색

In [None]:
while final_answer == "pass" and count < max_count:
    # First, input them into LLM to select appropriate ones
    selected_files = select_file(user_query=user_query, filetree=filetree)

    # Then, input selected files' paths and user's query to LLM to get final answer
    final_answer = create_answer(user_query=user_query, selected_files=selected_files)

    # This is for loop limitation
    count += 1

print(final_answer)

### 방법 2) 파일 내 import 문에서 파일 타고 들어가기

#### Build with AST module

In [None]:
import ast
import importlib
from pathlib import Path

class Analyzer(ast.NodeVisitor):
    def __init__(self):
        self.stats = {"import": [], "from": [], "class": [], "function": []}

    def visit_ImportFrom(self, node):
        for alias in node:
            self.stats["from"].append(alias.name)
        
    def visit_Import(self, node):
        for alias in node.names:
            self.stats["import"].append(alias.name)
        
    def visit_ClassDef(self, node):
        self.stats["class"].append(node.name)
        self.generic_visit(node)
    
    def visit_FunctionDef(self, node):
        self.stats["function"].append(node.name)
        self.generic_visit(node)
    

def find_pckg(package_name: str, module_names: list, root_dir: str):
    """
    현재 package_name이 dir, module_names가 함수명/클래스명일 때 해당 파일을 추가하지 못하는 문제가 있음
    """
    connected_files = []
    for path, dirs, files in os.walk(root_dir):
        if package_name:
            if package_name in dirs: # if package name exists and is in directory: find module_name in dir
                connected_files.extend([os.path.join(path, package_name, f"{module}.py") for module in module_names if f"{module}.py" in files])
            elif f"{package_name}.py" in files: # if package name is file: add that file to connected_files
                connected_files.append(os.path.join(path, f"{package_name}.py"))

        else: # if not package_name: add all module_names to connected_files
            for module in module_names:
                if f"{module}.py" in files:
                    connected_files.append(os.path.join(path, f"{module}.py"))
    
    return connected_files


def get_linked_files(selected_files: list, filetree: dict):
    linked_files_dict = defaultdict(list)
    for file_path in selected_files:
        with open(file_path, "r") as f:
            tree = ast.parse(f.read())

        for node in ast.walk(tree):
            if isinstance(node, ast.ImportFrom):
                package_name = node.module
                module_names = [name.name for name in node.names]

            elif isinstance(node, ast.Import):
                package_name = None
                module_names = [name.name for name in node.names]

            else: continue
            
            connected_files = find_pckg(package_name=package_name, module_names=module_names, root_dir=codebase_path)
            linked_files_dict[file_path].extend(connected_files)
            for file in connected_files:
                filetree[file] = False

    return linked_files_dict


In [None]:
# First, find appropriate file from filetree with llm
selected_files = select_file(user_query=user_query, filetree=filetree)

# Then, input selected file paths and user's query to LLM again
answer_candidates, final_answer = create_answer(user_query=user_query, selected_files=selected_files)

print(final_answer)

while final_answer == "pass": # 제대로 된 대답이 나올 때 까지 반복
    selected_files = get_linked_files(selected_files=selected_files, filetree=filetree)
    answer_candidates, final_answer = create_answer(user_query=user_query, selected_files=selected_files)
    print(final_answer)

defaultdict(<class 'list'>,
            {'/var/folders/g2/ss03t_z172d6366_2bfxnbpw0000gn/T/tmpj7xia7yo/transformers-main/src/transformers/trainer.py': ['/var/folders/g2/ss03t_z172d6366_2bfxnbpw0000gn/T/tmpj7xia7yo/transformers-main/src/transformers/configuration_utils.py',
                                                                                                                            '/var/folders/g2/ss03t_z172d6366_2bfxnbpw0000gn/T/tmpj7xia7yo/transformers-main/src/transformers/generation/configuration_utils.py',
                                                                                                                            '/var/folders/g2/ss03t_z172d6366_2bfxnbpw0000gn/T/tmpj7xia7yo/transformers-main/src/transformers/debug_utils.py',
                                                                                                                            '/var/folders/g2/ss03t_z172d6366_2bfxnbpw0000gn/T/tmpj7xia7yo/transformers-main/src/transformers/feature_ex

In [None]:
# 구조 효율화 필요

def get_associated_files(codebase_path: str, selected_files: list):
    file_imports = {}
    for file_path in selected_files:
        # print(f"FILE PATH: {file_path}")
        with open(file_path, "r") as f:
            lines = f.readlines()

        packages = []
        for line in lines:
            if not "import" in line: continue # import 문이 없으면: 되돌아감

            keywords = line.strip().split()

            # Pattern of """import"""
            # 1. import A (as B)
            # 2. from A import B
            # """A""" can be "file name" or "directory name"
            package_name = keywords[1]
            if len(keywords) > 2 and keywords[2] == "import":
                module_name = keywords[3]

            for path, dirs, files in os.walk(codebase_path):
                if f"{package_name}.py" in files: # 1. if package_name is file name
                    packages.append(os.path.join(path, f"{package_name}.py"))
                elif package_name in dirs: # 2. if package_name is directory name
                    if module_name == "*": # 2-1. wildcard: get all files in directory
                        for file in files:
                            packages.append(os.path.join(path, package_name, f"{file}.py"))
                    elif f"{module_name}.py" in files: # 2-2. if module_name is file name
                        packages.append(os.path.join(path, package_name, f"{module_name}.py"))

        # print(packages)
        file_imports[file_path] = packages

    return file_imports

FILE PATH: /var/folders/g2/ss03t_z172d6366_2bfxnbpw0000gn/T/tmphaprmlon/PlanGen-main/generator/generator.py
[]
FILE PATH: /var/folders/g2/ss03t_z172d6366_2bfxnbpw0000gn/T/tmphaprmlon/PlanGen-main/content_planner/contentplanner.py
['/var/folders/g2/ss03t_z172d6366_2bfxnbpw0000gn/T/tmprgxt_1uc/PlanGen-main/content_planner/dynamic_crf_layer.py', '/var/folders/g2/ss03t_z172d6366_2bfxnbpw0000gn/T/tmprgxt_1uc/PlanGen-main/content_planner/utlis.py', '/var/folders/g2/ss03t_z172d6366_2bfxnbpw0000gn/T/tmprgxt_1uc/PlanGen-main/generator/utlis.py']
FILE PATH: /var/folders/g2/ss03t_z172d6366_2bfxnbpw0000gn/T/tmphaprmlon/PlanGen-main/generator/utlis.py
[]
FILE PATH: /var/folders/g2/ss03t_z172d6366_2bfxnbpw0000gn/T/tmphaprmlon/PlanGen-main/content_planner/utlis.py
[]
FILE PATH: /var/folders/g2/ss03t_z172d6366_2bfxnbpw0000gn/T/tmphaprmlon/PlanGen-main/data/data_processing.py
['/var/folders/g2/ss03t_z172d6366_2bfxnbpw0000gn/T/tmprgxt_1uc/PlanGen-main/generator/data_processing_funcs.py', '/var/folders/g

In [None]:
file_imports = get_associated_files(codebase_path=codebase_path, selected_files=selected_files)
selected_files_asdf = []
for _, value in file_imports.items():
    selected_files_asdf.extend(value)

final_answer = create_answer(user_query=user_query, selected_files=selected_files_asdf)
print(final_answer)

  right=ast.Str(s=sentinel),
  return Constant(*args, **kwargs)


NameError: name 'get_associated_files' is not defined

In [None]:
while final_answer == "pass" and count < max_count:
    # Get unused files first
    filetree_unused = [file_path for file_path, is_unseen in filetree.items() if is_unseen]

    # Second, input them into LLM
    selected_files = select_file(user_query=user_query, filetree=filetree_unused)

    # Finally, input selected file paths and user's query to LLM again
    final_answer = create_answer(user_query=user_query, selected_files=selected_files)

    count += 1

print(final_answer)

In [51]:
import numpy

ImportError: Error importing numpy: you should not try to import numpy from
        its source directory; please exit the numpy source tree, and relaunch
        your python interpreter from there.

### python zipfile test

In [11]:
import zipfile
import traceback

try:
    with zipfile.ZipFile('code_snippets/trl.zip', 'r') as zip_ref:
        temp_dir = tempfile.mkdtemp()
        zip_ref.extractall(temp_dir)
except:
    final_str = traceback.format_exc()
print(final_str)

Traceback (most recent call last):
  File "/var/folders/g2/ss03t_z172d6366_2bfxnbpw0000gn/T/ipykernel_37690/1057143016.py", line 5, in <module>
    with zipfile.ZipFile('code_snippets/trl.zip', 'r') as zip_ref:
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/seonghwan/miniconda3/lib/python3.12/zipfile/__init__.py", line 1331, in __init__
    self.fp = io.open(file, filemode)
              ^^^^^^^^^^^^^^^^^^^^^^^
FileNotFoundError: [Errno 2] No such file or directory: 'code_snippets/trl.zip'



In [13]:
import pprint
pprint.pprint("Traceback (most recent call last): File \"\", line 209, in run File \"/usr/local/lib/python3.10/zipfile.py\", line 1251, in init self.fp = io.open(file, filemode) FileNotFoundError: [Errno 2] No such file or directory: '/nfs-root/temp-document/TEMP_DOCUMENT_2029.zip'")

('Traceback (most recent call last): File "", line 209, in run File '
 '"/usr/local/lib/python3.10/zipfile.py", line 1251, in init self.fp = '
 'io.open(file, filemode) FileNotFoundError: [Errno 2] No such file or '
 "directory: '/nfs-root/temp-document/TEMP_DOCUMENT_2029.zip'")
