In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [None]:
import numpy as np
from scipy.io import wavfile
from scipy.signal import welch
import matplotlib.pyplot as plt
import os
from scipy.signal import firwin, lfilter, hamming

In [None]:
BEST_THRESHOLD = 50000

In [None]:
def linear_normalize(value, c=-1, d=1):
    a = np.min(value)
    b = np.max(value)
    return (value - a) * (d - c) / (b - a) + c

In [None]:
# high pass and low pass filters which return the data with respect to their frequencies
def apply_high_pass_filter(data, sample_rate, cutoff_frequency):
    num_taps = 15
    high_pass_filter = firwin(num_taps, cutoff_frequency, pass_zero=False, fs=sample_rate, window='hamming')

    filtered_data = lfilter(high_pass_filter, 1.0, data)

    return filtered_data

def apply_low_pass_filter(data, sample_rate, cutoff_frequency):
    num_taps = 15
    low_pass_filter = firwin(num_taps, cutoff_frequency, fs=sample_rate, window='hamming')

    filtered_data = lfilter(low_pass_filter, 1.0, data)

    return filtered_data

In [None]:
# a simple abstract class
class simpleWave():
  def __init__(self, wave, sample_rate):
    self.wave = wave
    self.sample_rate = sample_rate

# Windowing is a class to analyse signal like stft with 'window size = 1 second'
# this class helps us to analysing every window seprately
class Windowing():
  def __init__(self, wave, sample_rate):
    self.wave = wave
    self.sample_rate = sample_rate
    self.windows = None
    self.windowing()
  def windowing(self):
    time_len = int(len(self.wave) / self.sample_rate)
    window_len = int(len(self.wave) / time_len)
    self.window = np.zeros((time_len, window_len))
    freqs, psds = [], []
    for i in range(time_len):
      self.window[i] = self.wave[i*window_len:(i+1)*window_len]
    return self.window

In [None]:
# calculate the PSD
class PSD(Windowing):
  def detect(self):
    freqs, psds = [], []
    for i in range(len(self.window)):
      frequencies, psd = welch(self.window[i], fs=self.sample_rate, nperseg=1024)
      freqs.append(frequencies)
      psds.append(psd)
    return freqs, psds

# filter the data with respcet to their frequency and get their PSD ratio
class PSD_ratio(PSD):
  def detect_(self):
    self.window = np.array([apply_high_pass_filter(wave, self.sample_rate, 3000) for wave in self.window])
    _, high_psd = self.detect()
    self.window = np.array([apply_low_pass_filter (wave, self.sample_rate, 3000) for wave in self.window])
    _, low_psd  = self.detect()
    return np.min(np.array(low_psd)/np.array(high_psd), axis=1)

# filter the data with respcet to their frequency and get their ratio
class energy_ratio(Windowing):
 def detect(self):
    self.window = np.array([apply_high_pass_filter(wave, self.sample_rate, 3000) for wave in self.window])
    high_psd = np.array(self.window ** 2).copy()
    self.window = np.array([apply_low_pass_filter (wave, self.sample_rate, 3000) for wave in self.window])
    low_psd = np.array(self.window ** 2).copy()
    return np.min(np.array(low_psd)/np.array(high_psd), axis=1)


# filter the frequencies and get the ratio
class frequency_filter(Windowing):
  def detect(self, cut_off=2000, coe=3):

    ratio = []

    for w in self.window:
      fr, fq = self.fft_feature(w)

      fr = np.abs(fr)

      high_fr = [fr[index] for index, f in enumerate(fq) if f < cut_off]
      low_fr  = [fr[index] for index, f in enumerate(fq) if f > cut_off]
      self.make_lists_equal_size(high_fr, low_fr)

      high_fr = np.array(high_fr)
      low_fr  = np.array(low_fr )


      high_fr = linear_normalize(high_fr, c=2, d=1000)
      low_fr  = linear_normalize(low_fr , c=2, d=1000) ** coe

      high_fr = np.array(high_fr)
      low_fr  = np.array(low_fr )

      ratio.append(np.mean(low_fr/high_fr))

    return np.array(ratio)

  def make_lists_equal_size(self, list1, list2):
    len1, len2 = len(list1), len(list2)

    while len(list1) > len(list2):
        del list1[0]
    while len(list2) > len(list1):
        del list2[0]

  def fft_feature(self ,data):
    fft_result = np.fft.fft(data)
    frequencies = np.fft.fftfreq(len(fft_result), d=1/self.sample_rate)
    return fft_result, frequencies



In [None]:
def train(threshold=70000, coe=3):
  # X and y are train data
  X = []
  y = []

  voices = '/content/drive/MyDrive/Dataset/voiceAndspeech/speech/'
  for index, file in enumerate(os.listdir(voices)):
    # if index == 3: break
    if os.path.isfile(os.path.join(voices, file)):
      print(index, end=',')
      sr, wv = wavfile.read(voices+file)
      en = frequency_filter(wave=wv, sample_rate=sr).detect()
      X.append(en)
      y.append(np.ones(en.shape[0],))
      print(en)

  print('---------------------------------------------------------------------------------------------')
  music = '/content/drive/MyDrive/Dataset/voiceAndspeech/music/'
  for index, file in enumerate(os.listdir(music)):
    # if index == 3: break
    if os.path.isfile(os.path.join(music, file)):
      print(index, end=',')
      sr, wv = wavfile.read(music+file)
      en = frequency_filter(wave=wv, sample_rate=sr).detect()
      X.append(en)
      y.append(np.zeros(en.shape[0],))
      print(en)

  print('')

  # SGD algorithm to find the best threshold for classifying
  total = 0
  score = 0
  best_score = 0
  best_param = 0
  # the upper and bottom limit of the threshold
  bounds = [th for th in range(500000, 800000, 1000)]
  for threshold in bounds:
    for i in range(len(X)):
      for j in range(len(X[i])):
        if (X[i][j] < threshold and y[i][j] == 0) or (X[i][j] > threshold and y[i][j] == 1):
          score += 1
        total += 1
    if score / total > best_score:
      best_param = threshold
      best_score = score / total
  print(best_param)
  print(best_score)
  return best_param

BEST_THRESHOLD=train()

0,[1632043.09990695 3015312.86092142  997150.27316963 2477232.47198919
  759101.95517678  906914.41499482  891176.87993135 1708205.32477306
 1618944.86490847 4615231.38470515]
1,[1582198.66455904 1182792.79125849 2013382.7660124  2530781.45825259
 1986983.37554181 1045343.03283923 1837302.04184301 1226546.0098789
 1402651.92643546 1843282.92384288 1675117.66375737]
2,[ 820447.18829594 1969795.24324661 1076412.22952998  729896.67041423
 1264651.71146788  772314.39663399 1609756.18128648 1677662.29980728
 1984979.35702017  962625.25817854 1077161.82377743  806896.10557213
 1133390.07700422 1336646.05945416 3130389.2666441   385745.06338598
 1276620.13817357 1576346.0964149   454136.16386055  656348.23529151
 1751485.25355092  888158.96417222  886995.13011198 1843401.93258872
 1596756.69066991]
3,[ 874544.04257378 1110054.70624742  946147.08988861 1339123.23439241
  803249.52012786  646437.85673515 2101213.55148964 1307584.7734599
  757035.03860807  490893.78329614  332714.77206793 105910

In [None]:
# THIS CELL IS CREATED ONLY TO MANAGE TWO BAND AUDIO

# TO CREATE A TWO BAND TEST FILE FROM z1.wav AND z2.wav:
  # add_zero_to_first('z1.wav', 'zm1.wav')
  # add_zero_to_end('z2.wav', 'zm2.wav')
  # create_stereo_track('zm1.wav', 'zm2.wav', 'zz.wav')

# TO SPLIT A TWO BAND AND GET OUTPUT AS A FILE AND WAVE:
  # split_and_save_channels('input.wav')
# IF YOU WANT TO SAVE THE TWO OUTPUT
  # split_and_save_channels('input.wav', 'left_channel.wav', 'right_channel.wav')


# MIXER
def create_stereo_track(l, r, sr, output_path=None):

    min_length = min(len(l), len(r))
    l = l[:min_length]
    r = r[:min_length]

    stereo_audio = np.column_stack((l, r))
    if not output_path == None:
      wavfile.write(output_path, sr, stereo_audio)
    return stereo_audio

# SPLITER
def split_and_save_channels(input_file, output_left=None, output_right=None):
    # Load the stereo audio file
    sr, y = wavfile.read(input_file)
    y = y.T
    # Extract left and right channels
    left_channel = y[0]
    right_channel = y[1]
    if not (output_left == None or output_right == None):
      # Save left and right channels as separate audio files
      wavfile.write(output_left, sr, left_channel)
      wavfile.write(output_right, sr, right_channel)
    return left_channel, right_channel, sr
def disable_zero(signal):
    for index, val in enumerate(signal):
        if val == 0: signal[index] = 1
    return signal

def add_zero_to_first(input_file, output_file):
    # Load the original WAV file
    original_fs, original_audio = wavfile.read(input_file)
    original_audio = disable_zero(original_audio)

    # Calculate the number of zero samples to prepend (specified duration in seconds)
    duration = 10
    silence_samples = int(duration * original_fs)

    # Generate the zero samples for mono audio
    zero_samples = np.zeros(silence_samples, dtype=original_audio.dtype)

    # Concatenate the zero samples with the original audio
    modified_audio = np.concatenate((zero_samples, original_audio))

    # Write the modified audio to a new WAV file
    wavfile.write(output_file, original_fs, modified_audio)


def add_zero_to_end(input_file, output_file):
    # Load the original WAV file
    original_fs, original_audio = wavfile.read(input_file)
    original_audio = disable_zero(original_audio)

    # Calculate the number of zero samples to prepend (specified duration in seconds)
    duration = 10
    silence_samples = int(duration * original_fs)

    # Generate the zero samples for mono audio
    zero_samples = np.zeros(silence_samples, dtype=original_audio.dtype)

    # Concatenate the zero samples with the original audio
    modified_audio = np.concatenate((original_audio, zero_samples))

    # Write the modified audio to a new WAV file
    wavfile.write(output_file, original_fs, modified_audio)


In [None]:
def delete_zeros(signal1, signal2):
  # signal one : zero at start
  # signal two : zero at end
  first_nonzero_index = np.argmax(signal1 != 0)
  trimmed_signal1 = signal1[first_nonzero_index:]
  last_nonzero_index = len(signal2) - np.argmax(signal2[::-1] != 0) - 1
  trimmed_signal2 = signal2[:last_nonzero_index + 1]
  return trimmed_signal1, trimmed_signal2

# estimate the signal is either the voice or music
def estimation(signal, sr):
  estimate_windows = frequency_filter(wave=signal, sample_rate=sr).detect()
  y_pred = np.array([-1 for i in range(len(estimate_windows))])

  for index, es in enumerate(estimate_windows):y_pred[index] = 1 if es > BEST_THRESHOLD else 0

  if np.sum(y_pred == 1) > np.sum(y_pred == 0):
    return 'voice'
  return 'music'

# trim the one that shows first when the second one get entered
def trim_last(pos0, pos1):
  for index in range(len(pos0)):
    if not pos1[index] == 0:
      pos0[index] = 0
  return pos0, pos1

def fade_out_effect(pos0, pos0_sample_rate, duration, pos1):
  fade_start_index = 0
  for index in range(len(pos0)):
    if not pos1[index] == 0:
      fade_start_index = index
      break
  pos0 = pos0.astype(np.float64)
  fade_end_index = fade_start_index + int(duration * pos0_sample_rate)
  fade_out = np.linspace(1, 0, fade_end_index - fade_start_index)
  pos0[fade_start_index:fade_end_index] *= fade_out
  pos0[fade_end_index:] *= 0
  return pos0.astype(np.int16), pos1

# find the start and the end time of every band
def start_end_time(pos0, pos1, sr):
  first_nonzero_index = np.argmax(pos1 != 0)
  pos1_time = [first_nonzero_index / sr, len(pos1) / sr]

  last_nonzero_index = len(pos0) - np.argmax(pos0[::-1] != 0) - 1
  pos0_time = [0, last_nonzero_index / sr]

  return pos0_time, pos1_time

def main():
  # get left, right and sample rate of two band audio
  left, right, sr = split_and_save_channels('/content/drive/MyDrive/Dataset/voiceAndspeech/test/zz.wav')


  if left[0] == 0:# if left channel is entered after the right
    left_tr, right_tr = delete_zeros(left, right) # delete zero from the start and end of two channel
    right_time, left_time = start_end_time(right, left, sr) # find what time every band started and ended
    # right_res, left_res = trim_last(right, left) # trim the first one when the second one entered
    right_res, left_res = fade_out_effect(right, sr, 2, left)
  else:# if right channel is entered after the left
    right_tr, left_tr = delete_zeros(right, left)
    left_time, right_time = start_end_time(left, right, sr)
    # left_res, right_res = trim_last(left, right)
    left_res, right_res = fade_out_effect(left, sr, 2, right)

  print(estimation(left_tr, sr), end=': ') # estimate what is the left channel
  print(left_time)
  print(estimation(right_tr, sr), end=': ') # estimate what is the right channel
  print(right_time)

  create_stereo_track(left_res, right_res, sr, '/content/drive/MyDrive/Dataset/voiceAndspeech/test/res1.wav') # output

main()

music: [10.0, 24.5735]
voice: [0, 15.453479166666666]
