In [1]:
%pip install -q python-dotenv langchain-openai langchain-community pydub librosa numpy uuid chromadb langchain-chroma google-cloud-texttospeech

Note: you may need to restart the kernel to use updated packages.


## ChromaDB class

In [2]:
import os
import chromadb
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_core.documents import Document
from dotenv import load_dotenv, find_dotenv
from uuid import uuid4

class ChromaManager:
    """
    Utility class for managing ChromaDB Cloud duties
    via langchain.
    """

    def __init__(self):
        """
        Initializes the ChromaDB Cloud Client.

        Environment variables:
            CHROMA_API_KEY (str): Your ChromaDB Cloud API key.
            CHROMA_TENANT (str): Your ChromaDB Cloud tenant ID.
            CHROMA_DATABASE (str): Your ChromaDB Cloud database name.
        """
        load_dotenv(find_dotenv())

        if not os.getenv("CHROMA_API_KEY") or not os.getenv("CHROMA_TENANT") or not os.getenv("CHROMA_DATABASE"):
            raise EnvironmentError("ChromaDB environment variables are not set. Please set CHROMA_API_KEY, CHROMA_TENANT, and CHROMA_DATABASE.")
        
        self.client = chromadb.CloudClient(
            api_key=os.getenv("CHROMA_API_KEY"),
            tenant=os.getenv("CHROMA_TENANT"),
            database=os.getenv("CHROMA_DATABASE")
        )
        self.embedding_function = OpenAIEmbeddings(model="text-embedding-3-small")

    def get_collection(self, collection_name: str) -> Chroma:
        """
        Creates or retrieves and returns a particular ChromaDB collection.
        
        Args:
            collection_name (str): Name of the desired collection to create or retrieve.

        Returns:
            Chroma: LangChain-compatible Chroma collection.
        """
        if not collection_name:
            raise ValueError("Collection name must be provided.")
        
        return Chroma(
            client=self.client,
            embedding_function=self.embedding_function,
            collection_name=collection_name,
        )
    
    def save_documents(self, docs: list[Document], collection_name: str, tag: str):
        """
        Save documents to a ChromaDB collection.

        Args:
            docs (list[Document]): Documents to be stored.
            collection_name (str): Target collection name.
            tag (str): Metadata tag to associate with the documents.
        """
        if not isinstance(docs, list):
            raise TypeError("Expected a list of Document objects.")
        if not docs:
            raise ValueError("No documents provided.")
        if not collection_name:
            raise ValueError("Collection name must be provided.")
        if not tag:
            raise ValueError("Tag must be provided.")

        documents = []
        docIds = []  # To ensure unique IDs

        for doc in docs:
            unique_id = str(uuid4())

            # Re-traitement mais utile pour ajouter des infos dans la BDD
            documents.append(
                Document(
                    page_content=doc.page_content,
                    metadata={
                        "tag": tag,
                        "id": unique_id
                    }
                )
            )
            docIds.append(unique_id)
        
        collection = self.get_collection(collection_name=collection_name)
        collection.add_documents(documents=documents, ids=docIds)

    def retrieve_many(self, query: str, collection_name: str, nResults: int = 10) -> list[str]:
        """
        Retrieve multiple documents matching a query from a ChromaDB collection.

        Args:
            query (str): Query string.
            collection_name (str): Name of the collection to search.
            nResults (int): Maximum number of results to return.

        Returns:
            list[str]: List of matching document contents. Max 10 documents.
        """
        if not isinstance(query, str) or not query.strip():
            raise ValueError("Query must be a non-empty string.")
        if not collection_name:
            raise ValueError("Collection name must be provided.")
        if not isinstance(nResults, int) or nResults <= 0:
            raise ValueError("nResults must be a positive integer.")
        if nResults > 10:
            raise ValueError("nResults cannot exceed 10 for performance reasons.")
        
        collection = self.get_collection(collection_name=collection_name)
        retriever = collection.as_retriever(
            search_type="mmr", 
            search_kwargs={"k": nResults, "fetch_k": nResults * 10}
        )

        documents_found = retriever.invoke(input=query)

        results = []

        if documents_found:
            for doc in documents_found:
                if doc.page_content:
                    results.append(doc.page_content)

        return results
    
    def retrieve_one(self, query: str, collection_name: str) -> str | None:
        """
        Retrieve a single document matching a query from a ChromaDB collection.

        Args:
            query (str): Query string.
            collection_name (str): Name of the collection to search.

        Returns:
            str|None: Matching document content, or None if no match found.
        """
        if not isinstance(query, str) or not query.strip():
            raise ValueError("Query must be a non-empty string.")
        elif not collection_name:
            raise ValueError("Collection name must be provided.")

        collection = self.get_collection(collection_name=collection_name)
        retriever = collection.as_retriever(
            search_type="mmr",
            search_kwargs={"k": 1, "fetch_k": 10}
        )

        documents_found = retriever.invoke(input=query)

        if documents_found and isinstance(documents_found, list):
            for doc in documents_found:
                if doc.page_content:
                    return doc.page_content

        return None


In [3]:
db = ChromaManager()

db.retrieve_one("Quelle est ma nourriture préférée ?", "test_collection")

'Ma nourriture préférée est la pizza.'

In [4]:
db.retrieve_one("Who is Joachim ?", "test_collection")

"Joachim, forgeron d'ombre au regard d'ambre brûlant, façonne des armes vivantes à partir des peurs des hommes. Il erre entre les ruines d'un monde brisé, traqué par les créatures qu'il a lui-même libérées du néant.\n"

In [5]:
db.retrieve_many("Quelle est ma nourriture préférée ?", "test_collection", nResults=5)

['Ma nourriture préférée est la pizza.',
 "J'adore le chocolat !",
 "J'adore également les sushis.",
 'je déteste les huîtres\n',
 "Kaelys, ancien archiviste du royaume d'Aelion, manie les secrets oubliés comme d'autres manie l'acier. Sous ses robes élimées se cache un pacte interdit, scellé avec une entité qu'il nomme simplement la Voix.\n"]

## Speech-To-Text class

In [6]:
import os
from langchain_community.document_loaders.parsers.audio import OpenAIWhisperParser
from langchain_community.document_loaders.generic import GenericLoader
from dotenv import load_dotenv, find_dotenv

class SpeechToTextManager:
    """
    Utility class for transcribing audio files into text using OpenAI Whisper,
    via LangChain integration.
    """

    def __init__(self, lang="fr", model="whisper-1"):
        """
        Initialize the OpenAI Whisper parser with preset parameters for French audio,
        deterministic output, and plain text format.

        Environment variables:
            OPENAI_API_KEY (str): Your OpenAI API key.
        
        Args:
            lang (str): Language to use. Defaults to French (ici c'est Paris !)
            model (str): Name of the OpenAI model. Defaults to whisper-1.
        """
        load_dotenv(find_dotenv())

        if not os.getenv('OPENAI_API_KEY'):
            raise EnvironmentError("OPENAI_API_KEY environment variable is not set.")
        elif not lang:
            raise ValueError("Language must be specified.")
        elif not model:
            raise ValueError("Model must be specified.")
        
        self.parser = OpenAIWhisperParser(
            api_key=os.getenv('OPENAI_API_KEY'),
            chunk_duration_threshold=0.7,  # Disregard audio chunks shorter than 0.7 seconds
            language=lang,  # Specify the language of the audio
            response_format="text",
            temperature=0.0,  # Deterministic output
            model=model
        )

    def transcribe(self, file_path: str) -> str:
        """
        Transcribe an audio file into a list of LangChain Document objects,
        ready for downstream processing like summarization, retrieval, or indexing.
        
        Args:
            file_path (str): Path to the audio file to be transcribed.
        
        Returns:
            str: The transcribed text from the audio file.
        """

        if not os.path.isfile(file_path):
            raise FileNotFoundError(f"The file {file_path} does not exist.")
        elif not file_path.lower().endswith(('.mp3', '.wav', '.flac', '.m4a', '.ogg')):
            raise ValueError("Unsupported audio file format. Supported formats: .mp3, .wav, .flac, .m4a, .ogg")
        
        loader = GenericLoader.from_filesystem(
            path=file_path,
            parser=self.parser
        )

        docs = loader.load()
        
        if not docs:
            raise ValueError("No documents were loaded from the audio file.")
        elif not isinstance(docs, list):
            raise TypeError("Expected a list of Document objects from the audio file.")
        
        return docs[0].page_content

In [7]:
stt = SpeechToTextManager()
transcribed_docs = stt.transcribe("../data/audio/test_whisper.m4a")
transcribed_docs

Transcribing part 1!


"Bonjour, je m'appelle Joachim Jasmin. Ceci est un enregistrement.\n"

In [8]:
stt = SpeechToTextManager()
transcribed_docs = stt.transcribe("../data/audio/test_whisper_clovis.m4a")
transcribed_docs

Transcribing part 1!


"Bonjour, j'aimerais savoir quelle est la situation géopolitique de Cuba actuellement le 15 juillet à 17h11.\n"

## Text-to-Speech class

In [9]:
import os
from google.cloud import texttospeech
from uuid import uuid4

class TextToSpeechManager:
    """
    Utility class for synthesizing speech from text using Google Text-to-Speech,
    via LangChain integration.
    """

    def __init__(self, credentials_path: str = "../secrets/google_tts.json"):
        """
        Initialize the Google Text-to-Speech client with the provided credentials.

        Args:
            credentials_path (str): Path to the Google Cloud service account JSON file.
        """
        if not os.path.isfile(credentials_path):
            raise FileNotFoundError(f"The credentials file {credentials_path} does not exist.")
        
        os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path
        self.client = texttospeech.TextToSpeechClient()
        self.audio_path = None  # Placeholder for the audio file path

    def synthesize_speech(self, text: str, output_folder: str, lang_code="fr-FR", voice_name="fr-FR-Chirp3-HD-Charon") -> str:
        """
        Synthesize speech from the provided text using Google Text-to-Speech.

        Args:
            text (str): The text to be converted to speech.
            output_folder (str): The folder path where the synthesized audio will be saved.
            lang_code (str): Language code for the voice. Defaults to "fr-FR".
            voice_name (str): Name of the voice to use. Defaults to "fr-FR-Chirp3-HD-Charon".

        Returns:
            str: The file path where the audio content is saved.
        """
        if not text or not isinstance(text, str):
            raise ValueError("Text must be a non-empty string.")
        if not output_folder or not isinstance(output_folder, str):
            raise ValueError("Output folder must be a non-empty string.")
        if not lang_code or not isinstance(lang_code, str):
            raise ValueError("Language code must be a non-empty string.")
        if not voice_name or not isinstance(voice_name, str):
            raise ValueError("Voice name must be a non-empty string.")
        
        synthesis_input = texttospeech.SynthesisInput(text=text)
        voice = texttospeech.VoiceSelectionParams(
            language_code=lang_code,
            name=voice_name
        )
        audio_config = texttospeech.AudioConfig(
            audio_encoding=texttospeech.AudioEncoding.MP3
        )

        response = self.client.synthesize_speech(
            input=synthesis_input,
            voice=voice,
            audio_config=audio_config
        )

        if not response.audio_content:
            raise RuntimeError("Failed to synthesize speech. No audio content returned.")

        # Save the audio content to the specified output folder
        self.audio_path = os.path.join(output_folder, f"{uuid4()}.mp3")

        with open(self.audio_path, "wb") as out:
            out.write(response.audio_content)

        return self.audio_path
    
    def get_audio_path(self) -> str:
        """
        Get the path of the last synthesized audio file.

        Returns:
            str: The path of the last synthesized audio file.
        """
        if not self.audio_path:
            raise ValueError("No audio has been synthesized yet.")
        
        return self.audio_path

    def clear_audio_path(self):
        """
        Clear the stored audio file.
        """
        if self.audio_path and os.path.isfile(self.audio_path):
            os.remove(self.audio_path)
            self.audio_path = None

In [10]:
tts = TextToSpeechManager()
output_path = tts.synthesize_speech(
    text="Hello, this is a test in English.",
    output_folder="../tmp",
    lang_code="en-US",
    voice_name="en-US-Wavenet-D"
)
output_path

'../tmp/b392975b-7c55-4160-8b92-c48f71245480.mp3'

In [11]:
tts.clear_audio_path()

## Recorder class

In [25]:
import os
import sounddevice as sd
import numpy as np
import threading
import queue
from scipy.io.wavfile import write
from uuid import uuid4

class AudioRecorder:
    """
    Class to handle audio recording from the microphone.
    Uses sounddevice for real-time audio input.
    """

    def __init__(self, output_folder="../tmp"):
        """
        Initialize the audio recorder with optional STT manager.

        Args:
            output_folder (str): Folder where the recorded audio will be saved.
        """
        self.samplerate = 44100
        self.stream = None
        self.recording = []
        self.queue = queue.Queue()
        self.output_folder = output_folder
        self.audio_path = None

    def start(self, _=None):
        """
        Start recording audio from the microphone.

        This method initializes the audio stream and starts collecting audio chunks
        in a separate thread. The audio data is stored in a queue and appended to
        the recording list as it is received.
        """
        if self.audio_path:
            self.clear_audio_path()  # Clear any previous audio file
            
        self.recording = []
        self.queue = queue.Queue()

        self.stream = sd.InputStream(
            samplerate=self.samplerate, 
            channels=1, 
            callback=lambda indata, *_: self.queue.put(indata.copy())
        )
        self.stream.start()

        def collect_chunks():
            while self.stream and self.stream.active:
                try:
                    chunk = self.queue.get(timeout=0.1)
                    self.recording.append(chunk)
                except queue.Empty:
                    continue

        threading.Thread(target=collect_chunks, daemon=True).start()

    def stop(self, _=None):
        """
        Stop the audio recording and save it as a WAV file.

        This method stops the audio stream, concatenates the recorded audio chunks,
        and saves them to a WAV file in the specified output folder. The file is named
        with a unique UUID to avoid conflicts.
        """
        if not self.stream or not self.stream.active:
            raise RuntimeError("Audio stream is not active. Please start recording first.")

        if self.stream and self.stream.active:
            self.stream.stop()
            self.stream.close()

        audio_data = np.concatenate(self.recording, axis=0)
        os.makedirs(self.output_folder, exist_ok=True)
        self.audio_path = os.path.join(self.output_folder, f"{uuid4()}.wav")
        
        write(self.audio_path, self.samplerate, audio_data)

        return self.audio_path

    def get_audio_path(self) -> str | None:
        """
        Return the path to the last recorded audio file.

        Returns:
            str|None: The path of the last recorded audio file, or None if no audio
        """
        return self.audio_path
    
    def clear_audio_path(self, _=None):
        """
        Clear the stored audio file.
        """
        if self.audio_path and os.path.isfile(self.audio_path):
            os.remove(self.audio_path)
            self.audio_path = None
        else:
            raise ValueError("No audio file to clear. Please record audio first.")
        

## AudioRecorderUI class

In [26]:
import ipywidgets as widgets
from IPython.display import display, Audio, clear_output

class AudioRecorderUI:
    """
    Simple UI for recording audio using IPython widgets.
    """

    def __init__(self, recorder: AudioRecorder):
        """
        Initialize the UI with an AudioRecorder instance.

        Args:
            recorder (AudioRecorder): The audio recorder instance to use.
        """
        self.recorder = recorder
        self.record_button = widgets.Button(description="Start Recording")
        self.stop_button = widgets.Button(description="Stop Recording")
        self.clear_button = widgets.Button(description="Clear Audio")
        
        self.record_button.on_click(self.recorder.start)
        self.stop_button.on_click(self.recorder.stop)
        self.clear_button.on_click(self.recorder.clear_audio_path)

        display(self.record_button, self.stop_button, self.clear_button)

    def show_audio(self):
        """
        Display the recorded audio file if it exists.
        
        This method checks if an audio file has been recorded and displays it using
        IPython's Audio widget. If no audio file is found, it prints a message.
        """
        audio_path = self.recorder.get_audio_path()
        if audio_path and os.path.isfile(audio_path):
            clear_output(wait=True)
            display(self.record_button, self.stop_button, self.clear_button)
            display(Audio(audio_path, autoplay=True))
        else:
            raise ValueError("No audio file recorded yet. Please record audio first.")

    def clear_audio(self):
        """
        Clear the recorded audio file and reset the UI.
        
        This method removes the audio file from the filesystem and resets the
        audio path in the recorder. It also updates the UI to reflect that no
        audio is currently available.
        """
        self.recorder.clear_audio_path()
        clear_output(wait=True)
        display(self.record_button, self.stop_button, self.clear_button)

In [27]:
recorder = AudioRecorder(output_folder="../tmp")
recorderUI = AudioRecorderUI(recorder)

Button(description='Start Recording', style=ButtonStyle())

Button(description='Stop Recording', style=ButtonStyle())

Button(description='Clear Audio', style=ButtonStyle())

RuntimeError: Audio stream is not active. Please start recording first.

ValueError: No audio file to clear. Please record audio first.