In [19]:
from dotenv import load_dotenv
import os, re
from openai import OpenAI
import xml.etree.ElementTree as ET
from pathlib import Path
import gradio as gr

In [20]:

load_dotenv(dotenv_path="ENV/env.txt")

True

In [21]:


# Chargement cl√© API
if not load_dotenv(dotenv_path="ENV/env.txt"):
    raise RuntimeError("Impossible de charger ENV/env.txt")
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
    raise RuntimeError("Cl√© API introuvable")

# Init OpenAI
client = OpenAI(api_key=api_key)
ASSISTANT_ID = "asst_nyRF0FGHJVGlf0mUumJS4bBI"



In [22]:
# --- Fonctions utilitaires ---
def parse_orchestrator_block(text):
    """
    Extrait le bloc <orchestrator> XML et retourne les infos utiles.
    """
    match = re.search(r"(<orchestrator[\s\S]*?</orchestrator>)", text)
    if not match:
        return None

    xml_block = match.group(1)
    root = ET.fromstring(xml_block)
    action = root.find(".//action")
    if action is None:
        return None

    return {
        "action_name": action.attrib.get("name"),
        "path": action.findtext("path"),
        "content": action.findtext("content", ""),
        "make_parents": action.findtext("make_parents", "false").lower() == "true",
        "if_exists": action.findtext("if_exists", "skip").lower(),
        "encoding": action.findtext("encoding", "utf-8")
    }

def execute_create_folder(params):
    folder_path = Path(params["path"])
    if folder_path.exists():
        if params["if_exists"] == "skip":
            return f"‚è© Dossier d√©j√† pr√©sent : {folder_path}"
        elif params["if_exists"] == "error":
            return f"‚ùå Dossier existe d√©j√† : {folder_path}"
        elif params["if_exists"] == "overwrite":
            import shutil
            shutil.rmtree(folder_path)
    folder_path.mkdir(parents=params["make_parents"], exist_ok=True)
    return f"‚úÖ Dossier cr√©√© : {folder_path}"

def execute_create_file(params):
    file_path = Path(params["path"])
    if file_path.exists():
        if params["if_exists"] == "skip":
            return f"‚è© Fichier d√©j√† pr√©sent : {file_path}"
        elif params["if_exists"] == "error":
            return f"‚ùå Fichier existe d√©j√† : {file_path}"
    file_path.parent.mkdir(parents=True, exist_ok=True)
    with open(file_path, "w", encoding=params.get("encoding", "utf-8")) as f:
        f.write(params.get("content", ""))
    return f"‚úÖ Fichier cr√©√©/√©crit : {file_path}"

def execute_action_from_orchestrator(assistant_output):
    """
    Ex√©cute l'action d√©tect√©e dans le bloc orchestrator.
    """
    params = parse_orchestrator_block(assistant_output)
    if not params:
        return "‚ö†Ô∏è Aucune action d√©tect√©e."
    
    action_name = params.get("action_name")
    if action_name == "create_folder":
        return execute_create_folder(params)
    elif action_name == "create_file":
        return execute_create_file(params)
    else:
        return f"‚ö†Ô∏è Action inconnue : {action_name}"

In [27]:
# --- Fonction Gradio ---
def envoyer_commande(user_input):
    # Cr√©er un thread et envoyer le message utilisateur
    thread = client.beta.threads.create()
    client.beta.threads.messages.create(
        thread_id=thread.id,
        role="user",
        content=user_input
    )

    # Lancer le run avec l'assistant
    run = client.beta.threads.runs.create(
        thread_id=thread.id,
        assistant_id=ASSISTANT_ID
    )

    # Polling jusqu'√† compl√©tion
    import time
    while True:
        status = client.beta.threads.runs.retrieve(thread_id=thread.id, run_id=run.id)
        if status.status in ["completed", "failed", "cancelled"]:
            break
        time.sleep(0.5)

    # R√©cup√©rer la r√©ponse de l'assistant
    messages = client.beta.threads.messages.list(thread_id=thread.id)
    assistant_output = messages.data[0].content[0].text.value

    # Ex√©cuter l'action locale
    action_result = execute_action_from_orchestrator(assistant_output)

    return assistant_output, action_result

In [28]:
# --- Interface Gradio ---
with gr.Blocks() as demo:
    gr.Markdown("## üñ•Ô∏è AgentIA Orchestrateur - Commandes locales")
    cmd_input = gr.Textbox(label="Commande utilisateur", placeholder="Ex: Cr√©e un dossier Test sur le Bureau")
    output_assistant = gr.Textbox(label="R√©ponse Assistant", lines=8)
    output_action = gr.Textbox(label="R√©sultat Action")

    submit_btn = gr.Button("Envoyer")
    submit_btn.click(envoyer_commande, inputs=cmd_input, outputs=[output_assistant, output_action])

In [30]:
demo.launch(share=True)

Rerunning server... use `close()` to stop if you need to change `launch()` parameters.
----
* Running on public URL: https://04387f57aa8ed60621.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




In [31]:
demo.close()

Closing server running on port: 7877
