In [None]:
# | default_exp _testing.benchmark

In [None]:
# | export

from typing import *
import shutil
from tempfile import TemporaryDirectory
from pathlib import Path
from contextlib import contextmanager
import os
import subprocess # nosec: B404: Consider possible security implications associated with the subprocess module.

import typer
from yaspin import yaspin

import numpy as np

from faststream_gen._code_generator.constants import INCOMPLETE_APP_ERROR_MSG


In [None]:
from typer.testing import CliRunner

In [None]:
# | export

BENCHMARK_RESULTS_DIR = "benchmark-results"

In [None]:
# | export

@contextmanager
def _set_cwd(cwd_path: Union[Path, str]) -> Generator:
    """Set the current working directory for the duration of the context manager.

    Args:
        cwd_path: The path to the new working directory.

    !!! note

        The above docstring is autogenerated by docstring-gen library (https://github.com/airtai/docstring-gen)
    """
    cwd_path = Path(cwd_path)
    original_cwd = os.getcwd()
    os.chdir(cwd_path)

    try:
        yield
    finally:
        os.chdir(original_cwd)

In [None]:
# | export

app = typer.Typer(
    short_help="Run benchmark against pre-defined example app descriptions",
)

In [None]:
# | export

@app.command(
    "run_benchmark",
    help="Run benchmark against pre-defined example app descriptions",
)
def benchmark(
    fixtures_path: str = typer.Argument(
        ...,
        help="The path to the pre-defined example app descriptions",
    ),
    no_repeat: int = typer.Option(
        1,
        "--repeat",
        "-r",
        help="Number of generation repetitions per app description",
    ),
) -> None:
    fixtures_path_obj = Path(fixtures_path).resolve()

    app_descriptions = [
        filename
        for filename in fixtures_path_obj.glob("*.txt")
        if "-log" not in filename.stem
    ]
    
    if no_repeat > 1:
        app_descriptions = np.repeat(np.array(app_descriptions), no_repeat).tolist()
    
    no_of_description_files = len(app_descriptions)
    typer.secho(
        f"Total app description files: {no_of_description_files}", fg=typer.colors.CYAN
    )
    success_cnt = 0
    for i, app_description in enumerate(app_descriptions):
        with yaspin(
            text=f"{i+1}/{no_of_description_files} Generating app for: {app_description.name}",
            color="cyan",
            spinner="clock",
        ) as sp:
            with TemporaryDirectory() as d:
                with _set_cwd(d):
                    cli_command = (
                        f"faststream_gen -i {app_description} -o {d}/new-project --dev"
                    )

                    # nosemgrep: python.lang.security.audit.subprocess-shell-true.subprocess-shell-true
                    result = subprocess.run(  # nosec: B602, B603 subprocess call - check for execution of untrusted input.
                        cli_command,
                        shell=True,
                        check=True,
                        stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE,
                        text=True,
                    )
                    sp.text = ""
                    if INCOMPLETE_APP_ERROR_MSG in str(result.stdout):
                        sp.text = ""
                        sp.color = "red"
                        sp.ok(
                            f" ✘ Error: App generated failed for: {app_description.name}"
                        )

                    else:
                        sp.ok(
                            f" ✔ App successfully generated for: {app_description.name}"
                        )
                        success_cnt += 1

                    benchmark_results_dir = fixtures_path_obj / BENCHMARK_RESULTS_DIR
                    benchmark_results_dir.mkdir(parents=True, exist_ok=True)
                    shutil.copytree(
                        str(Path(d) / "new-project"),
                        benchmark_results_dir / Path(app_description).stem,
                        dirs_exist_ok=True,
                    )

    success_rate = success_cnt / no_of_description_files
    typer.secho(
        f"Success rate: {success_rate}",
        fg=typer.colors.CYAN,
    )
    if int(success_rate) != 1:
        typer.secho(
            f"\nTo debug, go to {str(fixtures_path_obj / BENCHMARK_RESULTS_DIR)}, select one of the failed app description directories, and run 'pytest'.",
            fg=typer.colors.RED,
        )

In [None]:
runner = CliRunner()
result = runner.invoke(app, ["benchmark", "--help"])

In [None]:
# | notest


fixture = """
Create a FastStream application using localhost broker for testing and use the default port number. 
It should consume messages from the "input_data" topic, where each message is a JSON encoded object containing a single attribute: 'data'. 
For each consumed message, create a new message object and increment the value of the data attribute by 1. Finally, send the modified message to the 'output_data' topic.
"""

with TemporaryDirectory() as d:
    app_description = Path(d) / "hello_world.txt"
    with app_description.open("w", encoding="utf-8") as f:
        f.write(fixture)

    result = runner.invoke(app, [d])
    
    benchkark_dir = Path(d) / BENCHMARK_RESULTS_DIR
    assert benchkark_dir.exists()
    
    files = [f for f in benchkark_dir.rglob("*.py")]
    print(files)
    
    file_names = [f.stem for f in files]
    assert "test_application" in file_names
    assert "application" in file_names

[PosixPath('/tmp/tmp38j6f9uc/benchmark-results/hello_world/tests/__init__.py'), PosixPath('/tmp/tmp38j6f9uc/benchmark-results/hello_world/tests/test_application.py'), PosixPath('/tmp/tmp38j6f9uc/benchmark-results/hello_world/app/application.py')]
