In [None]:
# Pipeline from downloaded files, to complete huggingface dataset with metadata
# Normalisation of sound to [-1, 1]
# Construction of full dataset with metadata
# Train/Test/Val splits

In [None]:
import os
import librosa
import soundfile as sf
import numpy as np
from utils.spectrogram_image_converter import SpectrogramImageConverter
from utils.spectrogram_params import SpectrogramParams
import pydub
import typing as T
from PIL import Image


class WavPreprocessor:
    
    def __init__(self, spectrogram_params):
        self._params = spectrogram_params
        self._converter = SpectrogramImageConverter(parms=spectrogram_params, device="cuda")
        
        return
    
    def resample(self, audio, target_sr):
        y, sr = librosa.load(audio, sr=None)
        
        y_resampled = librosa.resample(y, sr, target_sr)
        
        return y_resampled, target_sr

    def resample_folder(self, input_path, target_sr):
        for filename in os.listdir(input_path):
            if filename.endswith('.wav'):
                path = os.path.join(input_path, filename)
                y_resampled, sr = self.resample(path, target_sr)
                
                save_path = os.path.join(input_path, filename) # Overwrite the original file
                sf.write(save_path, y_resampled, sr)

    def min_max_normalise(self, audio):
        y, sr = librosa.load(audio, sr=None)
        
        normalised_y = y / np.max(np.abs(y))
        
        return normalised_y, sr

    def min_max_normalise_folder(self, input_path):
        for filename in os.listdir(input_path):
            if filename.endswith('.wav'):
                path = os.path.join(input_path, filename)
                norm_wav, sr = self.min_max_normalise(path)
                
                save_path = os.path.join(input_path, filename) # Overwrite the original file
                sf.write(save_path, norm_wav, sr)

    def wav_to_spec(self, wav_path, output_path):
        # Convert wav to audiosegment
        segment = pydub.AudioSegment.from_wav(wav_path)

        # Convert to mono
        segment = segment.set_channels(1)

        # Define named sets of parameters
        param_sets: T.Dict[str, self._params] = {}

        images: T.Dict[str, Image.Image] = {}

        for name, params in param_sets.items():
            images[name] = self._converter.spectrogram_image_from_audio(segment)

        # Save images to disk
        for name, image in images.items():
            image_out = output_path + "/" + os.fsdecode(wav_path)[:-4] + ".png"
            image.save(image_out, exif=image.getexif(), format="PNG")
            print(f"Saved {image_out}")

    def wav_to_spec_folder(self, input_path):
        for wav_file in os.listdir(input_path):
            if wav_file.endswith('.wav'): # Ensure we're only working on wav files
                wav_path = os.path.join(input_path, wav_file)
                self.wav_to_spec(wav_path, input_path) # Output to the same folder


            
    # def spec_to_wav(self, spec):
    # def spec_to_wav_folder(self, input_path, output_path): 

: 

In [None]:
import os
import shutil
import csv
import random
import math

# Select chosen classes and copy across into data folder
class DatasetPipeline:    
    _classes = []
    _dataset_path = "" # Where dataset will be constructed
    _class_path = "" # Where raw data is saved
    _preprocessor = WavPreprocessor
    
    
    def __init__(self, dataset_path, class_path, preprocessor, *classes):
        self._dataset_path = dataset_path
        self._class_path = class_path
        self._classes = [c for c in classes]
        self._preprocessor = preprocessor
    
    
    # Creates corresponding folders for chosen classes
    def folder_setup(self):
        for folder in os.listdir(self._class_path):
            if folder in self._classes and os.path.isdir(os.path.join(self._class_path, folder)):
                dest_folder = os.path.join(self._dataset_path, folder)

                if not os.path.exists(dest_folder):
                    os.makedirs(dest_folder)


    # Copies files excluding those tagged with REMOVE
    def copy_files(self):
        for folder in os.listdir(self._class_path):  
            if os.path.isdir(os.path.join(self._class_path, folder)):
                for file in os.listdir(os.path.join(self._class_path, folder)):
                    if 'REMOVE' in file:
                        continue

                    # Add the folder name (class name) as prefix to each file
                    new_file_name = f"{folder}_{file}"
                    
                    src_file = os.path.join(self._class_path, folder, file)
                    dest_file = os.path.join(self._dataset_path, folder, new_file_name)
                    
                    shutil.copy(src_file, dest_file)

        
    # Generates a metadata.csv with headings [image, prompt, audiofile]
    def generate_metadata(self, prompt=None):
        for folder in os.listdir(self._dataset_path):
            # Define the output path for the CSV file
            csv_path = os.path.join(folder, 'metadata.csv')
            src_folder = os.path.join(self._class_path, folder)

            # Create the CSV file and write the header
            with open(csv_path, 'w', newline='') as csv_file:
                writer = csv.writer(csv_file)
                writer.writerow(['file_name', 'text', 'audiofile'])

                # Get the list of PNG files in the source folder
                png_files = [f for f in os.listdir(src_folder) if f.endswith('.png')]

                # Iterate over each PNG file
                for png_file in png_files:
                    # Extract the filename without the extension
                    file_name = os.path.splitext(png_file)[0]

                    # Construct the corresponding WAV file path
                    audio_file = os.path.join(folder, file_name)

                    # Get the absolute path of the audio file
                    abs_audio_file = os.path.abspath(audio_file)

                    # Write the row to the CSV file
                    if prompt is None:
                        prompt = f"A spectrogram of {(folder.lower())}"
                        
                    writer.writerow([png_file, prompt, abs_audio_file])
        
        
    # Combines all classes into a single dataset folder, combining the metadata
    def combine_data(self):
        combined_folder = os.path.join(self._dataset_path, "dataset", "unsplit")
        os.makedirs(combined_folder, exist_ok=True)

        combined_metadata_path = os.path.join(combined_folder, 'metadata.csv')
        with open(combined_metadata_path, 'w', newline='') as combined_metadata:
            writer = csv.writer(combined_metadata)
            writer.writerow(['file_name', 'text', 'audiofile'])  # Write header to the combined metadata

            for folder in self._classes:
                class_folder = os.path.join(self._dataset_path, folder)

                # Copy files
                for file in os.listdir(class_folder):
                    if file.endswith(".wav") or file.endswith(".png"):
                        src_file = os.path.join(class_folder, file)
                        dest_file = os.path.join(combined_folder, file)
                        shutil.copy(src_file, dest_file)

                # Append metadata
                metadata_path = os.path.join(class_folder, 'metadata.csv')
                with open(metadata_path, 'r') as metadata:
                    reader = csv.reader(metadata)
                    next(reader)  # Skip header
                    for row in reader:
                        writer.writerow(row)  # Write each row to the combined metadata
            
            
    # Splits data into train, test and var folders    
    def split_data(self, train_ratio=0.8, val_ratio=0.1):
        # Create the new directories
        base_path = os.path.join(self._dataset_path, "dataset")
        unsplit_folder = os.path.join(base_path, "unsplit")
        train_folder = os.path.join(base_path, "train")
        val_folder = os.path.join(base_path, "val")
        test_folder = os.path.join(base_path, "test")
        
        os.makedirs(train_folder, exist_ok=True)
        os.makedirs(val_folder, exist_ok=True)
        os.makedirs(test_folder, exist_ok=True)
        
        # Collect all unique file prefixes in the unsplit_folder
        file_prefixes = {filename.split('.')[0] for filename in os.listdir(unsplit_folder) if filename.endswith(('.wav', '.png'))}
        file_prefixes = list(file_prefixes)
        
        # Shuffle the list for randomness
        random.shuffle(file_prefixes)
        
        # Calculate the indices to split at
        total_files = len(file_prefixes)
        train_split = math.floor(total_files * train_ratio)
        val_split = train_split + math.floor(total_files * val_ratio)
        
        # Split the list
        train_files = file_prefixes[:train_split]
        val_files = file_prefixes[train_split:val_split]
        test_files = file_prefixes[val_split:]
        
        # Define a helper function to move files and update metadata
        def move_files_and_update_metadata(files, destination_folder, metadata_writer):
            for file_prefix in files:
                for extension in ['.wav', '.png']:
                    file_name = file_prefix + extension
                    src_path = os.path.join(unsplit_folder, file_name)
                    dest_path = os.path.join(destination_folder, file_name)
                    shutil.move(src_path, dest_path)
                    
                    # Update the metadata
                    if extension == '.wav':  # We only need to do this once per pair, so let's do it for '.wav' files
                        abs_audio_file = os.path.abspath(dest_path)
                        prompt = f"A spectrogram of {(file_prefix.lower())}"
                        metadata_writer.writerow([file_name, prompt, abs_audio_file])
                    
        # Move the files and update metadata
        for folder, files in zip([train_folder, val_folder, test_folder], [train_files, val_files, test_files]):
            metadata_path = os.path.join(folder, 'metadata.csv')
            with open(metadata_path, 'w', newline='') as metadata_file:
                writer = csv.writer(metadata_file)
                writer.writerow(['file_name', 'text', 'audiofile'])  # Write the header
                move_files_and_update_metadata(files, folder, writer)
                
    def preprocess(self, target_sr):
        for folder in os.listdir(self._dataset_path):
            folder_path = os.path.join(self._dataset_path, folder)  # Full path to the folder
            self._preprocessor.resample_folder(folder_path, target_sr)
            self._preprocessor.min_max_normalise_folder(folder_path)
            self._preprocessor.wav_to_spec_folder(folder_path)

    def create_dataset(self, target_sr, train_split, var_split):
        assert train_split + var_split <= 1, "Train and validation split must be less than 1"
        
        # 1. Create folders for each class
        self.folder_setup()

        # 2. Copy files for each class
        self.copy_files()
        
        # 3. Apply preprocessing (resampling and min_max_norm) for each class and create spectrograms
        self.preprocess(target_sr)

        # 4. Generate metadata for each class
        self.generate_metadata()

        # 5. Combine all classes into a single dataset folder
        self.combine_data()

        # 6. Split data into train, test and validation sets
        self.split_data(train_ratio=train_split, val_ratio=var_split)


In [None]:
# Get list of target classes

# Create objects

# Create dataset