In [1]:
from dataclasses import dataclass
from re import template
from typing import List, Dict, Optional, Any, Union
from pydantic import BaseModel, Field
from typing import ClassVar
import os
from pydantic import BaseModel, Field
from typing import ClassVar
import re


class Card(BaseModel):
    """
    A Card is a representation of a flashcard, containing a question and an answer. The card is uniquely identified by the id.

    Properties:
      id (str): The id uniquely identifies the card. It is represented as "card_xxxx_xxxx", with x being hexadecimal digits.
          The id is the only way to identify a card.
      question (str): The question (frontside) of the card.
      answer (str): The answer (frontside) of the card.
      flag (str): The flag of the card. **Must** be one of:
          none, red, orange, green, blue, pink, turquoise, purple
      cardState (str): The state of the card in the flashcard system. **Must** be one of:
          new, learning, review, suspended, buried
    """
    id: Optional[int] = Field(default=None)
    deck: Optional["Deck"] = Field(default=None)
    question: str
    answer: str
    flag: str
    cardState: str

    cards_by_id: ClassVar[dict[int, "Card"]] = {}
    CARD_ID_REGEX: ClassVar[re.Pattern] = re.compile(r"^card_[0-9a-fA-F]{4}_[0-9a-fA-F]{4}$")

    def __init__(self, **data):
        super().__init__(**data)

        attempt = 0
        while True:
            attempt += 1
            random_bytes = os.urandom(4)
            random_int = int.from_bytes(random_bytes, byteorder="big")
            if random_int not in Card.cards_by_id:
                Card.cards_by_id[random_int] = self
                self.id = random_int
                break
            if attempt >= 100:
                raise RuntimeError(f"{attempt} attempts of generating a new, unique card id failed.")

    def __str__(self):
        hex_str = f"{self.id:08x}"  # pad to 8 hex digits
        hex_id = f"card_{hex_str[:4]}_{hex_str[4:]}"
        s = f"""Card {hex_id} from the deck {self.deck}
Question:
{self.question}

Answer:
{self.answer}

Flag: {self.flag}
Card State: {self.cardState}"""
        return s

    @staticmethod
    def get_card_by_id(card_id: int) -> "Card":
        res = Card.cards_by_id[card_id]
        if res is None:
            raise RuntimeError(f"{card_id} not found.")
        return res

    @staticmethod
    def get_card_by_id_string(card_id_string: str) -> "Card":
        if not isinstance(card_id_string, str):
            raise ValueError("Card id must be a string in the format 'card_xxxx_xxxx'.")
        if not Card.CARD_ID_REGEX.fullmatch(card_id_string):
            raise ValueError(
                f"Invalid card id format: '{card_id_string}'. Expected format: 'card_xxxx_xxxx' with 8 hex digits.")

        hex_part = card_id_string[5:].replace("_", "")
        return Card.get_card_by_id(int(hex_part, 16))


class Deck(BaseModel):
    """
    A Deck represents a collection of flashcards.

    Properties:
       id (str): The id uniquely identifies the deck. It is represented as "deck_xxxx_xxxx", with x being hexadecimal digits.
          The id is the only way to identify a card. It is assigned randomly, there is no way to guess it!
       name (str): The name of the deck. This is **not** the id, and is **not** sufficient to address decks.
       cards (List[Card]): The cards contained in the deck. The order has no meaning.
    """
    id: Optional[int] = Field(default=None)
    name: str
    cards: List[Card]

    decks_by_id: ClassVar[dict[int, "Deck"]] = {}
    DECK_ID_REGEX: ClassVar[re.Pattern] = re.compile(r"^deck_[0-9a-fA-F]{4}_[0-9a-fA-F]{4}$")

    def __init__(self, **data):
        super().__init__(**data)

        attempt = 0
        while True:
            attempt += 1
            random_bytes = os.urandom(4)
            random_int = int.from_bytes(random_bytes, byteorder="big")
            if random_int not in Deck.decks_by_id:
                Deck.decks_by_id[random_int] = self
                self.id = random_int
                break
            if attempt >= 100:
                raise RuntimeError(f"{attempt} attempts of generating a new, unique card id failed.")

    def __str__(self):
        hex_str = f"{self.id:08x}"  # pad to 8 hex digits
        hex_id = f"deck_{hex_str[:4]}_{hex_str[4:]}"
        s = f"""Deck '{self.name}' (id: {hex_id})"""
        return s

    @staticmethod
    def get_deck_by_id(deck_id: int) -> "Deck":
        res = Deck.decks_by_id[deck_id]
        if res is None:
            raise RuntimeError(f"{deck_id} not found.")
        return res

    @staticmethod
    def get_deck_by_id_string(deck_id_string: str) -> "Deck":
        if not isinstance(deck_id_string, str):
            raise ValueError("Deck id must be a string in the format 'deck_xxxx_xxxx'.")
        if not Deck.DECK_ID_REGEX.fullmatch(deck_id_string):
            raise ValueError(
                f"Invalid deck id format: '{deck_id_string}'. Expected format: 'deck_xxxx_xxxx' with 8 hex digits.")

        hex_part = deck_id_string[5:].replace("_", "")
        return Deck.get_deck_by_id(int(hex_part, 16))


class DummyEnvironment(BaseModel):
    decks: List[str]


class CardWithFuzzyMatch(Card):
    __fuzzymatch: Optional[List[str]] = None


class TestResultDeck(BaseModel):
    name: str
    cards: List[Union[CardWithFuzzyMatch, Card]]


class ExpectedResult(BaseModel):
    decks: List[Union[str, TestResultDeck]]


class TestItem(BaseModel):
    name: str
    description: Optional[str] = ""
    environment: str
    queries: List[List[str]]
    params: Optional[Dict[str, Any]] = None
    expected_result: ExpectedResult


class QuestionAnsweringItem(BaseModel):
    name: str
    description: str
    environment: str
    queries: List[List[str]]
    expected_answer: str


class TestData(BaseModel):
    test_decks: Dict[str, Deck]
    dummy_environments: Dict[str, DummyEnvironment]
    tests: List[TestItem]
    question_answering: Dict[str, List[QuestionAnsweringItem]]


In [2]:
import pathlib

json_path = pathlib.Path("../tests/data/tests.json")
data = TestData.model_validate_json(json_path.read_text(encoding="utf-8"))

for (deck_key, deck) in data.test_decks.items():
    for card in deck.cards:
        card.deck = deck

del deck_key, deck, card

In [3]:
tmp_idx = 10
expected = data.test_decks["java"].cards[10]
print(Card.cards_by_id[expected.id])
print(3 * "\n")
print(expected)

Card card_e6ff_68ec from the deck Deck 'Java Programming' (id: deck_f8c6_3611)
Question:
What is the difference between 'int' and 'Integer'?

Answer:
'int' is a primitive type; 'Integer' is an object wrapper.

Flag: None
Card State: New




Card card_e6ff_68ec from the deck Deck 'Java Programming' (id: deck_f8c6_3611)
Question:
What is the difference between 'int' and 'Integer'?

Answer:
'int' is a primitive type; 'Integer' is an object wrapper.

Flag: None
Card State: New


In [4]:
import inspect

# function_registry.py
llm_commands = {}


def llm_command(func):
    llm_commands[func.__name__] = func
    return func


def get_llm_commands():
    res = []
    for cmnd_name, llm_command in llm_commands.items():
        params = []
        sig = inspect.signature(llm_command)
        for name, param in sig.parameters.items():
            params += [f"{name}: {param.annotation.__name__}"]

        if sig.return_annotation is None:
            returnType = "None"
        else:
            returnType = sig.return_annotation.__name__

        signature = f"{cmnd_name}({", ".join(params)}) -> {returnType}"
        signature = signature.replace("_empty", "<unspecified>")
        docs = llm_command.__doc__.strip("\n")
        res += [f"{signature}\n{docs}"]

    return "\n\n".join(res)


In [5]:
from typing import TypeVar, Generic, List, Iterator

T = TypeVar("T")

CARD_STREAM_CHUNK_SIZE = 5


class ChunkedCards(Generic[T]):
    def __init__(self, items: List[T], chunk_size: int = CARD_STREAM_CHUNK_SIZE):
        self.items = items
        self.chunk_size = chunk_size

    def __iter__(self) -> Iterator[List[T]]:
        for i in range(0, len(self.items), self.chunk_size):
            yield self.items[i:i + self.chunk_size]

    def __len__(self) -> int:
        return (len(self.items) + self.chunk_size - 1) // self.chunk_size


# currently, there is no "Decks" or "FlashcardProvider" class, instead decks and cards are managed by the
# static method of deck and card -> terrible idea. Ooops.
@llm_command
def list_decks() -> list[Deck]:
    """
    List all available decks. Necessary to get the ids of the decks.
    """
    return list(Deck.decks_by_id.values())


@llm_command
def create_deck(name: str) -> Deck:
    """
    Create a new deck with the given name. The name must be a non-empty string.
    There may be no deck with the same name.
    The Deck containing the generated id is returned.
    """
    if not isinstance(name, str) or not name.strip():
        raise ValueError("Deck name must be a non-empty string.")
    if name in Deck.decks_by_id:
        raise ValueError(f"Deck with name '{name}' already exists.")
    deck = Deck(name=name, cards=[])
    return deck


@llm_command
def delete_deck(deck_id_str: str) -> None:
    """
    Delete a deck by its id. The id must be a string in the format 'deck_xxxx_xxxx'.
    """
    deck = Deck.get_deck_by_id_string(deck_id_str)
    # Remove all cards from the deck and from cards_by_id
    for card in list(deck.cards):
        Card.cards_by_id.pop(card.id, None)
    Deck.decks_by_id.pop(deck.id, None)


@llm_command
def list_cards(deck_id_str: str, search_substring: str = None) -> ChunkedCards[Card]:
    """
    List all cards in a deck, optionally filtering by a substring in the question.
    The id must be a string in the format 'deck_xxxx_xxxx'.
    The cards will be returned in batches of 5.
    """
    deck = Deck.get_deck_by_id_string(deck_id_str)
    cards = deck.cards
    if search_substring:
        if not isinstance(search_substring, str):
            raise ValueError("Search substring must be a string.")
        cards = [c for c in cards if search_substring.lower() in c.question.lower()]
    return ChunkedCards(cards)


@llm_command
def add_card(deck_id_str: str, question: str, answer: str, state: str, flag: str) -> None:
    """
    Create a new card in a deck. The deck id must be a string in the format 'deck_xxxx_xxxx'.
    The question, answer, state, and flag must all be non-empty strings.
    The state must be a valid CARD_STATE.
    The flag must be a valid CARD_FLAG.
    """
    if not all(isinstance(x, str) and x.strip() for x in [question, answer, state, flag]):
        raise ValueError("Question, answer, state, and flag must all be non-empty strings.")
    deck = Deck.get_deck_by_id_string(deck_id_str)
    card = Card(question=question, answer=answer, cardState=state, flag=flag, deck=deck)
    deck.cards.append(card)


@llm_command
def edit_card_question(card_id_str: str, new_question: str) -> None:
    """
    Edit the question of a card. The id must be a string in the format 'card_xxxx_xxxx'.
    """
    if not isinstance(new_question, str) or not new_question.strip():
        raise ValueError("New question must be a non-empty string.")
    card = Card.get_card_by_id_string(card_id_str)
    card.question = new_question


@llm_command
def edit_card_answer(card_id_str: str, new_answer: str) -> None:
    """
    Edit the answer of a card. The id must be a string in the format 'card_xxxx_xxxx'.
    """
    if not isinstance(new_answer, str) or not new_answer.strip():
        raise ValueError("New answer must be a non-empty string.")
    card = Card.get_card_by_id_string(card_id_str)
    card.answer = new_answer


@llm_command
def edit_card_flag(card_id_str: str, new_flag: str) -> None:
    """
    Edit the flag of a card. The id must be a string in the format 'card_xxxx_xxxx'.
    """
    if not isinstance(new_flag, str) or not new_flag.strip():
        raise ValueError("New flag must be a non-empty string.")
    card = Card.get_card_by_id_string(card_id_str)
    card.flag = new_flag


@llm_command
def edit_card_state(card_id_str: str, new_state: str) -> None:
    """
    Edit the state of a card. The id must be a string in the format 'card_xxxx_xxxx'.
    """
    if not isinstance(new_state, str) or not new_state.strip():
        raise ValueError("New state must be a non-empty string.")
    card = Card.get_card_by_id_string(card_id_str)
    card.cardState = new_state


@llm_command
def delete_card(card_id_str: str) -> None:
    """
    Delete a card by its id. The id must be a string in the format 'card_xxxx_xxxx'.
    """
    card = Card.get_card_by_id_string(card_id_str)
    if not card.deck:
        raise RuntimeError(f"Card '{card_id_str}' does not have an associated deck.")
    deck = card.deck
    if card not in deck.cards:
        raise RuntimeError(f"Card '{card_id_str}' is not present in its deck.")
    deck.cards.remove(card)
    Card.cards_by_id.pop(card.id, None)


_virtual_decks: dict[str, list] = {}
_active_virtual_deck: str = None


#@llm_command TODO later
def create_virtual_deck(virtual_deck_name: str):
    if not isinstance(virtual_deck_name, str) or not virtual_deck_name.strip():
        raise ValueError("Virtual deck name must be a non-empty string.")
    if virtual_deck_name in _virtual_decks:
        raise ValueError(f"Virtual deck '{virtual_deck_name}' already exists.")
    _virtual_decks[virtual_deck_name] = []


#@llm_command TODO later
def activate_virtual_deck(virtual_deck_name: str):
    global _active_virtual_deck
    if not isinstance(virtual_deck_name, str) or not virtual_deck_name.strip():
        raise ValueError("Virtual deck name must be a non-empty string.")
    if virtual_deck_name not in _virtual_decks:
        raise KeyError(f"Virtual deck '{virtual_deck_name}' does not exist.")
    _active_virtual_deck = virtual_deck_name


#@llm_command TODO later
def virtual_deck_add_card(card_id_str: str):
    if _active_virtual_deck is None:
        raise RuntimeError("No active virtual deck. Please activate a virtual deck first.")
    card = Card.get_card_by_id_string(card_id_str)
    if card in _virtual_decks[_active_virtual_deck]:
        raise ValueError(f"Card '{card_id_str}' is already in the active virtual deck.")
    _virtual_decks[_active_virtual_deck].append(card)


#@llm_command TODO later
def virtual_deck_remove_card(card_id_str: str):
    if _active_virtual_deck is None:
        raise RuntimeError("No active virtual deck. Please activate a virtual deck first.")
    card = Card.get_card_by_id_string(card_id_str)
    if card not in _virtual_decks[_active_virtual_deck]:
        raise ValueError(f"Card '{card_id_str}' is not in the active virtual deck.")
    _virtual_decks[_active_virtual_deck].remove(card)


#@llm_command TODO later
def virtual_deck_list():
    if _active_virtual_deck is None:
        raise RuntimeError("No active virtual deck. Please activate a virtual deck first.")
    return ChunkedCards(_virtual_decks[_active_virtual_deck])


#@llm_command TODO later
def deactivate_virtual_deck():
    global _active_virtual_deck
    if _active_virtual_deck is None:
        raise RuntimeError("No virtual deck is currently active.")
    _active_virtual_deck = None

In [6]:
import re
import ast
import openai


class LLMCommunicator:
    messages: list[dict[str, str]]
    model: str
    temperature: float
    max_tokens: Optional[int]

    def __init__(self, model: str, temperature: float, max_tokens: Optional[int] = None):
        self.model = model
        self.temperature = temperature
        self.max_tokens = max_tokens
        self.client = openai.OpenAI(
            api_key="lm-studio",
            base_url="http://localhost:1234/v1"
        )
        self.messages = []

    def send_message(self, message: str) -> str:
        request_message = {"role": "user", "content": message}
        self.messages.append(request_message)

        response = self.client.chat.completions.create(
            model=self.model,
            messages=self.messages,
            temperature=self.temperature,
            max_tokens=self.max_tokens,
        )
        msg = response.choices[0].message
        self.messages.append({"role": msg.role, "content": msg.content})
        return msg.content


class TaskExecutor:
    __first_message: str = None
    function_map = llm_commands

    @staticmethod
    def get_first_message(user_prompt: str):
        if TaskExecutor.__first_message is not None:
            return TaskExecutor.__first_message + user_prompt.strip()

        available_functions = get_llm_commands()

        template = f"""You are an assistant for a flashcard learning system. The system manages Cards, that are collected in Decks.

About Cards:
{Card.__doc__.strip("\n")}

About Decks:
{Deck.__doc__.strip("\n")}

You can interact with the system by calling specific Python functions, each of which performs an action. The available actions are:
{available_functions}

_________________________________

First, you **have to** think about what the user wants, which information you need, and make a rough plan of your actions. Almost always you will need further information (e.g., deck ids, card ids, or card content). In this case, you will request the information.
After reasoning, output the steps you want to execute **now** in the following format:

<execute>
* function_call_1(arguments)
* function_call_2(arguments)
...
</execute>

The system will then execute your commands, and return an python array of results:
["result_of_call_1", "result_of_call_2", ...]

If no further actions are needed, please return an empty execute block:

<execute>
</execute>

For example, if the user prompt was:
"Create a new deck with the name Astrology and add What is the largest planet in our solar system? and Jupiter to it. Flag it as Purple."
Your steps should be:
* Create a new deck with the name Astrology. Wait for the output to get the id of this new deck.
* Add a card with the given question, answer and flag to the deck.

So the first execution plan would be:
<execute>
* create_deck("Astrology")
</execute>

The system then would answer you the information of the newly created deck, e.g.:
["Deck 'Astrology' (id: deck_9874_2787)"]

Then, the next execution plan would be:
<execute>
* add_card(deck_id_str="deck_9874_2787", question="What is the largest planet in our solar system?", answer="Jupiter", state="new", flag="purple")
</execute>

The system then provides you with an empty response.
Then, you have achieved your task, and return:

<execute>
</execute>


The user prompt is:
"""

        TaskExecutor.__first_message = template
        return TaskExecutor.__first_message + user_prompt.strip()

    @staticmethod
    def execute_prompt(user_prompt: str):
        llm_communicator = LLMCommunicator("qwen2.5-14b-instruct", 0.8)
        error_count = 0

        message_to_send = TaskExecutor.get_first_message(user_prompt)

        while True:
            try:
                print("\nQuestion:")
                print(message_to_send)
                answer = llm_communicator.send_message(message_to_send)
                print("\nAnswer:")
                print(answer)
                evaluated = TaskExecutor.evaluate_llm_response(answer)
                if len(evaluated) == 0:
                    return
                else:
                    message_to_send = str([str(it) for it in evaluated])
            except Exception as e:
                print("\nException raised:")
                print(e)
                error_count += 1
                if error_count > 10:
                    raise RuntimeError("Too many errors. Abort execution.")
                message_to_send = f"""An error occured: {e}. Please try again!"""

    @staticmethod
    def evaluate_llm_response(response: str):
        # Extract the execution plan block
        match = re.search(r"<execute>(.*?)</execute>", response, re.DOTALL)
        if not match:
            raise ValueError("No <execute> block found in response.")
        plan = match.group(1)

        results = []
        for line in plan.splitlines():
            line = line.strip()
            if not line.strip():
                continue
            command = line[1:].strip()
            # Parse function name and arguments
            m = re.match(r"(\w+)\((.*)\)", command)
            if not m:
                raise ValueError(f"Invalid command syntax: {command}")
            func_name, arg_str = m.groups()
            if func_name not in TaskExecutor.function_map:
                raise ValueError(f"Unknown function: {func_name}")
            # Parse arguments safely
            args = []
            kwargs = {}
            if arg_str.strip():
                # Split by comma, handle both positional and keyword args
                for arg in re.split(r",(?![^\(\[]*[\]\)])", arg_str):
                    arg = arg.strip()
                    if "=" in arg:
                        k, v = arg.split("=", 1)
                        kwargs[k.strip()] = ast.literal_eval(v.strip())
                    else:
                        args.append(ast.literal_eval(arg))
            result = TaskExecutor.function_map[func_name](*args, **kwargs)
            results.append(str(result))
        return results

In [7]:
TaskExecutor.execute_prompt("Go make new deck name Geography and add a new card (flag: Turquoise) with question What is the capital of France? and answer Paris. The state should be 'New'.")


Question:
You are an assistant for a flashcard learning system. The system manages Cards, that are collected in Decks.

About Cards:
    A Card is a representation of a flashcard, containing a question and an answer. The card is uniquely identified by the id.

    Properties:
      id (str): The id uniquely identifies the card. It is represented as "card_xxxx_xxxx", with x being hexadecimal digits.
          The id is the only way to identify a card.
      question (str): The question (frontside) of the card.
      answer (str): The answer (frontside) of the card.
      flag (str): The flag of the card. **Must** be one of:
          none, red, orange, green, blue, pink, turquoise, purple
      cardState (str): The state of the card in the flashcard system. **Must** be one of:
          new, learning, review, suspended, buried
    

About Decks:
    A Deck represents a collection of flashcards.

    Properties:
       id (str): The id uniquely identifies the deck. It is represented as 

## Do the tests!

## First, we need to find out how to compare decks.

In [8]:

@dataclass(frozen=True)
class HashableCard:
    question: str
    answer: str
    flag: str
    state: str


import openai


def fuzzy_match(expected_card: CardWithFuzzyMatch, actual_card: HashableCard) -> bool:
    prompt = f"""Please evaluate the following two flashcards, and tell me, if they have the same content. It is fine if the spelling, the grammar, the length and the wording differs, as long as the cards contain roughly the same information. If these cards are quite similar, please just answer "true", else answer "false". The output is only read by a machine, and you can **only** answer with the string "true" or "false". There is no other option.

Card 1:
Question: {expected_card.question}
Answer: {expected_card.answer}

Card 2:
Question: {actual_card.question}
Answer: {actual_card.answer}
"""

    client = openai.OpenAI(
        api_key="lm-studio",
        base_url="http://localhost:1234/v1"
    )

    response = client.chat.completions.create(
        model="qwen2.5-14b-instruct",  # Replace with your LM Studio model name if needed
        messages=[{"role": "user", "content": prompt}],
        temperature=0.0,
        max_tokens=10,
    )
    result = response.choices[0].message.content.strip().lower()
    if result not in ("true", "false"):
        raise ValueError(f"Unexpected LM Studio response: {result!r}")
    return result == "true"


def compare_decks(expected: TestResultDeck, actual: Deck) -> List[str]:
    expected_fuzzy_cards: List[CardWithFuzzyMatch] = []
    expected_set = set()
    for ecard in expected.cards:
        if isinstance(ecard, CardWithFuzzyMatch):
            expected_fuzzy_cards.append(ecard)
        else:
            expected_set.add(HashableCard(ecard.question, ecard.answer, ecard.flag, ecard.state))

    actual_set = set()
    for acard in actual.cards:
        expected_set.add(HashableCard(acard.question, acard.answer, acard.flag, acard.state))

    # get all missing actual cards
    additional_expected = expected_set - actual_set
    additional_actual = actual_set - expected_set

    # Now the remaining fuzzy matches. Search for possible matches.
    additional_fuzzy_cards: List[CardWithFuzzyMatch] = []

    for expected_fuzzy in expected_fuzzy_cards:
        match = None
        for actual in additional_actual:
            if "question" not in expected_fuzzy.__fuzzymatch:
                if expected_fuzzy.question != actual.question:
                    continue
            if "answer" not in expected_fuzzy.__fuzzymatch:
                if expected_fuzzy.answer != actual.answer:
                    continue
            if fuzzy_match(expected_fuzzy, actual):
                match = actual
                break

        if match is None:
            additional_fuzzy_cards.append(expected_fuzzy)
        else:
            additional_actual.remove(match)

    # Now create the error messages
    errors = []
    for additional_fuzzy in additional_fuzzy_cards:
        errors += [f"The following (fuzzy-matching card) has not found a partner:\nf{additional_fuzzy}"]

    for additional_actual in additional_actual:
        errors += [f"The following provided card was not expected:\n{additional_actual}"]

    for additional_expected in additional_expected:
        errors += [f"The following expected card was not provided:\n{additional_expected}"]

    return errors


def evaluate_test_result(expected: ExpectedResult, test_data: TestData, actual_decks: list[Deck]):
    errors = []
    expected_decks: list[TestResultDeck] = []
    for expected_deck in expected.decks:
        if isinstance(expected_deck, str):
            deck = test_data.test_decks[expected_deck]
            if deck is None:
                errors.append(f"Expected deck '{expected_deck}' does not exist.")

            expected_decks.append(TestResultDeck(name=deck.name, cards=deck.cards))
        else:
            expected_decks.append(expected_deck)

    # now match expected decks to actual decks using name
    expected_decks_by_name = {deck.name: deck for deck in expected_decks}
    unmatched_actual_decks = []

    for (name, actual) in actual_decks.items():
        if name not in expected_decks_by_name:
            unmatched_actual_decks.append(name)
            continue

        expected = expected_decks_by_name.pop(name)
        errors += compare_decks(expected, actual)

    unmatched_expected_decks = expected_decks_by_name.values()

    # create unmatched error messages
    for unmatched_expected_deck in unmatched_expected_decks:
        errors += [f"The deck {unmatched_expected_deck.name} was expected, but was not in the actual result."]

    for unmatched_actual_deck in unmatched_actual_decks:
        errors += [f"The deck {unmatched_actual_deck.name} was in the actual result, but was unexpected."]


In [13]:
expected = CardWithFuzzyMatch(
    question="Was ist ein Integer?",
    answer="Eine ganze Zahl.",
    flag="None",
    cardState="New"
)

actual = HashableCard(
    question="Was ist ein Integer?",
    answer="Eine Zahl ohne Nachkommastellen",
    flag="None",
    state="New"
)

fuzzy_match(expected, actual)

True

## Audio

In [18]:
import sounddevice as sd
import numpy as np
import scipy.io.wavfile as wav
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline


# Record audio from microphone
def record_audio(duration=5, fs=16000):
    print("Recording...")
    audio = sd.rec(int(duration * fs), samplerate=fs, channels=1, dtype='int16')
    sd.wait()
    print("Recording finished.")
    return audio.flatten(), fs


# Save audio to a temporary WAV file
def save_wav(audio, fs, filename="temp.wav"):
    wav.write(filename, fs, audio)
    return filename


def create_pipeline(only_cpu: bool = False) -> pipeline:
    device = "cuda:0" if not only_cpu and torch.cuda.is_available() else "cpu"
    torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
    model_id = "openai/whisper-large-v3"

    model = AutoModelForSpeechSeq2Seq.from_pretrained(
        model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
    )
    model.to(device)
    processor = AutoProcessor.from_pretrained(model_id)
    return pipeline(
        "automatic-speech-recognition",
        model=model,
        tokenizer=processor.tokenizer,
        feature_extractor=processor.feature_extractor,
        torch_dtype=torch_dtype,
        device=device,
    )


pipe = create_pipeline(only_cpu=True)


# Load and transcribe audio
def transcribe(filename):
    result = pipe(filename)
    print("Transcription:", result)

Device set to use cpu


In [19]:
audio, fs = record_audio(duration=5, fs=16000)
filename = save_wav(audio, fs)
transcribe(filename)

Recording...
Recording finished.


Due to a bug fix in https://github.com/huggingface/transformers/pull/28687 transcription using a multilingual Whisper will default to language detection followed by transcription instead of translation to English.This might be a breaking change for your use case. If you want to instead always translate your audio to English, make sure to pass `language='en'`.


Transcription: {'text': ' And they even have Swedish food there, Swedish fish, Swedish meatballs, whatever.'}


In [5]:
# import sounddevice as sd
# import numpy as np
#
# def audio_stream(chunk_duration=1, fs=16000):
#     chunk_samples = int(chunk_duration * fs)
#     with sd.InputStream(samplerate=fs, channels=1, dtype='int16') as stream:
#         print("Live transcription started. Speak into the microphone.")
#         while True:
#             audio_chunk, _ = stream.read(chunk_samples)
#             yield audio_chunk.flatten()
#
# for chunk in audio_stream():
#     audio_float = (chunk / 32768.0).astype(np.float32)
#     # Pass as dict with 'array' and 'sampling_rate'
#     result = pipe({"array": audio_float, "sampling_rate": 16000})
#     print("You said:", result["text"])