In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip3 install import_ipynb
import import_ipynb
import drive.MyDrive.harp.Common_functions as Common_functions # Make sure to adjust file paths

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting import_ipynb
  Downloading import_ipynb-0.1.4-py3-none-any.whl (4.1 kB)
Collecting jedi>=0.10
  Downloading jedi-0.18.2-py2.py3-none-any.whl (1.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m22.5 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: jedi, import_ipynb
Successfully installed import_ipynb-0.1.4 jedi-0.18.2
importing Jupyter notebook from /content/drive/MyDrive/tidierpp/Common_functions.ipynb



In [None]:
from keras.layers import Input, Embedding, Bidirectional, LSTM, GRU, Concatenate, TimeDistributed, Dense
from keras.models import Model
from tensorflow.keras.preprocessing.sequence import pad_sequences
import numpy as np
from typing import List
from collections import deque

In [None]:
def tokens_to_code(tokens, unk_dict = None, indent_info = None, throw_when_unk_failure=False):
    #replace general vars to actual vars
    if unk_dict is not None:
        rev_unk_dict = Common_functions.reverse_dict(unk_dict)
        rev_unk_dict['comma'] = ','
        rev_unk_dict['dot'] = '.'
        tokens_with_original_vars = []
        for token in tokens:
            try:
                tokens_with_original_vars.append(rev_unk_dict[token])
            except KeyError:
                if any(token.startswith(e) and token != e for e in ['atom', 'var', 'integer']):
                    return ''
                    #raise Exception('Not all tokens could be converted back to identifier/variable/integer')
                tokens_with_original_vars.append(token)

    #if any(token in ['atom', 'var', 'integer'] for token in tokens_with_original_vars):
    #    print(tokens_with_original_vars)
    #    raise Exception('Not all tokens could be converted back to identifier/variable/integer')

    if indent_info is None:
        indent_info = (0,'    ')

    #convert token sequence to code string
    del_space_before_tokens = ']())}.,:;'
    no_space_after_tokens= '[({.\n\t-'
    code_string = ''
    for token in tokens_with_original_vars:
        if token in del_space_before_tokens:
            try:
                if code_string[-1] == ' ' and code_string[-2] != ' ':
                    code_string = code_string[:-1]
            except IndexError:
                pass
        if token == '\t':
            code_string += indent_info[1]
        else:
            code_string += token
        if token not in no_space_after_tokens:
            code_string += ' '

    code_lines = code_string.splitlines()
    code_lines = [indent_info[0] * indent_info[1] + line for line in code_lines]
    code_string = '\n'.join(code_lines)
    return code_string

In [None]:
def generate_encoder_decoder_inputs_outputs(num_tokens, latent_dim, embedding_dim, MAX_LENGTH_NONIDIOMATIC, MAX_LENGTH_IDIOMATIC,
                                            encoder_rnn_num = 2, stacked_encoder = False, dropout_rate : int = 0.2, use_gru : bool = False):
    encoder_inputs = Input(shape=(MAX_LENGTH_NONIDIOMATIC,), name='encoder_inputs')

    encoder_rnn_dim = latent_dim//encoder_rnn_num

    emb_layer = Embedding(num_tokens, embedding_dim, mask_zero = True, name='enc_emb_layer')
    enc_emb = emb_layer(encoder_inputs)

    rnn_layer = GRU if use_gru else LSTM

    encoder_rnns = [
        (Bidirectional(rnn_layer(encoder_rnn_dim, return_sequences=True, return_state=True, dropout=dropout_rate, name=f'rnn{i+1}'), name=f'bidirectional{i+1}')
         if not stacked_encoder else
         Bidirectional(rnn_layer(encoder_rnn_dim, return_sequences=True, return_state=False, dropout=dropout_rate, name=f'rnn{i+1}'), name=f'bidirectional{i+1}'))
        for i in range(encoder_rnn_num)
    ]

    encoder_outputs = [
        encoder_rnn(enc_emb)
        for encoder_rnn in encoder_rnns
    ]

    if stacked_encoder:
        second_layer_rnns = [
            Bidirectional(rnn_layer(encoder_rnn_dim, return_sequences=True, return_state=True, dropout=dropout_rate, name=f'stacked_rnn{i+1}'))
            for i in range(encoder_rnn_num)
        ]
        stacked_encoder_outputs = [
            encoder_rnn(encoder_outputs[i])
            for i, encoder_rnn in enumerate(second_layer_rnns)
        ]
        encoder_outputs = stacked_encoder_outputs

    encoder_outputs, *states = zip(*encoder_outputs)
    encoder_outputs = Concatenate()(encoder_outputs)
    if use_gru:
        states_a, states_b = states
        state = Concatenate()([*states_a, *states_b])
        encoder_states  = [state]
    else:
        forward_h, forward_c, backward_h, backward_c = states
        state_h = Concatenate(name='concat_state_h')([*forward_h, *backward_h])
        state_c = Concatenate(name='concat_state_c')([*forward_c, *backward_c])
        encoder_states = [state_h, state_c]

    decoder_inputs = Input(shape=(MAX_LENGTH_IDIOMATIC, ), name='decoder_inputs')

    dec_emb = emb_layer(decoder_inputs)

    decoder_rnn = rnn_layer(latent_dim*2, return_sequences=True, return_state=True, name='decoder_rnn')

    decoder_outputs, *_ = decoder_rnn(dec_emb, initial_state=encoder_states)

    decoder_encoder_attention_layer = Common_functions.AttentionLayer(name='decoder_encoder_attention')
    decoder_encoder_attention       = decoder_encoder_attention_layer([encoder_outputs,decoder_outputs])

    decoder_dense = TimeDistributed(Dense(num_tokens, activation='softmax'),name='final_dense')
    decoder_outputs = decoder_dense(decoder_encoder_attention)

    return (encoder_inputs, decoder_inputs, encoder_outputs, decoder_outputs, encoder_states, emb_layer, decoder_rnn, decoder_encoder_attention_layer, decoder_dense)

In [None]:
def generate_encoder_decoder_model(encoder_inputs, encoder_states, encoder_outputs,
                                   latent_dim, embedding_dim, emb_layer, decoder_inputs,
                                   decoder_rnn, decoder_encoder_attention_layer,
                                   decoder_dense, MAX_LENGTH_NONIDIOMATIC, use_gru = False):
    encoder_model = Model([encoder_inputs], [*encoder_states, encoder_outputs])

    encoder_output_input   = Input(shape=(MAX_LENGTH_NONIDIOMATIC,latent_dim*2), name='encoder_output_input')
    if use_gru:
        decoder_state_input    = Input(shape=(latent_dim*2,), name='decoder_state_input_h')
        decoder_state_input    = [decoder_state_input]
    else:
        decoder_state_input_h = Input(shape=(latent_dim*2,), name='decoder_state_input_h')
        decoder_state_input_c = Input(shape=(latent_dim*2,), name='decoder_state_input_c')
        decoder_state_input   = [decoder_state_input_h, decoder_state_input_c]

    dec_emb2 = emb_layer(decoder_inputs)
    if use_gru:
        decoder_outputs2, state = decoder_rnn(dec_emb2, initial_state=decoder_state_input)
        decoder_states2 = [state]
    else:
        decoder_outputs2, state_h2, state_c2 = decoder_rnn(dec_emb2, initial_state=decoder_state_input)
        decoder_states2 = [state_h2, state_c2]

    decoder_encoder_attention2 = decoder_encoder_attention_layer([encoder_output_input, decoder_outputs2])

    decoder_outputs2 = decoder_dense(decoder_encoder_attention2)

    decoder_model = Model(
        [encoder_output_input, decoder_inputs] + decoder_state_input,
        [decoder_outputs2] + decoder_states2)

    return (encoder_model, decoder_model)

In [None]:
def decode_sequences(input_seqs : List,
                     encoder_model, decoder_model,
                     word2index_seq2seq, index2word_seq2seq,
                     MAX_LENGTH_IDIOMATIC,
                     batch_size : int = 1):
    *state, enc_output = encoder_model.predict(input_seqs)
    target_seqs = np.zeros((batch_size,1))
    for i in range(batch_size):
        target_seqs[i, 0] = word2index_seq2seq['BOS']
    decoded_sequences = [ [] for _ in range(batch_size) ]
    stop_conditions = [False] * batch_size
    while not all(stop_conditions):
        output_tokens, *hc = decoder_model.predict([enc_output, target_seqs, *state])
        sampled_token_indexes = [
            np.argmax(output_tokens[i, -1, :])
            for i in range(batch_size)
        ]
        sampled_words = [
            index2word_seq2seq[sampled_token_index]
            for sampled_token_index in sampled_token_indexes
        ]
        for i in range(batch_size):
            if not stop_conditions[i]:
                decoded_sequences[i].append(sampled_words[i])
            if (sampled_words[i] == 'EOS' or len(decoded_sequences[i]) > MAX_LENGTH_IDIOMATIC):
                stop_conditions[i] = True
        target_seqs = np.zeros((batch_size,1))
        for i in range(batch_size):
            target_seqs[i, 0] = sampled_token_indexes[i]
        state = hc
    decoded_sequences = [
        decoded_sequence[:-1] if decoded_sequence[-1] == 'EOS' else decoded_sequence
        for decoded_sequence in decoded_sequences
    ]
    return decoded_sequences

In [None]:
def idiomatize(codes,
               encoder_model, decoder_model,
               word2index_seq2seq, index2word_seq2seq,
               MAX_LENGTH_IDIOMATIC, MAX_LENGTH_NONIDIOMATIC,
               throw_when_unk_failure=False,
               batch_size : int = 2**7,
               to_string : bool = True, pprint : bool = False):
    if not codes:
        return []
    if isinstance(codes[0],str):
        input_seqs = Common_functions.tokenize_code(codes)
        input_seqs = [ Common_functions.process_tokenized_code(input_seq) for input_seq in input_seqs ]
    elif isinstance(codes[0],list) or isinstance(codes[0],tuple):
        input_seqs = codes
    else:
        assert False
    input_seqs, unk_dicts = zip(*input_seqs)
    input_seqs = [
        [word2index_seq2seq[token] for token in input_seq if token in word2index_seq2seq.keys()]
        for input_seq in input_seqs
    ]
    input_seqs = pad_sequences(input_seqs, maxlen=MAX_LENGTH_NONIDIOMATIC, padding='post', value=0., dtype='float32')

    decoded_sequences = deque()
    test_set_len = len(input_seqs)
    fits_into_batches = test_set_len % batch_size == 0
    for i in range(test_set_len//batch_size + (0 if fits_into_batches else 1)):
        fr, to = batch_size*i, batch_size*(i+1)
        bs = len(input_seqs[fr:to])
        assert bs <= batch_size
        decoded_sequences.extend(
            decode_sequences(input_seqs[fr:to],
                            encoder_model, decoder_model,
                            word2index_seq2seq, index2word_seq2seq,
                            MAX_LENGTH_IDIOMATIC, bs)
        )

    assert test_set_len == len(decoded_sequences)
    if not to_string:
        return decoded_sequences

    #decoded_sequences = np.array(decoded_sequences)


    idiomatic_codes = [
        tokens_to_code(decoded_sequence, unk_dict, throw_when_unk_failure=throw_when_unk_failure)
        for decoded_sequence, unk_dict in zip(decoded_sequences, unk_dicts)
    ]
    assert len(idiomatic_codes) == test_set_len

    if pprint:
        pretty_idiomatic_codes = Common_functions.erl_pretty_printer(idiomatic_codes)
        idiomatic_codes = [
            pretty_code
            if (code and
                       code.replace(' ','').replace('\t','').replace('\n','') ==
                pretty_code.replace(' ','').replace('\t','').replace('\n',''))
            else code
            for code, pretty_code in zip(idiomatic_codes, pretty_idiomatic_codes)
        ]

        assert len(pretty_idiomatic_codes) == test_set_len

    return idiomatic_codes