In [3]:
from freespeech.lib import gdocs


transcript = gdocs.load("https://docs.google.com/document/d/1CXHCVOi4fARqSNH1SdsY4dLsqrxWXfP3Z437cQnmdmo/edit#")

In [4]:
import re
from itertools import zip_longest
from functools import reduce

from dataclasses import dataclass
from tempfile import TemporaryDirectory
from freespeech.types import Character, Event, Language
from freespeech.lib import text, speech, audio, media


@dataclass(frozen=True)
class Interval:
    speech_ms: int
    rate: float
    character: Character
    outline: list[str | int]


def normalize(outline: list[str | int], sentence_pause_ms: int) -> list[str | int]:
    def reducer(x: list[str | int], y: str | int) -> list[int | str]:
        if x and isinstance(x[-1], int) and isinstance(y, int):
            return x[:-1] + [x[-1] + y]
        elif x and isinstance(x[-1], str) and isinstance(y, str):
            return x[:-1] + [sentence_pause_ms] + [y]
        else:
            return x + [y]
    
    return reduce(reducer, outline, [])


def merge(a: Interval, b: Interval, sentence_pause_ms: int) -> Interval:
    speech_ms = a.speech_ms + b.speech_ms
    rate = (a.rate * a.speech_ms + b.rate * b.speech_ms) / speech_ms

    if a.character != b.character:
        raise ValueError(f"character in a and b should be the same")

    return Interval(
        speech_ms=speech_ms,
        rate=rate,
        outline=normalize(
            a.outline + b.outline,
            sentence_pause_ms=sentence_pause_ms
        ),
        character=a.character
    )


def adjust(interval: Interval, target_rate: float, min_pause_ms: int) -> Interval:
    """Brings speech rate and pauses of an interval to match target speech rate.

    Args:
        interval: Speech interval.
        target_rate: Target speech rate.
        min_pause_ms: Minimum duration of a pause in milliseconds.

    Returns:
        New speech interval with speech rate as close to target_rate as possible
        while retaining minimum pauses.
    """
    original_pauses = [unit for unit in interval.outline if isinstance(unit, int)]
    original_silence_ms = sum(original_pauses)
    min_silence_ms = len(original_pauses) * min_pause_ms

    target_scale_factor = interval.rate / target_rate
    speech_ms = round(interval.speech_ms * target_scale_factor)
    silence_ms = max(min_silence_ms, original_silence_ms + interval.speech_ms - speech_ms)
    
    pauses = [min_pause_ms] * len(original_pauses)
    pause_budget = silence_ms - sum(pauses)
    deltas = [original - new if original > new else 0 for original, new in zip(original_pauses, pauses)]
    pause_scale_factor = pause_budget / sum(deltas) if sum(deltas) != 0 else 0

    # Not rounding to avoid overflowing pauses due to rounding error which may result in longer
    # synthesized clips than necessary.
    pauses = (int(pause + delta * pause_scale_factor) for pause, delta in zip(pauses, deltas))

    outline = [unit if isinstance(unit, str) else next(pauses) for unit in interval.outline]

    speech_ms = interval.speech_ms + (original_silence_ms - silence_ms)
    scale_factor = interval.speech_ms / speech_ms
    rate = interval.rate * scale_factor

    return Interval(
        speech_ms=speech_ms,
        rate=rate,
        character=interval.character,
        outline=outline,
    )


def get_outline(s: str, sentence_pause_ms: int, lang: Language) -> list[str | int]:
    split = re.split(r"#(\d+(\.\d+)?)#", s)
    outline = [
        [
            # Interlace each sentence with pauses, doping the first pause.
            *sum([
                [sentence_pause_ms, sentence] for
                sentence in text.sentences(s.strip(), lang=lang)], 
            [])[1:],
            round((float(pause) if pause else 0.0) * 1000)
        ] for s, pause in zip_longest(split[0::3], split[1::3], fillvalue="")
        if s or pause
    ]

    return normalize([item for item in sum(outline, []) if item], sentence_pause_ms=sentence_pause_ms)


async def get_interval(event: Event, sentence_pause_ms: int, lang: Language) -> Interval:
    paragraph = " ".join(event.chunks)
    outline = get_outline(paragraph, sentence_pause_ms=sentence_pause_ms, lang=lang)
    with TemporaryDirectory() as tmp_dir:
        clips = [
            audio.strip((await speech.synthesize_text(
                text=chunk,
                duration_ms=None,
                voice=event.voice,
                lang=lang,
                output_dir=tmp_dir
            ))[0]) if isinstance(chunk, str) else chunk
            for chunk in outline
        ]
        speech_ms = sum(media.audio_duration(clip) for clip in clips if not isinstance(clip, int))
        silence_ms = sum(clip for clip in clips if isinstance(clip, int))
        rate = event.voice.speech_rate
        character = event.voice.character

    scale_factor = event.duration_ms / (speech_ms + silence_ms)
    outline = [int(item * scale_factor) if isinstance(item, int) else item for item in outline]
    silence_ms = sum(item for item in outline if isinstance(item, int))

    interval = Interval(
        speech_ms=event.duration_ms - silence_ms,
        rate=rate / scale_factor,
        character=character,
        outline=outline
    )

    return adjust(interval, target_rate=event.voice.speech_rate, min_pause_ms=sentence_pause_ms)

intervals = [
    Interval(speech_ms=1000, rate=1.0, character="Alan", outline=["Hello.", 0, "World!", 100, "How are you?", 100])
]

[adjust(interval, target_rate=0.5, min_pause_ms=50) for interval in intervals]


[Interval(speech_ms=1050, rate=0.9523809523809523, character='Alan', outline=['Hello.', 50, 'World!', 50, 'How are you?', 50])]

In [5]:
intervals = [await get_interval(event, sentence_pause_ms=50, lang=transcript.lang) for event in transcript.events]


(python:62260): GLib-CRITICAL **: 15:30:56.425: Failed to set scheduler settings: Operation not permitted

(python:62260): GLib-CRITICAL **: 15:30:57.023: Failed to set scheduler settings: Operation not permitted

(python:62260): GLib-CRITICAL **: 15:30:57.630: Failed to set scheduler settings: Operation not permitted

(python:62260): GLib-CRITICAL **: 15:30:58.190: Failed to set scheduler settings: Operation not permitted

(python:62260): GLib-CRITICAL **: 15:30:59.239: Failed to set scheduler settings: Operation not permitted

(python:62260): GLib-CRITICAL **: 15:30:59.973: Failed to set scheduler settings: Operation not permitted


In [12]:
def average_rate(intervals: list[Interval]) -> float:
    total_speech_ms = sum(interval.speech_ms for interval in intervals)
    return sum(interval.rate * interval.speech_ms for interval in intervals) / total_speech_ms


def patch(intervals: list[Interval], index: int, target_rate: float, min_pause_ms: int) -> list[Interval]:
    """Patch the interval sequence at the given index to achieve target speech rate.
    
    Args:
        intervals: Input sequence of intervals.
        index: index to patch around.
        target_rate: target speech rate.
        min_pause_ms: minimum pause between the sentences.

    Returns:
        Initial sequence of intervals with the interval at index i merge with
        previous or next one to reduce the speech rate.
    """
    if len(intervals) <= 1:
        return intervals

    def merge_and_adjust(a: Interval, b: Interval) -> Interval:
        return adjust(
            merge(a, b),
            target_rate=target_rate,
            min_pause_ms=min_pause_ms
        )

    if index == 0:
        return [merge_and_adjust(intervals[index], intervals[index + 1])] + intervals[2:]

    if index == len(intervals) - 1:
        return intervals[:-2] + [merge_and_adjust(intervals[index - 1], intervals[index])]

    _next = merge_and_adjust(intervals[index], intervals[index + 1])
    _prev = merge_and_adjust(intervals[index - 1], intervals[index])

    if _prev.rate < _next.rate:
        return intervals[:index - 1] + [_prev] + intervals[index + 1:]
    else:
        return intervals[:index] + [_next] + intervals[index + 2:]


sample = intervals[:3]
average = average_rate(sample)

average, sample, patch(
    intervals=intervals[:3],
    index=0,
    target_rate=average,
    min_pause_ms=50,
)

(1.167945003662336,
 [Interval(speech_ms=2310, rate=1.3578106852497098, character='Alan', outline=['Привет всем!', 50, 'Как дела?', 50, 'Добро пожаловать на наш канал.', 50]),
  Interval(speech_ms=1220, rate=1.1795081967213115, character='Alan', outline=['Меня зовут Скотт Мосс.']),
  Interval(speech_ms=2694, rate=0.9999060949768186, character='Alan', outline=['В этом видео мы делаем кое-что слегка иначе.', 326])],
 [Interval(speech_ms=3530, rate=1.296187728874456, character='Alan', outline=['Привет всем!', 50, 'Как дела?', 50, 'Добро пожаловать на наш канал.', 50, 'Меня зовут Скотт Мосс.']),
  Interval(speech_ms=2694, rate=0.9999060949768186, character='Alan', outline=['В этом видео мы делаем кое-что слегка иначе.', 326])])

In [19]:
sample = intervals[:17]

def smoothen(intervals: list[Interval], min_pause_ms: int) -> list[Interval]:
    average = average_rate(intervals)
    intervals = [
        adjust(interval, target_rate=average, min_pause_ms=50) for interval in intervals
    ]


    while True:
        value, index = max((interval.rate, i) for i, interval in enumerate(intervals))
        if (value - average) > average * 0.05:
            intervals = patch(
                intervals=intervals,
                index=index,
                target_rate=average,
                min_pause_ms=50,
            )
        else:
            return intervals


smoothen(intervals=sample, min_pause_ms=50)

[Interval(speech_ms=59102, rate=1.1892388241710872, character='Alan', outline=['Привет всем!', 50, 'Как дела?', 50, 'Добро пожаловать на наш канал.', 50, 'Меня зовут Скотт Мосс.', 'В этом видео мы делаем кое-что слегка иначе.', 50, 'Я хотел немного кое-что поменять.', 'Я тут почувствовал небольшое вдохновение.', 'Пару дней назад у меня в гостях был мой друг, который работает инженером здесь, в Долине.', 'Он думал о том, чтобы открыть компанию и вот все это.', 50, 'Но у него нет никакого опыта, когда дело доходит до привлечения денег, общения с венчурными инвесторами и так далее.', 'Он сказал: "Йо, Скотт, когда ты создавал свою компанию и тебе пришлось обращаться к венчурным инвесторам, просить денег, делать все это и собирать деньги.', 50, 'На что был похож твой опыт?', 'Знаешь, какие были самые дикие истории, хорошие и плохие?', 50, 'Каждый раз, когда кто-то задает мне этот вопрос, я всегда вспоминаю один питч, который я сделал одному инвестору и была просто совсем странная атмосфера.

In [43]:
adjusted

[Interval(speech_ms=2260, rate=1.387373192315994, character='Alan', outline=[50, 'Привет всем!', 50, 'Как дела?', 50, 'Добро пожаловать на наш канал.', 50]),
 Interval(speech_ms=1170, rate=1.2309233571528653, character='Alan', outline=[50, 'Меня зовут Скотт Мосс.']),
 Interval(speech_ms=2300, rate=1.1615260581629716, character='Alan', outline=[52, 'В этом видео мы делаем кое-что слегка иначе.', 666]),
 Interval(speech_ms=1630, rate=1.239702015775635, character='Alan', outline=[50, 'Я хотел немного кое-что поменять.']),
 Interval(speech_ms=1390, rate=1.870728417266187, character='Alan', outline=[50, 'Я тут почувствовал небольшое вдохновение.']),
 Interval(speech_ms=3440, rate=1.4918635470113946, character='Alan', outline=[50, 'Пару дней назад у меня в гостях был мой друг, который работает инженером здесь, в Долине.']),
 Interval(speech_ms=2740, rate=1.161577923712331, character='Alan', outline=[50, 'Он думал о том, чтобы открыть компанию и вот все это.', 50]),
 Interval(speech_ms=4230, 

In [30]:
from itertools import permutations


def all_merges(intervals: list[Interval], n: int) -> list[tuple[bool]]:
    return sum((
        list(set(permutations([True] * i + [False] * (len(intervals) - i - 1))))
        for i in range(n)), [])

In [31]:
def merge_intervals(intervals: list[Interval], flags: tuple[bool]) -> list[Interval]:
    intervals = intervals.copy()
    acc = [intervals.pop(0)]

    for interval, should_merge in zip(intervals, flags):
        if not should_merge:
            acc += [interval]
        else:
            acc += [merge(interval, acc.pop())]

    return acc

In [32]:
solutions = [merge_intervals(intervals, flags) for flags in all_merges(intervals, len(intervals))]

In [33]:
solutions

[[Interval(speech_ms=10, pause_ms=0, speech_rate=1.0),
  Interval(speech_ms=10, pause_ms=0, speech_rate=2.0),
  Interval(speech_ms=20, pause_ms=10, speech_rate=1.0),
  Interval(speech_ms=20, pause_ms=0, speech_rate=2.0)],
 [Interval(speech_ms=20, pause_ms=0, speech_rate=1.5),
  Interval(speech_ms=20, pause_ms=10, speech_rate=1.0),
  Interval(speech_ms=20, pause_ms=0, speech_rate=2.0)],
 [Interval(speech_ms=10, pause_ms=0, speech_rate=1.0),
  Interval(speech_ms=10, pause_ms=0, speech_rate=2.0),
  Interval(speech_ms=40, pause_ms=10, speech_rate=1.5)],
 [Interval(speech_ms=10, pause_ms=0, speech_rate=1.0),
  Interval(speech_ms=30, pause_ms=10, speech_rate=1.3333333333333333),
  Interval(speech_ms=20, pause_ms=0, speech_rate=2.0)],
 [Interval(speech_ms=20, pause_ms=0, speech_rate=1.5),
  Interval(speech_ms=40, pause_ms=10, speech_rate=1.5)],
 [Interval(speech_ms=40, pause_ms=10, speech_rate=1.25),
  Interval(speech_ms=20, pause_ms=0, speech_rate=2.0)],
 [Interval(speech_ms=10, pause_ms=0, 

In [35]:
[[adjust(interval, base_rate=1.3) for interval in intervals] for intervals in solutions]

[[Interval(speech_ms=8, pause_ms=2, speech_rate=1.3),
  Interval(speech_ms=10, pause_ms=0, speech_rate=2.0),
  Interval(speech_ms=15, pause_ms=15, speech_rate=1.3),
  Interval(speech_ms=20, pause_ms=0, speech_rate=2.0)],
 [Interval(speech_ms=20, pause_ms=0, speech_rate=1.5),
  Interval(speech_ms=15, pause_ms=15, speech_rate=1.3),
  Interval(speech_ms=20, pause_ms=0, speech_rate=2.0)],
 [Interval(speech_ms=8, pause_ms=2, speech_rate=1.3),
  Interval(speech_ms=10, pause_ms=0, speech_rate=2.0),
  Interval(speech_ms=46, pause_ms=4, speech_rate=1.3000000000000003)],
 [Interval(speech_ms=8, pause_ms=2, speech_rate=1.3),
  Interval(speech_ms=31, pause_ms=9, speech_rate=1.3),
  Interval(speech_ms=20, pause_ms=0, speech_rate=2.0)],
 [Interval(speech_ms=20, pause_ms=0, speech_rate=1.5),
  Interval(speech_ms=46, pause_ms=4, speech_rate=1.3000000000000003)],
 [Interval(speech_ms=38, pause_ms=12, speech_rate=1.3),
  Interval(speech_ms=20, pause_ms=0, speech_rate=2.0)],
 [Interval(speech_ms=8, pause

In [24]:
merge(
    Interval(10, 0, 1.0),
    Interval(10, 10, 2.0),
)

Interval(speech_ms=20, pause_ms=10, speech_rate=1.5)

In [18]:
adjust_speech_rate(Interval(5, 10, 2.0), 1.0)

Interval(signal_ms=10, silence_ms=5, speech_rate=1.0)