In [12]:
import numpy as np
import matplotlib.pyplot as plt
import random as rand
import dataset_parameters as p
import morse_dictionary as morse
from importlib import reload
import csv
reload(p)
reload(morse)

# Generate a random speed function, based on the sum of different sinusoids,
# which describes the percentual variation of the speed
def generate_random_speed_function():
    
    # Sentence-level speed variation
    periods = rand.sample(p.F_SIN_PERIODS, p.F_SIN_N)
    amplitudes = np.random.normal(p.F_SIN_AMP_MEAN, p.F_SIN_AMP_DEV, size=p.F_SIN_N)
    phases = np.random.uniform(-np.pi, np.pi, size=p.F_SIN_N)
    
    # Local speed variation
    local_periods = rand.sample(p.F_SIN_L_PERIODS, p.F_SIN_L_N)
    local_amplitudes = np.random.normal(p.F_SIN_L_AMP_MEAN, p.F_SIN_L_AMP_DEV, size=p.F_SIN_L_N)
    local_phases = np.random.uniform(-np.pi, np.pi, size=p.F_SIN_L_N)
    
    # Generic sinusoid function (wave function)
    def parametric_sinusoid(x, amplitude, period, phase):
        return amplitude*np.sin((2*np.pi/(period))*x + phase)
    
    def sinusoid(x):
        sinusoid = np.sum([parametric_sinusoid(x, amplitudes[i], periods[i], phases[i]) for i in range(p.F_SIN_N)]) 
        local_sinusoid = np.sum([parametric_sinusoid(x, local_amplitudes[i], local_periods[i], local_phases[i]) for i in range(p.F_SIN_L_N)])
        return sinusoid + local_sinusoid
    
    def speed_function(t):
        return 1 + (sinusoid(t) + np.random.normal(p.NOISE_MEAN, p.NOISE_DEV))/100
    
    return speed_function


def generate_random_morse_sample():
    length = p.SAMPLE_LENGTH
        
    morse_symbols = '';
    morse_text = '';
    
    # Chooses between the three dictionaries 
    # (most frequent are letters and then numbers)
    def choose_dict():
        p_digit = p.MORSE_DIGITS_P
        p_symbol = p.MORSE_SYMBOLS_P
        
        random_number = np.random.uniform(0,1)
        if random_number < p_digit:
            return morse.MORSE_CODE_DICT_DIGITS
        elif random_number >= 1 - p_symbol:
            return morse.MORSE_CODE_DICT_SYMBOLS
        else:
            return morse.MORSE_CODE_DICT_LETTERS
    
    # Format to include the symbol for intra-character space, 
    # which in standard morse text is not displayed
    def format_morse(symbol):
        return '*'.join(list(symbol))
    
    # Generate the message (sample)
    while length != 0:
        while True:
            dictionary = choose_dict()
            character = rand.sample(sorted(dictionary), 1)[0]
            morse_symbol = dictionary[character]
            available_length = length - len(morse_symbol)
            if available_length >= 0:
                break
                
        # Decide whether to start a new word
        new_word_p = np.random.uniform(0, 1)
        start_new_word = new_word_p > p.MORSE_NEW_WORD_P

        is_empty_text = (morse_symbols == '')
        ends_with_word = (not is_empty_text and morse_symbols[-1] == '/')


        morse_text = f'{morse_text}{character}'
        formatted_symbol = format_morse(morse_symbol)
        
        if not is_empty_text and not ends_with_word:
            morse_symbols = f'{morse_symbols}|{formatted_symbol}'
            if available_length > 0 and start_new_word:
                morse_symbols = f'{morse_symbols}/'
                morse_text = f'{morse_text} '
        else:
            morse_symbols = f'{morse_symbols}{formatted_symbol}'

        length = available_length
    
    return morse_symbols, morse_text

# Generates the sequences from a sample message
def generate_sequences_from_sample():
    morse_sample, _ = generate_random_morse_sample()
    result = []
    
    # Return the relative duration, the signal type, and the integer code
    def get_symbol_info(symbol):
        match symbol: 
            case '_':
                return 0, 0, 0
            case '.':
                return 1, 1, 1
            case '*':
                return 1, 0, 2
            case '-':
                return 3, 1, 3
            case '|':
                return 3, 0, 4
            case '/':
                return 7, 0, 5
    
    # Check that the percentual variation did not generate negative or
    # null durations
    def all_positive(array):
        arr = array.copy()
        while arr[0] == 0:
            arr.pop(0)
        
        for i in range(len(arr)):
            if arr[i] <= 0:
                return False
            
        return True
    
    while True:
        durations = []
        signals = []
        codes = []
        
        wpm = np.random.uniform(p.SEQ_WPM_MIN, p.SEQ_WPM_MAX)
        time_unit = int(60000 / (50 * wpm))
        print(wpm)
        f = generate_random_speed_function()

        for t in range(len(morse_sample)):
            multiplier, signal_type, code = get_symbol_info(morse_sample[t])
            durations.append(int(time_unit * f(t) * multiplier))
            signals.append(signal_type)
            codes.append(code)
        
        if all_positive(durations):
            break;
    
    if p.DEBUG:
        l = p.SEQ_MAX_LENGTH;
        
        # continuous function
        x_val = np.linspace(0, l*2, 1000)
        y_val = [f(x) for x in x_val]
         
        # discrete function
        x1_val = [x for x in range(l*2)]
        y1_val = [f(x) for x in range(l*2)]
        
        # Plot the function
        plt.xlim(0, l*2)
        plt.plot(x_val, y_val, color='C2', linewidth=2)
        plt.plot(x1_val, y1_val, color='C1', marker='o', linestyle='none', markersize=4)
        plt.show()
    
    # Sliding window to generate sequences from sample message
    for s in range(len(morse_sample) - p.SEQ_MAX_LENGTH):
        window = p.SEQ_MAX_LENGTH
        if not p.ALWAYS_MAX_LENGTH:
            window = rand.randint(3, window)
            
        row_string_durations = ''
        row_string_signals = ''
        
        for i in range(window):
            row_string_durations = f'{row_string_durations},{durations[s + i]}'
            row_string_signals = f'{row_string_signals},{signals[s + i]}'
        
        # Padding for not fixed sequence length
        for i in range(p.SEQ_MAX_LENGTH - window):
            row_string_durations = f',0{row_string_durations}'
            row_string_signals = f',0{row_string_signals}'
            
        row_string_durations = row_string_durations[1:]
        row_string_signals = row_string_signals[1:]
        result.append([f'{row_string_durations},{row_string_signals}', codes[s + window - 1]])
    
    return result

# Function to call to generate the dataset
def generate_dataset():
    
    file = open(p.FILE_NAME, "w")
    writer = csv.writer(file)
    writer.writerow(['input', 'output'])
    
    count = p.SEQ_NUMBER
    if p.DEBUG:
        print('generating sequences...')
        
    sequence_array = generate_sequences_from_sample()
    for i in range(p.SEQ_NUMBER):
        if len(sequence_array) == 0:
            if p.DEBUG:
                print('generating sequences...')
            sequence_array = generate_sequences_from_sample()
        if p.DEBUG:
            print(f'{p.SEQ_NUMBER - i} sequences to do...')
        row_input, row_output = sequence_array[0][0], sequence_array[0][1]
        #print(f'{row_input} --- {row_output}')
        writer.writerow([row_input, row_output])
        sample.pop(0)

    file.close()
    if p.DEBUG:
        print('file closed')
    return
 
        
    

In [None]:
generate_dataset()