In [1]:
import os
import re
import pandas as pd
import sqlite3
import configparser
import shlex
import subprocess
import json
import atexit
import gradio as gr
from langchain_mistralai import ChatMistralAI
from langgraph.graph import StateGraph, START, END
from langchain.tools import tool
from typing import TypedDict, Dict, Any

# Определяем тип состояния
class GraphState(TypedDict):
    input: str
    vtune_file_path: str
    module_names: list
    csv_file_path: str
    path_last_csv: str
    result: str

# Глобальные переменные
path_last_csv = None
path_last_db = None
gradio_interface_instance = None

# Чтение API-ключа
check_path = os.path.dirname(os.getcwd())
config_path = os.path.join(check_path, "GPTPROF", "config.ini")
config = configparser.ConfigParser()
config.read(config_path)

try:
    mistral_api_key = config["API_KEYS"]["MISTRAL_API_KEY"]
except KeyError:
    raise ValueError("API key not found in config.ini")

# Инициализация модели
mistral_model = "codestral-latest"
llm = ChatMistralAI(model=mistral_model, temperature=0, api_key=mistral_api_key)

In [2]:
@tool
def export_vtune_to_csv(vtune_file_path: str) -> str:
    """
    Exports data from a .vtune file to a CSV file using Intel VTune.
    Returns path to exported CSV or error message.
    """
    global path_last_csv

    if not vtune_file_path:
        return "Error: File path is empty"

    vtune_file_path = vtune_file_path.strip('"')

    if not vtune_file_path.endswith(".vtune"):
        return "Error: File must have .vtune extension"

    if not os.path.exists(vtune_file_path):
        return f"Error: File {vtune_file_path} not found"

    try:
        file_name = os.path.splitext(os.path.basename(vtune_file_path))[0]
        output_csv = f"{file_name}.csv"
        command = f'vtune -report hotspots -r {shlex.quote(vtune_file_path)} -format=csv -report-output {output_csv}'

        subprocess.run(command, shell=True, check=True)
        path_last_csv = output_csv
        return f"Data exported to {output_csv}"

    except subprocess.CalledProcessError as e:
        return f"Export failed: {e}"

# Адаптер для узла графа
def export_vtune_node(state: GraphState) -> Dict[str, Any]:
    """
    Node that processes VTune export.
    Exports data from a .vtune file to CSV and returns updated state.

    Args:
        state (GraphState): Current state of the graph.

    Returns:
        dict: Updated state with the result of the export.
    """
    vtune_path = state.get("vtune_file_path", "")
    tool_result = export_vtune_to_csv(vtune_path)

    return {
        **state,
        "input": "",
        "path_last_csv": path_last_csv if "Data exported to" in tool_result else None,
        "result": tool_result
    }

In [3]:
@tool
def filter_by_module(module_names: list, csv_file_path: str = None) -> str:
    """
    Filters data by specified modules and saves the result in path_last_csv.

    Args:
        module_names (list): List of module names to filter by.
        csv_file_path (str, optional): Path to the CSV file for filtering. 
                                       If not provided, uses path_last_csv.

    Returns:
        str: Message indicating the result of the operation.
    """
    global path_last_csv

    # Проверяем, что передан хотя бы один модуль для фильтрации
    if not module_names:
        return "Error: No modules specified for filtering."

    # Определяем путь к CSV-файлу
    if csv_file_path:
        if not os.path.exists(csv_file_path):
            return f"Error: File {csv_file_path} not found."
        file_to_filter = csv_file_path
    elif path_last_csv:
        file_to_filter = path_last_csv
    else:
        return "Error: No data available for filtering. Export data from .vtune first."

    try:
        # Читаем данные из CSV-файла
        df = pd.read_csv(file_to_filter, sep="\t")

        # Фильтруем данные по модулям и исходным файлам
        filtered_df = df[
            (df["Module"].isin(module_names)) &
            (df["Source File"].str.endswith(".cpp") | df["Source File"].str.endswith(".hpp"))
            ]

        # Если после фильтрации данных нет
        if filtered_df.empty:
            return "Error: No data found for the specified modules."

        # Формируем имя для отфильтрованного файла
        file_name = os.path.basename(file_to_filter)
        filtered_csv_path = os.path.join(os.path.dirname(file_to_filter), f"filtered_{file_name}")

        filtered_df.to_csv(filtered_csv_path, sep="\t", index=False)
        path_last_csv = filtered_csv_path

        return f"Data filtered by modules {module_names} and saved to {filtered_csv_path}."
    except Exception as e:
        return f"Error processing data: {e}"


# Адаптер для узла графа
def filter_by_module_node(state: GraphState) -> Dict[str, Any]:
    """
    Node adapter for the filter_by_module tool.
    Extracts module names and CSV file path from the state, calls the tool,
    and returns the updated state.

    Args:
        state (dict): Current state of the graph.

    Returns:
        dict: Updated state after filtering.
    """
    # Извлекаем параметры из состояния
    module_names = state.get("module_names", [])
    csv_file_path = state.get("csv_file_path", None)

    # Вызываем тул для фильтрации данных
    tool_result = filter_by_module(module_names, csv_file_path)

    # Возвращаем обновленное состояние
    return {
        **state,
        "input": "",
        "path_last_csv": path_last_csv,
        "result": tool_result
    }

In [None]:

workflow = StateGraph(GraphState)

workflow.add_node("export", export_vtune_node)
workflow.add_node("filter", filter_by_module_node)

workflow.add_edge(START, "export")
workflow.add_edge("export", "filter")
workflow.add_edge("filter", END)

compiled_graph = workflow.compile()

def parse_command(input_str: str) -> Dict[str, Any]:
    if not input_str:
        return {"command": "", "args": "", "cleaned_args": ""}

    parts = input_str.strip().split(maxsplit=1)
    command = parts[0].lower()
    args = parts[1] if len(parts) > 1 else ""

    if command == "filter":
        cleaned_args = [arg.strip() for arg in args.split(",") if arg.strip()]
    else:
        cleaned_args = args.strip('"\' ')

    return {
        "command": command,
        "args": args,
        "cleaned_args": cleaned_args
    }

# Обработчик запросов
def ask_codestral(user_input: str, session_state: dict = None):
    # Парсим команду
    parsed = parse_command(user_input)
    command = parsed["command"]
    args = parsed["args"]
    cleaned_args = parsed["cleaned_args"]

    # Инициализируем состояние
    state = session_state or {"input": user_input, "result": ""}

    # Обрабатываем команды
    if command == "export":
        state.update({
            "vtune_file_path": cleaned_args,  # Используем очищенный аргумент
            "next_node": "export"
        })
    elif command == "filter":
        state.update({
            "module_names": cleaned_args,  # Уже список модулей
            "csv_file_path": path_last_csv,
            "next_node": "filter"
        })
    elif command == "exit":
        if gradio_interface_instance is not None:
            gradio_interface_instance.close()
        os._exit(0)
    else:
        state["result"] = f"Unknown command: {command}"
        return state["result"], state

    try:
        result_state = compiled_graph.invoke(state)
        return result_state["result"], result_state
    except Exception as e:
        return f"Error: {str(e)}", state

# Gradio интерфейс
def gradio_interface(user_input: str, session_state: dict = None):
    return ask_codestral(user_input, session_state)

def launch_gradio():
    global gradio_interface_instance

    if gradio_interface_instance is not None:
        gradio_interface_instance.close()

    with gr.Blocks() as interface:
        with gr.Row():
            cmd_input = gr.Textbox(label="Command",
                                   placeholder="Enter your response",
                                   interactive=True)
            session_state = gr.State()
            output = gr.Textbox(label="Output",
                                interactive=False,
                                lines=10,
                                autoscroll=True)

        submit_btn = gr.Button("Submit", variant="primary")

        def process_command(cmd: str, state: dict):
            response, new_state = gradio_interface(cmd, state)
            return response, new_state

        # Обработка нажатия кнопки и Enter в текстовом поле
        submit_btn.click(
            process_command,
            inputs=[cmd_input, session_state],
            outputs=[output, session_state]
        )
        cmd_input.submit(
            process_command,
            inputs=[cmd_input, session_state],
            outputs=[output, session_state]
        )

        # Инструкции для пользователя
        gr.Markdown("""
        ### Available commands:
        - `export path/to/file.vtune` - Export VTune data to CSV (e.g. `export results.vtune`)
        - `filter module1,module2,...` - Filter data by specified modules
        - `exit` - Terminate the session
        """)

    gradio_interface_instance = interface

    # Настройки запуска
    interface.launch(
        server_port=7860,
        inbrowser=False,
        share=False,
        show_error=True
    )

# Запускаем интерфейс
launch_gradio()