In [87]:
import numpy as np
from faker import Faker
import random
from tqdm import tqdm
from babel.dates import format_date
from keras.utils import to_categorical
import keras.backend as K
import matplotlib.pyplot as plt
from keras.optimizers.legacy import Adam 
Faker.seed(12345)
random.seed(12345)

In [88]:
from keras.models import Model
from keras.layers import Input, Dense, RepeatVector, Concatenate, Dot, Activation

In [89]:
from keras.layers import LSTM, Bidirectional 

In [90]:

FORMATS = ['short',
           'medium',
           'long',
           'full',
           'full',
           'full',
           'full',
           'full',
           'full',
           'full',
           'full',
           'full',
           'full',
           'd MMM YYY', 
           'd MMMM YYY',
           'dd MMM YYY',
           'd MMM, YYY',
           'd MMMM, YYY',
           'dd, MMM YYY',
           'd MM YY',
           'd MMMM YYY',
           'MMMM d YYY',
           'MMMM d, YYY',
           'dd.MM.YY']

# change this if you want it to work with another language
LOCALES = ['en_US']

def load_date():

    dt = fake.date_object()

    try:
        human_readable = format_date(dt, format=random.choice(FORMATS),  locale='en_US') # locale=random.choice(LOCALES))
        human_readable = human_readable.lower()
        human_readable = human_readable.replace(',','')
        machine_readable = dt.isoformat()

    except AttributeError as e:
        return None, None, None

    return human_readable, machine_readable, dt

def load_dataset(m):
    """
        Loads a dataset with m examples and vocabularies
        :m: the number of examples to generate
    """

    human_vocab = set()
    machine_vocab = set()
    dataset = []
    Tx = 30

    for i in tqdm(range(m)):
        h, m, _ = load_date()
        if h is not None:
            dataset.append((h, m))
            human_vocab.update(tuple(h))
            machine_vocab.update(tuple(m))

    human = dict(zip(sorted(human_vocab) + ['<unk>', '<pad>'], 
                     list(range(len(human_vocab) + 2))))
    inv_machine = dict(enumerate(sorted(machine_vocab)))
    machine = {v:k for k,v in inv_machine.items()}

    return dataset, human, machine, inv_machine

def preprocess_data(dataset, human_vocab, machine_vocab, Tx, Ty):

    X, Y = zip(*dataset)

    X = np.array([string_to_int(i, Tx, human_vocab) for i in X])
    Y = [string_to_int(t, Ty, machine_vocab) for t in Y]

    Xoh = np.array(list(map(lambda x: to_categorical(x, num_classes=len(human_vocab)), X)))
    Yoh = np.array(list(map(lambda x: to_categorical(x, num_classes=len(machine_vocab)), Y)))

    return X, np.array(Y), Xoh, Yoh

def string_to_int(string, length, vocab):
    
    string = string.lower()
    string = string.replace(',','')

    if len(string) > length:
        string = string[:length]

    rep = list(map(lambda x: vocab.get(x, '<unk>'), string))

    if len(string) < length:
        rep += [vocab['<pad>']] * (length - len(string))

    return rep

EXAMPLES = ['3 May 1979', '5 Apr 09', '20th February 2016', 'Wed 10 Jul 2007']


def run_example(model, input_vocabulary, inv_output_vocabulary, text):
    encoded = string_to_int(text, TIME_STEPS, input_vocabulary)
    prediction = model.predict(np.array([encoded]))
    prediction = np.argmax(prediction[0], axis=-1)
    return int_to_string(prediction, inv_output_vocabulary)

def run_examples(model, input_vocabulary, inv_output_vocabulary, examples=EXAMPLES):
    predicted = []
    for example in examples:
        predicted.append(''.join(run_example(model, input_vocabulary, inv_output_vocabulary, example)))
        print('input:', example)
        print('output:', predicted[-1])
    return predicted


def softmax(x, axis=1):
    ndim = K.ndim(x)
    if ndim == 2:
        return K.softmax(x)
    elif ndim > 2:
        e = K.exp(x - K.max(x, axis=axis, keepdims=True))
        s = K.sum(e, axis=axis, keepdims=True)
        return e / s
    else:
        raise ValueError('Cannot apply softmax to a tensor that is 1D')
        

def plot_attention_map(model, input_vocabulary, inv_output_vocabulary, text, n_s = 128, num = 6, Tx = 30, Ty = 10):

    attention_map = np.zeros((10, 30))
    Ty, Tx = attention_map.shape
    
    s0 = np.zeros((1, n_s))
    c0 = np.zeros((1, n_s))
    layer = model.layers[num]

    encoded = np.array(string_to_int(text, Tx, input_vocabulary)).reshape((1, 30))
    encoded = np.array(list(map(lambda x: to_categorical(x, num_classes=len(input_vocabulary)), encoded)))

    f = K.function(model.inputs, [layer.get_output_at(t) for t in range(Ty)])
    r = f([encoded, s0, c0])
    
    for t in range(Ty):
        for t_prime in range(Tx):
            attention_map[t][t_prime] = r[t][0,t_prime,0]


    prediction = model.predict([encoded, s0, c0])
    
    predicted_text = []
    for i in range(len(prediction)):
        predicted_text.append(int(np.argmax(prediction[i], axis=1)))
        
    predicted_text = list(predicted_text)
    predicted_text = int_to_string(predicted_text, inv_output_vocabulary)
    text_ = list(text)
    
    input_length = len(text)
    output_length = Ty
    
    plt.clf()
    f = plt.figure(figsize=(8, 8.5))
    ax = f.add_subplot(1, 1, 1)

    i = ax.imshow(attention_map, interpolation='nearest', cmap='Blues')

    cbaxes = f.add_axes([0.2, 0, 0.6, 0.03])
    cbar = f.colorbar(i, cax=cbaxes, orientation='horizontal')
    cbar.ax.set_xlabel('Alpha value (Probability output of the "softmax")', labelpad=2)

    ax.set_yticks(range(output_length))
    ax.set_yticklabels(predicted_text[:output_length])

    ax.set_xticks(range(input_length))
    ax.set_xticklabels(text_[:input_length], rotation=45)

    ax.set_xlabel('Input Sequence')
    ax.set_ylabel('Output Sequence')

    ax.grid()
    
    return attention_map


In [94]:
class Seq2seq(object):
    
    
    def __init__(self, Tx=30, Ty=10, n_x = 32, n_y = 64):
        self.model_param = {
            "Tx": Tx, 
            "Ty": Ty,
            "n_x": n_x,
            "n_y": n_y
        }


    def load_data(self, m):
        dataset, x_vocab, y_vocab, _ = load_dataset(m)

        X, y, X_onehot, y_onehot = preprocess_data(dataset, x_vocab, y_vocab, self.model_param["Tx"], self.model_param["Ty"])

        print("the data set feature shape: ", X_onehot.shape)
        print("the data set target shape: ", y_onehot.shape)

        print("see the first data frame: feature: %s, target: %s" % (dataset[0][0], dataset[0][1]))
        print(X[0], y[0])
        print("one_got encode: ", X_onehot[0], y_onehot[0])

        self.model_param["x_vocab"] = x_vocab 
        self.model_param["y_vocab"] = y_vocab 

        self.model_param["x_vocab_size"] = len(x_vocab)
        self.model_param["y_vocab_size"] = len(y_vocab)
        return X_onehot, y_onehot 
    
    
    
    def get_encoder(self):
        
        #self.encoder = LSTM(self.model_param["n_x"], return_sequences=True, name="bidirectional_1", merge_mode="concat")
        self.encoder = Bidirectional(LSTM(self.model_param["n_x"], return_sequences=True, name='bidirectional_1'), merge_mode='concat')

        return None 
        
    def get_decoder(self):
        
        self.decoder = LSTM(self.model_param["n_y"], return_state=True)
        
        return None 
    
    
    
    def get_output_layer(self):
        
        self.output_layer = Dense(self.model_param["y_vocab_size"], activation=softmax)
        
        return None 
    
    def get_attention(self):
        
        repeator = RepeatVector(self.model_param["Tx"])
        
        concatenator = Concatenate(axis=-1)
        
        densor1 = Dense(10, activation="tanh", name="Dense1")
        
        densor2 = Dense(1, activation="relu", name="Dense2")
        
        activator = Activation(softmax, name="attention_weights")
        
        dotor = Dot(axes=1)
        
        self.attention = {
            "repeator": repeator,
            "concatenator": concatenator,
            "densor1": densor1,
            "densor2": densor2,
            "activator": activator,
            "dotor": dotor
        }
        
        return None 
        
    def init_seq2seq(self):
        
        self.get_encoder()
        
        self.get_decoder()
        
        self.get_attention()
        
        self.get_output_layer()
        
        return None 
    
    def computer_one_attention(self, a, s_prev):
        
        s_prev = self.attention["repeator"](s_prev)
        
        concat = self.attention["concatenator"]([a, s_prev])
        
        e = self.attention["densor1"](concat)
        
        energies = self.attention["densor2"](e)
        
        alphas = self.attention["activator"](energies)
        
        context = self.attention["dotor"]([alphas, a])
        
        return context
        
    
    def model(self):
        
        X = Input(shape=(self.model_param["Tx"], self.model_param["x_vocab_size"]), name="X")
        
        s0 = Input(shape=(self.model_param["n_y"],), name='s0')
        
        c0 = Input(shape=(self.model_param["n_y"],),name='c0')
        
        s = s0
        
        c = c0 
        
        outputs = []
        
        a = self.encoder(X)
        
        for t in range(self.model_param["Ty"]):
            
            context = self.computer_one_attention(a, s)
            
            s, _, c = self.decoder(context, initial_state = [s,c])
            
            out = self.output_layer(s)
            
            outputs.append(out)
            
        model = Model(inputs=(X, s0, c0), outputs=outputs)
        
        return model 
        
    def train(self, X_onehot, y_onehot):
        
        model = self.model()
        
        opt = Adam(lr= 0.005, beta_1 = 0.9, beta_2 = 0.999, epsilon=None, decay = 0.001)
        
        model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
        
        s0 = np.zeros((10000, self.model_param["n_y"]))
        
        c0 = np.zeros((10000, self.model_param["n_y"]))
        
        outputs = list(y_onehot.swapaxes(0,1))
        
        model.fit([X_onehot, s0, c0], outputs, epochs = 1, batch_size=100)
        
        return None 
    
    
    def test(self):

        model = self.model()

        model.load_weights("./DL/model.h5")

        EXAMPLES = ['3 May 1979', '5 April 09', '21th of August 2016', 'Tue 10 Jul 2007', 'Saturday May 9 2018', 'March 3 2001', 'March 3rd 2001', '1 March 2001']
        
        for example in EXAMPLES:
            
            source = string_to_int(example, self.model_param["Tx"], self.model_param["x_vocab"])
            source = np.array(list(map(lambda x: to_categorical(x, num_classes=self.model_param["x_vocab_size"]), source)))
                        
            source = np.expand_dims(source, axis=0)
            #s0 = np.zeros((10000, self.model_param["n_y"]))
            #c0 = np.zeros((10000, self.model_param["n_y"]))
            s0 = np.zeros((source.shape[0], self.model_param["n_y"]))
            c0 = np.zeros((source.shape[0], self.model_param["n_y"]))
            
            prediction = model.predict([source, s0, c0])
            
            prediction = np.argmax(prediction, axis = -1)
                        

            
            output = [dict(zip(self.model_param["y_vocab"].values(), self.model_param["y_vocab"].keys()))[int(i)] for i in prediction]

            print("source:", example)
            print("output:", ''.join(output))

        return None
        
        
    

In [96]:
if __name__ == '__main__':
    s2s = Seq2seq()
    X_onehot, y_onehot = s2s.load_data(10000)
    s2s.init_seq2seq()
    s2s.train(X_onehot, y_onehot)
    s2s.test()

100%|██████████████████████████████████| 10000/10000 [00:00<00:00, 13726.72it/s]


the data set feature shape:  (10000, 30, 37)
the data set target shape:  (10000, 10, 11)
see the first data frame: feature: sunday january 20 2013, target: 2013-01-20
[29 31 25 16 13 34  0 22 13 25 31 13 28 34  0  5  3  0  5  3  4  6 36 36
 36 36 36 36 36 36] [3 1 2 4 0 1 2 0 3 1]
one_got encode:  [[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 1.]
 [0. 0. 0. ... 0. 0. 1.]
 [0. 0. 0. ... 0. 0. 1.]] [[0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]]
Train on 10000 samples


2024-01-23 21:38:45.621698: W tensorflow/c/c_api.cc:304] Operation '{name:'count_151/Assign' id:129079 op device:{requested: '', assigned: ''} def:{{{node count_151/Assign}} = AssignVariableOp[_has_manual_control_dependencies=true, dtype=DT_FLOAT, validate_shape=false](count_151, count_151/Initializer/zeros)}}' was changed by setting attribute after it was run by a session. This mutation will have no effect, and will trigger an error in the future. Either don't modify nodes after running them or create a new session.




2024-01-23 21:43:06.070821: W tensorflow/c/c_api.cc:304] Operation '{name:'dense_25_10/Softmax' id:133536 op device:{requested: '', assigned: ''} def:{{{node dense_25_10/Softmax}} = Softmax[T=DT_FLOAT, _has_manual_control_dependencies=true](dense_25_10/BiasAdd)}}' was changed by setting attribute after it was run by a session. This mutation will have no effect, and will trigger an error in the future. Either don't modify nodes after running them or create a new session.


source: 3 May 1979
output: 1979-05-33
source: 5 April 09
output: 2009-04-05
source: 21th of August 2016
output: 2016-08-20
source: Tue 10 Jul 2007
output: 2007-07-10
source: Saturday May 9 2018
output: 2018-05-09
source: March 3 2001
output: 2001-03-03
source: March 3rd 2001
output: 2001-03-03
source: 1 March 2001
output: 2001-03-01
