<a href="https://colab.research.google.com/github/Naomie25/DI-Bootcamp/blob/main/Week10_Day3_Building_Conversational_Chatbots_With_Gradio.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [8]:
# ✅ Ce script crée un fichier utils.py contenant une fonction determine_pause

import numpy as np

# 1. Contenu du fichier utils.py
utils_code = '''\
import numpy as np

def determine_pause(audio_stream, sampling_rate, state):
    """
    Détecte une pause dans un flux audio basé sur l'énergie du signal.
    Retourne True si une pause est détectée, False sinon.
    """
    if len(audio_stream) == 0:
        return True

    # Calcul de l'énergie moyenne du signal
    energy = np.sum(np.abs(audio_stream)) / len(audio_stream)

    # Seuil arbitraire pour détecter une pause
    return energy < 0.01
'''

# 2. Écriture dans le fichier utils.py
with open("utils.py", "w") as f:
    f.write(utils_code)

print("✅ Le fichier 'utils.py' a été créé avec la fonction determine_pause.")

# -----------------------------------
# Ensuite, on importe utils et tout le reste

import gradio as gr
from dataclasses import dataclass, field
import io
import tempfile
from pydub import AudioSegment
from utils import determine_pause

@dataclass
class AppState:
    stream: np.ndarray | None = None
    sampling_rate: int = 0
    pause_detected: bool = False
    stopped: bool = False
    conversation: list = field(default_factory=list)
    started_talking: bool = True

def process_audio(audio: tuple, state: AppState):
    if state.stream is None:
        state.stream = audio[1]
        state.sampling_rate = audio[0]
    else:
        state.stream = np.concatenate((state.stream, audio[1]))

    print(f"🔊 Chunks audio cumulés : {len(state.stream)} échantillons")
    duration = len(state.stream) / state.sampling_rate
    print(f"⏱️ Durée totale : {duration:.2f} secondes")

    pause_detected = determine_pause(state.stream, state.sampling_rate, state)
    state.pause_detected = pause_detected

    if state.pause_detected and state.started_talking:
        return gr.Audio(recording=False), state

    return None, state

def speaking(wav_bytes):
    print("📢 Réponse simulée : silence (remplace par mini omni)")
    yield b""

def response(state: AppState):
    if not state.pause_detected and not state.started_talking:
        return None, AppState()

    audio_buffer = io.BytesIO()

    segment = AudioSegment(
        state.stream.tobytes(),
        frame_rate=state.sampling_rate,
        sample_width=state.stream.dtype.itemsize,
        channels=(1 if len(state.stream.shape) == 1 else state.stream.shape[1]),
    )
    segment.export(audio_buffer, format="wav")

    with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
        f.write(audio_buffer.getvalue())
        user_audio_path = f.name

    state.conversation.append({
        "role": "user",
        "content": {"path": user_audio_path, "mime_type": "audio/wav"}
    })

    output_buffer = b""
    for mp3_bytes in speaking(audio_buffer.getvalue()):
        output_buffer += mp3_bytes
        yield mp3_bytes, state

    with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f:
        f.write(output_buffer)
        bot_audio_path = f.name

    state.conversation.append({
        "role": "assistant",
        "content": {"path": bot_audio_path, "mime_type": "audio/mp3"}
    })

    yield None, AppState(conversation=state.conversation)

def start_recording_user(state: AppState):
    if not state.stopped:
        return gr.Audio(recording=True)

with gr.Blocks() as demo:
    with gr.Row():
        with gr.Column():
            input_audio = gr.Audio(
                label="Input Audio", type="numpy"
            )
        with gr.Column():
            chatbot = gr.Chatbot(label="Conversation", type="messages")
            output_audio = gr.Audio(label="Output Audio", streaming=True, autoplay=True)

    state = gr.State(value=AppState())

    stream = input_audio.stream(
        process_audio,
        inputs=[input_audio, state],
        outputs=[input_audio, state],
        stream_every=0.5,
        time_limit=30,
    )

    respond = input_audio.stop_recording(
        response,
        inputs=[state],
        outputs=[output_audio, state]
    )

    respond.then(lambda s: s.conversation, inputs=[state], outputs=[chatbot])

    restart = output_audio.stop(
        start_recording_user,
        inputs=[state],
        outputs=[input_audio]
    )

    cancel = gr.Button("Stop Conversation", variant="stop")
    cancel.click(lambda: (AppState(stopped=True), gr.Audio(recording=False)),
                 inputs=None,
                 outputs=[state, input_audio],
                 cancels=[respond, restart])

if __name__ == "__main__":
    demo.launch()



✅ Le fichier 'utils.py' a été créé avec la fonction determine_pause.
It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://4f0db46b23a24b2c32.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
