In [None]:
# | default_exp _testing.benchmark

In [None]:
# | export

from typing import *
import shutil
from tempfile import TemporaryDirectory
from pathlib import Path
from contextlib import contextmanager
import os
import subprocess # nosec: B404: Consider possible security implications associated with the subprocess module.

import typer
from yaspin import yaspin


In [None]:
from typer.testing import CliRunner

In [None]:
# | export

@contextmanager
def _set_cwd(cwd_path: Union[Path, str]) -> Generator:
    """Set the current working directory for the duration of the context manager.

    Args:
        cwd_path: The path to the new working directory.

    !!! note

        The above docstring is autogenerated by docstring-gen library (https://github.com/airtai/docstring-gen)
    """
    cwd_path = Path(cwd_path)
    original_cwd = os.getcwd()
    os.chdir(cwd_path)

    try:
        yield
    finally:
        os.chdir(original_cwd)

In [None]:
# # | export

# def get_fixtures_path() -> Path:
#     return Path(__file__).parent.parent.parent / "fixtures"

In [None]:
# | export

app = typer.Typer(
    short_help="Run benchmark against pre-defined example app descriptions",
)

In [None]:
# | export


@app.command(
    "run_benchmark",
    help="Run benchmark against pre-defined example app descriptions",
)
def benchmark(
    fixtures_path: str = typer.Argument(
        ...,
        help="The path to the pre-defined example app descriptions",
    )
) -> None:
    fixtures_path_obj = Path(fixtures_path).resolve()

    app_descriptions = [
        filename
        for filename in fixtures_path_obj.glob("*.txt")
        if "-log" not in filename.stem
    ]
    no_of_description_files = len(app_descriptions)
    typer.secho(
        f"Total app description files: {no_of_description_files}", fg=typer.colors.CYAN
    )
    success_cnt = 0
    for i, app_description in enumerate(app_descriptions):
        with yaspin(
            text=f"{i+1}/{no_of_description_files} Generating app for: {app_description.name}",
            color="cyan",
            spinner="clock",
        ) as sp:
            with TemporaryDirectory() as d:
                with _set_cwd(d):
                    cli_command = (
                        f"faststream_gen -i {app_description} -o {d}/new-project -v"
                    )
                    try:
                        # nosemgrep: python.lang.security.audit.subprocess-shell-true.subprocess-shell-true
                        result = subprocess.run( # nosec: B602, B603 subprocess call - check for execution of untrusted input.
                            cli_command,
                            shell=True,
                            check=True,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE,
                            text=True,
                        )
                        sp.text = ""
                        sp.ok(
                            f" ✔ App successfully generated for: {app_description.name}"
                        )
                        success_cnt +=1
                    except Exception as e:
                        sp.text = ""
                        sp.color = "red"
                        sp.ok(
                            f" ✘ Error: App generated failed for: {app_description.name}"
                        )
                    finally:
                        shutil.copy(
                            Path(d) / "faststream-log.txt",
                            fixtures_path_obj / f"{Path(app_description).stem}-log.txt",
                        )
    typer.secho(f"Success rate: {(success_cnt/no_of_description_files) * 100} %", fg=typer.colors.CYAN)

In [None]:
runner = CliRunner()
result = runner.invoke(app, ["benchmark", "--help"])

In [None]:
# | notest


fixture = """
Create a FastStream application using localhost broker for testing and use the default port number. 
It should consume messages from the "input_data" topic, where each message is a JSON encoded object containing a single attribute: 'data'. 
For each consumed message, create a new message object and increment the value of the data attribute by 1. Finally, send the modified message to the 'output_data' topic.
"""

with TemporaryDirectory() as d:
    app_description = Path(d) / "hello_world.txt"
    with app_description.open("w", encoding="utf-8") as f:
        f.write(fixture)

    result = runner.invoke(app, [d])
    files = [f for f in Path(d).glob("*.txt")]
    print(files)
    assert len(files) == 2
    assert app_description.exists()
    log_file = Path(d) / "hello_world-log.txt"
    assert log_file.exists()
    
    with log_file.open("r", encoding="utf-8") as f:
        contents = f.read()
    
    print(contents)

[PosixPath('/tmp/tmp_da15059/hello_world.txt'), PosixPath('/tmp/tmp_da15059/hello_world-log.txt')]
23-09-21 23:57:35.628 [INFO] faststream_gen.cli: Project generation started.
23-09-21 23:57:35.628 [INFO] faststream_gen.cli: Reading application description from '/tmp/tmp_da15059/hello_world.txt'.
23-09-21 23:57:35.628 [INFO] faststream_gen._code_generator.app_description_validator: ==== App description validation ====
23-09-21 23:57:35.630 [INFO] faiss.loader: Loading faiss with AVX2 support.
23-09-21 23:57:35.641 [INFO] faiss.loader: Successfully loaded faiss with AVX2 support.
23-09-21 23:57:36.240 [INFO] faststream_gen._code_generator.helper: ************************************************************************************************************************
23-09-21 23:57:36.241 [INFO] faststream_gen._code_generator.helper: 

Prompt to the model: 

===Role:system===

Message:

You are an expert Python developer, tasked to generate executable Python code as a part of your work wi