<a href="https://colab.research.google.com/github/dmitriy-iliyov/data-science/blob/main/neural-network/rnn/notebook/rnn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import time

import tensorflow as tf


import numpy as np
import datetime
import requests
import pandas as pd


def coin_parsing(currency, days=90, interval='hourly'):

    test_url = 'https://api.coingecko.com/api/v3/coins/' + currency
    test_response = requests.get(test_url)

    if test_response.status_code == 200:

        print('Test request successful')
        url = test_url + '/market_chart'

        params = {
            'vs_currency': 'usd',
            'days': days
        }

        if interval != 'hourly':
            params['interval'] = interval

        response = requests.get(url, params=params)

        coin_year_prices = response.json()
        dates = []
        prices = []

        for daily_price in coin_year_prices['prices']:
            dates.append(datetime.datetime.fromtimestamp(daily_price[0] / 1000))
            prices.append(daily_price[1])
        coin_year_prices_df = pd.DataFrame({'date': dates, 'price': prices})

        print('Coin parsing successful')
        return coin_year_prices_df
    else:
        print('Test request failed, status code ' + str(test_response.status_code))
        print(test_response.text)
        return None

from sklearn.preprocessing import MinMaxScaler


scaler = MinMaxScaler(feature_range=(0, 1))


def prepare_data(currency):

    df = coin_parsing(currency, 90)
    prices = df['price']

    prices_scaled = scaler.fit_transform(prices.values.reshape(-1, 1))

    print(f"Prices scaled shape: {prices_scaled.shape}")

    time_step = 24
    features = 1
    train_data = []
    train_answ = []

    for i in range(len(prices_scaled) - time_step):
        train_data.append(prices_scaled[i:i + time_step])
        train_answ.append(prices_scaled[i + time_step])


    train_data = np.array(train_data, dtype=np.float32)
    train_answ = np.array(train_answ, dtype=np.float32)

    k = int(train_data.shape[0] * 0.8)
    train_data, test_data = train_data[:k], train_data[k:]
    train_answ, test_answ = train_answ[:k], train_answ[k:]

    train_data = train_data.reshape(train_data.shape[0], train_data.shape[1], features)
    test_data = test_data.reshape(test_data.shape[0], test_data.shape[1], features)

    return train_data, train_answ, test_data, test_answ


def one_plot_s(ax, x, y, y_max, label, color, title, x_label, y_label):
    ax.plot(x, y, label=label, color=color)
    ax.set_title(title)
    ax.set_xlabel(x_label)
    ax.set_ylabel(y_label)
    ax.set_ylim(0, y_max + y_max / 10)
    ax.legend()
    ax.grid(True)

    mplcursors.cursor(ax).connect("add", lambda sel: sel.annotation.set_text(f"{sel.target[1]:.4f}"))


def two_plot_s(ax, x1, y1, label1, color1, x2, y2, label2, color2, title, x_label, y_label):
    ax.plot(x1, y1, label=label1, color=color1, marker='o', markersize=2)
    ax.plot(x2, y2, label=label2, color=color2, marker='o', markersize=2)
    ax.set_title(title)
    ax.set_xlabel(x_label)
    ax.set_ylabel(y_label)
    y = y1 + y2
    min_y = min(y)
    max_y = max(y)
    ax.set_ylim(min_y - min_y / 10, max_y + max_y / 10)
    ax.legend()
    ax.grid(True)

    mplcursors.cursor(ax).connect("add", lambda sel: sel.annotation.set_text(f"{sel.target[1]:.4f}"))


def one_fit_statistic(fit_data):
    fig = plt.figure(figsize=(14, 10))
    gs = fig.add_gridspec(0, 2, width_ratios=[1, 1], height_ratios=[1, 1], wspace=0.2, hspace=0.2)
    plt.subplots_adjust(left=0.05, right=0.98, bottom=0.1, top=0.95, wspace=0.2)
    ax1 = fig.add_subplot(gs[0, 0])
    one_plot_s(ax1, range(fit_data['epochs']), fit_data['accuracy'], 1, 'Accuracy', 'blue', 'Model Accuracy',
             'Epochs', 'Accuracy')
    hidden_layer_count = fit_data.get('hidden_layer_count', 1)
    test_1_info = (f'Network: {fit_data["network"]}\n'
                   f'Hidden layer count: {hidden_layer_count}\n'
                   f'Hidden Neurons: {fit_data["hidden_neurons_count"]}\n'
                   f'Execution Time: {fit_data["execution_time"]:.2f}s\n'
                   f'Batch Size: {fit_data["batch_size"]}')
    ax1.text(0.975, 0.05, test_1_info, fontsize=11, verticalalignment='bottom', horizontalalignment='right',
             transform=ax1.transAxes, bbox=dict(facecolor='white', alpha=0.5))

    ax2 = fig.add_subplot(gs[0, 1])
    one_plot_s(ax2, range(fit_data['epochs']), fit_data['mse'], max(fit_data['mse']), 'MSE', 'red', 'Model MSE',
             'Epochs', 'MSE')
    mse_info = f"MSE: {fit_data['mse'][-1]:.4f}"
    ax2.text(0.975, 0.05, mse_info, fontsize=11, verticalalignment='bottom', horizontalalignment='right',
             transform=ax2.transAxes, bbox=dict(facecolor='white', alpha=0.5))
    plt.show()


def save_json(file_path, data):
    for key in data.keys():
        if isinstance(data[key], Iterable) and not isinstance(data[key], str):
            data[key] = [float(i) for i in data[key]]
    with open(file_path, 'a') as file:
        file.write(json.dumps(data) + '\n')


import time

import tensorflow as tf


from google.colab import drive
drive.mount('/content/drive')
home_dir = '/content/drive/MyDrive/main/languages/Python/neural_network/labs/rnn/model'


class RNN(tf.Module):

    def __init__(self, input_neuron_count, hidden_neuron_count, output_neuron_count, hidden_layer_count):
        super().__init__()

        if hidden_layer_count != len(hidden_neuron_count):
            raise ValueError("hidden_layer_count != len(hidden_neuron_count)")

        self._input_neuron_count = input_neuron_count
        self._hidden_layer_count = hidden_layer_count
        self._hidden_neurons_counts = hidden_neuron_count

        self._hidden_w_list = []
        self._hidden_b_list = []
        self._context_hidden_w_list = []

        previous_neuron_count = self._input_neuron_count

        for i in range(len(self._hidden_neurons_counts)):
            self._hidden_w_list.append(
                tf.Variable(tf.random.uniform([previous_neuron_count, self._hidden_neurons_counts[i]], -1, 1), dtype=tf.float32))
            self._hidden_b_list.append(
                tf.Variable(tf.zeros([self._hidden_neurons_counts[i]]), dtype=tf.float32))
            self._context_hidden_w_list.append(
                tf.Variable(tf.random.uniform([previous_neuron_count, self._hidden_neurons_counts[i]], -1, 1), dtype=tf.float32))
            previous_neuron_count = self._hidden_neurons_counts[i]

        self._output_neuron_count = output_neuron_count
        self._output_w = tf.Variable(tf.random.uniform([self._hidden_neurons_counts[-1], self._output_neuron_count], -1, 1),
                                     dtype=tf.float32)
        self._output_b = tf.Variable(tf.zeros([self._output_neuron_count]), dtype=tf.float32)

        self.deviation = None

        self.optimizer = tf.optimizers.Adam()
        self.checkpoint = tf.train.Checkpoint(model=self, optimizer=self.optimizer)
        self.checkpoint_dir = home_dir + "checkpoints/rnn_model/"
        self.checkpoint_manager = tf.train.CheckpointManager(self.checkpoint, self.checkpoint_dir, max_to_keep=3)

    def summary(self):
        for i in range(len(self._hidden_w_list)):
            print(f"Recurrent hidden layer №{i+1}: {self._hidden_w_list[i].shape}")
            print(f"    Previous hidden layer №{i+1}: {self._context_hidden_w_list[i].shape}")
        print(f"Output layer: {self._output_w.shape}")

    def fit(self, train_data, train_answers, epochs=100, learning_rate=0.05, batch_size=16):

        start_time = time.time()

        self.deviation = max(train_answers) / 100
        self.optimizer.learning_rate = learning_rate

        mse_list = []
        accuracy_list = []

        for epoch in range(epochs):
            epoch_mse = []
            epoch_accuracy_numerator = 0

            for i in range(0, len(train_data) - batch_size):
                batch_data = train_data[i:i + batch_size]
                batch_answers = train_answers[i:i + batch_size]

                with tf.GradientTape() as tape:
                    output = self._fit_passage(batch_data)
                    mse = self._compute_mse(output, batch_answers)

                trainable_vars = (
                        self._hidden_w_list +
                        self._hidden_b_list +
                        self._context_hidden_w_list
                        +
                        [self._output_w, self._output_b]
                )
                gradients = tape.gradient(mse, trainable_vars)
                self.optimizer.apply_gradients(zip(gradients, trainable_vars))
                epoch_mse.append(mse.numpy())

            mean_mse = tf.reduce_mean(epoch_mse).numpy()
            mse_list.append(mean_mse)
            epoch_accuracy = epoch_accuracy_numerator / len(train_data)
            accuracy_list.append(epoch_accuracy)

            if (epoch + 1) % ((epochs + 1) // 10) == 0:
                print(f"Epoch {epoch + 1}/{epochs}: MSE={mean_mse:.10f}, Accuracy={epoch_accuracy:.4f}")

        print(f"Fit ended in {time.time() - start_time:.2f} secs.")
        if sum(accuracy_list[-3:]) / 3 > 0.9:
            self.save_model()

        statistic = {
            'network': 'RNN',
            'accuracy': accuracy_list,
            'mse': mse_list,
            'epochs': epochs,
            'batch_size': batch_size,
            'execution_time': time.time() - start_time,
            'hidden_layer_count': self._hidden_layer_count,
            'hidden_neurons_count': str(self._hidden_neurons_counts)
        }
        save_json(home_dir + '/statistics/rnn_statistic.txt', statistic)
        self.fit_info(statistic)
        return statistic

    def _fit_passage(self, batch):
        outputs = []
        for _input in batch:
            output = tf.transpose(_input)
            for i in range(self._hidden_layer_count):
                output = self._activation_tanh(
                    tf.matmul(output, self._hidden_w_list[i]) +
                    tf.matmul(output, self._context_hidden_w_list[i]) +
                    self._hidden_b_list[i])
            output = tf.matmul(output, self._output_w) + self._output_b
            outputs.append(output)
        return tf.stack(outputs)

    def _default_passage(self, batch):
        output = batch
        for i in range(self._hidden_layer_count):
            output = tf.Variable((self._activation_tanh(
                tf.matmul(output, self._hidden_w_list[i]) +
                tf.matmul(output, self._context_hidden_w_list[i]) +
                self._hidden_b_list[i])))
        return tf.matmul(output, self._output_w) + self._output_b

    @staticmethod
    def _activation_tanh(x):
        return tf.tanh(x)

    @staticmethod
    def _compute_mse(output, y):
        return tf.reduce_mean(tf.square(output - y))

    def save_model(self):
        save_path = self.checkpoint.save(file_prefix=self.checkpoint_dir + 'rnn_model')
        print(f"\033[35mModel saved : {save_path}\033[0m\n")

    def load_model(self):
        self.checkpoint.restore(tf.train.latest_checkpoint(self.checkpoint_dir))
        print("\033[35mModel loaded!\033[0m\n")

    def evaluate(self, test_data, test_answers):
        errors = []
        for i in range(len(test_data)):
            output = self._default_passage(test_data[i])
            error = abs(output - test_answers[i])
            if error < self.deviation:
                errors.append(True)
            else:
                errors.append(False)
        return errors.count(True) / len(errors) * 100

    def predict(self, sequence):
        return self._default_passage(sequence)

    @staticmethod
    def fit_info(statistic):
        one_fit_statistic(statistic)



with tf.device('/GPU:0'):
  train_data, train_answers, test_data, test_answers = prepare_data("bitcoin")
  rnn = RNN(24, [128, 128], 1, 2)
  rnn.summary()
  rnn.fit(train_data, train_answers)
  rnn.evaluate(test_data, test_answers)
  last_48_hour = coin_parsing('bitcoin', 2)
  sequence = last_48_hour['price'][:24]

  sequence_scaled = scaler.transform(np.array(sequence).reshape(-1, 1)).reshape(1, 24, 1)

  prediction = rnn.predict(sequence_scaled)

  predicted_price = scaler.inverse_transform(prediction)
  print(f"Predicted price: {predicted_price[0][0]}")
  print(f"Actual price: {last_48_hour['price'][24]}")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Test request successful
Coin parsing successful
Prices scaled shape: (2159, 1)
Recurrent hidden layer №1: (24, 128)
    Previous hidden layer №1: (24, 128)
Recurrent hidden layer №2: (128, 128)
    Previous hidden layer №2: (128, 128)
Output layer: (128, 1)
