In [None]:
import os
import re
import glob
import json
import shutil
import librosa
import librosa.display
from tqdm import tqdm
import pandas as pd
from pathlib import Path
from typing import Union, List
from tqdm import tqdm
from scipy.io.wavfile import write
from audiomentations import Compose
from audiomentations import TimeStretch
from audiomentations import PitchShift
from audiomentations import Shift
from audiomentations import Trim
from audiomentations import Gain
from audiomentations import PolarityInversion
from sklearn.model_selection import train_test_split

In [None]:
# Constants
DATA_DIR = '../../Coswara-Data/'
EXTRACTED_DATA_DIR = 'Extracted_data'
SUFFIX = 'shallow'


## Utility functions

In [None]:
def mkdir(path:Union[str, Path]):
    if not os.path.exists(path):
        os.makedirs(path)

def move_recordings(ids:List[str],
                    id_path_map:dict[str, Union[str, Path]],
                    target_dir:Union[str, Path],
                    rec_format:str='.wav'):

    for rec_id in tqdm(ids):
        if rec_id not in id_path_map: continue
        
        old_path = id_path_map[rec_id]
        file_name = f'{rec_id}{rec_format}'
        new_path = os.path.join(target_dir, file_name)
        shutil.copy(old_path, new_path)

def move_set(paths:List[str], target_dir:str):
    for p in paths:
        target_path = os.path.join(target_dir, *p.split('/')[-2:])
        shutil.copy(p, target_path)


### Separate postive and negative recordings

In [None]:
def extract_data(data_dir, suffix='shallow'):
    # Get cough paths
    extracted_data_dir = 'Extracted_data'
    recording_regex = fr'202*/*/cough-{suffix}.wav'
    search_path = os.path.join(data_dir, extracted_data_dir, recording_regex)
    paths = glob.glob(search_path)

    # Create folders for files
    n_dir = os.path.join(data_dir, 'data', suffix, 'n')
    p_dir = os.path.join(data_dir, 'data', suffix, 'p')
    mkdir(n_dir)
    mkdir(p_dir)

    # Read metadata
    meta_data_path = os.path.join(data_dir, 'combined_data.csv')
    meta_data = pd.read_csv(meta_data_path)

    # Separate IDs based on class
    n_classes = ['healthy',
                 'no_resp_illness_exposed',
                 'resp_illness_not_identified',
                 'recovered_full']
    
    p_classes = ['positive_mild',
                 'positive_moderate',
                 'positive_asymp']

    n_mask = meta_data.covid_status.isin(n_classes)
    p_mask = meta_data.covid_status.isin(p_classes)
    n_ids = meta_data[n_mask].id.to_list()
    p_ids = meta_data[p_mask].id.to_list()

    # Map id to path
    id_path_map = dict()
    for path in paths:
        rec_id = path.split('/')[-2]
        if rec_id in id_path_map:
            print(f'Duplicate id :: {rec_id}')
            continue
        
        id_path_map[rec_id] = path

    # Separate recordings based on class
    move_recordings(n_ids, id_path_map, n_dir)
    move_recordings(p_ids, id_path_map, p_dir)

    # Get metadata for recordings
    n_id_mask = meta_data.id.isin(n_ids)
    p_id_mask = meta_data.id.isin(p_ids)
    np_meta_data = meta_data[p_id_mask | n_id_mask]
    np_meta_data_path = os.path.join(data_dir, 'data', suffix, 'meta_data.csv')
    np_meta_data.to_csv(np_meta_data_path, index=False)

In [None]:
# Extract data
extract_data(DATA_DIR, suffix=SUFFIX)

### Create data splits

In [None]:
def split_data(data_dir:str, suffix:str=SUFFIX, test_size:float=0.15):

    # Collect paths to recordings
    search_pattern = fr'{suffix}/*/*.wav'
    search_path = os.path.join(data_dir, 'data', search_pattern)
    paths = glob.glob(search_path)

    # Extract labels from paths
    labels = list(map(lambda p: p.split('/')[-2], paths))

    # Create train (includes valid) and test set split of 85:15
    train_paths, test_paths, *_ = train_test_split(paths,
                                                   labels,
                                                   test_size=test_size,
                                                   stratify=labels,
                                                   random_state=7)

    # Move split to different folders
    train_dir = os.path.join(data_dir, 'data', suffix, 'train')
    test_dir = os.path.join(data_dir, 'data', suffix, 'test')
    mkdir(os.path.join(train_dir, 'n'))
    mkdir(os.path.join(train_dir, 'p'))
    mkdir(os.path.join(test_dir, 'n'))
    mkdir(os.path.join(test_dir, 'p'))
    move_set(train_paths, train_dir)
    move_set(test_paths, test_dir)

In [None]:
split_data(DATA_DIR)

### Augment sets

In [None]:
def augment_set(set_dir:str, set_label:str='p', sr:int=22050, extension:str='.wav'):
    # Original data augmentation configuration from the Brogrammer's git repo
    augment1 = Compose([
        TimeStretch(min_rate=0.7, max_rate=1.4, p=0.9),
        PitchShift(min_semitones=-2, max_semitones=4, p=1),
        Shift(min_fraction=-0.5, max_fraction=0.5, p=0.8),
        Trim(p=1),
        Gain(p=1),
        PolarityInversion(p=0.8)   
        ])

    # Same augmentation configuration with TimeStretch parameters set to default
    augment2 = Compose([
        TimeStretch(min_rate=0.8, max_rate=1.25, p=0.5),
        PitchShift(min_semitones=-2, max_semitones=4, p=1),
        Shift(min_fraction=-0.5, max_fraction=0.5, p=0.8),
        Trim(p=1),
        Gain(p=1),
        PolarityInversion(p=0.8)   
        ])

    label_dir = os.path.join(set_dir, set_label)
    paths = glob.glob(os.path.join(label_dir, fr'*{extension}'))

    j = 0
    for p in tqdm(paths):
        try:
            data, _ = librosa.load(p, sr=sr)

            # First augmentation
            data=augment1(data, sr)
            write(os.path.join(label_dir, str(j) + extension), sr, data)
            j += 1
            
            # Second augmentation
            data=augment2(data, sr)
            write(os.path.join(label_dir, str(j) + extension), sr, data)
            j += 1
            
        except:
            continue


In [None]:
# Augment sets individually
train_dir = os.path.join(DATA_DIR, 'data', SUFFIX, 'train')
test_dir = os.path.join(DATA_DIR, 'data', SUFFIX, 'test')

augment_set(train_dir)
augment_set(test_dir)

### Extract features

In [None]:
def extract_features(set_dir:str,
                     duration:int=7,
                     sample_rate:int=22050):

    # Data collection parameters
    recording_regex = r'*/*.wav'
    search_path = os.path.join(set_dir, recording_regex)

    # Collect paths to recordings to analyse
    paths = glob.glob(search_path)

    # Extract MFCCs
    data = {
        'mfcc': [],
        'label': []
        }
    
    for path in tqdm(paths):
        try:
            y, _ = librosa.load(path, sr=sample_rate)
            y = librosa.util.fix_length(y, size=sample_rate * duration)
            mfcc = librosa.feature.mfcc(y=y, n_mfcc=15, n_fft=2048, hop_length=512)
            mfcc = mfcc.T
            
            label = re.split(r'/|\\', path)[-2]
            
            data['mfcc'].append(mfcc.tolist())
            data['label'].append(label)
            
        except:
            continue

    # Save features in a JSON file
    json_path = os.path.join(set_dir, 'mfcc15_augdata.json')

    with open(json_path, 'w') as fp:
        json.dump(data, fp, indent=4)

In [None]:
# Extract features for train and test sets
extract_features(train_dir)
extract_features(test_dir)

### Split training set into training and validation sets

In [None]:
train_dir = os.path.join(DATA_DIR, 'data', SUFFIX, 'train')
train_path = os.path.join(train_dir, 'mfcc15_augdata.json')
with open(train_path, 'r') as f:
    data = json.load(f)

# Extract labels and MFCCs
X = data['mfcc']
y = data['label']

X_train, X_valid, y_train, y_valid = train_test_split(X, y, stratify=y, test_size=0.15/0.85, random_state=7)

# Save sets
train_data = {'mfcc': X_train, 'label': y_train}
train_save_path = os.path.join(train_dir, 'train.json')
with open(train_save_path, 'w') as fp:
        json.dump(train_data, fp, indent=4)

valid_data = {'mfcc': X_valid, 'label': y_valid}
valid_save_path = os.path.join(train_dir, 'valid.json')
with open(valid_save_path, 'w') as fp:
        json.dump(valid_data, fp, indent=4)