In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, MultiHeadAttention, Dropout, LayerNormalization
import matplotlib.pyplot as plt
import pandas as pd
from keras.models import Sequential
from keras.layers import Dense,Activation,Flatten, LSTM
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from keras.layers import Bidirectional
from math import sqrt

import matplotlib.pylab as plt
%matplotlib inline
plt.rcParams['figure.figsize'] = (16, 9)
plt.style.use('fast')

# Hyperparameters
sequence_length = 150
num_heads = 6
ff_dim =50

# Create the dataset
df = pd.read_csv('Name of the file', index_col='Date', parse_dates=['Date']) #Carga del fichero
#Chart parameters are defined, such as index labels, chart size, etc.
df.plot(figsize=(15,10))
plt.legend(fontsize=10)
plt.xlabel('Año', fontsize=25)
plt.ylabel('Precio',fontsize=25)
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.show()
#The integer value of the size of the dataset is taken and the column of the data is taken
v=int(len(df))
values = df.iloc[:v,-1].values
#We make sure that all values ​​are of type "Float"
values = values.astype('float32')
values=values.reshape(-1, 1)
#We divide the data into training, validation and testing
n_train_days = int(len(df)*0.8)
n_val_days=int(len(df)*0.1)
n_test_days=int(len(df)*0.2)
train = values[:n_train_days-n_val_days, :]
validation= train[:n_val_days, :]
test = values[n_train_days:, :]
x_train, y_train = train[:, :-1], train[:, -1]
x_val, y_val = test[:, :-1], test[:, -1]
dataframe1=df.transpose()
train_rate = 0.8 #Training rate will determine the learning level of the network
seq_len = 50
pre_len = 5
num_nodes, time_len = dataframe1.shape
#Data is normalized between [0,1]
def train_test_split(data, train_portion):
    time_len = data.shape[1]
    train_size = int(time_len * train_portion)
    train_data = np.array(data.iloc[:1, :n_train_days-n_val_days])
    test_data = np.array(data.iloc[:1, n_train_days:])
    val_data =  np.array(data.iloc[:1, n_train_days-n_val_days:n_train_days])
    return train_data, test_data,val_data
train_data, test_data, val_data = train_test_split(dataframe1, train_rate)
def scale_data(train_data, test_data,val_data):
    max_speed = train_data.max()
    min_speed = train_data.min()
    train_scaled = (train_data - min_speed) / (max_speed - min_speed)
    test_scaled = (test_data - min_speed) / (max_speed - min_speed)
    val_scaled = (val_data - min_speed) / (max_speed - min_speed)
    return train_scaled, test_scaled,val_scaled
train_scaled, test_scaled, val_scaled = scale_data(train_data, test_data,val_data)
def sequence_data_preparation(seq_len, pre_len, train_data, test_data,val_data):
    trainX, trainY, testX, testY, valY, valX = [], [], [], [], [], []

    for i in range(train_data.shape[1] - int(seq_len + pre_len - 1)):
        a = train_data[:, i : i + seq_len + pre_len]
        trainX.append(a[:, :seq_len])
        trainY.append(a[:, -1])

    for i in range(test_data.shape[1] - int(seq_len + pre_len - 1)):
        b = test_data[:, i : i + seq_len + pre_len]
        testX.append(b[:, :seq_len])
        testY.append(b[:, -1])

    for i in range(val_data.shape[1] - int(seq_len + pre_len - 1)):
        c = val_data[:, i : i + seq_len + pre_len]
        valX.append(c[:, :seq_len])
        valY.append(c[:, -1])

    trainX = np.array(trainX)
    trainY = np.array(trainY)
    testX = np.array(testX)
    testY = np.array(testY)
    valX = np.array(valX)
    valY = np.array(valY)

    return trainX, trainY, testX, testY, valX, valY
trainX, trainY, testX, testY, valX, valY = sequence_data_preparation(
    seq_len, pre_len, train_scaled, test_scaled, val_scaled
)


# Transformer Model
inputs = Input(shape=(1,50))
attention_output = MultiHeadAttention(num_heads=num_heads, key_dim=sequence_length)(inputs, inputs)
attention_output = Dropout(0.1)(attention_output)
skip_connection = tf.keras.layers.Add()([inputs, attention_output])
layer_norm1 = LayerNormalization(epsilon=1e-6)(skip_connection)
ffn = Dense(ff_dim, activation='relu')(layer_norm1)
ffn = Dropout(0.1)(ffn)
transformer_output = tf.keras.layers.Add()([layer_norm1, ffn])
layer_norm2 = LayerNormalization(epsilon=1e-6)(transformer_output)

output = Dense(1, activation='tanh')(tf.keras.layers.Flatten()(layer_norm2))

model = Model(inputs=inputs, outputs=output)

model.compile(optimizer='adam', loss='mean_squared_error')
model.summary()

# Train the model with training data
model.fit(trainX, trainY, epochs=250, batch_size=250)

# Make predictions on the test set
predictions = model.predict(testX)


# Select a sample for display
sample_index = 25


#Data is entered into a dataset to be rescaled
df2 = pd.DataFrame(predictions)

max_speed = train_data.max()
min_speed = train_data.min()
train_rescpred = np.array((testY) * max_speed)
pred = np.array((predictions) * max_speed)
pcompara = pd.DataFrame(np.array([[y[0] for y in train_rescpred], [x[0] for x in pred]])).transpose()


pcompara.columns = ['real', 'prediccion']
pcompara['diferencia'] = pcompara['real'] - pcompara['prediccion']
pcompara.head()
pcompara['real'].plot()
pcompara['prediccion'].plot()
#Los resultados se guardan en un CSV
pcompara.to_csv('Name of the file')
compara = pd.DataFrame(np.array([y_val, [x[0] for x in predictions]])).transpose()
compara.columns = ['real', 'prediccion']
# View actual and predicted data
plt.plot(testX[sample_index, :, 0], label='Real')
plt.plot(predictions[sample_index], label='Predicción')
plt.title('Predicción de Serie Temporal con Red Transformer')
plt.xlabel('Tiempo')
plt.ylabel('Valor')
plt.legend()
plt.show()
