**Phoneme-wise modification of audio**
---
This code can be used to modify/alter the pitch, duration and energy of an audio file, provided, there is a well-documented transcript available for the particular phoneme of the audio file that is required to be modified.<br><br>**Note:** This code is programmed with the limitation that the transcript file should only contain the timestamps of the phoneme to be modified, and nothing else.

The following code has been written in Python and the audio modification functions have been written using PyWorld, a Python wrapper for WORLD vocoder (see [Python-Wrapper-for-World-Vocoder](https://github.com/JeremyCCHsu/Python-Wrapper-for-World-Vocoder)). The [source file](https://drive.google.com/file/d/15nDAprE0FkmMHzH4HQNa7YgIg5jWH2iN/view?usp=share_link) containing all the functions of PyWorld needs to be present in the present working directory for the relevant functions to be imported and used in the code.

This cell is used for unzipping the zipped source file containing the PyWorld functions.

In [None]:
!unzip /content/drive/MyDrive/PythonWORLDmaster.zip

Now, the relevant modules and their functions need to be imported by running the following cell.

In [None]:
from pathlib import Path
import numpy as np
from scipy.io.wavfile import read as wavread
from scipy.io.wavfile import write as wavwrite
from scipy import signal
from scipy.io import savemat
!pip install pysoundfile
!pip install bitstring
import IPython
import wave
import contextlib
import matplotlib.pyplot as plt

Once the source file has been unzipped, the present working directory needs to be changed to it, so that it can be used by running the following cell.

In [None]:
cd /content/Python-WORLD-master

Now, the **main** program from the list of defined programs in the **world** module needs to be imported.

In [None]:
from world import main

From **main**, the **World** class is called.

In [None]:
vocoder = main.World()

The following cell contains a list of all the phonemes, defined in the TIMIT corpus, in the same order as mentioned in their list of phonemes irrespective of category, in a single dimensional list.<br><br>**Note:** The following code might give faulty results, if the order of these phonemes is changed. Although no explicit bounds are defined in the list to segregate them into different categories, the phonemes are defined in a certain order so as to identify their category at a later stage in the code.

In [None]:
# List of all phonemes
phone_list = ['p', 'k', 't', 'q', 'b', 'd', 'g', 'dx', 'f', 'th', 's', 'sh', 'v', 'dh', 'z', 'zh', 'ch', 'jh', 'm', 'n', 'ng', 'em', 'en', 'eng', 'nx', 'l', 'r', 'y', 'w', 'hh', 'hv', 'hl', 'iy', 'ey', 'aa', 'aw', 'ay', 'ao', 'oy', 'ow', 'uw', 'er', 'ax', 'ih', 'eh', 'ae', 'ah', 'uh', 'ux', 'ix', 'axr', 'ax-h']

Now, the various categories of the phonemes are defined, with their constituent phonemes and optimum exaggeration ratios for each of pitch, duration and energy paameters. The optimum ratios used here are obtained from the supplementary materials of the [PTeacher](https://arxiv.org/abs/2105.05182) paper.<br>Each phoneme category is a Python dictionary with the following key-value pairs:
*   **Phoneme**: includes a single dimensional numpy array containing the phonemes under that category
*   **Pitch**: defines the optimum exaggeration ratio for pitch of the said category
*   **Duration**: defines the optimum exaggeration ratio for duration of the said category
*   **Energy**: defines the optimum exaggeration ratio for energy of the said category

In [None]:
# Categories of phonemes and their corresponding optimum exaggeration ratios
voiceless_stops = {"Phoneme" : np.array(['p', 'k', 't', 'q']), "Pitch" : 1.26, "Duration" : 1.13, "Energy" : 4.22}
voiced_stops = {"Phoneme" : np.array(['b', 'd', 'g', 'dx']), "Pitch" : 1.26, "Duration" : 1.49, "Energy" : 5.29}

voiceless_fricatives = {"Phoneme" : np.array(['f', 'th', 's', 'sh']), "Pitch" : 1.22, "Duration" : 2.76, "Energy" : 3.78}
voiced_fricatives = {"Phoneme" : np.array(['v', 'dh', 'z', 'zh']), "Pitch" : 1.51, "Duration" : 1.64, "Energy" : 4.63}

voiceless_affricates = {"Phoneme" : np.array(['ch']), "Pitch" : 1.19, "Duration" : 1.19, "Energy" : 3.58}
voiced_affricates = {"Phoneme" : np.array(['jh']), "Pitch" : 1.70, "Duration" : 1.87, "Energy" : 4.64}

nasals = {"Phoneme" : np.array(['m', 'n', 'ng', 'em', 'en', 'eng', 'nx']), "Pitch" : 1.32, "Duration" : 3.42, "Energy" : 1.60}

laterals = {"Phoneme" : np.array(['l', 'r']), "Pitch" : 1.12, "Duration" : 2.78, "Energy" : 3.45}

semi_vowels = {"Phoneme" : np.array(['y', 'w', 'hh', 'hv', 'hl']), "Pitch" : 1.24, "Duration" : 2.16, "Energy" : 2.57}
long_vowels = {"Phoneme" : np.array(['iy', 'ey', 'aa', 'aw', 'ay', 'ao', 'oy', 'ow', 'uw', 'er', 'ax']), "Pitch" : 1.14, "Duration" : 1.96, "Energy" : 2.82}
short_vowels = {"Phoneme" : np.array(['ih', 'eh', 'ae', 'ah', 'uh', 'ux', 'ix', 'axr', 'ax-h']), "Pitch" : 1.38, "Duration" : 2.15, "Energy" : 2.15}

Now, the path of the audio file, which is to be modified, and its corresponding transcript file, is taken as input using the variables **wavfile** and **transcript**, respectively.<br>Note that the **audio file needs to be in .wav format**.

In [None]:
# Input section
wavfile = input("Path of the .wav file: ")
transcript = input("Path of the transcript file: ")

In the following cell, the function **scipy.io.wavfile.read** reads the .wav audio file, and returns its sampling rate in samples/sec (fs) and the data in the form of a numpy array (x_int16).

In [None]:
# Obtains sampling frequency and array of frames
fs, x_int16 = wavread(wavfile)
x = x_int16 / (2 ** 15 - 1)

The following cell is used for encoding the audio file using parameters obtained from reading the file in the previous cell and using the f0_method **harvest**.

In [None]:
# Encodes the audio file
dat = vocoder.encode(fs, x, f0_method = 'harvest')
ori = vocoder.encode(fs, x, f0_method = 'harvest')

Now, a function get_position() is defined, which calculates the position/index of a certain timestamp in the temporal_positions array of the encoded audio file.<br>
<br>**Parameters**
* **time - float**: Timestamp whose position is to be determined

<br>**Returns** 
* **pos - int**: Position/index of the timestamp



In [None]:
# Calculates the index of a timestamp from the temporal_positions array
def get_position(time):
  pos = 0
  time1 = round((5 * round((time / 5), 3)), 3)

  for i in range(0, len(ori['temporal_positions'])):
    if ori['temporal_positions'][i] == time1:
      pos = i
      
  return pos

The following cell reads the transcript file line by line, and creates a nested Python dictionary, where the the primary key is **Phoneme 'x'** (x being the phoneme number), which contains another dictionary as its value with the following key-value pairs:
*   **Phoneme**: contains the phoneme string
*   **Start**: contains the index/position of the starting timestamp of the said phoneme
*   **End**: contains the index/position of the ending timestamp of the said phoneme



In [None]:
# Reads the transcript file
phoneme = {}
t = 0

with open(transcript, "r") as txtfile:
  for line in txtfile:
    c = 0

    for word in line.split():
      if c == 0:
        st = float(word)

      elif c == 1:
        et = float(word)

      else:
        ph = word

      c += 1

    key = "Phoneme " + str(t)
    phoneme[key] = {}

    phoneme[key]["Phoneme"] = ph
    phoneme[key]["Start"] = get_position(st)
    phoneme[key]["End"] = get_position(et)

    t += 1

len_phn = len(phoneme)

Now, the end bounds of the part of the audio file to be exaggerated is determined.

In [None]:
# Determmines the starting and ending temporal positions of the word which is to be exaggerated
st = phoneme["Phoneme 0"]["Start"]
k = "Phoneme " + str(len(phoneme) - 1)
et = phoneme[k]["End"]

This cell determines the difference between two consecutive timestamps in the 'temporal_positions' array, and rounds it upto three decimal places.

In [None]:
diff = ori['temporal_positions'][1] - ori['temporal_positions'][0]
diff = round(diff, 3)

A function pitch_modulations() is defined, which returns the pitch modified array of data, read from the audio file.<br>
<br>**Parameters**
* **pos1 - int**: Starting timestamp
* **pos2 - int**: Ending timestamp
* **factor - float**: Optimum exaggeration ratio, or factor, by which the pitch is to be modified
* **dt - numpy array**: Array of data read from the audio file

<br>**Returns** 
* **dt - numpy array**: Pitch modified array of data read from the audio file

In [None]:
# Modifies the pitch in an audio
def pitch_modulations(pos1, pos2, factor, dt):
  dt['f0'][pos1:pos2] *= factor
  return dt

A function time_modulations() is defined, which returns the duration modified array of data, read from the audio file.<br>
<br>**Parameters**
* **pos1 - int**: Starting timestamp
* **pos2 - int**: Ending timestamp
* **factor - float**: Optimum exaggeration ratio, or factor, by which the duration is to be modified
* **dt - numpy array**: Array of data read from the audio file

<br>**Returns** 
* **dt - numpy array**: Duration modified array of data read from the audio file

In [None]:
# Modifies the duration in an audio
def time_modulations(pos1, pos2, factor, dt, diff):
  temp = []
  c = 0

  for i in range(pos1, pos2 + 1):
    temp.append(dt['temporal_positions'][i] - dt['temporal_positions'][pos1])

  for i in range(len(temp)):
    temp[i] *= factor

  for i in range(pos1, pos2 + 1):
    dt['temporal_positions'][i] =  dt['temporal_positions'][pos1] + temp[c]
    c += 1

  for i in range(1, len(dt['temporal_positions'])):
    if dt['temporal_positions'][i] < dt['temporal_positions'][i - 1]:
      dt['temporal_positions'][i] = dt['temporal_positions'][i - 1] + diff
      
  return dt

A function energy_modulations() is defined, which returns the energy modified array of data, read from the audio file.<br>
<br>**Parameters**
* **pos1 - int**: Starting timestamp
* **pos2 - int**: Ending timestamp
* **factor - float**: Optimum exaggeration ratio, or factor, by which the energy is to be modified
* **dt - numpy array**: Array of data read from the audio file

<br>**Returns** 
* **dt - numpy array**: Energy modified array of data read from the audio file

In [None]:
# Modifies the energy in an audio
def energy_modulations(pos1, pos2, factor, dt):
  dt['spectrogram'][pos1:pos2] *= factor
  return dt

A function search() is defined, which returns the position of a phonem in a pre-defined list of phonemes, using which its category is determined.<br>
<br>**Parameters**
* **ar - numpy aray**: The pre-defined Python list phone_list, containing a list of all the phonemes, defined above
* **x - str**: Phoneme whose category is to be determined

<br>**Returns** 
* **i - int**: Position of the phoneme in the pre-defined list

In [None]:
# Determines the category of the current phoneme (using linear search)
def search(ar, x):
  for i in range(len(ar)):
    if ar[i] == x:
      return i + 1
  return -1

This cell is used to segregate the phonemes under different categories, from the **phoneme** dictionary, and accordingly, perform each of pitch, duration and energy exaggeration on them using the appropriate optimum ratios.

In [None]:
# Performs various modifications on the audio phoneme-wise
for i in phoneme:
    phone = phoneme[i]["Phoneme"] # Phoneme
    start = phoneme[i]["Start"] # Starting timestamp
    end = phoneme[i]["End"] # Ending timestamp

    pos = search(phone_list, phone)

    # Voiceless stops
    if pos <= 4 and pos != -1:
      pf = voiceless_stops["Pitch"]
      df = voiceless_stops["Duration"]
      ef = voiceless_stops["Energy"]

    # Voiced stops
    elif pos > 4 and pos <= 8:
      pf = voiced_stops["Pitch"]
      df = voiced_stops["Duration"]
      ef = voiced_stops["Energy"]

    # Voiceless fricatives
    elif pos > 8 and pos <= 12:
      pf = voiceless_fricatives["Pitch"]
      df = voiceless_fricatives["Duration"]
      ef = voiceless_fricatives["Energy"]

    # Voiced fricatives
    elif pos > 12 and pos <= 16:
      pf = voiced_fricatives["Pitch"]
      df = voiced_fricatives["Duration"]
      ef = voiced_fricatives["Energy"]

    # Voiceless affricates
    elif pos > 16 and pos <= 17:
      pf = voiceless_affricates["Pitch"]
      df = voiceless_affricates["Duration"]
      ef = voiceless_affricates["Energy"]

    # Voiced affricates
    elif pos > 17 and pos <= 18:
      pf = voiced_affricates["Pitch"]
      df = voiced_affricates["Duration"]
      ef = voiced_affricates["Energy"]

    # Nasals
    elif pos > 18 and pos <= 25:
      pf = nasals["Pitch"]
      df = nasals["Duration"]
      ef = nasals["Energy"]

    # Laterals
    elif pos > 25 and pos <= 27:
      pf = laterals["Pitch"]
      df = laterals["Duration"]
      ef = laterals["Energy"]

    # Semi-vowels
    elif pos > 27 and pos <= 32:
      pf = semi_vowels["Pitch"]
      df = semi_vowels["Duration"]
      ef = semi_vowels["Energy"]

    # Long vowels
    elif pos > 32 and pos <= 43:
      pf = long_vowels["Pitch"]
      df = long_vowels["Duration"]
      ef = long_vowels["Energy"]

    # Short vowels
    elif pos != -1:
      pf = short_vowels["Pitch"]
      df = short_vowels["Duration"]
      ef = short_vowels["Energy"]
      
    dat = pitch_modulations(start, end, pf, dat)
    dat = time_modulations(start, end, df, dat, diff)
    dat = energy_modulations(start, end, ef, dat)

Once the modulations have been done the modified array of data from the audio file, **dat**, is decoded using the decode() function of the vocoder.

In [None]:
# Decodes the modified audio file and writes it onto a new file
dat1 = vocoder.decode(dat)

Now, this cell is solely used for the purpose of storing the modified audio file. It follows the format of **(Low/High) Proficiency/(Word/Phoneme) Modifications/(Word/Phoneme)_(Number)**

In [None]:
if len_phn == 1:
  ph_num = transcript.rindex("_") + 1
  wrd_num = transcript.rindex("/") - 1

  if ph_num < 10:
    new_wavfile = wavfile[0:(len(wavfile) - 19)] + "/" + "Low Proficiency/Phoneme_Modifications/Phoneme_" + transcript[ph_num:(ph_num + 1)] + ".wav"

  elif ph_num >=10:
    # new_wavfile = wavfile[0:(len(wavfile) - 19)] + "/" + "Low Proficiency/Phoneme_Modifications/Phoneme_" + transcript[ph_num:(ph_num + 2)] + ".wav"
    new_wavfile = wavfile[0:(len(wavfile) - 19)] + "/" + "Low Proficiency/Phoneme_Modifications/Word_" + transcript[wrd_num:(wrd_num + 1)] + "/Phoneme_" + transcript[ph_num:(ph_num + 2)] + ".wav"

elif len_phn > 1:
  wrd_num = transcript.rindex("_") + 1

  if wrd_num < 10:
    new_wavfile = wavfile[0:(len(wavfile) - 19)] + "/" + "Low Proficiency/Word_Modifications/Word_" + transcript[wrd_num:(wrd_num + 1)] + "wav"

  elif wrd_num >= 10:
    new_wavfile = wavfile[0:(len(wavfile) - 19)] + "/" + "Low Proficiency/Word_Modifications/Word_" + transcript[wrd_num:(wrd_num + 1)] + ".wav"

wavwrite(new_wavfile, fs, (dat1['out'] * (2 ** 15)).astype(np.int16))