In [30]:
import colorsys
import itertools
import platform
from pprint import pprint
import shlex
import subprocess
import tempfile
import math
import os
import random
import shutil
import sqlite3
from tree_sitter import Language, Parser
import tree_sitter_python as tspython
# PY_LANGUAGE = Language(tspython.language())
import sys
sys.path.insert(0, os.path.abspath('.'))
import time
import warnings
from collections import Counter, defaultdict, namedtuple
from importlib import resources
from pathlib import Path

from diskcache import Cache
from grep_ast import TreeContext, filename_to_lang
from pygments.lexers import guess_lexer_for_filename
from pygments.token import Token
from tqdm import tqdm

from tree_sitter_languages import get_language, get_parser
warnings.simplefilter("ignore", category=FutureWarning)

Tag = namedtuple("Tag", "rel_fname fname line name kind".split())

ROOT_IMPORTANT_FILES = [
    # Version Control
    ".gitignore",
    ".gitattributes",
    # Documentation
    "README",
    "README.md",
    "README.txt",
    "README.rst",
    "CONTRIBUTING",
    "CONTRIBUTING.md",
    "CONTRIBUTING.txt",
    "CONTRIBUTING.rst",
    "LICENSE",
    "LICENSE.md",
    "LICENSE.txt",
    "CHANGELOG",
    "CHANGELOG.md",
    "CHANGELOG.txt",
    "CHANGELOG.rst",
    "SECURITY",
    "SECURITY.md",
    "SECURITY.txt",
    "CODEOWNERS",
    # Package Management and Dependencies
    "requirements.txt",
    "Pipfile",
    "Pipfile.lock",
    "pyproject.toml",
    "setup.py",
    "setup.cfg",
    "package.json",
    "package-lock.json",
    "yarn.lock",
    "npm-shrinkwrap.json",
    "Gemfile",
    "Gemfile.lock",
    "composer.json",
    "composer.lock",
    "pom.xml",
    "build.gradle",
    "build.sbt",
    "go.mod",
    "go.sum",
    "Cargo.toml",
    "Cargo.lock",
    "mix.exs",
    "rebar.config",
    "project.clj",
    "Podfile",
    "Cartfile",
    "dub.json",
    "dub.sdl",
    # Configuration and Settings
    ".env",
    ".env.example",
    ".editorconfig",
    "tsconfig.json",
    "jsconfig.json",
    ".babelrc",
    "babel.config.js",
    ".eslintrc",
    ".eslintignore",
    ".prettierrc",
    ".stylelintrc",
    "tslint.json",
    ".pylintrc",
    ".flake8",
    ".rubocop.yml",
    ".scalafmt.conf",
    ".dockerignore",
    ".gitpod.yml",
    "sonar-project.properties",
    "renovate.json",
    "dependabot.yml",
    ".pre-commit-config.yaml",
    "mypy.ini",
    "tox.ini",
    ".yamllint",
    "pyrightconfig.json",
    # Build and Compilation
    "webpack.config.js",
    "rollup.config.js",
    "parcel.config.js",
    "gulpfile.js",
    "Gruntfile.js",
    "build.xml",
    "build.boot",
    "project.json",
    "build.cake",
    "MANIFEST.in",
    # Testing
    "pytest.ini",
    "phpunit.xml",
    "karma.conf.js",
    "jest.config.js",
    "cypress.json",
    ".nycrc",
    ".nycrc.json",
    # CI/CD
    ".travis.yml",
    ".gitlab-ci.yml",
    "Jenkinsfile",
    "azure-pipelines.yml",
    "bitbucket-pipelines.yml",
    "appveyor.yml",
    "circle.yml",
    ".circleci/config.yml",
    ".github/dependabot.yml",
    "codecov.yml",
    ".coveragerc",
    # Docker and Containers
    "Dockerfile",
    "docker-compose.yml",
    "docker-compose.override.yml",
    # Cloud and Serverless
    "serverless.yml",
    "firebase.json",
    "now.json",
    "netlify.toml",
    "vercel.json",
    "app.yaml",
    "terraform.tf",
    "main.tf",
    "cloudformation.yaml",
    "cloudformation.json",
    "ansible.cfg",
    "kubernetes.yaml",
    "k8s.yaml",
    # Database
    "schema.sql",
    "liquibase.properties",
    "flyway.conf",
    # Framework-specific
    "next.config.js",
    "nuxt.config.js",
    "vue.config.js",
    "angular.json",
    "gatsby-config.js",
    "gridsome.config.js",
    # API Documentation
    "swagger.yaml",
    "swagger.json",
    "openapi.yaml",
    "openapi.json",
    # Development environment
    ".nvmrc",
    ".ruby-version",
    ".python-version",
    "Vagrantfile",
    # Quality and metrics
    ".codeclimate.yml",
    "codecov.yml",
    # Documentation
    "mkdocs.yml",
    "_config.yml",
    "book.toml",
    "readthedocs.yml",
    ".readthedocs.yaml",
    # Package registries
    ".npmrc",
    ".yarnrc",
    # Linting and formatting
    ".isort.cfg",
    ".markdownlint.json",
    ".markdownlint.yaml",
    # Security
    ".bandit",
    ".secrets.baseline",
    # Misc
    ".pypirc",
    ".gitkeep",
    ".npmignore",
]


# Normalize the lists once
NORMALIZED_ROOT_IMPORTANT_FILES = set(os.path.normpath(path) for path in ROOT_IMPORTANT_FILES)

class Spinner:
    unicode_spinner = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]
    ascii_spinner = ["|", "/", "-", "\\"]

    def __init__(self, text):
        self.text = text
        self.start_time = time.time()
        self.last_update = 0
        self.visible = False
        self.is_tty = sys.stdout.isatty()
        self.tested = False

    def test_charset(self):
        if self.tested:
            return
        self.tested = True
        # Try unicode first, fall back to ascii if needed
        try:
            # Test if we can print unicode characters
            print(self.unicode_spinner[0], end="", flush=True)
            print("\r", end="", flush=True)
            self.spinner_chars = itertools.cycle(self.unicode_spinner)
        except UnicodeEncodeError:
            self.spinner_chars = itertools.cycle(self.ascii_spinner)

    def step(self):
        if not self.is_tty:
            return

        current_time = time.time()
        if not self.visible and current_time - self.start_time >= 0.5:
            self.visible = True
            self._step()
        elif self.visible and current_time - self.last_update >= 0.1:
            self._step()
        self.last_update = current_time

    def _step(self):
        if not self.visible:
            return

        self.test_charset()
        print(f"\r{self.text} {next(self.spinner_chars)}\r{self.text} ", end="", flush=True)

    def end(self):
        if self.visible and self.is_tty:
            print("\r" + " " * (len(self.text) + 3))

def is_important(file_path):
    file_name = os.path.basename(file_path)
    dir_name = os.path.normpath(os.path.dirname(file_path))
    normalized_path = os.path.normpath(file_path)

    # Check for GitHub Actions workflow files
    if dir_name == os.path.normpath(".github/workflows") and file_name.endswith(".yml"):
        return True

    return normalized_path in NORMALIZED_ROOT_IMPORTANT_FILES


def filter_important_files(file_paths):
    """
    Filter a list of file paths to return only those that are commonly important in codebases.

    :param file_paths: List of file paths to check
    :return: List of file paths that match important file patterns
    """
    return list(filter(is_important, file_paths))

def get_test(fname):
    lang = filename_to_lang(fname)
    print(lang)
    language = get_language(lang)
    print(language)
    parser = get_parser(lang)
    print(parser)
    
    
def get_scm_fname(lang):
    # Load the tags queries
    try:
        # base_path = Path(__file__).parent
        base_path = Path(os.getcwd())
        return base_path.joinpath("queries", f"tree-sitter-{lang}-tags.scm")
    except KeyError:
        return


def get_tags_raw(fname, rel_fname):
        lang = filename_to_lang(fname)
        if not lang:
            return

        try:
            language = get_language(lang)
            parser = get_parser(lang)
        except Exception as err:
            print(f"Skipping file {fname}: {err}")
            return

        query_scm = get_scm_fname(lang)
        if not query_scm.exists():
            return
        query_scm = query_scm.read_text()

        # Read source code
        with open(fname, 'r', encoding="utf-8") as f:
            code = f.read()
        if not code:
            return
        tree = parser.parse(bytes(code, "utf-8"))

        # Run the tags queries
        query = language.query(query_scm)
        captures = query.captures(tree.root_node)

        captures = list(captures)

        saw = set()
        for node, tag in captures:
            if tag.startswith("name.definition."):
                kind = "def"
            elif tag.startswith("name.reference."):
                kind = "ref"
            else:
                continue

            saw.add(kind)

            result = Tag(
                rel_fname=rel_fname,
                fname=fname,
                name=node.text.decode("utf-8"),
                kind=kind,
                line=node.start_point[0],
            )

            yield result

        if "ref" in saw:
            return
        if "def" not in saw:
            return

        # We saw defs, without any refs
        # Some tags files only provide defs (cpp, for example)
        # Use pygments to backfill refs

        try:
            lexer = guess_lexer_for_filename(fname, code)
        except Exception:  # On Windows, bad ref to time.clock which is deprecated?
            # self.io.tool_error(f"Error lexing {fname}")
            print("Error lexing {fname}")
            return

        tokens = list(lexer.get_tokens(code))
        tokens = [token[1] for token in tokens if token[0] in Token.Name]

        for token in tokens:
            yield Tag(
                rel_fname=rel_fname,
                fname=fname,
                name=token,
                kind="ref",
                line=-1,
            )
            
def get_rel_fname(fname, root="D:\Projects\llmpairprog\Agentless\playground\588436a5-43f1-45f4-80b4-0394f1d7b838\matplotlib"):
        try:
            return os.path.relpath(fname, root)
        except ValueError:
            # Issue #1288: ValueError: path is on mount 'C:', start on mount 'D:'
            # Just return the full fname.
            return fname
        
def get_tags(fname, rel_fname):
        """Get tags for a single file"""
        data = list(get_tags_raw(fname, rel_fname))

        return data
            
def get_ranked_tags(
    chat_fnames, other_fnames, mentioned_fnames, mentioned_idents, progress=None
):
    import networkx as nx

    defines = defaultdict(set)
    references = defaultdict(list)
    definitions = defaultdict(set)

    personalization = dict()

    fnames = set(chat_fnames).union(set(other_fnames))
    chat_rel_fnames = set()

    fnames = sorted(fnames)

    # Default personalization for unspecified files is 1/num_nodes
    # https://networkx.org/documentation/stable/_modules/networkx/algorithms/link_analysis/pagerank_alg.html#pagerank
    personalize = 100 / len(fnames)

    showing_bar = False

    for fname in fnames:
        # if self.verbose:
        #     self.io.tool_output(f"Processing {fname}")
        if progress and not showing_bar:
            progress()

        try:
            file_ok = Path(fname).is_file()
        except OSError:
            file_ok = False

        # if not file_ok:
        #     if fname not in self.warned_files:
        #         self.io.tool_warning(f"Repo-map can't include {fname}")
        #         self.io.tool_output(
        #             "Has it been deleted from the file system but not from git?"
        #         )
        #         self.warned_files.add(fname)
        #     continue

        # dump(fname)
        rel_fname = get_rel_fname(fname)

        if fname in chat_fnames:
            personalization[rel_fname] = personalize
            chat_rel_fnames.add(rel_fname)

        if rel_fname in mentioned_fnames:
            personalization[rel_fname] = personalize

        tags = list(get_tags(fname, rel_fname))
        if tags is None:
            continue

        for tag in tags:
            if tag.kind == "def":
                defines[tag.name].add(rel_fname)
                key = (rel_fname, tag.name)
                definitions[key].add(tag)

            elif tag.kind == "ref":
                references[tag.name].append(rel_fname)

    ##
    # dump(defines)
    # dump(references)
    # dump(personalization)

    if not references:
        references = dict((k, list(v)) for k, v in defines.items())

    idents = set(defines.keys()).intersection(set(references.keys()))

    G = nx.MultiDiGraph()

    for ident in idents:
        if progress:
            progress()

        definers = defines[ident]
        if ident in mentioned_idents:
            mul = 10
        elif ident.startswith("_"):
            mul = 0.1
        else:
            mul = 1

        for referencer, num_refs in Counter(references[ident]).items():
            for definer in definers:
                # dump(referencer, definer, num_refs, mul)
                # if referencer == definer:
                #    continue

                # scale down so high freq (low value) mentions don't dominate
                num_refs = math.sqrt(num_refs)

                G.add_edge(referencer, definer, weight=mul * num_refs, ident=ident)

    if not references:
        pass

    if personalization:
        pers_args = dict(personalization=personalization, dangling=personalization)
    else:
        pers_args = dict()

    try:
        ranked = nx.pagerank(G, weight="weight", **pers_args)
    except ZeroDivisionError:
        # Issue #1536
        try:
            ranked = nx.pagerank(G, weight="weight")
        except ZeroDivisionError:
            return []

    # distribute the rank from each source node, across all of its out edges
    ranked_definitions = defaultdict(float)
    for src in G.nodes:
        if progress:
            progress()

        src_rank = ranked[src]
        total_weight = sum(data["weight"] for _src, _dst, data in G.out_edges(src, data=True))
        # dump(src, src_rank, total_weight)
        for _src, dst, data in G.out_edges(src, data=True):
            data["rank"] = src_rank * data["weight"] / total_weight
            ident = data["ident"]
            ranked_definitions[(dst, ident)] += data["rank"]

    ranked_tags = []
    ranked_definitions = sorted(
        ranked_definitions.items(), reverse=True, key=lambda x: (x[1], x[0])
    )

    # dump(ranked_definitions)

    for (fname, ident), rank in ranked_definitions:
        # print(f"{rank:.03f} {fname} {ident}")
        if fname in chat_rel_fnames:
            continue
        ranked_tags += list(definitions.get((fname, ident), []))

    rel_other_fnames_without_tags = set(get_rel_fname(fname) for fname in other_fnames)

    fnames_already_included = set(rt[0] for rt in ranked_tags)

    top_rank = sorted([(rank, node) for (node, rank) in ranked.items()], reverse=True)
    for rank, fname in top_rank:
        if fname in rel_other_fnames_without_tags:
            rel_other_fnames_without_tags.remove(fname)
        if fname not in fnames_already_included:
            ranked_tags.append((fname,))

    for fname in rel_other_fnames_without_tags:
        ranked_tags.append((fname,))

    return ranked_tags

def token_count(text):
    return len(text.split(" "))

def get_ranked_tags_map_uncached(
        chat_fnames,
        other_fnames=None,
        max_map_tokens=None,
        mentioned_fnames=None,
        mentioned_idents=None,
    ):
        if not other_fnames:
            other_fnames = list()
        if not max_map_tokens:
            max_map_tokens = max_map_tokens
        if not mentioned_fnames:
            mentioned_fnames = set()
        if not mentioned_idents:
            mentioned_idents = set()

        spin = Spinner("Updating repo map")

        ranked_tags = get_ranked_tags(
            chat_fnames,
            other_fnames,
            mentioned_fnames,
            mentioned_idents,
            progress=spin.step,
        )

        other_rel_fnames = sorted(set(get_rel_fname(fname) for fname in other_fnames))
        special_fnames = filter_important_files(other_rel_fnames)
        ranked_tags_fnames = set(tag[0] for tag in ranked_tags)
        special_fnames = [fn for fn in special_fnames if fn not in ranked_tags_fnames]
        special_fnames = [(fn,) for fn in special_fnames]

        ranked_tags = special_fnames + ranked_tags

        spin.step()

        num_tags = len(ranked_tags)
        lower_bound = 0
        upper_bound = num_tags
        best_tree = None
        best_tree_tokens = 0

        chat_rel_fnames = set(get_rel_fname(fname) for fname in chat_fnames)
        tree = to_tree(ranked_tags[:num_tags], chat_rel_fnames)
        num_tokens = token_count(tree)

        if num_tokens > max_map_tokens:
            print(f"Warning: The generated tree exceeds the max_map_tokens limit by {num_tokens - max_map_tokens} tokens.")

        return tree
        # self.tree_cache = dict()

        # middle = min(max_map_tokens // 25, num_tags)
        # while lower_bound <= upper_bound:
        #     # dump(lower_bound, middle, upper_bound)

        #     spin.step()

        #     tree = to_tree(ranked_tags[:middle], chat_rel_fnames)
        #     num_tokens = token_count(tree)

        #     pct_err = abs(num_tokens - max_map_tokens) / max_map_tokens
        #     ok_err = 0.15
        #     if (num_tokens <= max_map_tokens and num_tokens > best_tree_tokens) or pct_err < ok_err:
        #         best_tree = tree
        #         best_tree_tokens = num_tokens

        #         if pct_err < ok_err:
        #             break

        #     if num_tokens < max_map_tokens:
        #         lower_bound = middle + 1
        #     else:
        #         upper_bound = middle - 1

        #     middle = (lower_bound + upper_bound) // 2

        # spin.end()
        # return best_tree
    
def get_mtime(fname):
    try:
        return os.path.getmtime(fname)
    except FileNotFoundError:
        # self.io.tool_warning(f"File not found error: {fname}")
        print(f"File not found error: {fname}")

def render_tree(abs_fname, rel_fname, lois):
    mtime = get_mtime(abs_fname)
    key = (rel_fname, tuple(sorted(lois)), mtime)


    # code = self.io.read_text(abs_fname) or ""
    with open(abs_fname, 'r', encoding="utf-8") as f:
        code = f.read()
    if not code.endswith("\n"):
        code += "\n"

    context = TreeContext(
        rel_fname,
        code,
        color=False,
        line_number=False,
        child_context=False,
        last_line=False,
        margin=0,
        mark_lois=False,
        loi_pad=0,
        # header_max=30,
        show_top_of_file_parent_scope=False,
    )
        # self.tree_context_cache[rel_fname] = {"context": context, "mtime": mtime}

    # context = self.tree_context_cache[rel_fname]["context"]
    context.lines_of_interest = set()
    context.add_lines_of_interest(lois)
    context.add_context()
    res = context.format()
    # self.tree_cache[key] = res
    return res

def to_tree(tags, chat_rel_fnames):
    # print("DEBUG: Starting to_tree")
    # print(f"DEBUG: Number of tags: {len(tags)}")
    if not tags:
        return ""

    cur_fname = None
    cur_abs_fname = None
    lois = None
    output = ""
    # add a bogus tag at the end so we trip the this_fname != cur_fname...
    # dummy_tag = (None,)
    dummy_tag = Tag(rel_fname="dummy", fname="dummy", line=999999, name="dummy", kind="dummy")
    # sorted_tags = sorted(tags) + [dummy_tag]
    # print(f"DEBUG: Sorted tags: {sorted_tags[:5]}")
    
    
    # print(f"DEBUG: Sorted tags: {sorted_tags[:5]}")  # Show first 5 tags
    for tag in sorted(tags) + [dummy_tag]:
        this_rel_fname = tag[0]
        # print(f"DEBUG: Current tag: {tag}")
        # print(f"DEBUG: this_rel_fname: {this_rel_fname}, cur_fname: {cur_fname}")
        # if this_rel_fname in chat_rel_fnames:
        #     continue

        # ... here ... to output the final real entry in the list
        if this_rel_fname != cur_fname:
            # print(f"DEBUG: File change detected: {this_rel_fname} != {cur_fname}")
            if lois is not None:
                output += "\n"
                output += cur_fname + ":\n"
                print("abs name", cur_abs_fname, "rel name: ", cur_fname)
                output += render_tree(cur_abs_fname, cur_fname, lois)
                lois = None
            elif cur_fname:
                output += "\n" + cur_fname + "\n"
            if type(tag) is Tag:
                lois = []
                cur_abs_fname = tag.fname
            cur_fname = this_rel_fname

        if lois is not None:
            lois.append(tag.line)

    # truncate long lines, in case we get minified js or something else crazy
    output = "\n".join([line[:100] for line in output.splitlines()]) + "\n"

    return output

def find_src_files(directory):
    if not os.path.isdir(directory):
        return [directory]

    src_files = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            src_files.append(os.path.join(root, file))
    return src_files
            

In [67]:
base_path = Path(os.getcwd())
lang = "python"
base_path = base_path.joinpath("queries", f"tree-sitter-{lang}-tags.scm")
base_path
with open(base_path, 'r') as f:
    print(f.read())

(class_definition
  name: (identifier) @name.definition.class) @definition.class

(function_definition
  name: (identifier) @name.definition.function) @definition.function

(call
  function: [
      (identifier) @name.reference.call
      (attribute
        attribute: (identifier) @name.reference.call)
  ]) @reference.call



In [37]:
root_path = "D:/Projects/aider"
file_paths = [r'D:\Projects\Agentless']

chat_fnames = []
other_fnames = []
for fname in file_paths:
    if Path(fname).is_dir():
        chat_fnames += find_src_files(fname)
    else:
        chat_fnames.append(fname)
# rm = RepoMap(root=".", io=io_handler, main_model=mock_model, map_tokens=1024, verbose=False)
repo_map = get_ranked_tags_map_uncached(chat_fnames, other_fnames, max_map_tokens=1024)
# for fname, tags in repo_map.items():
#     print(f"File: {fname}")
#     print("  Definitions:")
#     for tag in tags["defs"]:
#         print(f"    Line {tag.line}: {tag.name}")
#     print("  References:")
#     for tag in tags["refs"]:
#         print(f"    Line {tag.line}: {tag.name}")
# tags = list(get_tags_raw(file_path, get_rel_fname(file_path, root_path)))
# for tag in tags:
#     pprint(tag)
# ranked_tags = [
#     Tag(rel_fname="file1.py", fname="file1.py", line=10, name="func1", kind="def"),
#     Tag(rel_fname="file1.py", fname="file1.py", line=20, name="func2", kind="ref"),
#     Tag(rel_fname="file2.py", fname="file2.py", line=30, name="func3", kind="def"),
#     Tag(rel_fname="file2.py", fname="file2.py", line=40, name="func4", kind="ref"),
# ]
# chat_rel_fnames = {"file1.py", "file2.py"}

# tree_output = to_tree(ranked_tags, chat_rel_fnames)
repo_map_unique = set(repo_map)
# Test code
# test_tags = [
#     Tag(rel_fname="test1.py", fname="test1.py", line=10, name="func1", kind="def"),
#     Tag(rel_fname="test1.py", fname="test1.py", line=20, name="func2", kind="ref"),
#     Tag(rel_fname="test2.py", fname="test2.py", line=30, name="func3", kind="def")
# ]
# chat_rel_fnames = set()
# result = to_tree(test_tags, chat_rel_fnames)
# print("Result:", result)

In [38]:
len(chat_fnames)

63

In [41]:
elements = [line.strip().replace("..\\..\\", "") for line in repo_map.splitlines() if line.strip()]
print(len(elements))

24


In [49]:
import os
import tempfile
from pathlib import Path
import shutil

# 2. Create a temporary directory and replicate the structure
def create_temp_repo_structure_with_content(paths, base_path):
    # Create a temporary directory
    temp_dir = tempfile.mkdtemp()

    for file_path in paths:
        # Define the full path in the temporary directory
        temp_file_path = Path(temp_dir) / file_path
        # Define the original file path
        original_file_path = base_path / file_path
        # Create necessary directories
        temp_file_path.parent.mkdir(parents=True, exist_ok=True)
        # Copy content from the original file if it exists
        if original_file_path.exists():
            shutil.copyfile(original_file_path, temp_file_path)
        else:
            # Create an empty file if the original does not exist
            temp_file_path.touch()

    return temp_dir


In [50]:
# Create the temp directory with file contents
base_path = Path("D:\Projects\llmpairprog")
temp_dir = create_temp_repo_structure_with_content(elements, base_path)

print(f"Temporary repo structure created at: {temp_dir}")
print("Contents:")
for root, dirs, files in os.walk(temp_dir):
    for name in files:
        print(os.path.join(root, name))

Temporary repo structure created at: C:\Users\shaya\AppData\Local\Temp\tmpbtp9q5lo
Contents:
C:\Users\shaya\AppData\Local\Temp\tmpbtp9q5lo\Agentless\agentless\fl\combine.py
C:\Users\shaya\AppData\Local\Temp\tmpbtp9q5lo\Agentless\agentless\fl\FL.py
C:\Users\shaya\AppData\Local\Temp\tmpbtp9q5lo\Agentless\agentless\fl\Index.py
C:\Users\shaya\AppData\Local\Temp\tmpbtp9q5lo\Agentless\agentless\fl\localize.py
C:\Users\shaya\AppData\Local\Temp\tmpbtp9q5lo\Agentless\agentless\fl\retrieve.py
C:\Users\shaya\AppData\Local\Temp\tmpbtp9q5lo\Agentless\agentless\repair\repair.py
C:\Users\shaya\AppData\Local\Temp\tmpbtp9q5lo\Agentless\agentless\repair\rerank.py
C:\Users\shaya\AppData\Local\Temp\tmpbtp9q5lo\Agentless\agentless\test\generate_reproduction_tests.py
C:\Users\shaya\AppData\Local\Temp\tmpbtp9q5lo\Agentless\agentless\test\run_regression_tests.py
C:\Users\shaya\AppData\Local\Temp\tmpbtp9q5lo\Agentless\agentless\test\run_reproduction_tests.py
C:\Users\shaya\AppData\Local\Temp\tmpbtp9q5lo\Agentl

In [74]:
import os
import json
from pprint import pprint
import ast
# from get_repo_structure.get_repo_structure import get_project_structure_from_scratch
# PROJECT_FILE_LOC = os.environ.get("PROJECT_FILE_LOC", None)
# print(PROJECT_FILE_LOC)

def get_docstring(node):
    """Extract docstring from AST node if it exists."""
    if (
        node.body 
        and isinstance(node.body[0], ast.Expr) 
        and isinstance(node.body[0].value, ast.Str)
    ):
        return node.body[0].value.s
    return None


def get_function_signature(node):
    """Extract function signature from AST node."""
    args_list = []
    
    # Get positional args
    for arg in node.args.posonlyargs:
        args_list.append(arg.arg)
        
    # Get regular args
    for arg in node.args.args:
        args_list.append(arg.arg)
        
    # Get args with defaults
    defaults = [None] * (len(node.args.args) - len(node.args.defaults)) + node.args.defaults
    for arg, default in zip(node.args.args, defaults):
        if default:
            try:
                default_value = ast.literal_eval(default)
                args_list.append(f"{arg.arg}={default_value}")
            except:
                args_list.append(f"{arg.arg}=...")

    # Get *args
    if node.args.vararg:
        args_list.append(f"*{node.args.vararg.arg}")

    # Get kwargs
    for kwarg in node.args.kwonlyargs:
        args_list.append(kwarg.arg)

    # Get **kwargs
    if node.args.kwarg:
        args_list.append(f"**{node.args.kwarg.arg}")
        
    docstring = get_docstring(node)
    signature = f"{node.name}({', '.join(args_list)})"
    
    if docstring:
        signature += f"\n    \"\"\"{docstring}\"\"\""
    
    return signature

def parse_python_file(file_path, file_content=None):
    """Parse a Python file to extract class and function definitions with their line numbers.
    :param file_path: Path to the Python file.
    :return: Class names, function names, and file contents
    """
    if file_content is None:
        try:
            with open(file_path, "r", encoding="utf-8") as file:
                file_content = file.read()
                parsed_data = ast.parse(file_content)
        except Exception as e:  # Catch all types of exceptions
            print(f"Error in file {file_path}: {e}")
            return [], [], ""
    else:
        try:
            parsed_data = ast.parse(file_content)
        except Exception as e:  # Catch all types of exceptions
            print(f"Error in file {file_path}: {e}")
            return [], [], ""

    class_info = []
    function_names = []
    class_methods = set()

    for node in ast.walk(parsed_data):
        if isinstance(node, ast.ClassDef):
            methods = []
            for n in node.body:
                if isinstance(n, ast.FunctionDef):
                    methods.append(
                        {
                            "name": n.name,
                            "signature": "@classmethod\ndef " + get_function_signature(n),
                            "start_line": n.lineno,
                            "end_line": n.end_lineno
                            # "text": file_content.splitlines()[
                            #     n.lineno - 1 : n.end_lineno
                            # ],
                        }
                    )
                    class_methods.add(n.name)
            class_info.append(
                {
                    "name": node.name,
                    "start_line": node.lineno,
                    "end_line": node.end_lineno,
                    # "text": file_content.splitlines()[
                    #     node.lineno - 1 : node.end_lineno
                    # ],
                    "methods": methods
                }
            )
        elif isinstance(node, ast.FunctionDef) and not isinstance(
            node, ast.AsyncFunctionDef
        ):
            if node.name not in class_methods:
                function_names.append(
                    {
                        "name": node.name,
                        "signature": get_function_signature(node),
                        "start_line": node.lineno,
                        "end_line": node.end_lineno
                        # "text": file_content.splitlines()[
                        #     node.lineno - 1 : node.end_lineno
                        # ],
                    }
                )

    return class_info, function_names#, file_content.splitlines()

def create_structure(directory_path):
    """Create the structure of the repository directory by parsing Python files.
    :param directory_path: Path to the repository directory.
    :return: A dictionary representing the structure.
    """
    structure = {}

    for root, _, files in os.walk(directory_path):
        repo_name = os.path.basename(directory_path)
        # print("repo name", repo_name)
        relative_root = os.path.relpath(root, directory_path)
        if relative_root == ".":
            relative_root = repo_name
        curr_struct = structure
        for part in relative_root.split(os.sep):
            if part not in curr_struct:
                curr_struct[part] = {}
            curr_struct = curr_struct[part]
        for file_name in files:
            if file_name.endswith(".py"):
                file_path = os.path.join(root, file_name)
                class_info, function_names = parse_python_file(file_path)
                curr_struct[file_name] = {
                    "classes": class_info,
                    "functions": function_names
                    # "text": file_lines,
                }
            else:
                curr_struct[file_name] = {}

    return structure

# structure = create_structure(r"D:\Projects\llmpairprog\Agentless\playground\588436a5-43f1-45f4-80b4-0394f1d7b838")
# print(structure['matplotlib']['setup.py'])

In [75]:
structure = create_structure(temp_dir)

In [79]:
pprint(structure['Agentless']['agentless']['fl']['FL.py'])

{'classes': [{'end_line': 25,
              'methods': [{'end_line': 21,
                           'name': '__init__',
                           'signature': '@classmethod\n'
                                        'def __init__(self, instance_id, '
                                        'structure, problem_statement, '
                                        '**kwargs)',
                           'start_line': 18},
                          {'end_line': 25,
                           'name': 'localize',
                           'signature': '@classmethod\n'
                                        'def localize(self, top_n, mock, '
                                        'top_n=1, mock=False)',
                           'start_line': 24}],
              'name': 'FL',
              'start_line': 17},
             {'end_line': 792,
              'methods': [{'end_line': 240,
                           'name': '__init__',
                           'signature': '@classmethod\n'
   

In [62]:
elements[:10]

['Agentless\\agentless\\fl\\FL.py',
 'Agentless\\agentless\\fl\\Index.py',
 'Agentless\\agentless\\fl\\combine.py',
 'Agentless\\agentless\\fl\\localize.py',
 'Agentless\\agentless\\fl\\retrieve.py',
 'Agentless\\agentless\\repair\\repair.py',
 'Agentless\\agentless\\repair\\rerank.py',
 'Agentless\\agentless\\test\\generate_reproduction_tests.py',
 'Agentless\\agentless\\test\\run_regression_tests.py',
 'Agentless\\agentless\\test\\run_reproduction_tests.py']