In [None]:
from pprint import pprint, pformat
import json
import os
import sys
import subprocess

from pathlib import Path

# Get my_package directory path from Notebook
parent_dir = str(Path().resolve().parents[1])

# Add to sys.path
if parent_dir not in sys.path:
  sys.path.insert(0, parent_dir)

pprint(sys.path)

testdata_path = os.path.abspath(os.path.join(
  str(sys.path[0]), '..', 'tests', 'data'))
print(
  f'Test data {"exists" if os.path.exists(testdata_path) else "dosent exists!"}: {testdata_path}')


In [None]:
path = Path(os.path.join(testdata_path, 'ssa'))
ts_files = sorted([str(p) for p in path.glob('**/*.json')])
summaries = [
  '这145平米的样板间采用10多种色彩, 创造了和谐、高级的空间！',
  '客餐厅采用无吊顶设计，左右清晰分区',
  '墙面、顶面和门框采用黑色金属线条，营造简约现代感。',
  '餐椅和抱枕的脏橘"色贯"穿客餐厅，避免了视觉上的割裂感:',
  '选择雅琪诺悦动风华系列的“窗帘”，营造出<温馨>的氛围?',
  '主卧和次卧都采用了不同的装饰《元素》, 呈现出不同的风格, 相应地丰富了整个空间',
  "恒洁通过“恒洁选品SHOW”和“设计圈”等工具，与装企合作，提升效率，享受合作共赢的喜悦。让你更轻松、更高效地完成家装。",
  "恒洁T9PRO花洒荣膺“红鼎技术TOP10”大奖，恒洁“技术包”让你的卫浴更加智能、舒适。精确控温技术、长效阻垢技术、水净技术……细节成就卓越。",
]


In [None]:
import re


def remove_punct(text):
  return re.sub(r'[^\w]', '', text)
  # Define regex patterns for Chinese and English punctuation
  symbol_pattern = '\\s+'
  chinese_punct_pattern = '[\u3000-\u303f\uFF00-\uFFEF]'
  english_punct_pattern = '[\u0021-\u002f\u003a-\u0040\u005b-\u0060\u007b-\u007e]'
  cond = '|'.join([
    symbol_pattern,
    chinese_punct_pattern,
    english_punct_pattern,
  ])

  # Remove Chinese and English punctuation using regex
  return re.sub(cond, '', text)


In [2]:
for s in summaries:
  # print(remove_punct(s))
  # print(s)
  print(re.sub(r'[^\w]', '', s))


NameError: name 'summaries' is not defined

In [None]:
def test_ts(ts_files, summaries):
  for f, s in zip(ts_files, summaries):
    ts = json.load(open(f, 'r', encoding='utf-8'))['payload']['subtitles']
    words = ['']
    index = 0
    for t in ts:
      if t['begin_index'] == index:
        words[-1] += t['text']
      else:
        words.append(t['text'])
      index = t['end_index']

    summary = remove_punct(s)
    pointer = 0
    ranges = [(0, 0)]
    for w in words:
      print(f'Working on {w}')
      l = len(w)
      if ranges[-1] is not None:
        if w == summary[pointer:pointer + l]:
          ranges.append((pointer, pointer + l))
          pointer += l
        else:
          print(f'miss: {summary[pointer:]}')
          ranges.append(None)
      else:
        if p := summary[pointer:].find(w):
          print(f'found: {w} at {p+pointer}')
          print(f'from {summary[pointer:]}')
          pointer += p
          ranges.append((pointer, pointer + l))
          pointer += l

    for i, r in enumerate(ranges):
      if r is None:
        start = ranges[i - 1][1] if i > 0 else 0
        end = ranges[i + 1][0] if i < len(ranges) - 1 else len(summary)
        ranges[i] = (start, end)

    print(words)
    print([summary[r[0]:r[1]] for r in ranges[1:] if r is not None])
    print(f'summary: {s}')


test_ts(ts_files, summaries)


In [None]:
ssa_break_ms = 200
ssa_line_width = 12


def tok(timestamps):
  '''
  Tokenize the timestamps into words
  1. combine consecutive indexed characters into words
  2. unless the timestamp of two characters are not consecutive
  '''

  # assemble words based on `begin_index` and `end_index`
  words = [{'text': '', 'begin': 0, 'end': 0}]
  index = 0
  for t in timestamps:
    # assuming the `begin_time`` of the first item is 0
    if t['begin_index'] == index and t['begin_time'] - words[-1]['end'] < 50:
      words[-1]['text'] += t['text']
      words[-1]['end'] = t['end_time']
    else:
      words.append({
        'text': t['text'],
        'begin': t['begin_time'],
        'end': t['end_time']
      })
    index = t['end_index']

  return words


def gen_subtitle(words):
  lines = []

  def new_line(begin):
    lines.append({
      'texts': [],
      'start': begin,
      'end': 0,
    })
    return lines[-1]

  def wrap(texts):
    total_len = sum([len(t) for t in texts])
    n = round(total_len / ssa_line_width)
    if n < 2:
      return ''.join(texts)

    avg_line_width = round(total_len / n)
    length = 0
    text = ''
    for t in texts:
      if length > avg_line_width:
        text += '\\N'
        length = 0
      text += t
      length += len(t)
    return text

  new_line(0)
  for word in words:
    if word['begin'] - lines[-1]['end'] > ssa_break_ms:
      line = new_line(word['begin'])
    elif word['begin'] - lines[-1]['end'] > 100 and sum([len(t) for t in line['texts']]) > ssa_line_width:
      line = new_line(word['begin'])
    else:
      line = lines[-1]
    line['end'] = word['end']
    line['texts'].append(word['text'])

  for line in lines:
    print(wrap(line['texts']))


def mapping(words, sentence):
  '''
  tokenize the sentence by aligning the summary sentence with given words
  '''

  print(f'sentence: {sentence}')
  print(f'words: {pformat(words)}')

  # matching all words from the beginning of the sentence
  # skip the words that cannot be matched, fill the gap later
  pointer = 0
  ranges = [{'range': (0, 0), 'begin': 0, 'end': 0}]
  for w in words:
    l = len(w['text'])
    if ranges[-1]['range'] is not None:
      # sentence and words are currently matched before dealing with word w
      if w['text'] == sentence[pointer:pointer + l]:
        # w and sentence are matched at (pointer, pointer + l) of sentence
        ranges.append({
          'range': (pointer, pointer + l),
          'begin': w['begin'],
          'end': w['end'],
        })
        pointer += l
      else:
        # w and sentence are not matched, record the timestamp
        ranges.append({
          'range': None,
          'begin': w['begin'],
          'end': w['end'],
        })
    else:
      # sentence and words are NOT matched before dealing with word w
      if (p := sentence[pointer:].find(w['text'])) > -1:
        # w and sentence are matched, at position `p` from `pointer`
        print(w['text'])
        print(p)
        pointer += p
        ranges.append({
          'range': (pointer, pointer + l),
          'begin': w['begin'],
          'end': w['end'],
        })
        pointer += l
      else:
        # still no match, extend the timestamp
        ranges[-1]['end'] = w['end']

  # drop the placeholder
  ranges = ranges[1:]
  print(f'ranges: {pformat(ranges)}')

  for i, r in enumerate(ranges):
    if r['range'] is None:
      start = ranges[i - 1]['range'][1] if i > 0 else 0
      end = ranges[i + 1]['range'][0] if i < len(ranges) - 1 else len(sentence)
      ranges[i]['range'] = (start, end)

  tokens = [{
    'text': sentence[r['range'][0]:r['range'][1]],
    'begin': r['begin'],
    'end': r['end'],
  } for r in ranges]

  print(f'tokens: {pformat(tokens)}')

  return tokens


# def match(words, sentence):
#   def lenw(words):
#     return sum([len(w) for w in words])

#   if len(words) == 0 or len(sentence) == 0:
#     return lenw(words) + len(sentence)


In [None]:
import json
from speechsynthesis.alitts import create_token, websocket_tts, tts_with_subtitle

sentences = [
  "闪耀中装协住宅产业年会，恒洁获2022红鼎奖最高奖",
  '恒洁的花洒和智能一体机都搭载了包括精准控温、长效阻垢、水净、镀层等在内的恒洁“技术包”，让您享受更加智能的卫浴体验。',
  'R11智能一体机、T9PRO花洒......恒洁卫浴荣获红鼎奖和红鼎技术TOP10大奖，展示恒洁作为智能卫浴专家的地位。',
  '恒洁通过“恒洁选品SHOW”和“设计圈”等工具，与装企合作，提升效率，享受合作共赢的喜悦。让你更轻松、更高效地完成家装。',
  '恒洁T9PRO花洒荣膺“红鼎技术TOP10”大奖，恒洁“技术包”让你的卫浴更加智能､舒适。精确控温技术､长效阻垢技术､水净技术……细节成就卓越。',
  '恒洁卫浴的创新能力备受肯定。恒洁R11智能一体机斩获红鼎奖最高荣誉，而T9PRO花洒则被评为红鼎技术TOP10。这些荣誉背后，是我们持续追求卓越的信念！',
  "恒洁致力于提供符合品质生活需求的智能卫浴产品和服务，与装企伙伴一同打造品质家装。",
  "恒洁将国潮红利转化为品牌价值，从消费者视角定义恒洁品质。选恒洁，选品质，就是选生活。",
  "恒洁以超低水压的Q9X、智感设计的R11等智能卫浴产品为支撑，满足品质生活需求，打造个性化颜值的全卫空间解决方案。",
  "恒洁拥有完善的综合服务体系和强大的服务保障能力，三千多个网点遍布全国四百多个城市，让全卫焕新仅需3小时，让您享受快速暖心的服务体验。",
  "恒洁T9PRO花洒荣膺“红鼎技术TOP10”大奖，凭借包括长效阻垢技术在内的恒洁“技术包”，满足消费者对卫浴产品的高品质需求。",
  "恒洁R11智能一体机凭借完美的美学特征和高端智能技术斩获最高奖红鼎奖，为住宅产业注入更多创新和设计思维，让品质人居变得更加美好。"
]

token = create_token()[0]
metadata = []

for i, s in enumerate(sentences):
  _, m = await websocket_tts(
    text=s,
    voice='kenny',
    wav_file=os.path.join('/Volumes/RamDisk/temp', f'{i}.wav'),
    token=token
  )
  metadata.append(m)


In [None]:
from itertools import groupby
import re
from difflib import SequenceMatcher

from dataclasses import dataclass

ssa_line_width = 12


@dataclass
class Token:
  text: str = ''
  begin: int = -1
  end: int = -1
  omitted: bool = False
  tts_sent_break: bool = False
  tts_tok_idx: int = -1
  nlp_sent: bool = False
  nlp_tok_idx: int = -1
  punct = None


def is_left_punct(str):
  punc = r'^\s*[“‘（［｛〔《「『【〖〘〚〝﹙﹛﹝（｛［‘“].*'  # -—_…
  return re.match(punc, str) is not None


def is_sent_sep(str):
  conds = [
    r'.*\\n\s*$',
    r'.*\\N\s*$',
  ]
  return re.match('|'.join(conds), str) is not None


def split_sent_sep(str):
  sent_sep = ['。', '！', '？', '\\\\n', '\\\\N']
  pattern = \
      r'(' + '|'.join(sent_sep) + r')\s*$' + r'|' + \
      r'^\s*(' + '|'.join(sent_sep) + r')'
  return [x for x in re.split(pattern, str) if x]

# def split_sent_sep(str):
#   return [x for x in re.split(r'(\\n\s*$|\\N\s*$)', str) if x != '']


def sent_sep(text):
  # punc = "！？｡。＂＃＄％＆＇（）＊＋，－／：；＜＝＞＠［＼］＾＿｀｛｜｝～｟｠｢｣､、〃》「」『』【】〔〕〖〗〘〙〚〛〜〝〞〟〰〾〿–—‘’‛“”„‟…‧﹏."
  # punc = punc.decode("utf-8")

  hard_sep = [
    r'([。！？\?](?=$|[^”’]))',
    r'(\.{6}(?=[^”’]))|(\.{3}(?=[^”’]))',
    r'(\…{1,2}(?=[^”’]))',
    r'([。！？\?][”’][^，。！？\?])',
  ]

  soft_sep = [
    r'([,，；](?=$|[^”’]))',
    r'([。！？\?][”’][^，。！？\?])',
  ]

  text = re.sub('|'.join(hard_sep), r'\\N', text)
  text = re.sub('|'.join(soft_sep), r'\\n', text)
  return text
  # results = re.finditer('|'.join(sent_sep), s)
  # for ret in results:
  #   print(f"Punct '{ret.group()}' at index ({ret.start()}, {ret.end()})")


def tokenize(ts, sentence):
  a = ''.join([t['text'] for t in ts])
  print(a)
  print(sentence)

  s = SequenceMatcher(None, a, sentence)

  ts_idx = 0
  char_idx = 0
  tok_idx = 0
  tokens = []
  for tag, i1, i2, j1, j2 in s.get_opcodes():
    print('{:7}   a[{}:{}] --> b[{}:{}] {!r:>8} --> {!r}'.format(
      tag, i1, i2, j1, j2, a[i1:i2], sentence[j1:j2]))

    # 'delete' tag means TTS engine introduces new token that not belong to the origin sentence
    #  we can not drop them, so treat the newly created as 'equal'
    if tag == 'equal' or tag == 'delete':
      while char_idx < i2:
        t = ts[ts_idx]
        token = tokens.append(Token(
          text=t['text'],
          begin=t['begin_time'],
          end=t['end_time'],
          tts_tok_idx=tok_idx,
        ))
        char_idx += len(t['text'])
        ts_idx += 1

        if ts_idx < len(ts) and t['end_index'] != ts[ts_idx]['begin_index']:
          tok_idx += 1
    elif tag == 'replace':
      # merge as one token
      # if the last tag was `equal`, there might be an overflow since t['text'] might be word
      # e.g. if the tts gives `top十佳` and the origin sentence is `toop10佳`
      # the match result will be { equal: 'to', replace: ['p十','op10'], equal: '佳' }
      # but in this case, from the tss side, `top` is a word, and we should honor that.
      # there for:
      # 1. when dealing with the first `equal` part, `top` will be taken as a token, though 'p'
      #    is not part of the equal tag, only 'to' is;
      # 2. when dealing with `replace`, the `p` of 'p十' will be skip since it has been taken by
      #    token `top` in the first step, `op10`(from the origin sentence) will be created as a
      #    token, which will take the `begin_time` and `end_time` of `十` (`p十` without `p`)

      # HACK! split the sentence break into separate tokens if any
      texts = split_sent_sep(sentence[j1:j2])
      for text in texts:
        token = Token(text)
        if is_sent_sep(text):
          tokens.append(token)
        else:
          while char_idx < i2:
            t = ts[ts_idx]
            if token.begin == -1:
              token.begin = t['begin_time']
            token.end = t['end_time']
            char_idx += len(t['text'])
            ts_idx += 1
          if token.text != '':
            tokens.append(token)
    elif tag == 'insert':
      tokens.append(Token(
        text=sentence[j1:j2],
        omitted=True,
      ))
      # for j in range(j1, j2):
      #   tokens.append(Token(
      #     text=sentence[j],
      #     omitted=True,
      #   ))
    else:
      raise RuntimeError(f'Unknown tag: {tag}')

  return tokens


def wrap(texts):
  total_len = sum([len(t) for t in texts])
  n = round(total_len / ssa_line_width)
  if n < 2:
    return ''.join(texts)

  avg_line_width = round(total_len / n)
  length = 0
  text = ''
  for t in texts:
    if length > avg_line_width:
      text += '\\N'
      length = 0
    text += t
    length += len(t)
  return text


def wording(metadata, sentence):
  tokens = tokenize(metadata['payload']['subtitles'], sent_sep(sentence))
  # print(pformat([t for t in tokens]))

  candidates = [(t, i) for i, t in enumerate(tokens) if t.begin == -1]
  for item, idx in candidates:
    if is_left_punct(item.text) and idx < len(tokens) - 1:
      tokens[idx + 1].text = item.text + tokens[idx + 1].text
    elif is_sent_sep(item.text):
      item.tts_sent_break = True
    elif idx > 0:
      tokens[idx - 1].text = tokens[idx - 1].text + item.text
  tokens = [t for t in tokens if t.begin != -1 or t.tts_sent_break]
  # print(len(tokens))
  # print([t.text for t in tokens])

  # groups =[list(g) for _, g in groupby(tokens, lambda x: f'{len(x.text)}')]
  stens = [list(g) for _, g in groupby(tokens, lambda x: x.tts_sent_break)]
  stens = [s for s in stens if not s[0].tts_sent_break]
  for sten in stens:
    groups = [list(g) for _, g in groupby(sten, lambda x: x.tts_tok_idx)]
    words = [Token(
      text=''.join([i.text for i in g]),
      begin=g[0].begin,
      end=g[-1].end,
    ) for g in groups]
    print(
      f'begin: {words[0].begin}, end: {words[-1].end}, text: {wrap([w.text for w in words])}')

# idx = 3
# wording(metadata[idx], sentences[idx])


In [None]:
data = '/Users/zhengt/Codes/VideoClipGen/data/20230411-140813/'

ts_path = Path(os.path.join(data, 'wav'))
ts_files = sorted([str(p) for p in ts_path.glob('**/*.json')])

summaries = json.load(open(os.path.join(data, 'summaries.json')))
sentences = summaries['summaries']

for idx, sentence in enumerate(sentences):
  ts = json.load(open(ts_files[idx], 'r'))
  wording(ts, sentence)


In [None]:
# From https://github.com/hankcs/HanLP/blob/master/hanlp/utils/rules.py

import re

SEPARATOR = r'@'
RE_SENTENCE = re.compile(r'(\S.+?[.!?])(?=\s+|$)|(\S.+?)(?=[\n]|$)', re.UNICODE)
AB_SENIOR = re.compile(r'([A-Z][a-z]{1,2}\.)\s(\w)', re.UNICODE)
AB_ACRONYM = re.compile(r'(\.[a-zA-Z]\.)\s(\w)', re.UNICODE)
UNDO_AB_SENIOR = re.compile(
  r'([A-Z][a-z]{1,2}\.)' + SEPARATOR + r'(\w)', re.UNICODE)
UNDO_AB_ACRONYM = re.compile(
  r'(\.[a-zA-Z]\.)' + SEPARATOR + r'(\w)', re.UNICODE)


def replace_with_separator(text, separator, regexs):
  replacement = r"\1" + separator + r"\2"
  result = text
  for regex in regexs:
    result = regex.sub(replacement, result)
  return result


def split_sent_sep(text, best=True):
  text = re.sub('([。！？\?])([^”’])', r"\1\n\2", text)
  text = re.sub('(\.{6})([^”’])|(\.{3})([^”’])', r"\1\n\2", text)
  text = re.sub('(\…{1,2})([^”’])', r"\1\n\2", text)
  text = re.sub('([。！？\?][”’])([^，。！？\?])', r'\1\n\2', text)
  for chunk in text.split("\n"):
    chunk = chunk.strip()
    if not chunk:
      continue
    if not best:
      yield chunk
      continue
    processed = replace_with_separator(
      chunk, SEPARATOR, [AB_SENIOR, AB_ACRONYM])
    for sentence in RE_SENTENCE.finditer(processed):
      sentence = replace_with_separator(
        sentence.group(), r" ", [UNDO_AB_SENIOR, UNDO_AB_ACRONYM])
      yield sentence


def find_punct(s):
  sent_sep = [
    '([。！？\?](?=$|[^”’]))',
    '(\.{6}(?=[^”’]))|(\.{3}(?=[^”’]))',
    '(\…{1,2}(?=[^”’]))',
    '([。！？\?][”’][^，。！？\?])',
  ]

  results = re.finditer('|'.join(sent_sep), s)
  for ret in results:
    print(f"Punct '{ret.group()}' at index ({ret.start()}, {ret.end()})")


In [None]:
for s in sentences:
  print(s)
  find_punct(s)

for s in sentences:
  print([a for a in split_sent_sep(s, best=False)])


In [None]:
import re

text = 'Prof. White'
print(re.split('([A-Z][a-z]{1,3}\.\s)', 'Prof. White'))

# punctuations = re.finditer(r"\.{3,6}", text)
# r"\.{3,6}"
# r"\.{6}|\.{3}"

# punctuations = re.finditer('([A-Z][a-z]{1,3}\.\s)|\.{3,6}', text)
# for p in punctuations:
#   print(f"Punctuation '{p.group()}' at index {p.start()}, {p.end()}")


In [None]:

sent_sep = ['。', '！', '？', '\\\\n', '\\\\N']


def is_sent_sep(str):
  pattern = r'.*(' + '|'.join(sent_sep) + r')\s*$'
  print(pattern)
  return re.match(pattern, str) is not None


def is_sent_sep(str):
  return re.match(r'.*(。|！|？|\\n|\\N)\s*$', str) is not None


print(is_sent_sep(' \\n '))
print(is_sent_sep('aa\\n'))
print(is_sent_sep('\\naa'))
print(is_sent_sep('  \\N '))
print(is_sent_sep('aa\\N '))
print(is_sent_sep('\\Naa'))
print(is_left_punct(' “ '))
print(is_left_punct('“aa'))
print(is_left_punct('aa“'))


In [None]:
def split_sent_sep(str):
  return [x for x in re.split(r'(' + r'[。！？\\n\\N]' + '\s*$)', str) if x != '']


split_sent_sep('aa\\N')
split_sent_sep('aa')


In [None]:
def remove_punct(text):
  '''
  Remove Chinese and English punctuation from text, as well as all spaces.
  '''
  # Define regex patterns for Chinese and English punctuation
  # symbol_pattern = '\\s+'
  # chs_punct_pattern = '[\u3000-\u303f\uFF00-\uFFEF]'
  # eng_punct_pattern = '[\u0021-\u002f\u003a-\u0040\u005b-\u0060\u007b-\u007e]'
  # Remove Chinese and English punctuation using regex
  # return re.sub('|'.join([
  #   symbol_pattern,
  #   chs_punct_pattern,
  #   eng_punct_pattern,
  # ]), '', text)
  return re.sub(r'[^\w]', '', text)


def tok(timestamps):
  '''
  Tokenize the timestamps into words
  1. combine consecutive indexed characters into words
  2. unless the timestamp of two characters are not consecutive
  '''

  # assemble words based on `begin_index` and `end_index`
  words = [{'text': '', 'begin': 0, 'end': 0}]
  index = 0
  for t in timestamps:
    # assuming the `begin_time` of the first item is 0
    if t['begin_index'] == index \
            and t['begin_time'] - words[-1]['end'] < word_break_ms:
      words[-1]['text'] += t['text']
      words[-1]['end'] = t['end_time']
    else:
      words.append({
        'text': t['text'],
        'begin': t['begin_time'],
        'end': t['end_time']
      })
    index = t['end_index']

  return words


def mapping(words, sentence):
  '''
  tokenize the sentence by aligning the summary sentence with given words
  '''

  logging.info(f'sentence: {sentence}')
  logging.info(f'words: {pformat(words)}')

  # matching all words from the beginning of the sentence
  # skip the words that cannot be matched, fill the gap later
  pointer = 0
  ranges = [{'range': (0, 0), 'begin': 0, 'end': 0}]
  for w in words:
    l = len(w['text'])
    if ranges[-1]['range'] is not None:
      # sentence and words are currently matched before dealing with word w
      if w['text'] == sentence[pointer:pointer + l]:
        # w and sentence are matched at (pointer, pointer + l) of sentence
        ranges.append({
          'range': (pointer, pointer + l),
          'begin': w['begin'],
          'end': w['end'],
        })
        pointer += l
      else:
        # w and sentence are not matched, record the timestamp
        ranges.append({
          'range': None,
          'begin': w['begin'],
          'end': w['end'],
        })
    else:
      # sentence and words are NOT matched before dealing with word w
      if (p := sentence[pointer:].find(w['text'])) > -1:
        # w and sentence are matched, at position `p` from `pointer`
        pointer += p
        ranges.append({
          'range': (pointer, pointer + l),
          'begin': w['begin'],
          'end': w['end'],
        })
        pointer += l
      else:
        # still no match, extend the timestamp
        ranges[-1]['end'] = w['end']

  # drop the placeholder
  ranges = ranges[1:]
  logging.info(f'ranges: {pformat(ranges)}')

  for i, r in enumerate(ranges):
    if r['range'] is None:
      start = ranges[i - 1]['range'][1] if i > 0 else 0
      end = ranges[i + 1]['range'][0] if i < len(ranges) - 1 else len(sentence)
      ranges[i]['range'] = (start, end)

  tokens = [{
    'text': sentence[r['range'][0]:r['range'][1]],
    'begin': r['begin'],
    'end': r['end'],
  } for r in ranges]

  logging.info(f'tokens: {pformat(tokens)}')

  return tokens


In [None]:
def split_sent_sep(str):
  sent_sep = ['。', '！', '？', '\\\\n', '\\\\N']
  pattern = \
      r'(' + '|'.join(sent_sep) + r')\s*$' + r'|' + \
      r'^\s*(' + '|'.join(sent_sep) + r')'
  return [x for x in re.split(pattern, str) if x]


print(split_sent_sep('aa\\N'))
print(split_sent_sep('\\N2017'))


In [None]:
def replace_sep_in_quote(text):
  hard_sep = [
    r'([｡。！？\?](?=$|[^”’]))',
    r'([。！？?][”’](?=[^，。！？?]))',
  ]

  soft_sep = [
    r'([,，；](?=$|[^”’]))',
  ]

  text = re.sub('|'.join(hard_sep), r'\\N', text)
  text = re.sub('|'.join(soft_sep), r'\\n', text)

  return text


print(replace_sep_in_quote('a？b'))
print(replace_sep_in_quote('a？”b'))
print(replace_sep_in_quote('a？”.b'))


In [None]:
sent_sep = [r'。', r'！', r'？', r'\\n', r'\\N']


def has_sent_sep(str):
  pattern = r'.*[' + '|'.join(sent_sep) + r']\s*$'
  return re.match(pattern, str) is not None


print(has_sent_sep('。a'))


In [None]:
from speechsynthesis.subtitle import (
  tokenize,
  wording,
  subtitle,
)


def sub():
  import json
  data = '/Users/zhengt/Codes/VideoClipGen/data/20230411-134744/'

  ts_path = Path(os.path.join(data, 'wav'))
  ts_files = sorted([str(p) for p in ts_path.glob('**/*.json')])

  summaries = json.load(open(os.path.join(data, 'summaries.json')))
  sentences = summaries['summaries']

  for idx, sentence in enumerate(sentences):
    ts = json.load(open(ts_files[idx], 'r'))
    tokens = tokenize(
      ts=ts['payload']['subtitles'],
      sentence=sentence,
    )
    lines = wording(tokens)
    subtitles = subtitle(lines, (720, 1280))
    print(subtitles)


sub()
