In [1]:
import pandas as pd
import numpy as np
from pydub.playback import play
import matplotlib.pyplot as plt
from scipy.io import wavfile as wav
import statistics
import os
from IPython.display import display, HTML
import wave
import contextlib
from datetime import date
import librosa
from pcm_extraction import *
from dropbox_file_handling import *
from prev_failed_sku_handling import *
# from airtable_df_handling import *
import wavio

## Get the skus of the programs that have already successfully had PCM Data extracted

In [2]:
def get_date():
    today = date.today()

    d3 = today.strftime("%m_%d_%y")
    return d3

In [3]:
def get_completed_skus_from_csv(full_csv:str):
    """
        Given the path of a csv of a previous sessions export, retrieves all of the skus that were completed
        during that previous session. These will not have to be executed over again. 
        @Returns: the list of str of all skus that the program has executed over previously
    """
    
    # If there have been no previous csvs exported, then the initialized "./pcms/initialized_0_0_0.csv" is passed
    # if so, return an empty list of completed skus
    if full_csv == './pcms/initialized_0_0_0.csv':
        return []
    else:
        full_pcm_df = pd.read_csv(full_csv)
        return list(full_pcm_df['sku'].values)


# Formatting Frequency List
### Goal: normalize so that all are in range (-1,1), shorten down to 400 given averages.

In [4]:
def split_into_equal_parts(lst, n=400):
    """
        separate a list into n (mostly) equal parts. Last index may contain less. 
    """
    # decrease n size for indexing purposes
    n -= 1
    incr = int(len(lst)/n)
    div_list = []
    i = 0
    while i+incr < len(lst):
        div_list.append(lst[i: i+incr])
        i += incr
    div_list.append(lst[i:-1])
    return div_list

In [5]:
def get_mean_of_sublists(lst):
    """
        given list of with sublists containing numbers, creates a new array containing only the averages
        of each of those sublists
        Notably bypasses the error if the length of the sublist is 0
    """
    avg_list = []
    for sublist in lst:
        #if the sublists length is not zero, add the mean of the sublist
        if len(sublist) != 0:
            #with librosa the sublist will be float32 objects, must convert all to float before getting mean
            float_list = [float(num) for num in sublist]
            avg_list.append(statistics.mean(float_list))
        else:
            # append zero if there are no elements in the sublist
            avg_list.append(0)
    return avg_list

def get_avg_of_list(lst_of_lst):
    """
        given a list of lists that contain floats, get the average of each sublist and return it as a new list
    """
    abs_list = [[abs(num) for num in lst] for lst in lst_of_lst]
    new_list = get_mean_of_sublists(abs_list)
    return new_list

In [6]:
def zeroify_edges(lst):
    """
        takes in a list, then returns the same list but replacing the 1st and last elements with zeros
    """
    lst[0] = 0.0
    lst[-1] = 0.0
    return lst

def normalize_list(lst):
    """
        returns new list where each element of the list has been divided by the maximum
    """
    lst_max = max(lst)
    normal_list = [round(ele/lst_max, 2) for ele in lst]
    # for visualization purposes, make the first and last elements 0.0
    normal_list = zeroify_edges(normal_list)
    return normal_list


def alternate_amplitude(amp_list):
    """
        given list of amplitudes, returns the same values, but alternating between positive and negative
        destructively edits the list
    """
    for i in range(len(amp_list)):
        # if its an odd index, replace with the negative number. 
        if i%2 == 1 and amp_list[i] != 0:
            amp_list[i] = round(amp_list[i]*-1,2)
    return amp_list

In [7]:
def get_pcm_amp_list_from_data(data, size):
    """
        given data, which is a list of all the amplitude values, returns a list of 
        normalized, alternating, averaged amplitudes
    """
    split_data = split_into_equal_parts(data, size)
    avg_list = get_avg_of_list(split_data)
    norm_list = normalize_list(avg_list)
    pcm_amp_list = alternate_amplitude(norm_list)
    return pcm_amp_list

In [8]:
def average_lists(lst_1:list, lst_2:list):
    """
        Averages each element in list corresponding to index
        Intended to average together the L and R waveforms generated
    """
    combined_list = [lst_1, lst_2]
    avg_lst = [round((x+y)/2, 2) for x,y in zip(*combined_list)]
    return avg_lst
        

In [9]:
def get_string_of_list(lst):
    """
        represents the list of floats as a string. Used for visualizing without line breaks
    """
    str_list = [str(ele) for ele in lst]
    joined_str = ','.join(str_list)
    return '['+joined_str+']'

In [10]:
def turn_s_list_to_f_list(s:str):
    """
        Takes a string that can be converted into a list, and separates them along commas, then converts each
        Element in the list into a float and returns a list
        
        @Params: string with brackets and commas separating each element which can be converted to float
        @Returns list of floats
    """
    
    #remove the brackets which would be at the beginning and end
    s = s[1:-1]
    s_list = s.split(',')
    #convert all elements to float
    f_list = [float(s) for s in s_list]
    return f_list

# Executing the program over a directory

In [11]:
def format_duration(duration:float):
    """
        @Params duration in seconds as a float
        @Returns a representation of those seconds as mm:ss for formatting purposes
    """
    minutes = int(duration//60)
    seconds = int(duration%60)
    # if there are less than 10 seconds, then we have to append the 0 beforehand. We'll convert to str in here
    if seconds < 10:
        seconds = '0'+str(seconds)
    else:
        seconds = str(seconds)
    return str(minutes)+':'+seconds

def get_duration_from_wav_file(sku, wav_file:str):
    """
        given a string of the location of a wav file, returns the duration of that wav file in seconds (int)
    """
    try:
        with contextlib.closing(wave.open(wav_file,'r')) as f:
            frames = f.getnframes()
            rate = f.getframerate()
            duration = frames / float(rate)
            return int(duration)
    except:
        failed_skus.append(sku)
        return 0

In [12]:


def get_sku_from_wav_file(wav_file:str):
    """
        given an abosolute path, will isolate just the SKU that is at the front of the filename
    """
    filename = wav_file.split('/')[-1]
    #isolate from .wav by splitting at the '.' and taking the first part
    title = filename.split('.')[0]
    # get only the SKU, as in nothing after the first '_' if there is one
    sku = title.split('_')[0]
    return sku

def get_pcm_data_from_wav_file_w_librosa(sku, wav_file:str, sample_rate=12800):
    """
        Identical function to "get_pcm_from_wav_file", but uses the librosa library instead, also changes sample
        rate to decrease the total number of values in the array. 
        
        @params: wav_file as a string of the absolute path of a wav file, sku of the file as a str, sample rate
        for getting wav amplitudes, defaults to 200 (very low resolution)
        @returns: list of floats rounded to 2 decimal places
    """
#     try:
    #wav amps is the list of amplitudes at 
    wav_amps, sr = librosa.load(wav_file, mono=True, sr=sample_rate)
    # 400 is the length of the array that is returned
    pcm_data = get_pcm_amp_list_from_data(wav_amps, 400)
    return pcm_data
#     except:
    print(sku+': Failed')
    failed_skus.append(sku)
    return []
    
    
def get_pcm_data_from_wav_file_test(sku, wav_file:str, stereo=False):
    #data here has both the R and L channels
    data = wavio.read(wav_file).data
    #get the waveform data from the right channel
    dataR = [datum[0] for datum in data]
    pcm_data_R = get_pcm_amp_list_from_data(dataR, 400)

    # if we want the stereo version, then we'll also take in the left track and average the two
    if stereo:
        dataL = [datum[1] for datum in data]
        pcm_data_L = get_pcm_amp_list_from_data(dataL, 400)
        pcm_data_full = average_lists(pcm_data_R, pcm_data_L)
        return pcm_data_full
    else:
        return pcm_data_R

def get_pcm_data_from_wav_file(sku, wav_file:str, stereo=True):
    """
        given an absolute path of a wav file, returns a list of 400 numbers between 0 and 1 that represent the
        waveform of the file. Normally only returns the waveform data for the Right channel, 
        but if stereo is set to true, it will average both left and right. 
        
        @params: wav_file as a string of the absolute path of a wav file, stereo, true if average of channel is sought
        @returns: list of floats rounded to 2 decimal places
    
    """
    try:
        wavio_file = wavio.read(wav_file)
        data = wavio_file.data
        dataR = [datum[0] for datum in data]
        pcm_data_R = get_pcm_amp_list_from_data(dataR, 400)

        # if we want the stereo version, then we'll also take in the left track and average the two
        if stereo:
            dataL = [datum[1] for datum in data]
            pcm_data_L = get_pcm_amp_list_from_data(dataL, 400)
            pcm_data_full = average_lists(pcm_data_R, pcm_data_L)
            return pcm_data_full
        else:
            return pcm_data_R
    except:
        try:
            #if there is an error with the wav.read, next try the librosa.read function
            print('File Extracted w Librosa')
            return get_pcm_data_from_wav_file_w_librosa(sku, wav_file)
        except:
            #if that also fails, return an empty array and append the value to the failed skus
            failed_skus.append(sku)
            return []

In [13]:
def get_dict_representation_of_row(sku, pcm, duration):
    """
        Given the sku, pcm and duration (ORDER MATTERS), will return a dictionary with the keys being the
        column names and the values being the input. 
        So that dict can be appended to dataframe
    """
    new_row = {}
    new_row['sku'] = sku
    new_row['meta:wlk_pcm_data'] = pcm
    new_row['meta:wlk_track_length'] = duration
    return new_row

def create_series_from_data(cols:list, data:list):
    """
        Given a list of cols and a list of data, which correspond to those cols based on index, 
        returns a pd.Series object that can be easily added to a dataframe
    """  
    row = pd.Series()
    #make sure that the two lists are of equal length, otherwise they cannot correspond
    if len(cols) != len(data):
        #if not, return the empty series
        return row
    for i in range(len(cols)):
        col = cols[i]
        col_datum = data[i]
        row[col] = col_datum
    return row

In [14]:
def create_df_w_pcm_data(wav_files:list, cols, index_col, completed_skus:list):
    """
        given a list where each element is the absolute path of a wav file file, get the pcm data for that wav
        file and save it to a dataframe. 
        Only executes over the skus that have not been completed during a previous run.
        @Params: completed_skus is a list that contains all the skus that the program has already completed. 
    """
    df = pd.DataFrame(columns=cols)
    for wav_file in wav_files:
        sku = get_sku_from_wav_file(wav_file)
        # only execute the rest of this if the sku hasn't already been completed before
        if sku not in completed_skus:
            pcm_data = get_pcm_data_from_wav_file_w_librosa(sku, wav_file)
            duration = get_duration_from_wav_file(sku, wav_file)
            # bundle all data together in a list so that it can be passed to create a series
            data = [sku, pcm_data, duration]
            new_row = create_series_from_data(cols, data)
            df = df.append(new_row, ignore_index=True)
            #add to the global variable of current completed skus
            curr_completed_skus.append(sku)
            
        
    #set the index col before returning 
    df.set_index(index_col, inplace=True)
    return df

def export_df_as_csv(df, title='pcm_data_'):
    """
        given dataframe, it exports it as a csv to the current directory with current date
    """
    subdirectory = './pcms/'
    file_designation = title
    date = get_date()
    filename = subdirectory+file_designation+date+'.csv'
    df.to_csv(filename)
    
def export_completed_skus_as_csv(curr_completed_skus, old_completed_skus):
    """
        Takes in the old list of completed skus and the new list of completed skus and exports 
        a csv in the current directory with the name "completed_skus.csv". Will overwrite the old version
    """
    all_completed_skus = curr_completed_skus+old_completed_skus
    # we will first convert to a pd dataframe before exporting as a csv
    completed_dict = {'sku':all_completed_skus}
    complete_skus_df = pd.DataFrame.from_dict(completed_dict)
    filename = 'completed_skus.csv'
    complete_skus_df.to_csv(filename)


def get_pcm_data(path, sku_range=0, after=True):
    """
        Main method
        Takes in the path of the directory and will execute through all wav files in that directory and export a
        CSV containing each tracks pcm data and track length with its corresponding SKU
    
    """
    all_wav_files = get_filenames_in_range(path, sku_range, after)
    cols = ['sku', 'meta:wlk_pcm_data', 'meta:wlk_track_length']
    
    # get the most recent export in the pcms folder so that we only have to execute program for new files
    prev_exports_dir = './pcms/'
    most_recent_export_path = prev_exports_dir+get_most_recent_file_in_dir(prev_exports_dir)
    completed_skus = get_completed_skus_from_csv(most_recent_export_path)
    
    # if there has never been a previous export, then skip reading the previous export as a df
    if not completed_skus:
        full_pcm_df = create_df_w_pcm_data(all_wav_files, cols, 'sku', completed_skus)
    else:
        #create new df with all the wav files that were executed over last time
        df_prev = pd.read_csv(most_recent_export_path)
        #reset the index to sku on the previous one for concatenating later
        df_prev.set_index('sku', inplace=True)  
        df_new = create_df_w_pcm_data(all_wav_files, cols, 'sku', completed_skus)
        full_pcm_df = pd.concat([df_new, df_prev])
    #sort by the skus
    full_pcm_df.sort_values('sku', inplace=True)
    export_df_as_csv(full_pcm_df)
    # if there were previous executions to pull from, return the df of the new ones added
    return df_new

In [15]:
def get_filename_from_dir_w_sku(directory, sku):
    """
        @Params the sku of the desired file
        @Returns the full path of the file including filename corresponding to that sku
    """
    all_files = get_all_filenames(directory)
    for file in all_files:
        if sku in file:
            return file

def get_all_filenames(directory):
    all_wavs = []
    for filename in os.listdir(directory):
        if filename.endswith(".wav"): 
             # print(os.path.join(directory, filename))
            all_wavs.append(directory+filename)
    return all_wavs
  
def get_sku_num_from_filename(filename:str):
    """
        Retrieves the sku number as an integer based on the filename in the standard format
        
        @Params: string of a filename where everything before the first '-' is an sku, as an int
    """
    sku_num = filename.split('-')[1]
    return int(sku_num)

def get_sku_from_filepath(filepath:str):
    """
        Retrieves the sku from the full filepath of a wav.
        @Returns: the sku in front of the filename
    """
    filename = filepath.split('/')[-1]
    sku_num = get_sku_num_from_filename(filename)
    return 'TR-'+str(sku_num)

def get_filenames_in_range(directory, sku_max, after=True):
    """
        
    """
    all_wavs = []
    for filename in os.listdir(directory):
        if filename.endswith(".wav"):
            #if after is true, then only get files with sku greater than the sku specified
            if after:
                if get_sku_num_from_filename(filename) >= sku_max:
                    all_wavs.append(directory+filename)
            else:
                if get_sku_num_from_filename(filename) <= sku_max:
                    all_wavs.append(directory+filename)
    return all_wavs

In [16]:
def get_date_from_file(filename:str):
    """
        Given a file as a string, returns just the date that is appended at the end, before the file classifier
        @Returns as a list of the 2 numbers with [[0]month,[1]day,[2]year]
    """
    #remove the .filetype at the end
    file = filename.split('.')[-2]
    #Get only the last 3 numbers which are the Month, Day, Year
    date = file.split('_')[-3:]
    date_as_ints = [int(day) for day in date]
    return date_as_ints

def compare_dates(date1:list, date2:list):
    """
        Date1 and Date2 are lists containing elements where [0] is month, [1] is day, [2] is year
        Returns the date that is most recent. 
    """
    if date1[2] > date2[2]:
        return date1
    elif date1[2] < date2[2]:
        return date2
    #if the years are the same
    else:
        #compare the months
        if date1[0] > date2[0]:
            return date1
        elif date1[0] < date2[0]:
            return date2
        else:
            #compare days
            if date1[1] > date2[1]:
                return date1
            #if they are equal, then it doesn't matter which one is returned
            else:
                return date2            

def compare_file_by_date(file1:str, file2:str):
    """
        Given two filenames, returns the file that was published at a later date
    """
    file1_date = get_date_from_file(file1)
    file2_date = get_date_from_file(file2)
    later_date = compare_dates(file1_date, file2_date)
    if later_date == file1_date:
        return file1
    else:
        return file2
    

def get_most_recent_file_in_dir(directory):
    """
        Given a directory with a bunch of files that have the date appended to the end of them (preceding
        the file type), returns the file in that directory with the latest date
    """
    # the initialized file has to be overwritten immediately. Must be in the format, but must be equivalent to 0
    initialized_date = 'initialized_0_0_0.csv'
    most_recent = initialized_date
    for filename in os.listdir(directory):
        # There is the .ds store file in this folder, it must be ignored. 
        if filename != '.DS_Store':
            most_recent = compare_file_by_date(filename, most_recent)
    return most_recent
    

# Fixing previously failed skus

In [17]:
def get_previously_failed_skus(prev_df):
    """
        Goes through the previously completed export of a dataframe and returns all skus where the
        pcm value was an empty list
        @Returns: list of previously failed skus as strings
    """
    #initialize an empty list of the previously failed skus
    missed_skus = []
    for index, row in prev_df.iterrows():
        if row['meta:wlk_pcm_data']=='[]':
            missed_skus.append(row['sku'])
    return missed_skus

def get_filepaths_of_prev_failed_skus(prev_failures:list):
    """
        Given a list of previously failed skus (strings), returns a list of the full filepath of 
        each of their corresponding files
    """
    failed_file_paths = []
    for failed_sku in prev_failures:
        failed_file_paths.append(get_filename_from_dir_w_sku(path, failed_sku))
    return failed_file_paths

def get_dict_of_retried_skus(prev_failed_skus:list):
    """
        Given the skus of the skus that failed in the previous export, retries to extract pcm data with the
        librosa method instead. 
        @Returns Dicionary where key is the sku of the file, and value is the list of PCM data
    """
    path = '/Users/dsaphra/Dropbox/_TuneReel/Library/WAV Retitled/'
    failed_filepaths = get_filepaths_of_prev_failed_skus(prev_failed_skus)
    retried_file_pcms = {}
    for failed_file_path in failed_file_paths:
        file_sku = get_sku_from_filepath(failed_file_path)
        print(failed_file_path)
        file_pcm = get_pcm_data_from_wav_file_w_librosa(file_sku, failed_file_path)
        retried_file_pcms[file_sku] = file_pcm
    return retried_file_pcms
 
def add_retried_file_pcms_to_prev_dataframe(prev_df, new_pcm_dict):
    """
        Easiest way to reincorporate the successful pcm extractions is to fill in the meta:wlk_pcm_data column in 
        the previous dataframe with the newly added pcm data
        @Returns: new dataframe, identical but with updated pcm cells for previously failed skus
    """
    new_pcm_skus = list(new_pcm_dict.keys())
    df = prev_df.set_index('sku')
    display(df.at['TR-3466-A','meta:wlk_pcm_data'])
    for sku in new_pcm_skus:
        # All the skus added at this point (might be temporary) had the suffix -A, so it needs to be added
        df_sku = sku+'-A'
        df.at[df_sku, 'meta:wlk_pcm_data'] = new_pcm_dict[sku]
    return df
        
    
def retry_previous_failures(prev_df_csv:str):
    """
        Go through the most previous export, and if there are any rows that ended up with empty values under
        pcm data, retry to extract the PCM data from those files
        @Params: Previous df as a csv file
    """
    prev_df = pd.read_csv(prev_df_csv)
    prev_failed_skus = get_previously_failed_skus(prev_df)
    retried_pcm_dict = get_dict_of_retried_skus(prev_failed_skus)
    updated_df = add_retried_file_pcms_to_prev_dataframe(prev_df, retried_pcm_dict)
    export_df_as_csv(updated_df)
    return updated_df
 

In [20]:
def main():
    path = '/Users/dsaphra/Dropbox/_TuneReel/Library/WAV Retitled/'
    new_df = get_pcm_data(path, 0)
    display(new_df)

In [21]:
failed_skus = []
#curr completed skus is the skus that this session of the program have been completed. Will be exported at the end
curr_completed_skus = []
main()

FileNotFoundError: [Errno 2] No such file or directory: '/Users/dsaphra/Dropbox/_TuneReel/Library/WAV Retitled/'

In [20]:
test = get_pcm_data_from_wav_file_w_librosa('TR-1002-A', 'TR-1007-A_-hi-tech.wav', 44100)

for ele in test:
    print(str(ele)+',', end='')

0.0,-0.54,0.26,-0.18,0.36,-0.37,0.13,-0.06,0.52,-0.29,0.2,-0.27,0.44,-0.2,0.1,-0.38,0.39,-0.23,0.21,-0.47,0.28,-0.14,0.25,-0.4,0.27,-0.23,0.53,-0.33,0.21,-0.13,0.51,-0.36,0.31,-0.43,0.58,-0.39,0.25,-0.6,0.47,-0.4,0.39,-0.62,0.48,-0.34,0.47,-0.65,0.49,-0.37,0.62,-0.41,0.41,-0.42,0.57,-0.52,0.5,-0.63,0.56,-0.66,0.57,-0.84,0.93,-0.67,0.75,-0.48,0.54,-0.37,0.49,-0.54,0.38,-0.3,0.47,-0.42,0.33,-0.39,0.44,-0.41,0.31,-0.39,0.47,-0.32,0.33,-0.51,0.54,-0.32,0.34,-0.46,0.19,-0.1,0.37,-0.42,0.39,-0.32,0.46,-0.44,0.36,-0.42,0.42,-0.43,0.36,-0.44,0.46,-0.35,0.36,-0.47,0.44,-0.4,0.36,-0.47,0.41,-0.37,0.39,-0.38,0.29,-0.3,0.38,-0.41,0.59,-0.75,0.83,-0.5,0.73,-0.61,0.45,-0.62,0.75,-0.59,0.92,-0.71,0.77,-0.73,0.71,-0.94,0.81,-0.67,0.66,-0.71,0.53,-0.57,0.51,-0.92,0.83,-0.77,0.71,-0.72,0.78,-0.84,0.81,-0.58,0.4,-0.58,0.4,-0.4,0.43,-0.62,0.68,-0.54,0.8,-0.54,0.52,-0.7,0.68,-0.7,0.4,-0.67,0.43,-0.37,0.49,-0.51,0.69,-0.63,0.53,-0.45,0.42,-0.52,0.73,-0.81,0.92,-0.47,0.95,-0.72,0.74,-0.47,0.95,-0.84,0.36,-0.

In [21]:
test = get_pcm_data_from_wav_file_test('TR-1002-A', 'TR-1007-A_-hi-tech.wav')

for ele in test:
    print(str(ele)+',', end='')

0.0,-0.59,0.27,-0.15,0.31,-0.34,0.12,-0.06,0.48,-0.29,0.19,-0.22,0.4,-0.2,0.1,-0.35,0.37,-0.22,0.16,-0.43,0.27,-0.13,0.23,-0.37,0.25,-0.19,0.47,-0.3,0.2,-0.12,0.48,-0.38,0.3,-0.37,0.53,-0.37,0.25,-0.55,0.47,-0.36,0.35,-0.58,0.47,-0.32,0.46,-0.63,0.5,-0.37,0.57,-0.4,0.42,-0.45,0.6,-0.57,0.52,-0.57,0.52,-0.64,0.68,-0.87,0.9,-0.66,0.75,-0.49,0.51,-0.38,0.45,-0.44,0.33,-0.28,0.4,-0.4,0.31,-0.4,0.44,-0.4,0.29,-0.36,0.45,-0.3,0.34,-0.44,0.48,-0.29,0.31,-0.39,0.21,-0.12,0.37,-0.41,0.38,-0.31,0.43,-0.45,0.35,-0.43,0.39,-0.4,0.33,-0.4,0.41,-0.33,0.35,-0.44,0.41,-0.41,0.33,-0.43,0.41,-0.38,0.43,-0.36,0.36,-0.36,0.41,-0.56,0.58,-0.78,0.76,-0.57,0.79,-0.64,0.5,-0.65,0.75,-0.73,0.91,-0.74,0.74,-0.8,0.69,-0.91,0.93,-0.68,0.68,-0.82,0.58,-0.55,0.53,-1.0,0.82,-0.71,0.81,-0.84,0.82,-0.87,0.79,-0.64,0.42,-0.63,0.44,-0.4,0.45,-0.66,0.71,-0.55,0.91,-0.57,0.65,-0.77,0.86,-0.71,0.43,-0.68,0.44,-0.36,0.44,-0.54,0.76,-0.63,0.56,-0.43,0.47,-0.56,0.74,-0.79,0.86,-0.44,0.89,-0.66,0.66,-0.42,0.86,-0.75,0.34,-0.67

In [58]:
wavy = wavio.read('TR-1002-A_-medieval.wav')

44100

In [None]:
prev_csv = get_most_recent_file_in_dir('./pcms/')

retry_previous_failures('./pcms/'+prev_csv)