In [78]:
%%capture
!pip install datasets
!pip install pydub

In [79]:
from datasets import load_dataset
from pydub import AudioSegment
import numpy as np
import os
import zipfile

In [80]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [81]:
raw_dataset = load_dataset("DynamicSuperb/RespiratorySoundClassification_ICBHI2017")
#raw_dataset = load_dataset("ericyxy98/pulmonary-disease-airway-lung-function-dataset")
raw_dataset

DatasetDict({
    test: Dataset({
        features: ['audio', 'file', 'instruction', 'label'],
        num_rows: 174
    })
})

In [None]:
#raw_dataset["test"][0]
raw_dataset["csa"][13]["Participant:"]

'Asthma'

In [82]:
def audio_segment_from_array(audio_array, sr):
    audio_int16 = np.int16(audio_array * 32767)
    audio_bytes = audio_int16.tobytes()

    audio_segment = AudioSegment(
        data=audio_bytes,
        sample_width=2,
        frame_rate=sr,
        channels=1
    )

    return audio_segment

def add_to_dict(dictionary, key, folder_path):
  if key in dictionary.keys():
    dictionary[key] += 1
  else:
    dictionary[key] = 1
    dir = os.path.join(folder_path, key)
    if not os.path.exists(dir):
      os.makedirs(dir)

def unzip_in_place(zip_path: str):
    if not zipfile.is_zipfile(zip_path):
        raise ValueError("Provided file is not a valid zip archive")

    extract_dir = os.path.dirname(zip_path)

    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_dir)

    print(f"Unzipped '{zip_path}' to '{extract_dir}'")

def walk_folders(root_dir):
    folder_dict = {}

    for folder_name in os.listdir(root_dir):
        folder_path = os.path.join(root_dir, folder_name)
        if os.path.isdir(folder_path):
            folder_dict[folder_name] = [os.path.join(folder_path, file) for file in os.listdir(folder_path)]

    return folder_dict

In [None]:
unzip_in_place("/content/drive/MyDrive/Pulmonary Prediction/raw_dataset/aware_raw.zip")

Unzipped '/content/drive/MyDrive/Pulmonary Prediction/raw_dataset/aware_raw.zip' to '/content/drive/MyDrive/Pulmonary Prediction/raw_dataset'


In [84]:
def process_and_save_a(raw_dataset, file_path, sr=8000, max_size=0, verbose=False):
  labels = {}

  for i, pair in enumerate(raw_dataset):
    if (max_size != 0) and (i > max_size):
      return

    samples = pair["audio"]["array"]
    sample_rate = pair["audio"]["sampling_rate"]
    label = pair["label"].lower()

    export_dir = os.path.join(file_path, label)
    add_to_dict(labels, label, export_dir)

    audio_segment = audio_segment_from_array(samples, sample_rate)
    audio_segment = audio_segment.set_frame_rate(sr).set_sample_width(1).set_channels(1)

    file_name = label + "_" + str(labels[label])  + ".wav"
    export_path = os.path.join(export_dir, file_name)
    audio_segment.export(export_path, format="wav")

    if verbose:
      print("Exported to: " + export_path)

In [85]:
process_and_save_a(raw_dataset['test'], "/content/drive/MyDrive/Pulmonary Prediction/dataset", 8000, 0, True)

Exported to: /content/drive/MyDrive/Pulmonary Prediction/dataset/pneumonia/pneumonia_1.wav
Exported to: /content/drive/MyDrive/Pulmonary Prediction/dataset/urti/urti_1.wav
Exported to: /content/drive/MyDrive/Pulmonary Prediction/dataset/no potential disease detected/no potential disease detected_1.wav
Exported to: /content/drive/MyDrive/Pulmonary Prediction/dataset/copd/copd_1.wav
Exported to: /content/drive/MyDrive/Pulmonary Prediction/dataset/no potential disease detected/no potential disease detected_2.wav
Exported to: /content/drive/MyDrive/Pulmonary Prediction/dataset/no potential disease detected/no potential disease detected_3.wav
Exported to: /content/drive/MyDrive/Pulmonary Prediction/dataset/urti/urti_2.wav
Exported to: /content/drive/MyDrive/Pulmonary Prediction/dataset/copd/copd_2.wav
Exported to: /content/drive/MyDrive/Pulmonary Prediction/dataset/urti/urti_3.wav
Exported to: /content/drive/MyDrive/Pulmonary Prediction/dataset/bronchiectasis/bronchiectasis_1.wav
Exported t

In [None]:
def process_and_save_b(raw_dataset, input_path, file_path, sr=8000, max_size=0, verbose=False):
  labels = {}
  paths = walk_folders(input_path)
  checked = []

  for i, pair in enumerate(raw_dataset):
    if (max_size != 0) and (int(i) > max_size):
      return

    pair_id = str(pair['AWARE STUDY ID:'])

    if pair_id in checked:
      pass
    else:
      checked.append(pair_id)

    print(pair_id)
    print(paths)

    audio_segment_path = paths[pair_id]
    print(audio_segment_path)

    for path in audio_segment_path:
      audio_segment = AudioSegment.from_wav(path)
      label = pair["Participant:"].lower()

      export_dir = os.path.join(file_path, label)
      add_to_dict(labels, label, export_dir)

      audio_segment = audio_segment.set_frame_rate(sr).set_sample_width(1).set_channels(1)

      file_name = label + "_" + str(labels[label])  + ".wav"
      export_path = os.path.join(export_dir, file_name)
      audio_segment.export(export_path, format="wav")

    if verbose:
      print("Exported to: " + export_path)

In [None]:
    ## 8khz, 8bit, mono --- 4k of frequencies represented, 200 samples for 40hz
    ## filters needed - 2,  3,  6,  12, 25, 50, 100, 200 (+1 sample for last 3 layers)
    ##                -   (kernels)
    ##                -  (stride, test and adjust)

process_and_save_b(raw_dataset["csa"],
                   "/content/drive/MyDrive/Pulmonary Prediction/raw_dataset/aware_raw",
                   "/content/drive/MyDrive/Pulmonary Prediction/dataset",
                   8000,
                   0,
                   True)

Output hidden; open in https://colab.research.google.com to view.