This is a Jupyter Notebook that generates Audio for a specific Verb Italain Anki Deck

And updates all Anki Cards respectivly.

!! The Anki App needs to be OPEN through all of this!!!

The Audio can be found in a .rar in this directory. Generating took me 30mins on my Old Surface 

Updating the Anki Fields is being done one by one, with each automatic access costing 5 secs, and it took 4 Hours in total.

The Italian Deck can be found [here](https://ankiweb.net/shared/info/1891639832) 

To communicate with Anki, Use [Anki Connect](https://ankiweb.net/shared/info/2055492159)

In [5]:
import edge_tts

import genanki
from gtts import gTTS
import requests
import os
import json

DECK_NAME = "Italiano::Conjugation"
ANKI_CONNECT_URL = "http://localhost:8765"

In [6]:
def invoke(action, **params):
    """Helper to communicate with AnkiConnect"""
    requestJson = json.dumps({"action": action, "version": 6, "params": params})
    response = requests.post(ANKI_CONNECT_URL, data=requestJson).json()
    if len(response) != 2:
        raise Exception("Response has an unexpected number of fields")
    if "error" not in response:
        raise Exception("Response is missing required error field")
    if response["error"] is not None:
        raise Exception(response["error"])
    return response["result"]


try:
    decks = invoke("deckNames")
    print(f"Connected! Your decks: {decks}")
except Exception as e:
    print(f"Could not connect to Anki: {e}")

Connected! Your decks: ['Default', 'Italiano', 'Italiano::Conjugation', 'Ultimate Geography']


In [7]:
# Get Model
print(f"Finding notes in deck: {DECK_NAME}...")
note_ids = invoke("findNotes", query=f'deck:"{DECK_NAME}"')
first_note = invoke("notesInfo", notes=[note_ids[0]])[0]
model_name = first_note["modelName"]
print(f"Detected Model: {model_name}")

Finding notes in deck: Italiano::Conjugation...
Detected Model: > UID ; Front ; Back > ULTIMATE ITALIAN CONJUGATION SHARED


In [8]:
current_fields = invoke("modelFieldNames", modelName=model_name)
print(current_fields)
new_fields = ["Italian Audio"]
for field in new_fields:
    if field not in current_fields:
        print(f"Adding field: {field}")
        invoke("modelFieldAdd", modelName=model_name, fieldName=field)
    else:
        print(f"Field '{field}' already exists. Skipping.")

['ID', 'Front', 'Back', 'Italian Audio']
Field 'Italian Audio' already exists. Skipping.


We make it so the Playbutton cant be seen, but audio is being played
this is done with 

```html
<div style="display:none;">
{{Italian Audio}}

In [9]:
model_templates = invoke("modelTemplates", modelName=model_name)

# We just hardcode it
new_back_template = """{{FrontSide}}

<hr id=answer>

{{Back}}

<br>
<div style="display:none;">
{{Italian Audio}}
<br>

<div class="back">
<br /><br />
{{#Tags}}
<span class='tag_SECTION'>
tags: <span class='tags'>{{Tags}}</span>
</span>
{{/Tags}}
</div>
<br><br>"""

current_front_template = "{{Front}}"
templates_payload = {
    "Card 1": {"Front": current_front_template, "Back": new_back_template}
}

result = invoke(
    "updateModelTemplates", model={"name": model_name, "templates": templates_payload}
)

In [None]:
import asyncio
import edge_tts
import os
from tqdm.asyncio import tqdm

VOICE = "it-IT-IsabellaNeural"
OUTPUT_FOLDER = "anki_audio"

# Create the folder if it doesn't exist
os.makedirs(OUTPUT_FOLDER, exist_ok=True)


async def generate_one(text, note_id):
    """
    Generates a single audio file using the ID as the filename.
    """
    if not text or not text.strip():
        print(f"Skipping ID {note_id}: Text is empty.")
        return None

    # Save inside the folder, naming the file {ID}.mp3
    filename = os.path.join(OUTPUT_FOLDER, f"{note_id}.mp3")

    try:
        communicate = edge_tts.Communicate(text, VOICE)
        await communicate.save(filename)
        return filename
    except Exception as e:
        print(f"Error generating ID {note_id}: {e}")
        return None


async def generate_batch(data_dict):
    """
    Iterates over a dictionary {ID: Text}, processes concurrently.
    """
    # Limit to 5 parallel requests to avoid timeouts/bans
    sem = asyncio.Semaphore(5)

    async def sem_task(note_id, text):
        async with sem:
            # print(f"Generating ID {note_id}: {text[:30]}...")  # Print first 30 chars
            return await generate_one(text, note_id)

    # Create tasks by iterating over dict items (key=ID, value=Text)
    tasks = [sem_task(note_id, text) for note_id, text in data_dict.items()]

    results = await tqdm.gather(
        *tasks, desc="Generating Audio", unit="files"  # Title of the bar  # Unit name
    )

    # Filter out None results (failed or empty texts)
    return [r for r in results if r is not None]

In [None]:
notes_info = invoke("notesInfo", notes=note_ids)
value_dict = {}  # holds id and the thing to say
for note in notes_info:
    note_id = note["noteId"]
    fields = note["fields"]
    back_value = fields["Back"]["value"]
    value_dict[note_id] = back_value

In [None]:
import re

html_pattern = re.compile(r"<[^>]+>")
cleaned_dict = {}
# filter out html tags so they dont get pronounced
for key, text in value_dict.items():

    # dealing with HTML tags
    text = re.sub(r"<br\s*/?>", " ", text, flags=re.IGNORECASE)
    clean_text = re.sub(r"<[^>]+>", "", text)

    to_remove = ["[rare]", "[most common]", "[less common]", "[lit]", "/"]
    replacements = {
        "Lui/lei": "Lui",
        "lui/lei": "lui",
    }  # I am not sure if capitalzion affects pronuncation

    for old, new in replacements.items():
        clean_text = clean_text.replace(old, new)

    for target in to_remove:
        clean_text = clean_text.replace(target, "")

    cleaned_dict[key] = clean_text

In [None]:
import csv

# There are some cards that have a switch case in them.
# "Voi morirete / morrete", Both a grammarly correct, but i think its the best to use the more common, correct one
# I generated a .csv with AI, that gives the corrected one, so we can pronounce it

csv_filename = "switch_cases.csv"

if os.path.exists(csv_filename):
    print("Loading CSV and updating dictionary...")

    with open(csv_filename, mode="r", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        count = 0
        for row in reader:
            # conv to ID as its a string in csv
            note_id = int(row["ID"])
            selected_text = row["Selected_Text"]

            # update
            if note_id in cleaned_dict:
                cleaned_dict[note_id] = selected_text
                count += 1

    print(f"Success! Updated {count} items in cleaned_dict.")

In [None]:
generated_files = await generate_batch(cleaned_dict)  # takes 30mins

  def __init__(self, lock=None):
Generating Audio:  20%|██        | 622/3043 [22:12<124:20:24, 184.89s/files]

Error generating ID 1704316540781: Cannot connect to host speech.platform.bing.com:443 ssl:<ssl.SSLContext object at 0x000001D285D25850> [getaddrinfo failed]


Generating Audio:  21%|██        | 625/3043 [22:16<65:59:24, 98.25s/files]  

Error generating ID 1704316541171: Cannot connect to host speech.platform.bing.com:443 ssl:<ssl.SSLContext object at 0x000001D285D25DD0> [getaddrinfo failed]
Error generating ID 1704316539144: Cannot connect to host speech.platform.bing.com:443 ssl:<ssl.SSLContext object at 0x000001D288F51D50> [getaddrinfo failed]
Error generating ID 1704316538676: Cannot connect to host speech.platform.bing.com:443 ssl:<ssl.SSLContext object at 0x000001D2843B41D0> [getaddrinfo failed]


Generating Audio: 100%|██████████| 3043/3043 [33:49<00:00,  1.50files/s]  


In [None]:
# Had a issue, with my laptop llsing connection for 1 second, the IDs needed to be generated
"""failed_IDs = [1704316540781, 1704316541171, 1704316539144, 1704316538676]
subset_dict = {}
for fid in failed_IDs:
    subset_dict[fid] = cleaned_dict[fid]
generate_f = await generate_batch(subset_dict)"""

Generating Audio: 100%|██████████| 4/4 [00:01<00:00,  3.16files/s]


In [None]:
import base64

# update the anki database, this is realy slow as each upadte takes 5 seconds


# needed for anki format
def get_file_base64(file_path):
    """Reads a file and returns its base64 string."""
    with open(file_path, "rb") as f:
        return base64.b64encode(f.read()).decode("utf-8")


print("Updating Anki Cards with generated audio...")
AUDIO_FOLDER = OUTPUT_FOLDER
TARGET_FIELD = "Italian Audio"
for note in tqdm(notes_info):
    note_id = note["noteId"]
    fields = note["fields"]

    field_updates = {}

    # get file, which holds id in its name
    local_filename = f"{note_id}.mp3"
    local_path = os.path.join(AUDIO_FOLDER, local_filename)

    if os.path.exists(local_path):
        try:
            # convert to b64
            b64_data = get_file_base64(local_path)

            # make the string uniqie to our deck, to keep clear on anki media server
            anki_filename = f"it_{note_id}.mp3"

            invoke("storeMediaFile", filename=anki_filename, data=b64_data)

            # update the field with the sound name
            field_updates[TARGET_FIELD] = f"[sound:{anki_filename}]"

        except Exception as e:
            print(f"Error updating note {note_id}: {e}")

    if field_updates:
        invoke("updateNoteFields", note={"id": note_id, "fields": field_updates})

Updating Anki Cards with generated audio...


100%|██████████| 3043/3043 [3:33:34<00:00,  4.21s/it]  


In [12]:
# code to package the vocab set, so people dont need to do this whole 4 hour process themselves.


output_filename = "Italiano_Conjugation_With_Audio.apkg"
output_path = os.path.abspath(output_filename)

print(f"Exporting deck '{DECK_NAME}' to:\n{output_path}")


try:
    # 'includeSched': False

    invoke("exportPackage", deck=DECK_NAME, path=output_path, includeSched=False)


except Exception as e:
    print(f" Export failed: {e}")

Exporting deck 'Italiano::Conjugation' to:
c:\Users\elyes\Desktop\Anki TTS\Italiano_Conjugation_With_Audio.apkg
