In [48]:
"""Util functions"""

import re
import json
import os
import shutil

def write_json_to_file(file_path, data):
    with open(file_path, 'w') as f:
        json.dump(data, f, indent=4, sort_keys=True)
        
def load_json_from_file(file_path):
    with open(file_path) as json_file:
        return json.load(json_file)

def first_matching(iterable, predicate):
    return next(x for x in iterable if predicate(x))

def get_file_ext(file_name):
    return file_name.rsplit('.', 1)[-1]

def get_file_name(file_path):
    return file_path.rsplit('/', 1)[-1]

def filter_dict_keys(orig_dict, keys_to_keep):
    return { key: orig_dict[key] for key in keys_to_keep }

def formatted_ms(total_ms, format_str='{min}:{sec}:{ms}'):
    if not type(total_ms) == int:
        return None
    time = total_ms
    ms = int(time % 1000)
    ms_str = '{0:03d}'.format(ms)
    time -= ms
    time /= 1000
    seconds = int(time % 60)
    seconds_str = '{0:02d}'.format(seconds)
    time -= seconds
    time /= 60
    minutes = int(time)
    minutes_str = '{0:02d}'.format(minutes)
    return format_str.format(**{'min': minutes_str, 'sec': seconds_str, 'ms': ms_str})

def dot_formatted_ms(total_ms):
    return formatted_ms(total_ms, '{min}.{sec}.{ms}')

def colon_formatted_ms(total_ms):
    return formatted_ms(total_ms, '{min}:{sec}:{ms}')

def multi_replacer(*key_values):
    replace_dict = dict(key_values)
    replacement_function = lambda match: replace_dict[match.group(0)]
    pattern = re.compile("|".join([re.escape(k) for k, v in key_values]), re.M)
    return lambda string: pattern.sub(replacement_function, string)

def multi_replace(string, *key_values):
    return multi_replacer(*key_values)(string)

def create_file_path(*file_path_parts):
    file_path = ''
    if file_path_parts[0][0] == '/':
        file_path = '/'
    for part in file_path_parts[:-1]:
        file_path += part.strip('/') + '/'
    file_path += file_path_parts[-1].strip('/')
    return file_path

def path_is_subpath(path, comparison_path):
    full_path = os.path.realpath(path)
    full_comparison_path = os.path.realpath(comparison_path)
    return full_path.startswith(full_comparison_path)

def construct_dir_path(dir_path):
    try:
        os.makedirs(dir_path)
    except FileExistsError:
        pass
    
def copy_file_to_dir(src_file_path, dest_path):
    return shutil.copy(src_file_path, dest_path)
   
# replacements = ('[', '[[]'), (']', '[]]')
# multi_replace('Runaway [Explicit]', *replacements)
# SHELL_ESCAPE_CHARS = ['[', ']', '?', '!']
# multi_replace('Runaway [Explicit]', *tuple((ch, '[{}]'.format(ch)) for ch in SHELL_ESCAPE_CHARS))

In [2]:
"""SQLite helper functions"""

import sqlite3

def dict_factory(cursor, row):
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d

def get_cursor(db_file):
    conn = sqlite3.connect(db_file)
    conn.row_factory = dict_factory
    return conn.cursor()

def get_itunes_cursor():
    return get_cursor('data/itunes.db')

def get_mb_cursor():
    return get_cursor('data/musicbee.db')

def db_info(db_file):
    cursor = get_cursor(db_file)
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    return cursor.fetchall()

def column_names(cursor, table_name):
    return sorted(list(cursor.execute("SELECT * FROM {}".format(table_name)).fetchone().keys()))

def escape_query_value(value):
    return value.replace("'", "''")


In [15]:
"""
Utils to work with example tracks
"""

track_search_map = {
    'Beck': 'Debra',
    'Kanye West': 'Runaway',
    'Outkast': 'The Way You Move',
    'Chromeo': 'Opening Up (- Ce soir on danse',
}

def get_example_tracks():
    c = get_itunes_cursor()
    clause = " OR ".join([
        "(artist='{}' AND name LIKE '%{}%')".format(artist, track_name)
            for artist, track_name in track_search_map.items()
        ])
    q = "SELECT * FROM tracks WHERE {}".format(clause)
    tracks = c.execute(q).fetchall()
    if len(tracks) != len(track_search_map.keys()):
        raise Exception('Found too many or too few tracks')
    return tracks

# [(track['artist'], track['name'], ms_to_min_sec_ms(track['stop_time']))
#     for track in tracks]
# track_debra = first_matching(tracks, lambda t: t['artist'] == 'Beck')
# (ms_to_min_sec_ms(track_debra['start_time']), ms_to_min_sec_ms(track_debra['stop_time']))

In [3]:
"""File searching utility functions"""

import os
from fnmatch import fnmatch
import subprocess

ORIG_FILE_PATH = '/home/gavin/Music/chop'
CHOPPED_REL_FILE_PATH = 'chopped/'
SHELL_ESCAPE_CHARS = ['[', ']', '?', '!']

def escape_shell_string(string):
    return multi_replace(string, *tuple((ch, '[{}]'.format(ch)) for ch in SHELL_ESCAPE_CHARS))

def find_matching_file_name(partial_track_name):
    escaped_track_name = escape_shell_string(partial_track_name)
    for file in os.listdir(ORIG_FILE_PATH):
        if fnmatch(file, '*{}*'.format(escaped_track_name)):
            return file
    raise Exception("'{}' not found in {}".format(escaped_track_name, ORIG_FILE_PATH))


In [34]:
"""
ffmpeg splitter working example
"""

import subprocess

def ffmpeg_formatted_ms(total_ms):
    return formatted_ms(total_ms, '00:{min}:{sec}.{ms}')

def ffmpeg_output_file_name(file_name):
    file_name_parts = file_name.rsplit('.', 1)
    return ''.join([file_name_parts[0], ' chopped', '.', file_name_parts[1]])

def ffmpeg_time_params(start_ms=None, stop_ms=None):
    start_ms = start_ms or 0
    formatted_start = ffmpeg_formatted_ms(start_ms)
    if stop_ms:
        formatted_duration = ffmpeg_formatted_ms(stop_ms - start_ms)
    else:
        formatted_duration = None
    return formatted_start, formatted_duration

def ffmpeg_param_list(in_file, out_file, formatted_start, formatted_duration=None, overwrite=False):
    param_list = [
        'ffmpeg',
        '-i',
        in_file,
        '-y' if overwrite else '-n',
        '-vn',
        '-acodec',
        'copy',
        '-ss',
        formatted_start
    ]
    if formatted_duration:
        param_list += [
            '-t',
            formatted_duration
        ]
    param_list += [
        out_file
    ]
    return param_list

def chop_example_tracks():
    tracks = get_example_tracks()
    process_results = []
    for track in tracks:
        formatted_start, formatted_duration = ffmpeg_time_params(track['start_time'], track['stop_time'])
        file_name = find_matching_file_name(track['name'])
        full_file_path = '{}/{}'.format(ORIG_FILE_PATH, file_name)
        output_file_path = create_file_path(ORIG_FILE_PATH, CHOPPED_REL_FILE_PATH, ffmpeg_output_file_name(file_name))
        param_list = ffmpeg_param_list(full_file_path, output_file_path, formatted_start, formatted_duration)
        #print(' '.join(param_list))
        process_results.append(subprocess.run(param_list, stdout=subprocess.PIPE))

    print(dir(process_results[0]))
    print('')
    for res in process_results:
        print(' '.join(res.args))
        print('    returncode: {}'.format(res.returncode))
        print('    stdout: {}'.format(res.stdout))

# chop_example_tracks()

# ffmpeg -i '.mp3' -acodec copy -ss 00:00:08 -t 00:04:04 '.mp3'

In [5]:
"""
Serialize tracks with start/stop times to JSON
"""

PRELIM_SERIALIZED_TRACK_FILE_PATH = './data/prelim_start_stop_track_info.json'
SERIALIZE_KEYS = ['artist', 'album_artist', 'album', 'name', 'start_time', 'stop_time', 'total_time']

def serialize_orig_track(track):
    return filter_dict_keys(track, SERIALIZE_KEYS)

def serialize_start_stop_tracks():
    c = get_itunes_cursor()
    start_stop_time_tracks = c.execute("SELECT * FROM tracks WHERE start_time IS NOT NULL;")\
                              .fetchall()
    write_json_to_file(PRELIM_SERIALIZED_TRACK_FILE_PATH, [serialize_orig_track(track) for track in start_stop_time_tracks])

serialize_start_stop_tracks()

In [7]:
"""
Find tracks in MusicBee based on serialized start/stop track data
"""

SERIALIZED_TRACK_FILE_PATH = './data/start_stop_track_info.json'
TRACK_LENGTH_COMPARE_PRECISION = 0.98

def track_query(track):
    return """
        SELECT * FROM tracks WHERE (artist='{artist}' OR album_artist='{artist}')
        AND name='{name}' COLLATE NOCASE;
        """.format(
            **{'artist': escape_query_value(track['artist']), 'name': escape_query_value(track['name'])}
        )

def artist_all_track_query(track):
    return "SELECT * FROM tracks WHERE (artist='{artist}' OR album_artist='{artist}')".format(
        **{'artist': escape_query_value(track['artist'])}
    )

def find_missing_track_by_duration(cursor, track_to_find):
    artist_tracks = cursor.execute(artist_all_track_query(track_to_find)).fetchall()
    for artist_track in artist_tracks:
        track_lengths = (artist_track['total_time'], track_to_find['total_time'])
        if min(*track_lengths)/max(*track_lengths) >= TRACK_LENGTH_COMPARE_PRECISION:
            return artist_track
    return None

def serialize_target_track(orig_track, found_track):
    formatted_start, formatted_duration = ffmpeg_time_params(orig_track['start_time'], orig_track['stop_time'])
    serialized_track = filter_dict_keys(found_track, ['artist', 'name', 'album', 'location'])
    serialized_track['start'] = formatted_start
    if formatted_duration:
        serialized_track['duration'] = formatted_duration
    return serialized_track

def find_tracks_and_serialize_for_cutting():
    c = get_mb_cursor()
    start_stop_tracks = load_json_from_file(PRELIM_SERIALIZED_TRACK_FILE_PATH)
    start_stop_track_count = len(start_stop_tracks)

    missing_tracks = []
    found_tracks = []
    for track in start_stop_tracks:
        matching_tracks = c.execute(track_query(track)).fetchall()
        if len(matching_tracks) > 1:
            raise Exception("Track [{} - {}] was found {} times.".format(track['artist'], track['name'], len(matching_tracks)))
        elif len(matching_tracks) == 0:
            missing_tracks.append(track)
        else:
            found_tracks.append(serialize_target_track(track, matching_tracks[0]))

    for missing_track in missing_tracks[:]:
        found_track = find_missing_track_by_duration(c, missing_track)
        if found_track:
            found_tracks.append(serialize_target_track(missing_track, found_track))
            missing_tracks = list(filter(found_track.__eq__, missing_tracks))

    return found_tracks

write_json_to_file(SERIALIZED_TRACK_FILE_PATH, find_tracks_and_serialize_for_cutting())

In [49]:
import os
import subprocess
import urllib

EXAMPLE_TRACK_INFO_PATH = './data/start_stop_track_info.json'
BASE_FOLDER = '/media/gavin/HDD11/Users/Gavin/Music' # '/Users/gavin/dev/miniature-fiesta/'
SOURCE_FOLDER = None # 'example_files'
TARGET_FOLDER = 'chopped'
SAVE_ORIG_FOLDER = 'chopped_orig'
FULL_TRACK_PATHS = True
AUTO_OVERWRITE = False
# ubuntu/win specific
TRACK_PATH_REPLACE = ('file://localhost/D:', '/media/gavin/HDD11')

# TODO: option to overwrite chopped folder files
# TODO: general file path handling cleanup (base folder == cwd; anything different?)
# TODO: more options for source/target dirs? not worrying about base folder?
# TODO: handle other time formats?

def is_relative_path(file_path):
    return file_path[0] != '/'

def relative_track_dir_path(file_path, source_folder_name):
    # TODO: is this needed? just take the path as specified in json?
    file_path = file_path.split(source_folder_name)[-1]
    file_path = file_path.rsplit('/', 1)[0]
    return file_path.lstrip('./')

def get_file_dir_paths():
    source_dir_path = target_dir_path = save_orig_dir_path = ''
    if BASE_FOLDER:
        source_dir_path = BASE_FOLDER if not SOURCE_FOLDER else create_file_path(BASE_FOLDER, SOURCE_FOLDER) 
        target_dir_path = create_file_path(BASE_FOLDER, TARGET_FOLDER)
        if SAVE_ORIG_FOLDER:
            save_orig_dir_path = create_file_path(BASE_FOLDER, SAVE_ORIG_FOLDER)
    else:
        source_dir_path = SOURCE_FOLDER
        target_dir_path = TARGET_FOLDER
        if SAVE_ORIG_FOLDER:
            save_orig_dir_path = SAVE_ORIG_FOLDER
    if not os.path.isdir(source_dir_path):
        raise Exception('Source folder [{}] does not exist.'.format(source_dir_path))
    return source_dir_path, target_dir_path, save_orig_dir_path

def parse_track_file_path(track_file_path):
    #file://localhost/D:/Users/Gavin/Music/Clip%20MicroSD/Wu-Tang%20Clan/Enter%20the%20Wu-Tang%20-%2036%20Chambers/03%20Clan%20In%20Da%20Front.mp3
    partial_path = track_file_path.replace(TRACK_PATH_REPLACE[0], TRACK_PATH_REPLACE[1])
    partial_path = urllib.parse.unquote(partial_path) 
    return partial_path
    
def chop_file_generator():
    source_dir_path, target_dir_path, save_orig_dir_path = get_file_dir_paths()
    start_stop_tracks = load_json_from_file(EXAMPLE_TRACK_INFO_PATH)
    for track in start_stop_tracks:
        # TODO: test in other dirs
        file_path = parse_track_file_path(track['location'])
        if not FULL_TRACK_PATHS:
            file_path = create_file_path(source_dir_path, file_path)
        if not os.path.isfile(file_path):
            raise Exception('File not found [{}]'.format(file_path))
        rel_track_dir_path = relative_track_dir_path(file_path, source_dir_path)
        chopped_track_dir_path = create_file_path(target_dir_path, rel_track_dir_path)
        chopped_track_file_path = create_file_path(chopped_track_dir_path, get_file_name(file_path))
        construct_dir_path(chopped_track_dir_path)
        if save_orig_dir_path:
            orig_track_dir_path = create_file_path(save_orig_dir_path, rel_track_dir_path)
            construct_dir_path(orig_track_dir_path)
            copy_file_to_dir(file_path, orig_track_dir_path)
        # Build params for ffmpeg and run
        param_list = ffmpeg_param_list(
            file_path, 
            chopped_track_file_path, 
            track['start'], 
            track.get('duration', None),
            overwrite=AUTO_OVERWRITE
        )
        process_result = subprocess.run(param_list, stdout=subprocess.PIPE)
        yield process_result

chop_file_gen = chop_file_generator()
# results = [next(cfg) for i in range(5)]
results = []
for result in chop_file_gen:
    results.append(result)
# results

# print('[TRACK: {} {}]'.format(track['artist'], track['name']))
# print('    file_path:\n    > {}'.format(file_path))
# print('    rel_track_dir_path:\n    > {}'.format(rel_track_dir_path))
# print('    chopped_track_dir_path:\n    > {}'.format(chopped_track_dir_path))
# print('    chopped_track_file_path:\n    > {}'.format(chopped_track_file_path))

In [None]:
##################################

In [22]:
# c = get_mb_cursor()
# column_names(c, 'tracks')
#c.execute("SELECT * FROM tracks WHERE (artist='Dead Or Alive' OR album_artist='Dead Or Alive') COLLATE NOCASE;").fetchall()

# c = get_itunes_cursor()
# column_names(c, 'tracks')

# c = get_get_mb_cursor()
# # column_names(c, 'tracks')
# c.execute("SELECT * FROM tracks WHERE name LIKE '%The Way You Move%';").fetchall()

# start_time_tracks = c.execute("SELECT * FROM tracks WHERE start_time IS NOT NULL;")\
#         .fetchall()
# [(track['artist'], track['name'], track['start_time'])
#     for track in start_stop_tracks[10:20]]

['album',
 'album_artist',
 'artist',
 'artist1',
 'artist2',
 'bit_rate',
 'bpm',
 'comments',
 'composer',
 'conductor',
 'date_added',
 'date_modified',
 'disc_count',
 'disc_number',
 'encoder',
 'episode_date',
 'episode_description',
 'genre',
 'genre1',
 'genre2',
 'grouping',
 'itunes_compilation',
 'keywords',
 'kind',
 'location',
 'lyricist',
 'mood',
 'name',
 'origin',
 'persistent_id',
 'play_count',
 'play_date_utc',
 'publisher',
 'rating',
 'relative_volume_adjustment',
 'rememberplaybackposition',
 'sample_rate',
 'size',
 'sort_album_artist',
 'sort_artist',
 'tempo',
 'total_time',
 'track_count',
 'track_id',
 'track_number',
 'track_type',
 'year']

In [40]:
### url decode
# import urllib
# urllib.parse.unquote('07%20Opening%20Up%20(-%20Ce%20soir%20on%20danse%20In.m4a') 

# time=332000
# ms = time % 1000
# time -= ms
# time /= 1000
# seconds = time % 60
# time -= seconds
# time /= 60
# minutes = time
# (minutes, seconds, ms)

# import os, fnmatch
# for file in os.listdir(ORIG_FILE_PATH):
#     #print(file)
#     if fnmatch.fnmatch(file, '*Runaway [[]Ex*'):
#         print(file)

# SHELL_ESCAPE_CHARS = ['[', ']', '?', '!']
# def escape_shell_string(string):
#     for c in SHELL_ESCAPE_CHARS:
#         string = string.replace(c, '\{}'.format(c))
#     return string
# escape_shell_string('Runaway [Explicit]')

# import re
# def multi_replacer(*key_values):
#     replace_dict = dict(key_values)
#     replacement_function = lambda match: replace_dict[match.group(0)]
#     pattern = re.compile("|".join([re.escape(k) for k, v in key_values]), re.M)
#     return lambda string: pattern.sub(replacement_function, string)
# def multi_replace(string, *key_values):
#     return multi_replacer(*key_values)(string)
# replacements = ('[', '[[]'), (']', '[]]')
# multi_replace('Runaway [Explicit]', *replacements)

# import os
# os.makedirs('./path/to/artist')
# os.path.isfile('./example_files/example.mp3')
# os.getcwd()
# os.path.isdir('/Users/gavin')

# './path'.lstrip('./')
# '/path'.lstrip('./')

# from shutil import copyfile
# copyfile('./itunes_musicbee.ipynb', './bkp/itunes_musicbee.bak')

FileNotFoundError: [Errno 2] No such file or directory: './bkp/itunes_musicbee.bak'

In [39]:
"""
mp3splt working example
Fails on m4a
"""

# import subprocess

# MP3SPLT_DEFAULT_START = '0.0.0'
# MP3SPLT_DEFAULT_STOP = 'EOF'

# def mp3splt_output_file_name(file_name):
#     file_name_parts = file_name.rsplit('.', 1)
#     return ''.join([file_name_parts[0], ' chopped'])

# tracks = get_example_tracks()
# process_results = []
# for track in tracks:
#     start_arg, stop_arg = \
#         dot_formatted_ms(track['start_time']), dot_formatted_ms(track['stop_time'])
#     start_arg = start_arg or MP3SPLT_DEFAULT_START
#     stop_arg = stop_arg or MP3SPLT_DEFAULT_STOP
#     file_name = find_matching_file_name(track['name'])
#     full_file_path = '{}/{}'.format(ORIG_FILE_PATH, file_name)
#     mp3splt_param_list = [
#         'mp3splt',
#         full_file_path,
#         start_arg,
#         stop_arg,
#         '-o',
#         CHOPPED_REL_FILE_PATH + mp3splt_output_file_name(file_name)
#     ]
#     process_results.append(subprocess.run(mp3splt_param_list, stdout=subprocess.PIPE))

# process_results

'\nmp3splt working example\nFails on m4a\n'