In [5]:
import asyncio
import base64
import datetime
import os
import sys
from dotenv import load_dotenv
from hume.client import AsyncHumeClient
from hume.empathic_voice.chat.socket_client import ChatConnectOptions, ChatWebsocketConnection
from hume.empathic_voice.chat.types import SubscribeEvent
from hume.empathic_voice.types import UserInput
from hume.core.api_error import ApiError
from hume import MicrophoneInterface, Stream

In [6]:
# Need to manually define this
class WebSocketInterface:
    def __init__(self) -> None:
        "Construct the WebSocketInterface, initially assigning the socket to None and the byte stream to a new Stream object."
        self.socket = None
        self.byte_strs = Stream.new()

    def set_socket(self, socket):
        """Set the socket.
        
        This method assigns the provided asynchronous WebSocket connection
        to the instance variable `self.socket`. It is invoked after successfully
        establishing a connection using the client's connect method.

        Args:
            socket (ChatWebsocketConnection): EVI asynchronous WebSocket returned by the client's connect method.
        """
        self.socket = socket

    async def on_open(self):
        """Logic invoked when the WebSocket connection opens."""
        print("WebSocket connection opened.")

    async def on_message(self, message):
        """Callback function to handle a WebSocket message event.
        
        This asynchronous method decodes the message, determines its type, and 
        handles it accordingly. Depending on the type of message, it 
        might log metadata, handle user or assistant messages, process
        audio data, raise an error if the message type is "error", and more.

        This method interacts with the following message types to demonstrate logging output to the terminal:
        - [chat_metadata](https://dev.hume.ai/reference/empathic-voice-interface-evi/chat/chat#receive.Chat%20Metadata.type)
        - [user_message](https://dev.hume.ai/reference/empathic-voice-interface-evi/chat/chat#receive.User%20Message.type)
        - [assistant_message](https://dev.hume.ai/reference/empathic-voice-interface-evi/chat/chat#receive.Assistant%20Message.type)
        - [audio_output](https://dev.hume.ai/reference/empathic-voice-interface-evi/chat/chat#receive.Audio%20Output.type)

        Args:
            data (SubscribeEvent): This represents any type of message that is received through the EVI WebSocket, formatted in JSON. See the full list of messages in the API Reference [here](https://dev.hume.ai/reference/empathic-voice-interface-evi/chat/chat#receive).
        """
        scores = {} # Store expression inference scores

        if message.type == "chat_metadata":
            message_type = message.type.upper()
            chat_id = message.chat_id
            chat_group_id = message.chat_group_id
            text = f"<{message_type}> Chat ID: {chat_id} Chat Group ID: {chat_group_id}"
        elif message.type in ["user_message", "assistant_message"]:
            role = message.message.role.upper()
            message_text = message.message.content
            text = f"{role}: {message_text}"
        elif message.type == "audio_output":
            message_str: str = message.data
            message_bytes = base64.b64decode(message_str.encode("utf-8"))
            await self.byte_strs.put(message_bytes)
            return
        elif message.type == "error":
            error_message: str = message.message
            error_code: str = message.code
            raise ApiError(f"Error ({error_code}): {error_message}")
        else:
            message_type = message.type.upper()
            text = f"<{message_type}>"

        self._print_prompt(text) # Print the message to the terminal. This is different to print as it is a coroutine - it doesn't block the main event loop.

        # Extract and print the top 3 emotions inferred from user and assistant expressions
        if len(scores) > 0:
            top_3_emotions = self._extract_top_n_emotions(scores, 3)
            self._print_emotion_scores(top_3_emotions)
            print("")
        else:
            print("Less than 3 emotions")

    async def on_error(self, error):
        print("WebSocket error:", error)

    async def on_close(self):
        print("WebSocket connection closed.")


    def _print_prompt(self, text:str) -> None:
        """Print with timestamp"""
        now = datetime.datetime.now(tz=datetime.timezone.utc)
        now_str = now.strftime("%H:%M:%S")
        print(f"[{now_str}] {text}")

    def _extract_top_n_emotions(self, emotion_scores: dict, n: int) -> list:
        """
        Extract the top N emotions based on confidence scores.

        Args:
            emotion_scores (dict): A dictionary of emotions and their corresponding confidence scores.
            n (int): The number of top emotions to extract.

        Returns:
            dict: A dictionary containing the top N emotions as keys and their raw scores as values.
        """
        # Convert the dictionary into a list of tuples and sort by score in descending order
        sorted_emotions = sorted(emotion_scores.items(), key=lambda item: item[1], reverse=True)

        top_n_emotions = {emotion: score for emotion, score in sorted_emotions[:n]}
        return top_n_emotions

    def _print_emotion_scores(self, emotion_scores: dict) -> None:
        """
        Print the emotions and their scores in a formatted, single-line manner.
        """
        formatted_emotions = ' | '.join(f"{emotion} ({score:.2f})" for emotion, score in emotion_scores.items())
        print(f"|{formatted_emotions}")

In [7]:
async def sending_handler(socket: ChatWebsocketConnection):
    """Handle sending a message over the socket.
    
    Waits 3 seconds and sends a UserInput message, which takes a 'text' parameter as input."""

    await asyncio.sleep(3)

    # Construct a user input message
    user_input_message = UserInput(text="HelloThere!")

    # Send the user input as text to the socket
    await socket.send(user_input_message)

In [8]:
# class MicrophoneInterface:
#     @staticmethod
#     async def start(socket, device=None, allow_user_interrupt=False, byte_stream=None):
#         # Audio parameters
#         samplerate = 16000
#         channels = 1
#         blocksize = 1024

#         # Create an asynchronous audio stream
#         async def audio_callback(indata, frames, time, status):
#             if status:
#                 print(f"Audio status: {status}", file=sys.stderr)
#             # Send the audio data over the socket
#             await socket.send(indata.tobytes())
#             if byte_stream is not None:
#                 byte_stream.append(indata.tobytes())

In [19]:
async def main() -> None:
    
    load_dotenv()

    # Retrieve the API key, Secret key, and EVI config id from the environment variables
    HUME_API_KEY = os.getenv("HUME_API_KEY")
    HUME_SECRET_KEY = os.getenv("HUME_SECRET_KEY")
    HUME_CONFIG_ID = os.getenv("HUME_CONFIG_ID")
   
    # Initialize the asynchronous client, authenticating with your API key
    client = AsyncHumeClient(api_key=HUME_API_KEY)
    
    # Define options for the WebSocket connection, such as an EVI config id and a secret key for token authentication
    options = ChatConnectOptions(config_id=HUME_CONFIG_ID, secret_key=HUME_SECRET_KEY)

    ########
    # INITIATE THE WEBSOCKETINTERFACE
    ########
    websocket_interface = WebSocketInterface()

    # Open the WebSocket connection with the configuration options and the interface's handlers
    async with client.empathic_voice.chat.connect_with_callbacks(
        options=options,
        on_open=websocket_interface.on_open,
        on_message=websocket_interface.on_message,
        on_close=websocket_interface.on_close,
        on_error=websocket_interface.on_error
    ) as socket:
        
        websocket_interface.set_socket(socket)

        microphone_task = asyncio.create_task(MicrophoneInterface.start(socket, allow_user_interrupt=True,byte_stream=websocket_interface.byte_strs))

        # # Create an asynchronous task to send messages
        # message_sending_task = asyncio.create_task(sending_handler(socket))

        # await asyncio.gather(microphone_task, message_sending_task)




        await microphone_task

        # Specify device [] in MicrophoneInterface
        device = 3
        MicrophoneInterface.start(socket, device=device, allow_user_interrupt=True, byte_stream=websocket_interface.byte_strs)


In [20]:
import nest_asyncio
nest_asyncio.apply()

asyncio.get_event_loop().run_until_complete(main())

# asyncio.run(main())

WebSocket connection opened.
Configuring socket with microphone settings...
Microphone connected. Say something!
[09:20:38] <CHAT_METADATA> Chat ID: 21f2cdee-9914-4b70-9ca5-df31858b5230 Chat Group ID: 94af3790-26b0-42d9-931a-d4c63e5a0964
Less than 3 emotions


KeyboardInterrupt: 

[09:21:16] USER: Hey.
Less than 3 emotions
[09:21:16] ASSISTANT: What?
Less than 3 emotions
[09:21:16] <ASSISTANT_END>
Less than 3 emotions
WebSocket error: sent 1011 (internal error) keepalive ping timeout; no close frame received
