## Imports

In [1]:
import glob
import logging as smlog
import os
import traceback

#from smdataset.abstime import calc_note_beats_and_abs_times
#from smdataset.parse import parse_sm_txt

_ATTR_REQUIRED = ['offset', 'bpms', 'notes']

import argparse
from collections import OrderedDict
import json

json.encoder.FLOAT_REPR = lambda f: ('%.6f' % f)


## Util

In [2]:
# FUNCTION THAT TAKES A DIRECTORY AND LISTS ALL OF ITS SUBDIRECTORIES
# SIMILAR TO WHAT "OS.WALK" DOES.
def get_subdirs(root, choose=False):
    subdir_names = sorted(filter(lambda x: os.path.isdir(os.path.join(root, x)), os.listdir(root)))
    if choose:
        for i, subdir_name in enumerate(subdir_names):
            print('{}: {}'.format(i, subdir_name))
        subdir_idxs = [int(x) for x in input('Which subdir(s)? ').split(',')]
        subdir_names = [subdir_names[i] for i in subdir_idxs]
    return subdir_names

get_subdirs("E:/Escritorio/Temporal")


# FUNCTION THAT REMOVES SPACES, LEAVES NUMBERS AND LETTERS
# AND REPLACES ANY OTHER SYMBOL WITH A "_"
def ez_name(x):
    x = ''.join(x.strip().split())
    x_clean = []
    for char in x:
        if char.isalnum():
            x_clean.append(char)
        else:
            x_clean.append('_')
    return ''.join(x_clean)


ez_name("ITG is the best! ")

'ITGisthebest_'

## Parse TXT

In [3]:
import logging
import re
import traceback

parlog = logging

VALID_PULSES = set([4, 8, 12, 16, 24, 32, 48, 64, 96, 192])


int_parser = lambda x: int(x.strip()) if x.strip() else None
bool_parser = lambda x: True if x.strip() == 'YES' else False
str_parser = lambda x: x.strip() if x.strip() else None
float_parser = lambda x: float(x.strip()) if x.strip() else None


def kv_parser(k_parser, v_parser):
    def parser(x):
        if not x:
            return (None, None)
        k, v = x.split('=', 1)
        return k_parser(k), v_parser(v)
    return parser
def list_parser(x_parser):
    def parser(l):
        l_strip = l.strip()
        if len(l_strip) == 0:
            return []
        else:
            return [x_parser(x) for x in l_strip.split(',')]
    return parser

def bpms_parser(x):
    bpms = list_parser(kv_parser(float_parser, float_parser))(x)

    if len(bpms) == 0:
        raise ValueError('No BPMs found in list')
    if bpms[0][0] != 0.0:
        raise ValueError('First beat in BPM list is {}'.format(bpms[0][0]))

    # make sure changes are nonnegative, take last for equivalent
    beat_last = -1.0
    bpms_cleaned = []
    for beat, bpm in bpms:
        if beat == None or bpm == None:
            raise ValueError('Empty BPM found')
        if bpm <= 0.0:
            raise ValueError('Non positive BPM found {}'.format(bpm))
        if beat == beat_last:
            bpms_cleaned[-1] = (beat, bpm)
            continue
        bpms_cleaned.append((beat, bpm))
        if beat <= beat_last:
            raise ValueError('Descending list of beats in BPM list')
        beat_last = beat
    if len(bpms) != len(bpms_cleaned):
        parlog.warning('One or more (beat, BPM) pairs begin on the same beat, using last listed')

    return bpms_cleaned
def stops_parser(x):
    stops = list_parser(kv_parser(float_parser, float_parser))(x)

    beat_last = -1.0
    for beat, stop_len in stops:
        if beat == None or stop_len == None:
            raise ValueError('Bad stop formatting')
        if beat < 0.0:
            raise ValueError('Bad beat in stop')
        if stop_len == 0.0:
            continue
        if beat <= beat_last:
            raise ValueError('Nonascending list of beats in stops')
        beat_last = beat
    return stops

def notes_parser(x):
    pattern = r'([^:]*):' * 5 + r'([^;:]*)'
    notes_split = re.findall(pattern, x)
    if len(notes_split) != 1:
        raise ValueError('Bad formatting of notes section')
    notes_split = notes_split[0]
    if (len(notes_split) != 6):
        raise ValueError('Bad formatting within notes section')

    # parse/clean measures
    measures = [measure.splitlines() for measure in notes_split[5].split(',')]
    measures_clean = []
    for measure in measures:
        measure_clean = list(filter(lambda pulse: not pulse.strip().startswith('//') and len(pulse.strip()) > 0, measure))
        measures_clean.append(measure_clean)
    if len(measures_clean) > 0 and len(measures_clean[-1]) == 0:
        measures_clean = measures_clean[:-1]

    # check measure lengths
    for measure in measures_clean:
        if len(measure) == 0:
            raise ValueError('Found measure with 0 notes')
        if not len(measure) in VALID_PULSES:
            parlog.warning('Nonstandard subdivision {} detected, allowing'.format(len(measure)))

    chart_type = str_parser(notes_split[0])
    if chart_type not in ['dance-single', 'dance-double', 'dance-couple', 'lights-cabinet']:
        raise ValueError('Nonstandard chart type {} detected'.format(chart_type))

    return (str_parser(notes_split[0]),
        str_parser(notes_split[1]),
        str_parser(notes_split[2]),
        int_parser(notes_split[3]),
        list_parser(float_parser)(notes_split[4]),
        measures_clean
    )

def unsupported_parser(attr_name):
    def parser(x):
        raise ValueError('Unsupported attribute: {} with value {}'.format(attr_name, x))
        return None
    return parser

ATTR_NAME_TO_PARSER = {
    'title': str_parser,
    'subtitle': str_parser,
    'artist': str_parser,
    'titletranslit': str_parser,
    'subtitletranslit': str_parser,
    'artisttranslit': str_parser,
    'genre': str_parser,
    'credit': str_parser,
    'banner': str_parser,
    'background': str_parser,
    'lyricspath': str_parser,
    'cdtitle': str_parser,
    'music': str_parser,
    'offset': float_parser,
    'bpms': bpms_parser,
    'stops': stops_parser,
    'samplestart': float_parser,
    'samplelength': float_parser,
    'displaybpm': str_parser,
    'selectable': bool_parser,
    'bgchanges': str_parser,
    'bgchanges2': str_parser,
    'fgchanges': str_parser,
    'keysounds': str_parser,
    'musiclength': float_parser,
    'musicbytes': int_parser,
    'attacks': str_parser,
    'timesignatures': list_parser(kv_parser(float_parser, kv_parser(int_parser, int_parser))),
    'warps': unsupported_parser('warps'),
    'notes': notes_parser
}

# LIST OF ATTRIBUTES
ATTR_MULTI = ['notes']

def parse_sm_txt(sm_txt):

    # CREATES A DICTIONARY WITH AN EMPTY LIST AS VALUE FOR EVERY
    # ATTRIBUTE IN "ATTR_MULTI"
    attrs = {attr_name: [] for attr_name in ATTR_MULTI}

    # FINDS ALL STRINGS WITH THE SHAPE "NAME:VALUE"
    # THE RESULTS ARE SPLIT INTO THE NAME OF THE ATTRIBUTE AND THE ATTRIBUTE VALUE
    for attr_name, attr_val in re.findall(r'#([^:]*):([^;]*);', sm_txt):

        # THE ATTRIBUTE NAME IS TURNED TO LOWERCASE
        attr_name = attr_name.lower()

        # CHECK IF THE ATTRIBUTE IS IN THE DICTIONARY ASSIGNING EACH
        # ATTRIBUTE TO A PARSER. IF AN ATTRIBUTE IS NOT SUPPORTED A MESSAGE
        # WILL APPEAR.
        if attr_name not in ATTR_NAME_TO_PARSER:
            parlog.warning('Found unexpected attribute {}:{}, ignoring'.format(attr_name, attr_val))
            continue
        
        # PROCESS THE ATTR WITH ITS CORRESPONDING PARSER
        attr_val_parsed = ATTR_NAME_TO_PARSER[attr_name](attr_val)

        # IF THE ATTRIBUTE IS IN THE LIST OF PREVIOUS ATTRIBUTES
        if attr_name in attrs:

            # IF THE NAME IS NOT IN THE DICTIONARY OF INITIAL ATTRIBUTES
            # (CHECK FOR DUPLICATES FOR EVERY ATTRIBUTE EXCEPT NOTES
            # AS THERE ARE MANY IN A SINGLE FILE)
            if attr_name not in ATTR_MULTI:

                # IF THE VALUE IN THE DICTIONARY OF ATTRIBUTES FOR THE CURRENT ATTRIBUTE
                # IS EQUAL TO THE PARSED VALUE, CONTINUE TO THE NEXT ATTRIBUTE 
                # (PREVENTS UNNECESARY OVERWRITES)
                if attr_val_parsed == attrs[attr_name]:
                    continue

                # IF TWO DIFFERENT VALUES ARE DETECTED FOR A SINGLE
                # ATTRIBUTE AN ERROR IS RAISED
                else:
                    raise ValueError('Attribute {} defined multiple times'.format(attr_name))

            # ADD NEW PARSED VALUE TO THE CURRENT VALUE STORED
            attrs[attr_name].append(attr_val_parsed)

        else:
            # IF THERE WERE NO VALUES FOR THE CURRENT ATTRIBUTE 
            # ADD A VALUE FOR THE KEY "ATTRIBUTE NAME"
            attrs[attr_name] = attr_val_parsed


    # CLEAN OR DELETE EMPTY ATTRIBUTES
    for attr_name, attr_val in list(attrs.items()):
        if attr_val == None or attr_val == []:
            del attrs[attr_name]

    return attrs


## Postprocessing of Notes

In [7]:
_EPSILON = 1e-6

def bpm_to_spb(bpm):
    return 60.0 / bpm

def calc_segment_lengths(bpms):

    # CHECK IF THE BPM IS NOT EMPTY
    assert len(bpms) > 0

    # EMPTY LIST FOR ALL LENGTHS
    segment_lengths = []

    # COUNTER FROM 0 TO THE TOTAL NUMBER OF BPMS - 2
    # (SECTIONS WITHOUT A CHANGING BPM)
    for i in range(len(bpms) - 1):

        # SECONDS PER BEAT
        spb = bpm_to_spb(bpms[i][1])

        # USES THE "BEAT" PART OF THE BPM ATTRIBUTE TO CALCULATE THE 
        # THE NUMBER OF SECONDS IN A SEGMENT WITHOUT CHANGES IN BPM
        segment_lengths.append(spb * (bpms[i + 1][0] - bpms[i][0]))


    return segment_lengths

def calc_abs_for_beat(offset, bpms, stops, segment_lengths, beat):
    # OFFSET: VALUE
    # BPMS: LIST OF TUPLES
    # STOPS: LIST OF TUPLES
    # SEGMENT_LENGTHS: LIST OF LENGTHS IN SECONDS FOR EACH CHANGE IN BPM
    # BEAT: CURRENT BEAT

    # BPM INITIAL INDEX
    bpm_idx = 0

    # FOR EVERY BEAT TIMESTAMP IN "BPMS" CHECK IF THE CURRENT BEAT
    # (PLUS A SMALL DELTA) IS BIGGER THAN THE TIMESTAMP. BASICALLY 
    # CHECK TO WHICH BPM SEGMENT EACH BEAT PERTAINS
    while bpm_idx < len(bpms) and beat + _EPSILON > bpms[bpm_idx][0]:
        bpm_idx += 1
    
    bpm_idx -= 1

    # CUMULATIVE STOP LENGTH
    stop_len_cumulative = 0.0
    #print("Stops:", stops)

    # FOR EACH STOP IN THE SONG
    for stop_beat, stop_len in stops:
        #print(f"Stop Beat: {stop_beat} | Stop Len: {stop_len} | Cumulative: {stop_len_cumulative}")

        # DISTANCE IN BEATS BETWEEN CURRENT BEAT AND BEAT IN WHICH A STOP OCCURS
        diff = beat - stop_beat

        # IF THE DIFFERENCE IS TOO LOW, WE ARE AT THE BEAT STOP
        # We are at this stop which should not count to its timing
        if abs(diff) < _EPSILON:
            break

        # We are before this stop
        elif diff < 0:
            break

        # We are AFTER this stop
        # WE ADD TO THE CUMULATIVE SUM OF STOPS
        else:
            stop_len_cumulative += stop_len
        

    # WE TAKE ALL THE LENGTHS BEFORE "BPM_IDX" AND WE SUM THEIR LENGTH IN SECONDS
    # TOTAL LENGTH OF ALL SEGMENTS
    full_segment_total = sum(segment_lengths[:bpm_idx])

    # GETS THE BPM TIMESTAMP 
    partial_segment_spb = bpm_to_spb(bpms[bpm_idx][1])
    partial_segment = partial_segment_spb * (beat - bpms[bpm_idx][0])

    #print("Cumulative BPM:", full_segment_total)
    #print("Cumulative Stop:", stop_len_cumulative)
    #print("Offset:", offset)
    #print("Partial Segment:", partial_segment)
    #print("Abs Time:", full_segment_total + partial_segment - offset + stop_len_cumulative)
    #print("========================")

    return full_segment_total + partial_segment - offset + stop_len_cumulative

def calc_note_beats_and_abs_times(offset, bpms, stops, note_data):

    # CALCULATE THE LENGTH OF EACH SEGMENT
    segment_lengths = calc_segment_lengths(bpms)

    # ====================

    # copy bpms
    bpms = bpms[:]
    inc = None
    inc_prev = None
    time = offset

    # beat loop
    # INITIAL LISTS
    note_beats_abs_times = []
    beat_times = []

    # FOR EACH MEASURE IN THE NOTE DATA
    for measure_num, measure in enumerate(note_data):

        # LINES IN A MEASURE
        # (PORTIONS PER MEASURE)
        ppm = len(measure)

        # FOR EVERY LINE IN A MEASURE
        for i, code in enumerate(measure):
            
            # BEAT = 4*(MEASURE NUMBER) + 4*(LINE NUMBER / LINES IN A MEASURE)
            # NOTE: THERE ARE 4 BEATS PER MEASURE
            beat = measure_num * 4.0 + 4.0 * (float(i) / ppm)

            # TODO: This could be much more efficient but is not the bottleneck for the moment.
            
            # BEAT IN ABSOLUTE TIME
            beat_abs = calc_abs_for_beat(offset, bpms, stops, segment_lengths, beat)


            note_beats_abs_times.append(((measure_num, ppm, i), beat, beat_abs, code))
            beat_times.append(beat_abs)

    # handle negative stops
    beat_time_prev = float('-inf')
    del_idxs = []
    for i, beat_time in enumerate(beat_times):
        if beat_time_prev > beat_time:
            del_idxs.append(i)
        else:
            beat_time_prev = beat_time
    for del_idx in sorted(del_idxs, reverse=True):
        del note_beats_abs_times[del_idx]
        del beat_times[del_idx]

    #TODO: remove when stable
    assert sorted(beat_times) == beat_times

    return note_beats_abs_times


## Command Line Argument Parsing

**DONT EXECUTE: PARSING FOR THE COMMAND LINE ONLY**

In [None]:
import argparse
from collections import OrderedDict
import json

json.encoder.FLOAT_REPR = lambda f: ('%.6f' % f)

parser = argparse.ArgumentParser()
parser.add_argument('packs_dir', type=str, help='Directory of packs (organized like Stepmania songs folder)')
parser.add_argument('json_dir', type=str, help='Output JSON directory')
parser.add_argument('--itg', dest='itg', action='store_true', help='If set, subtract 9ms from offset')
parser.add_argument('--choose', dest='choose', action='store_true', help='If set, choose from list of packs')

parser.set_defaults(
    itg=False,
    choose=False)

args = parser.parse_args()

pack_names = get_subdirs(args.packs_dir, args.choose)
pack_dirs = [os.path.join(args.packs_dir, pack_name) for pack_name in pack_names]
pack_sm_globs = [os.path.join(pack_dir, '*', '*.sm') for pack_dir in pack_dirs]

## Overwriting Inputs

In [5]:
pack_names = ["fraxtil", "itg", "kda"]
pack_dirs = ["data/raw/fraxtil", "data/raw/itg", "data/raw/kda"]
json_dir = "./data/json_raw"

# GENERIC PATHS FOR ANY SONG FILE WITH EXTENSION ".SM"
pack_sm_globs = [os.path.join(pack_dir, "*", "*", "*.sm") for pack_dir in pack_dirs]
pack_sm_globs

['data/raw/fraxtil\\*\\*\\*.sm',
 'data/raw/itg\\*\\*\\*.sm',
 'data/raw/kda\\*\\*\\*.sm']

## "Main Loop"

In [8]:
# IF THE JSON DIRECTORY DOESNT EXIST, CREATE IT
if not os.path.isdir(json_dir):
    os.mkdir(json_dir)
    print("JSON RAW CREATED")

# SET OF "EZNAMETS" FOR A PACK
pack_eznames = set()

# GLOBS AND NAMES ARE "ZIPPED" INTO A TUPLE. 
# ITERATE OVER EVERY PACK NAME, GLOB PAIR.
for pack_name, pack_sm_glob in zip(pack_names, pack_sm_globs):

    print(f"PACK NAME: {pack_name}, PACK GLOB: {pack_sm_glob}")

    # EXTRACT ALL FILES INSIDE THE PACK THAT END IN .SM
    pack_sm_fps = sorted(glob.glob(pack_sm_glob))
    
    # CLEAN THE NAME OF THE PACK
    pack_ezname = ez_name(pack_name)

    # IF PACK WAS ALREADY CHECKED, RAISE ERROR
    if pack_ezname in pack_eznames:
        raise ValueError('Pack name conflict: {}'.format(pack_ezname))

    # PACK IS ADDED TO LIST OF PACKS CHECKED
    pack_eznames.add(pack_ezname)

    # IF THERE ARE SONGS IN A PACK, CREATE A PATH FOR A DIRECTORY INSIDE "JSON RAW"
    if len(pack_sm_fps) > 0:
        pack_outdir = os.path.join(json_dir, pack_ezname)

    # IF THE DIRECTORY CORRESPONDING TO THE PATH CREATED DOESNT EXIST, CREATE IT
    if not os.path.isdir(pack_outdir):
        os.mkdir(pack_outdir)

    # SET FOR THE NAME OF EACH SONG
    sm_eznames = set()

    # ITERATE OVER EVERY ".SM" FILE PATH
    for sm_fp in pack_sm_fps:

        # EXTRACTS THE NAME OF THE SM FILE (WITHOUT EXTENSION)
        sm_name = os.path.split(os.path.split(sm_fp)[0])[1]

        #print("Song:", sm_name)

        # THE NAME OF THE SM FILE IS CLEANED
        sm_ezname = ez_name(sm_name)

        # IF THE SONG WAS PREVIOUSLY PROCESSED
        if sm_ezname in sm_eznames:
            raise ValueError('Song name conflict: {}'.format(sm_ezname))

        # IF NO ERROR WAS RAISED, THE SONG NAME IS ADDED
        sm_eznames.add(sm_ezname)

        # THE TEXT OF THE ".SM" FILE IS EXTRACTED
        with open(sm_fp, 'r') as sm_f:
            sm_txt = sm_f.read()

        # TRY TO PARSE THE FILE
        try:
            sm_attrs = parse_sm_txt(sm_txt)
        except ValueError as e:
            smlog.error('{} in\n{}'.format(e, sm_fp))
            continue
        except Exception as e:
            smlog.critical('Unhandled parse exception {}'.format(traceback.format_exc()))
            raise e

        print(sm_attrs["bpms"], sm_attrs["stops"])

        # CHECKS IF ALL REQUIRED ATTRIBUTES ARE PRESENT IN THE DICTIONARY
        try:
            for attr_name in _ATTR_REQUIRED:
                if attr_name not in sm_attrs:
                    raise ValueError('Missing required attribute {}'.format(attr_name))
        except ValueError as e:
            smlog.error('{}'.format(e))
            continue

        # HANDLE SONGS WITHOUT MUSIC
        root = os.path.abspath(os.path.join(sm_fp, '..'))
        music_fp = os.path.join(root, sm_attrs.get('music', ''))
        if 'music' not in sm_attrs or not os.path.exists(music_fp):
            music_names = []
            sm_prefix = os.path.splitext(sm_name)[0]

            # check directory files for reasonable substitutes
            for filename in os.listdir(root):
                prefix, ext = os.path.splitext(filename)
                if ext.lower()[1:] in ['mp3', 'ogg']:
                    music_names.append(filename)

            try:
                # handle errors
                if len(music_names) == 0:
                    raise ValueError('No music files found')
                elif len(music_names) == 1:
                    sm_attrs['music'] = music_names[0]
                else:
                    raise ValueError('Multiple music files {} found'.format(music_names))
            except ValueError as e:
                smlog.error('{}'.format(e))
                continue

            music_fp = os.path.join(root, sm_attrs['music'])

        # EXTRACTS THE VALUE OF THE STOPS, OFFSET AND STOPS
        bpms = sm_attrs['bpms']
        offset = sm_attrs['offset']
        itg = False

        if itg:
            # Many charters add 9ms of delay to their stepfiles to account for ITG r21/r23 global delay
            # see http://r21freak.com/phpbb3/viewtopic.php?f=38&t=12750
            offset -= 0.009

        stops = sm_attrs.get('stops', [])


        # DICTIONARY THAT STORES ALL THE INFO THAT WILL BE PUT INTO THE OUTPUT JSOON
        out_json_fp = os.path.join(pack_outdir, '{}_{}.json'.format(pack_ezname, sm_ezname))
        out_json = OrderedDict([
            ('sm_fp', os.path.abspath(sm_fp)),
            ('music_fp', os.path.abspath(music_fp)),
            ('pack', pack_name),
            ('title', sm_attrs.get('title')),
            ('artist', sm_attrs.get('artist')),
            ('offset', offset),
            ('bpms', bpms),
            ('stops', stops),
            ('charts', [])
        ])

        # FOR EVERY ELEMENT INSIDE THE "NOTES ATTRIBUTE"
        # (THIS INCLUDES THE DIFFICULTY, CHART TYPE, GROOVE METER AND CHART NOTES)
        for idx, sm_notes in enumerate(sm_attrs['notes']):

            # CHART NOTES ARE POST PROCESSED
            note_beats_and_abs_times = calc_note_beats_and_abs_times(offset, bpms, stops, sm_notes[5])


            # note_beats_abs_times.append(((measure_num, ppm, i), beat, beat_abs, code))
            notes = {
                'type': sm_notes[0],
                'desc_or_author': sm_notes[1],
                'difficulty_coarse': sm_notes[2],
                'difficulty_fine': sm_notes[3],
                'notes': note_beats_and_abs_times,
            }
            out_json['charts'].append(notes)

        with open(out_json_fp, 'w') as out_f:
            try:
                out_f.write(json.dumps(out_json))
            except UnicodeDecodeError:
                smlog.error('Unicode error in {}'.format(sm_fp))
                continue

        print('Parsed {} - {}: {} charts'.format(pack_name, sm_name, len(out_json['charts'])))
    
    break

PACK NAME: fraxtil, PACK GLOB: data/raw/fraxtil\*\*\*.sm
Song: Bad Ketchup
[(0.0, 180.0)] [(0.0, 180.0)]
Parsed fraxtil - Bad Ketchup: 9 charts
Song: Bitch Clap


KeyError: 'stops'