
#### 点击上方菜单栏的“代码执行程序”，在“更改运行时类型”中选择硬件加速器为GPU
#### 避免Colab断开连接：按Ctrl+Shift+I等待弹窗然后点击控制台，输入以下内容并回车：


```
function ConnectButton(){
    console.log("Connect pushed"); 
    document.querySelector("#top-toolbar > colab-connect-button").shadowRoot.querySelector("#connect").click()}
setInterval(ConnectButton,60000);
```



### 01 安装需要的依赖和部署预训练模型

In [None]:
!python -m pip install paddlepaddle-gpu==2.4.1.post112 -f https://www.paddlepaddle.org.cn/whl/linux/mkl/avx/stable.html
!python -m pip install ppasr
!python -m pip install paddlespeech-ctcdecoders
!pip install pypinyin
!sudo apt-get install sox
!git clone https://github.com/AlexandaJerry/PPASR.git
%cd /content/PPASR
!python setup.py install

In [None]:
!wget --load-cookies /tmp/cookies.txt "https://docs.google.com/uc?export=download&confirm=$(wget --quiet --save-cookies /tmp/cookies.txt --keep-session-cookies --no-check-certificate 'https://docs.google.com/uc?export=download&id=1e7t3lOPj0PAgQHWVot0dRw8_NRoP27IN' -O- | sed -rn 's/.*confirm=([0-9A-Za-z_]+).*/\1\n/p')&id=1e7t3lOPj0PAgQHWVot0dRw8_NRoP27IN" -O "PPASR.zip" && rm -rf /tmp/cookies.txt
!unzip /content/PPASR/PPASR.zip -d /content
!cp -RT /content/PPASR_V2-conformer_online-fbank-超大数据集/configs/ /content/PPASR/configs/
!cp -RT /content/PPASR_V2-conformer_online-fbank-超大数据集/dataset/ /content/PPASR/dataset/
!cp -RT /content/PPASR_V2-conformer_online-fbank-超大数据集/models/ /content/PPASR/models/
!wget --load-cookies /tmp/cookies.txt "https://docs.google.com/uc?export=download&confirm=$(wget --quiet --save-cookies /tmp/cookies.txt --keep-session-cookies --no-check-certificate 'https://docs.google.com/uc?export=download&id=1--Sy7fcquKBR8PNIVW-quDlSD1fCNcGk' -O- | sed -rn 's/.*confirm=([0-9A-Za-z_]+).*/\1\n/p')&id=1--Sy7fcquKBR8PNIVW-quDlSD1fCNcGk" -O "zh_giga.no_cna_cmn.prune01244.klm" && rm -rf /tmp/cookies.txt
!cp -r /content/PPASR/zh_giga.no_cna_cmn.prune01244.klm /content/PPASR/lm/
!python export_model.py --resume_model=models/conformer_online_fbank/best_model/



---



---



### 02 导入谷歌云盘中的音频压缩文件
#### 谷歌云盘内音频压缩包的文件名填入wav_file

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
wav_file = 'samples.zip'#@param {type:"string"}
wav_path = "/content/drive/MyDrive/" + wav_file
!unzip -j $wav_path "*/*.wav" -d /content/PPASR/raw_wav

#### 降低音频采样率到16000赫兹 如果不想降低采样率可跳过不运行此框
#### 采样率高低主要影响后续音频切片和识别的速度 时间充裕可不降采样率

In [None]:
%%bash
###降低音频采样率到16000赫兹
###如果不想降低采样率可不运行此框
cd /content/PPASR/raw_wav

for x in ./*.wav
do 
  b=${x##*/}
  sox $b -r 16000 tmp_$b
  rm -rf $b
  mv tmp_$b $b
  echo "成功降低$b的采样率到16000Hz"
done



---



---



### 03 长音频自动切片为短音频
#### db_threshold：低于多少分贝视为静音（单位：分贝）
#### min_length: 每个短音频的最低切片时长（单位：毫秒）
#### win_l_ win_s_: 多长的静音才会被切分（单位：毫秒）
#### max_silence_kept：切分处保留的左右时长（单位：毫秒）

In [None]:
import glob
import os
import shutil
import wave

import librosa
import soundfile
import tqdm

def length(src: str):
    if os.path.isfile(src) and src.endswith('.wav'):
        with wave.open(src, 'r') as w:
            return w.getnframes() / w.getframerate() / 3600
    elif os.path.isdir(src):
        total = 0
        for ch in [os.path.join(src, c) for c in os.listdir(src)]:
            total += length(ch)
        return total
    return 0


print('Environment initialized successfully.')

# Configuration for data paths
raw_path = '/content/PPASR/raw_wav'  # Path to your raw, unsliced recordings

########################################

assert os.path.exists(raw_path) and os.path.isdir(raw_path), 'The chosen path does not exist or is not a directory.'
print('Raw recording path:', raw_path)
print()
print('===== Recording List =====')
raw_filelist = glob.glob(f'{raw_path}/*.wav', recursive=True)
raw_length = length(raw_path)
if len(raw_filelist) > 5:
    print('\n'.join(raw_filelist[:5] + [f'... ({len(raw_filelist) - 5} more)']))
else:
    print('\n'.join(raw_filelist))
print()
print(f'Found {len(raw_filelist)} valid recordings with total length of {round(raw_length, 2)} hours.')

sliced_path = '/content/PPASR/sliced_wav'  # Path to hold the sliced segments of your recordings

# Slicer arguments
db_threshold_ = -40. #@param {type:"number"}
min_length_ = 8000 #@param {type:"number"}
win_l_ = 800 #@param {type:"number"}
win_s_ = 20 #@param {type:"number"}
max_silence_kept_ = 500 #@param {type:"number"}

# Number of threads (based on your CPU kernels)
num_workers = 5

########################################

assert 'raw_path' in locals().keys(), 'Raw path of your recordings has not been specified.'
assert not os.path.exists(sliced_path) or os.path.isdir(sliced_path), 'The chosen path is not a directory.'
os.makedirs(sliced_path, exist_ok=True)
print('Sliced recording path:', sliced_path)

from slicer import Slicer
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED

def slice_one(in_audio):
    audio, sr = librosa.load(in_audio, sr=None)
    slicer = Slicer(
        sr=sr,
        db_threshold=db_threshold_,
        min_length=min_length_,
        win_l=win_l_,
        win_s=win_s_,
        max_silence_kept=max_silence_kept_
    )
    chunks = slicer.slice(audio)
    for i, chunk in enumerate(chunks):
        soundfile.write(os.path.join(sliced_path, f'%s_slice_%04d.wav' % (os.path.basename(in_audio).rsplit('.', maxsplit=1)[0], i)), chunk, sr)


print('Slicing your recordings may take several minutes. Please wait.')
thread_pool = ThreadPoolExecutor(max_workers=num_workers)
tasks = []
for file in raw_filelist:
    tasks.append(thread_pool.submit(slice_one, file))
wait(tasks, return_when=ALL_COMPLETED)
print()
print('===== Segment List =====')
sliced_filelist = glob.glob(f'{sliced_path}/*.wav', recursive=True)
sliced_length = length(sliced_path)
if len(sliced_filelist) > 5:
    print('\n'.join(sliced_filelist[:5] + [f'... ({len(sliced_filelist) - 5} more)']))
else:
    print('\n'.join(sliced_filelist))
print()
print(f'Sliced your recordings into {len(sliced_filelist)} segments with total length of {round(sliced_length, 2)} hours.')



---



---



### 04 切片后的短音频自动转写和转拼音

In [None]:
!python infer_wav.py

In [None]:
import os
import sys
import numpy as np
from pypinyin import pinyin, lazy_pinyin, Style
import re

root_dir = "/content/PPASR/sliced_wav"
pattern = re.compile(r'(.*)\.txt$')
r="[_.!+-=——,$%^,。?、~@#￥%……&*《》<>「」{}【】()（）/''\n ]"

for root, dir, files in os.walk(root_dir):
	for filename in files:
		#print(filename)
		output = pattern.match(filename)
		if output is not None:
			print(root, filename)
			text_file = open(root+"/"+filename)
			line = text_file.read().strip()
			line = line.replace("，", "")
			line = re.sub(r,"",line)
			pinyin =  lazy_pinyin(line, style=Style.TONE3, errors='default', strict=False, v_to_u=False, neutral_tone_with_five=True,tone_sandhi=True)
			pinyinline = ' '.join(pinyin)
			print(line)
			target_text_file = open(root+"/"+output.group(1)+".txt", "w")
			target_text_file.write(pinyinline)
			target_text_file.close()

In [None]:
## 加载MFA普通话词典检测有无超出词典范围的词出现
import tqdm
dict_path = '/content/PPASR/mandarin_pinyin.txt'
with open(dict_path, 'r', encoding='utf8') as f:
    rules = [ln.strip().split('\t') for ln in f.readlines()]
dictionary = {}
phoneme_set = set()
for r in rules:
    phonemes = r[1].split()
    dictionary[r[0]] = phonemes
    phoneme_set.update(phonemes)

# Run checks
check_failed = False
covered = set()
phoneme_map = {}
for ph in sorted(phoneme_set):
    phoneme_map[ph] = 0

segment_pairs = []

for file in tqdm.tqdm(sliced_filelist):
    filename = os.path.basename(file)
    name_without_ext = filename.rsplit('.', maxsplit=1)[0]
    annotation = os.path.join(sliced_path, f'{name_without_ext}.txt')
    if not os.path.exists(annotation):
        print(f'No annotation found for \'{filename}\'!')
        check_failed = True
    with open(annotation, 'r', encoding='utf8') as f:
        syllables = f.read().strip().split()
    if not syllables:
        print(f'Annotation file \'{annotation}\' is empty!')
        check_failed = True
    else:
        oov = []
        for s in syllables:
            if s not in dictionary:
                oov.append(s)
            else:
                for ph in dictionary[s]:
                    phoneme_map[ph] += 1
                covered.update(dictionary[s])
        if oov:
            print(f'Syllable(s) {oov} not allowed in annotation file \'{annotation}\'')
            check_failed = True

### 05 保存短音频和对应转写到谷歌云盘
#### 注意保存文件名不要与已有文件名相同 否则可能提示覆盖谷歌云盘的文件
#### 这里的tar后缀是因为云服务器是linux系统 但是不影响在window系统解压

In [None]:
import os, tarfile
import os
from google.colab import files
save_as = 'mfa_prepare.tar'#@param {type:"string"}
def make_targz_one_by_one(output_filename, source_dir):
  tar = tarfile.open(output_filename,"w")
  for root,dir_name,files_list in os.walk(source_dir):
    for file in files_list:
      pathfile = os.path.join(root, file)
      tar.add(pathfile)
  tar.close()
 
  # files.download(output_filename)

make_targz_one_by_one(save_as, '/content/PPASR/sliced_wav')
savename = "/content/PPASR/" + save_as
!cp -i $savename /content/drive/MyDrive