# Data preprocessing

In [None]:
!wget -c https://drive.google.com/file/d/1TDPRZgJ6ysEcy26ZqyY0ds_8cFAOBbCZ/view?usp=sharing
!unzip /content/audio.zip #your audios here

In [None]:
import pandas as pd
import os
import re
from tqdm import tqdm

In [None]:
df = pd.read_csv('/content/asr_data.csv') #your dataset here

In [None]:
df["fpath"] = './audio_to_release/' + df["lang"].astype(str) + "/" + df["source"].astype(str) #adding paths to file to dataset
all_paths = df['fpath']
all_paths = list(set(all_paths))

In [None]:
df['fpath'] = df['fpath'].apply(lambda x: x.replace(' ', '_')) #removing spaces in dataset's paths

In [None]:
df = df.reset_index() #adding indexes(id)
df

In [None]:
print(all_paths)

In [None]:
for i in range(len(all_paths)): #correcting filenames
  os.rename(all_paths[i], all_paths[i].replace(' ', '_'))
  all_paths[i] = all_paths[i].replace(' ', '_')

In [None]:
for i in tqdm(all_paths): #resampling and recoding to wav
    os.system('ffmpeg -i {} -acodec pcm_s16le -ar 16000 {}.wav'.format(i, i.replace('.mp4','').replace('.WAV', '').replace('.wav', '')))

In [None]:
already_wavs = ['/content/audio_to_release/yrk/Katushka_VorontsovoNENETS_A_48-16.wav', '/content/audio_to_release/yrk/TOYa_audio_TNenets.wav']
for i in already_wavs: #resampling wavs
  !ffmpeg -i {i} -ar 16000 {i}

In [None]:
dir_name = "/content/audio_to_release" #getting rid of mp4 files

def remover(directory):
  for item in os.listdir(directory):
    if item.endswith(".mp4") or item.endswith(".WAV"):
        os.remove(os.path.join(directory, item))

In [None]:
dirs = os.listdir(dir_name) 
for i in range(len(dirs)):
  dirs[i] = "/content/audio_to_release/" + dirs[i]

In [None]:
for item in dirs:
  remover(item)

In [None]:
def replacer(fpath): #new paths in dataframe's column 'fpath', because dragged files to MyDrive folder
  fpath = fpath.replace('.mp4', '.wav').replace('.WAV', '.wav')
  return fpath

In [None]:
df['fpath'] = df['fpath'].apply(replacer)

In [None]:
all_paths_new = list(set(df['fpath']))

In [None]:
print(all_paths_new)

In [None]:
new_dir = '/content/new_audio' #new directory for cut files
if os.path.exists(new_dir) is False:
  os.mkdir(new_dir)
else:
  print('folder already exists')

In [None]:
def cutter(fpath, start, stop, index): #cutting files accroding to timecodes
  !ffmpeg -i {fpath} -ss {str(start)} -to {str(stop)} {'/content/new_audio/' + str(index)}.wav

In [None]:
dictionary = df.set_index('index').T.to_dict('list')
print(dictionary)

In [None]:
for i in tqdm(dictionary):
  cutter(dictionary[i][5], dictionary[i][0], dictionary[i][1], i)
  print(i)

In [None]:
df['new_path'] = df['index'].apply(lambda x: '/content/new_audio/' + str(x) + '.wav') #making column for paths of cut files


# ASR


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install transformers datasets phonemizer
!apt install espeak
!pip install pydub

from transformers import AutoModelForCTC, Wav2Vec2Processor

model = AutoModelForCTC.from_pretrained("facebook/wav2vec2-xlsr-53-espeak-cv-ft")
processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-xlsr-53-espeak-cv-ft")


In [None]:
import torch
import torchaudio
from tqdm import tqdm
def recognizer(fpath):
  try:
    tqdm()
    waveform, sample_rate = torchaudio.load(fpath)
    logits = model(waveform).logits
    pred_ids = torch.argmax(logits, dim=-1)
    pred_str = processor.batch_decode(pred_ids)[0]
    return pred_str
  except:
    return 0

In [None]:
df['recognised'] = df['new_path'].apply(recognizer)

In [None]:
df['transcription'] = df['transcription'].apply(lambda x: x.strip('.«,').replace('=', '').replace(' ', '').replace('Ø', ' '))#clearing punctuation marks and spaces
df['transcription'] = df['transcription'].apply(lambda x: re.sub('\(.+?\)', '', x))
df['recognised'] = df['recognised'].apply(lambda x: str(x).replace(' ',''))

In [None]:
def fill_empty(string): #filling empty strings
  if string == '':
    string = '-'
  return string

In [None]:
df['transcription'] = df['transcription'].apply(fill_empty)

# Evaluation
levinsshtein-distance

In [None]:
!pip install  python-Levenshtein

In [None]:
from Levenshtein import distance

In [None]:
df["distance"] = df[["transcription", "recognised"]].apply(lambda row: distance(row["transcription"], row["recognised"]) if not pd.isna(row["transcription"]) else pd.NA, axis=1)

In [None]:
df["len"] = df.apply(lambda x: len(x["transcription"]) if not pd.isna(x["transcription"]) else pd.NA, axis=1)

In [None]:
df[["transcription", "recognised", "distance", "len"]]

In [None]:
!pip install jiwer

In [None]:
from jiwer import cer

In [None]:
df["cer"] = df[["transcription", "recognised"]].apply(lambda row: cer(row["transcription"], row["recognised"]) if not pd.isna(row["transcription"]) else pd.NA, axis=1)

In [None]:
df[["transcription", "recognised", "distance", "len", 'cer']].to_csv('acr_recognised_wmetrics.csv', sep=',', encoding='utf-8') #results