In [None]:
list(filter(lambda x: 'wav' in x, os.listdir(common_voice_dir)))

In [9]:
# -------------------- Librosa load and SoundFile load ----------------------
import librosa
import soundfile as sf

fname = '/mnt/raid10/home/SPEECH/Diarization/Clustering/test_sounds/test1.wav'
SR = 8000

# 1
sample, _ = librosa.load(fname, sr=SR)
print(sample[10:20])
print()
data, _ = sf.read(fname)
print(data[10:20])

[-6.7138672e-04  1.2207031e-04 -1.2207031e-04  9.1552734e-05
  9.1552734e-05  3.6621094e-04  3.9672852e-04 -1.2207031e-04
  3.9672852e-04 -1.2207031e-04]

[-6.71386719e-04  1.22070312e-04 -1.22070312e-04  9.15527344e-05
  9.15527344e-05  3.66210938e-04  3.96728516e-04 -1.22070312e-04
  3.96728516e-04 -1.22070312e-04]


In [32]:
# ------------------------------ setattr ------------------------------------
# к экземпляру класса можно добавить аттрибут
# аттрибут - поле класса - переменная

class MyClass():
    def __init__(self):
        self.a = 3
        pass
    
test = MyClass()
print('Базовый аттрибут:', test.a)
setattr(test, 'b', 5)
print('Новый аттрибут:', test.b)

Базовый аттрибут: 3
Новый аттрибут: 5


In [28]:
# -------------------- Подбор всех возможных разметок -----------------------
from itertools import permutations
import numpy as np

label = np.array([[1, 0, 0],
                  [0, 0, 1]], dtype=np.int8)

label_perms = [label[..., list(p)] for p
               in permutations(range(label.shape[-1]))]

print('Кол-во вариантов=N_speakers! =', len(label_perms))
for var, label in enumerate(label_perms):
    print('Вариант', var+1)
    print(label)
    print()

Кол-во вариантов=N_speakers! = 6
Вариант 1
[[1 0 0]
 [0 0 1]]

Вариант 2
[[1 0 0]
 [0 1 0]]

Вариант 3
[[0 1 0]
 [0 0 1]]

Вариант 4
[[0 0 1]
 [0 1 0]]

Вариант 5
[[0 1 0]
 [1 0 0]]

Вариант 6
[[0 0 1]
 [1 0 0]]



In [50]:
# --------------------------- Multi Processing -------------------------------
from multiprocessing import Pool
 
def doubler(number):
    return number * 2
 

if __name__ == '__main__':
    numbers = [5, 10, 20]
    pool = Pool(processes=3)      # создаем 3 рабочих процесса
    
    # map - ставит соответствие процесса каждому вызову функции
    res_ls = pool.map(doubler, numbers)
    
    print(res_ls)

[10, 20, 40]


In [32]:
# --------------------------- Multi Processing -------------------------------
import os
from multiprocessing import Process
 
def doubler(number):
    """
    Функция умножитель на два
    """
    result = number * 2
    proc_id = os.getpid()
    print('{0} doubled to {1} by process id: {2}'.format(
        number, result, proc_id))
 
 
if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    procs = []
    
    for index, number in enumerate(numbers):
        # создаем несколько процессоров
        # каждый вызов функции имеет свой процесс
        proc = Process(target=doubler, args=(number,))
        procs.append(proc)
        # запуск
        proc.start()
        
    # если процессы долгие
    # они работают, а мы попадаем
    # в эту строчку
    
    for proc in procs:
        proc.join()    # подождать, пока каждый процесс завершится
        
    # когда эти процессы завершатся
    # мы перейдем сюда
    
    # но мы можем прервать
    # procs[i].terminate()

5 doubled to 10 by process id: 38637
10 doubled to 20 by process id: 38640
15 doubled to 30 by process id: 38647
20 doubled to 40 by process id: 38650
25 doubled to 50 by process id: 38655


In [17]:
# -------------------------------- ZIP --------------------------------------
ls_1 = ['a', 'b', 'c', 'd']
ls_2 = [1, 2, 3]

for k in zip(ls_1, ls_2):
    # tuple of (ls_1[i], ls_2[i])
    print(k)
    
# Если кол-во не совпадает (ls_1>ls_2),
# берется минимальное

('a', 1)
('b', 2)
('c', 3)


In [10]:
# -------------------------- *args, **kwargs ---------------------------------
def func(*args, **kwargs):
    """
    *args - arguments
    tuple of args
    **kwargs - keyword arguments
    
    """
    print(kwargs)
    pass

# мы можем вызывать функцию со списком
# аргументов переменной длины

func(1, 2, 3)
# *args = (1, 2, 3)
# kwargs = {}

func(1, b=2, c=3)
# *args = (1,)
# kwargs = {'b': 2, 'c': 3}

{'b': 2, 'c': 3}


In [12]:
# --------------------------- yield ------------------------------------------
def func(n):
    for i in range(n):
        yield i   # ==> return [0, 1, 2, ..., n-1]
        
ls = func(5)
for elem in ls:
    print(elem)

0
1
2
3
4


In [4]:
# --------------------------- __str__ ----------------------------------------
class MyClass():
    def __init__(self, data):
        self.data = data
    
    def __str__(self):
        str_ = 'str is: {}'.format(self.data)
        return str_
    
test = MyClass(2)
print(str(test))
print('formating str: %s' % (test))

str is: 2
formating str: str is: 2


In [5]:
# ---------------------------- __len__ ----------------------------------------
class MyClass():
    def __init__(self, items):
        self.items = items
    
    def __len__(self):
        return len(self.items)

test = MyClass([1, 2, 3, 4])
len(test) # equal to list method

4

In [14]:
# ----------------------------- __iter__ --------------------------------------
class MyClass():
    def __init__(self, items):
        self.items = items
        
    def __iter__(self):
        return iter(self.items)
    
test = MyClass([1, 2, 3, 4])
iterator = iter(test)

# в отличие от getitem,
# next можно вызывать в любом месте
# а не только в цикле for
# ! только следить за выходом за пределом !
print(next(iterator))
print(next(iterator))        

1
2


In [4]:
# ---------------------------- __getitem__ ------------------------------------

class MyClass():
    def __init__(self, items):
        self.items = items
        pass
    
    def __getitem__(self, index):
        return self.items[index]
    
test = MyClass([1, 2, 3, 4])
for elem in test:
    print(elem)

1
2
3
4


In [10]:
# ------------------------------ DateTime ---------------------------
from datetime import datetime

now = datetime.now()

current_time = now.strftime("%d-%b-%Y %H:%M:%S")
# %Y - year
# %m - month (num)
# %b - month (str)
# %d - day

print(current_time)

09-Sep-2020 15:59:00


In [2]:
# --------------------------------- Time ----------------------------
from datetime import datetime
now = datetime.now()
# testing code
print('time:', datetime.now() - now)

time: 0:00:00.000056


In [None]:
# --------------------------------- Glob ----------------------------
import glob
template = '/mnt/raid10/home/SPEECH/Diarization/Clustering/test_sounds/test*.wav'
file_ls = glob.glob(template)
for file in file_ls:
    print(file)

In [None]:
# --------------------------------- Types ---------------------------
import numpy as np

a = [1, 2, 3]
if type(a) == list:
    print('a is list')
    
b = (1, 2, 3)
if type(b) == tuple:
    print('b is tuple')
    
c = np.array([1,2, 3])
if type(c) == np.ndarray:
    print('c is ndarray')

In [None]:
# переменная функции
def func1():
    print('1')
    
def func2():
    print('2')
    
func = func2
func()

In [None]:
# ---------------------------------- Как подключать ТФ -----------------------------------------
import os
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"   # see issue #152
os.environ["CUDA_VISIBLE_DEVICES"]="0"
os.environ['TF_CPP_MIN_LOG_LEVEL']="2"       # чтоб не выводилась инфа
import warnings
warnings.filterwarnings('ignore')        # IGNORE WARNINGS

try:
   # TF 2.
   import tensorflow.compat.v1 as tf
except ImportError:
   # TF 1.
   import tensorflow as tf

In [None]:
# ------------------------------ load model from .pb ------------------------------------
import tensorflow as tf
from tensorflow.python.platform import gfile

model_filename ='PATH_TO_PB.pb'

with tf.Session() as sess:
   
    # open .pb file
    with gfile.FastGFile(model_filename, 'rb') as f:
        graph_def = tf.GraphDef()              # define serialized graph 
                                               # to restore from .pb or SavedModel
        graph_def.ParseFromString(f.read())    # load graph from .pb   
        
        # sess.graph.as_default()  вроде как это лишнее но оставлю на всякий
    
        # присваиваем graph_def to sess
        tf.import_graph_def(graph_def)
        # теперь sess.graph - это graph.def 
        
        # еще можно сохранить в переменную сам граф
#         with tf.Graph().as_default() as my_graph:
#             tf.import_graph_def(graph_def, name='')
    
    # print tensors from graph
    for n in tf.get_default_graph().as_graph_def().node:
        print(n.name )
        
    # create summary with graph
    with tf.Graph().as_default() as g:
        file_writer = tf.summary.FileWriter(os.path.join(FLAGS.logdir, 'train'), graph=sess.graph)        

    # or as below
    writer = tf.summary.FileWriter('logdir/')
    writer.add_graph(sess.graph)

In [None]:
# subprocess - вызов команд из comline
from subprocess import Popen, PIPE
import shutil

in_wav = 'test.wav'
out_wav = 'preprocessed.wav'
command = 'ffmpeg'

# для начала проверим, 
# что команда существует в системе
if shutil.which(command) is None:
    raise Exception('{} is not found'.format(command))
    
# --- также нужно проверить существование файлов ---

args = [command, '-y', '-i', in_wav, '-ar', '16000', '-ac', '1', out_wav]  # кол-во каналов на выходе

# launch ffmpeg
p = Popen(args, stdout=PIPE, stderr=PIPE)
output, error = p.communicate()
# output: строку или кортеж данных вывода
# error: строку или кортеж ошибок
assert p.returncode == 0, error


In [None]:
# время работы программы 
from datetime import datetime
import time

start_time = datetime.now()

#Тут выполняются действия
time.sleep(5)

print(datetime.now() - start_time)

In [None]:
from librosa import load
import numpy as np

fname = '/mnt/raid10/home/SPEECH/Diarization/diarization/tests/test1.wav'
sample, _ = load(fname, sr=8000)
sample = sample[np.newaxis, :]
print(sample.shape)

In [None]:
str = 'abc/123/345/789.wav'
new_str = str.replace('.', '-0.')  # 'burmurian'
print(new_str)

In [None]:
# ------------------------------ Assert ---------------------------------
"""
используют тогда, когда ошибка может вылезти в другом месте
а мы не поймем, откуда взялась. Лучше сразу ее обработать
через assert
"""
assert 0 < 2 < 1, 'MessageError'
print('no print')

In [None]:
# ----------------------- try, except, finally --------------------------

try:
    2 / 0
except ZeroDivisionError:  # обработка именно этого исключения
    # можно в скобках через , указать несколько исключений
    # или не указывать ни одного - будет обрабатывать все исключения
    print('Деление на 0 - не надо так')
finally:
    # этот блок сработает в любом случае - 
    # что при try, что при except
    print('Пример исключения')

In [None]:
# ----------------------------- Exception -------------------------------

raise Exception('It is my error')
print('it will be not shown')

In [None]:
# ----------------------------- перенос -----------------------------------

str = "dsdadda"
"eba"
print(str)

In [None]:
# ------------------------------- Logging -----------------------------------

import logging

# DEBUG
# INFO
# WARNING
# ERROR
# CRITICAL

logging.basicConfig(filename=None, format="%(asctime)s %(message)s", datefmt='%H:%M:%S', level=logging.INFO)
# если filename=None, то вывод будет в консоль
# будут выводиться сообщения с приоритетом не ниже INFO
# filemode='a' by Default на добавление в конец. ('w' - запись)
# можем выбрать другой формат вывода: время, тип лога и др.

logging.info('info_message')
logging.debug('debug_message')     # это не покажется, т.к. приоритет DEBUG ниже, чем level=logging.INFO

# в процессе можно менять приоритет
# logging.setLevel(logging.DEBUG)

In [None]:
# ------------------------------- Work with .tsv ------------------------------------------

import pandas as pd
from os.path import join
from shutil import copyfile as cp

dir_ = '/home/SPEECH/voice_data/raw_voices/common_voice/ru'
file_name = 'validated.tsv'

table = pd.read_table(join(dir_, file_name))
print(len(table.path))