# Functions

## Metadata Cleaning

In [5]:
def split_values(str_input, delimiter):
    """
    Purpose:
    Convert an input such as "25-50 g" to a single
    integer value that is the average of the numbers
    (37.5 in this case).
    
    Input:
        str_input = the string to convert
        delimiter = what to split the string by
        
    Output:
        mean_value
    """
    # Split by the delimiter
    sep_values = str_input.split(delimiter)
    
    # Convert each value to an integer
    sep_values = [int(val) for val in sep_values]
    
    # Calculate the mean value
    mean_value = mean(sep_values)
    
    return mean_value

In [6]:
def clean_pd(value):
    """
    Purpose:
    - Clean numerical 'X_pd' columns.
    - To be used with `apply()`.
    
    Pipeline:
    - Convert to a string for manipulation.
    - Strip non-numerical values.
    - Calculate averages as required.
    - Return an int value.
    """
    
    # Values which do not need cleaning
    try:
        return float(value)
    
    # Values which need cleaning
    except:
        # Convert value to a string
        value = str(value)

        ### REMOVE STRINGS ###
        # Check if 'week' is in the value
        if ' week' in value:
            
            # Variation #1: per week
            if ' per week' in value:
                # Remove the ' per week' string
                converted = value.replace(" per week", "").strip()

            # Variation #2: for week
            elif ' for week' in value:
                # Remove the ' for week' string
                converted = value.replace(" for week", "").strip()
            
            # Check for a dash
            if '/' in converted:
                result = split_values(converted, "/")
            else:
                result = converted
            
            # Get the per day value
            per_day = round(int(result)/7, 2)

            return per_day
        
        # Check if ' for mounth' is in the value
        elif ' for mounth' in value:
            
            # Remove the ' for mounth'
            converted = value.replace(" for mounth", "")
            
            # Check for a dash
            if '-' in converted:
                result = split_values(converted, "-")
            else:
                result = converted
            
            # Get the per day value
            per_day = round(float(result)/30,  2)
            
            return per_day
        
        # Check if '/ month' is in the value
        elif '/ month' in value:
            
            # Convert to a list delimited by a space
            space_list = value.split(" ")
            
            # Get the per day value
            per_day = int(space_list[0])/30
            
            return per_day
        
        # Check if 'g' is in the value:
        elif 'g' in value:
            
            # Find the index of 'g'
            g_index = value.find('g')
            
            # Strip the remainder and whitespace
            converted = value[:g_index].strip()
            
            # Check for delimiters
            if '-' in converted:
                result = split_values(converted, "-")
            elif '/' in converted:
                result = split_values(converted, "/")
            else:
                result = converted

            return result
        
        ### CALCULATE AVERAGES ###
        elif '/' in value:
            return split_values(value, "/")
        
        elif '-' in value:
            return split_values(value, "-")
        
        ### OTHER CLEANING ###
        elif ',' in value:
            return float(value.replace(",", "."))
        
        # Catchall
        else:
            return value

In [7]:
def imputation(df, cat_col, num_col):
    """
    Purpose:
    Check whether the missing data should be
    imputed with a mean or a median.
    
    If the absolute skew score > 0.5, MEDIAN.
    Else, not skewed, MEAN.
    
    Input:
    df = dataframe
    cat_col = categorical column
    num_col = numerical column
        
    Output:
        histogram
        skewness score
    """
    
    # Get the unique values from the categorical column
    unique_values = df[cat_col].unique()
    print(unique_values)
    
    # Loop through each unique value
    for unique in unique_values:
        
        # If the value is 'never', set the value to `0`
        df.loc[df[cat_col] == 'never', num_col] = 0
        
        # Get the non-null values per unique
        check_condition = (df[cat_col] == unique) & (~df[num_col].isnull())
        check_data = df.loc[check_condition, num_col]
        
        # Check for unique values with 'nan' values
        check_null = df.loc[df[cat_col] == unique, num_col].value_counts(dropna=False).index
        
        # Skip if all values valid
        if (np.nan not in check_null) or (len(check_data) == 0):
            continue
        
        else:
            # Convert values for plotting
            check_data = check_data.astype(float)
            
            # Histogram
            check_data.hist()
            
            # Clean title
            title = cat_col.replace("_", " ").title()
            plt.title(f'{title} ({unique})')
            plt.xlabel(f'{title} per Day')
            plt.ylabel('Frequency')
            
            # Calculate the skewness
            skew_score = skew(check_data)
            
            # Display the results
            plt.show()
            
            # Check the skew score
            if abs(skew_score) > 0.5: # skewed = use median
                chosen_value = df.loc[check_condition, num_col].astype(float).median()
                print(f'Skewness: {round(skew_score,2)}, use MEDIAN.')
                print(f'Median value for "{unique}": {round(chosen_value, 2)}')
            
            else:
                chosen_value = df.loc[check_condition, num_col].astype(float).mean()
                print(f'Skewness: {round(skew_score,2)}, use MEAN.')
                print(f'Mean value for "{unique}": {round(chosen_value, 2)}')
            
            # Round the chosen value
            rounded_value = round(chosen_value, 2)
            
            # Impute the missing values
            null_condition = (df[cat_col] == unique) & (df[num_col].isnull())
            df.loc[null_condition, num_col] = rounded_value

In [8]:
def fruit_gram(value):
    """
    Purpose:
    - Convert gram-value to number of fruits.
    - To be used with `apply()`.
    """
    
    # Define a standard serve
    std_serve = 150
    
    # Values which do not need cleaning
    try:
        # Look for values greater than 5
        if float(value) > 5:

            # Convert to number of fruits
            num_fruits = round(int(value)/std_serve, 2)

            return num_fruits
        
        # Catchall
        else:
            return float(value)
    
    except:
        # Look for 'gramme'
        if ' gramme' in value:
            converted = value.replace(" gramme", "").strip()

            # Convert to number of fruits
            num_fruits = round(int(converted)/std_serve, 2)

            return num_fruits

        # Catchall
        else:
            return value

## Feature Engineering

In [4]:
def resize_option(new_width, new_height, input_dir_path, output_dir_name):
    """
    PURPOSE: Resize a set of images
    """
    
    # Define the input files
    input_files = os.listdir(input_dir_path)
    
    # Loop through each file
    for image_name in tqdm(input_files, desc=f"Resizing spectrograms ({output_dir_name})"):
        
        # Only read .png files
        if image_name.endswith(".png"):
            
            # Open the image file
            img = pil_Image.open(input_dir_path + image_name)

            # Resize
            resized = img.resize((new_width, new_height))

            # Create a new figure
            plt.figure(figsize=(new_width / 100, new_height / 100))

            # Plot the resized image
            plt.imshow(resized)

            # Define the filename
            filename = image_name.split(".")[0]

            # Remove labels and border
            plt.tight_layout()
            plt.axis('off')

            # Export image
            plt.savefig(
                f'../resources/spectrograms/{output_dir_name}/{filename}.png',
                bbox_inches = 'tight',
                pad_inches = 0
            )

            # Close the figure to avoid runtime warning
            plt.close()

In [5]:
def spec_to_csv(original_path, output_name):
    """
    PURPOSE: Convert spectrograms to RGBA CSV files.
    """
    
    # Define the resized files
    resized_files = os.listdir(original_path)
    
    # Initialise lists to hold the dictionaries
    spectro_list = []
    id_list = []
    r_list = []
    g_list = []
    b_list = []
    a_list = []
    
    ### PARSE TO LISTS ###
    # Loop through each image
    for resized_image in resized_files:
        
        # Only read .png files
        if resized_image.endswith(".png"):
            
            # Initialise a dictionary to hold the pixels
            spectro_dict = dict()

            # Open the image file
            img = pil_Image.open(original_path + resized_image)

            # Convert image to array format
            img_array = img_to_array(img)

            # Add image attributes and array to dictionary
            spectro_dict['id'] = resized_image.split(".")[0]
            spectro_dict['format'] = img.format
            spectro_dict['mode'] = img.mode
            spectro_dict['width_px'] = img.width
            spectro_dict['height_px'] = img.height

            # Append the dictionary to the list
            spectro_list.append(spectro_dict)

            # Append the 'id' for use as an index later
            id_list.append(resized_image.split(".")[0])

            # Populate the RGBA lists
            r_list.append(img_array[:, :, 0].flatten().astype(int))
            g_list.append(img_array[:, :, 1].flatten().astype(int))
            b_list.append(img_array[:, :, 2].flatten().astype(int))
            a_list.append(img_array[:, :, 3].flatten().astype(int))

    # Create a list of RGBA lists
    rgba_list = [r_list, g_list, b_list, a_list]

    
    ### SPECTROGRAM METADATA ###  
    # Convert the dictionary to a JSON
    json_data = json.dumps(spectro_list, indent=2)

    # Specify the file path within your repository
    file_path = '../voice_app/assets/spec_metadata.json'

    # Export JSON data to a file
    with open(file_path, 'w') as json_file:
        json_file.write(json_data)
        
    
    ### EXPORT TO CSV ###
    # Colour reference list
    colours = ['r', 'g', 'b', 'a']
    
    # Loop through each file in the RGBA list
    for idx, colour_list in tqdm(
        enumerate(rgba_list),
        desc = f"Exporting as CSV ({output_name})"):
        
        # Create a dataframe for each channel
        df = pd.DataFrame(colour_list)
        
        # Use the 'id' as the index
        df.index = id_list
        
        # Export to CSV
        df.transpose().to_csv(
            f'../resources/clean_data/spectrogram/{output_name}{colours[idx]}val.csv',
            encoding = 'utf8',
            index = False
        )

## Machine Learning

In [10]:
def limit_unique(df, max_value, columns_to_limit):
    """
    Purpose of the function is to limit the number of unique values
    """
    
    # Loop through each column
    for col in columns_to_limit:
        # Get the value counts of the column
        total_counts = df[col].value_counts()
        
        # Get the top values to retain, not including "Other"
        top_counts = total_counts[:max_value-1]
        
        # Define the cutoff
        cutoff_value = top_counts.iloc[-1]
        
        # Create a list of values to replace
        replace_values = total_counts.loc[total_counts.values < cutoff_value].index
        
        # Replace in dataframe
        for value in replace_values:
            df[col] = df[col].replace(value, "other")
        
        # Check to make sure binning was successful
        print(df[col].value_counts())
        print(f'Number of unique values: {df[col].nunique()}\n')

In [2]:
def encode_binary(value):
    """
    Purpose:
    - Encode 'healthy' to '0'
    - All other options to '1'
    """
    
    if value == 'healthy':
        return 0
    else:
        return 1

In [3]:
def encode_multi(value):
    """
    Purpose:
    - Encode 'healthy' to '0'
    - Encode 'reflux laryngitis' to '1'
    - Encode 'hypokinetic dysphonia' to '2'
    - Encode 'hyperkinetic dysphonia' to '3'
    """
    
    if value == 'healthy':
        return 0
    elif value == 'reflux laryngitis':
        return 1
    elif value == 'hypokinetic dysphonia':
        return 2
    elif value == 'hyperkinetic dysphonia':
        return 3

In [4]:
def create_model(hp):
    nn_model = Sequential()
    
    # Choose activation function in hidden layers
    activation_first_hidden = hp.Choice('activation_layer_0', activation_functions)
    
    # Allow kerastuner to decide number of neurons in first layer
    nn_model.add(Dense(
        units = hp.Int(
            'units_layer_0',
            min_value = 1,
            max_value = max_num_neurons,
            step = step_count),
        activation = activation_first_hidden,
        kernel_regularizer = reg_kernel,
        input_dim = number_input_features
    ))
    
    # Allow kerastuner to decide number of hidden layers and neurons in hidden layers
    num_layers = hp.Int('num_layers', 1, max_hidden_layers-1) # options: 1, 2
    
    for i in range(1, num_layers+1): # i-values: 1, 2 only
        # Choose the number of neurons per layer
        units_layer_i = hp.Int(
            f'units_layer_{i}',
            min_value = 1,
            max_value = max_num_neurons,
            step = step_count
        )
        
        # Choose a different activation function for each layer
        activation_layer_i = hp.Choice(f'activation_layer_{i}', activation_functions)

        nn_model.add(Dense(
            units = units_layer_i,
            activation = activation_layer_i,
            kernel_regularizer = reg_kernel
        ))

    # Add the output layer
    nn_model.add(Dense(
        units = output_layer_neurons,
        activation = output_layer_activation
    ))

    # Compile the model
    nn_model.compile(
        loss = compile_loss,
        optimizer = compile_opt,
        metrics = ["accuracy"]
    )
    
    return(nn_model)