In [42]:
from pathlib import Path
from google.cloud import texttospeech_v1beta1 as tts
import bson, json
import dotenv, os
from dataclasses import dataclass
import datetime
from tqdm import tqdm
import asyncio

dotenv.load_dotenv()

True

### Audio generation

In [43]:
def words_to_ssml(words: list[str]) -> str:
    ssml: str = "<speak>\n"

    for i, word in enumerate(words):
        word_clean = word.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;').replace('"', '&quot;').replace("'", '&apos;')
        ssml += (f'<mark name="{i}"/>{word_clean} ')

    ssml = ssml.strip() + "\n</speak>"

    return ssml

In [44]:
async def synthesize_speech(ssml, voice_name='en-US-Neural2-J'):
    client = tts.TextToSpeechAsyncClient(
        client_options={"api_key": os.getenv('GOOGLE_CLOUD_API_KEY')}
    )
    
    voice = tts.VoiceSelectionParams(
        language_code="en-US",
        name=voice_name,
    )
    
    response = await client.synthesize_speech(
        request=tts.SynthesizeSpeechRequest(
            input=tts.SynthesisInput(ssml=ssml),
            voice=voice,
            audio_config=tts.AudioConfig(
                audio_encoding=tts.AudioEncoding.MP3,
                speaking_rate=1
            ),
            enable_time_pointing=[
                tts.SynthesizeSpeechRequest.TimepointType.SSML_MARK
            ]
        )
    )

    audio = response.audio_content
    marks = [t.time_seconds for t in response.timepoints]

    return (audio, marks)

### Schema definition & DB generation

In [45]:
import ormar
import datetime
from typing import Optional, List
from databases import Database
from sqlalchemy import create_engine, MetaData

DATEBASE_VERSION = "1.0"
DATABASE_URL = f"sqlite:///out/quizzies_v{DATEBASE_VERSION}.db"

engine = create_engine(DATABASE_URL)
metadata = MetaData()
database = Database(DATABASE_URL)  # You can change this to your database URL
base_ormar_config = ormar.OrmarConfig(
    metadata=MetaData(),
    database=Database(DATABASE_URL),
    engine=engine,
)

class QuizSet(ormar.Model):
    ormar_config = base_ormar_config.copy(tablename="sets")

    id: str = ormar.String(max_length=512, primary_key=True)
    name: str = ormar.Text()
    year: int = ormar.Integer()
    difficulty: int = ormar.Integer()
    standard: bool = ormar.Boolean()

class QuizPacket(ormar.Model):
    ormar_config = base_ormar_config.copy(tablename="packets")

    id: str = ormar.String(max_length=512, primary_key=True)
    set: QuizSet = ormar.ForeignKey(QuizSet)
    name: str = ormar.Text()
    number: int = ormar.Integer()

class Tossup(ormar.Model):
    ormar_config = base_ormar_config.copy(tablename="tossups")

    id: str = ormar.String(max_length=512, primary_key=True)
    category: str = ormar.Text()
    subcategory: str = ormar.Text()
    difficulty: int = ormar.Integer()

    set: Optional[QuizSet] = ormar.ForeignKey(QuizSet)
    packet: Optional[QuizPacket] = ormar.ForeignKey(QuizPacket)
    number: int = ormar.Integer()

    question: List[str] = ormar.JSON()  # Stored as JSON array
    word_timing: List[float] = ormar.JSON()  # Stored as JSON array
    answer: str = ormar.Text()
    power_mark: int = ormar.Integer()

    audio: bytes = ormar.LargeBinary(max_length=(5*(10**6)))  # Stored as BLOB

    qbr_created: datetime.datetime = ormar.DateTime()
    qbr_updated: datetime.datetime = ormar.DateTime()
    rerun_marker: int = ormar.Integer()

base_ormar_config.metadata.drop_all(engine)
base_ormar_config.metadata.create_all(engine)

### Helper functions

In [46]:
RERUN_MARKER = 1

async def process_tossup(tossup: dict) -> None:
    existing_tossup = await Tossup.objects.get_or_none(id=str(tossup['_id']))
    if existing_tossup and existing_tossup.rerun_marker == RERUN_MARKER:
        return

    qpacket = await QuizPacket.objects.get_or_none(id=str(tossup['packet']['_id']))
    qset = await QuizSet.objects.get_or_none(id=str(tossup['set']['_id']))

    words: list[str] = tossup['question_sanitized'].split()
    
    power_mark: int = next((i for i, word in enumerate(words) if "(*)" in word), 0) - 1
    if power_mark:
        words = words[:power_mark+1] + words[power_mark+2:]

    ssml: str = words_to_ssml(words)
    audio, timestamps = await synthesize_speech(ssml)

    m_tossup = Tossup(
        id=str(tossup['_id']), 
        category=tossup['category'], 
        subcategory=tossup['subcategory'], 
        difficulty=tossup['difficulty'], 
        set=qset,
        packet=qpacket,
        number=tossup['number'], 
        question=words, 
        word_timing=timestamps, 
        answer=tossup['answer_sanitized'], 
        power_mark=power_mark, 
        audio=audio, 
        qbr_created=tossup['createdAt'], 
        qbr_updated=tossup['updatedAt'], 
        rerun_marker=RERUN_MARKER
    )

    await m_tossup.save()

In [47]:
async def process_batch(batch: list[dict]):
    tasks = [process_tossup(tossup) for tossup in batch]
    await asyncio.gather(*tasks)

### Runner

In [48]:
with open("D:\\Seafile\\Personal\\projects\\quizzies\\qbreader_db\\2024-11-18_22_56_52\\tossups.bson", "rb") as file:
    tossups = bson.decode_all(file.read())
with open("D:\\Seafile\\Personal\\projects\\quizzies\\qbreader_db\\2024-11-18_22_56_52\\bonuses.bson", "rb") as file:
    bonuses = bson.decode_all(file.read())
with open("D:\\Seafile\\Personal\\projects\\quizzies\\qbreader_db\\2024-11-18_22_56_52\\sets.bson", "rb") as file:
    sets = bson.decode_all(file.read())
with open("D:\\Seafile\\Personal\\projects\\quizzies\\qbreader_db\\2024-11-18_22_56_52\\packets.bson", "rb") as file:
    packets = bson.decode_all(file.read())


In [49]:
for mset in tqdm(sets, desc="Sets"):
    qset = await QuizSet.objects.get_or_none(id=str(mset['_id']))
    if qset is None:
        qset = QuizSet(
            id=str(mset['_id']),
            name=mset['name'],
            difficulty=mset['difficulty'],
            year=mset['year'],
            standard=mset['standard']
        )
        await qset.save()


for packet in tqdm(packets, desc="Packets"):
    qpacket = await QuizPacket.objects.get_or_none(id=str(packet['_id']))
    if qpacket is None:
        qset = await QuizSet.objects.get_or_none(id=str(packet['set']['_id']))
        qpacket = QuizPacket(
            id=str(packet['_id']),
            set=qset,
            name=packet['name'],
            number=packet['number']
        )
        await qpacket.save()

Sets: 100%|██████████| 605/605 [00:02<00:00, 202.35it/s]
Packets: 100%|██████████| 7752/7752 [00:55<00:00, 138.73it/s]


In [50]:
# I am broke!
char_ct = 0

include_tossups = [tossup for tossup in tossups if (
    tossup['difficulty'] <= 5 and
    tossup['set']['year'] >= 2022 and
    tossup['set']['standard'] == True
)]

for tossup in include_tossups:
    ssml = words_to_ssml(tossup['question_sanitized'].split())
    char_ct += len(ssml)

print(f"With {char_ct:,} characters, estimated API cost is ${char_ct/1e6*16:.2f} for {len(include_tossups):,} tossups.")

With 18,658,161 characters, estimated API cost is $298.53 for 7,753 tossups.


In [51]:
BATCH_SIZE = 50  # can be adjusted based on performance
RATE_LIMIT = 900/60  # requests per second limit

START_AT = 0
END_AT = len(include_tossups)

batches = [ include_tossups[i:i+BATCH_SIZE] for i in range(START_AT, END_AT, BATCH_SIZE) ]

for batch in tqdm(batches):
    await process_batch(batch)
    await asyncio.sleep(len(batch) / RATE_LIMIT)

100%|██████████| 156/156 [20:36<00:00,  7.93s/it]
