In [None]:
import numpy as np
import scipy as sp
import scipy.stats as sp_stats
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import LZH_Utilities as utl

In [None]:
def transpose_stack(arr):
    return np.array([arr]).T

def tsr(arr):
    return torch.tensor(arr)

def plot(x, y, x_label="", y_label="", legend="", title=""):
    plt.figure(figsize=[8, 6], dpi=300)
    
    if (type(legend) is list):
        for yy in y:
            plt.plot(x, yy)
        plt.legend(legend)
    else: 
        plt.plot(x, y)
        
    plt.xlabel(x_label)
    plt.ylabel(y_label)
    plt.title(title)
    plt.show()

In [None]:
def plot_diff_percentage(x, y, x_label="", y_label="", title=""):
    plt.figure(figsize=[8, 6], dpi=300)
    
    x_fit = np.linspace(0, 100, 1000)
    y_fit1 = np.polyval(np.polyfit(x, y, 1), x_fit)
    
    plt.scatter(x, y, s=0.5, c='k')
    plt.plot(x_fit, y_fit1, "r")
        
    plt.xlabel(x_label)
    plt.ylabel(y_label)
    plt.legend(["linear regression", "data"])
    plt.ylim([0, 100])
    plt.xlim([0, 100])
    plt.title(title)
    plt.show()
    
    result = sp_stats.linregress(x, y)
    print("     slope: {0}".format(result.slope))
    print(" intercept: {0}".format(result.intercept))
    print("corr coeff: {0}".format(result.rvalue))
    print("  variance: {0}".format(result.rvalue ** 2))


In [None]:
df = utl.read_time_series_data("full_rank_dataset_ERA5")

In [None]:
idx_test_set = np.random.choice(np.arange(df[0].shape[0]), [int(0.1 * df[0].shape[0])], False)
idx_training_set = np.delete(np.arange(df[0].shape[0]), idx_test_set)

In [None]:
time_arr = np.arange(9)

X_full = [np.c_[df[time]['ERA5'].to_numpy()] for time in time_arr]
y_hat_full = [np.c_[df[time]['TCC'].to_numpy()] for time in time_arr]

X_train = np.array([X_full[time][idx_training_set] for time in time_arr])
y_hat_train = np.array([y_hat_full[time][idx_training_set] for time in time_arr])

X_test = np.array([X_full[time][idx_test_set] for time in time_arr])
y_hat_test = np.array([y_hat_full[time][idx_test_set] for time in time_arr])

In [None]:
# transform them from shape[n:1] into shape[n]
plot_y_fit = X_test[-1][:, 0]
plot_y_hat = y_hat_test[-1][:, 0]

In [None]:
plot_diff_percentage(plot_y_hat, plot_y_fit, "true", "fit", "ERA5 Prediction")

In [None]:
for idx in np.arange(9):
    plot_y_fit = X_test[idx][:, 0]
    plot_y_hat = y_hat_test[idx][:, 0]
    plot_diff_percentage(plot_y_hat, plot_y_fit, "true", "fit", "T{0}".format(idx))

In [None]:
y_fit_series = []
for idx in np.arange(9):
    y_fit_series.append(X_test[idx])
    
y_fit_series = np.array(y_fit_series)

for i in np.arange(10):
    idx = np.random.randint(0, y_hat_test.shape[1])
    plot(
        np.arange(9), 
        (y_fit_series[:, idx, 0], y_hat_test[:, idx, 0]), 
        x_label="T", 
        y_label="TCC %", 
        legend=["fit", "true"], 
        title="Time Series Prediction, Sample: {0}".format(idx)
    )

***

In [None]:
RECORD_FILE_NAME = "Output/ERA5_R_value_res.csv"
def test_and_record():

    idx_test_set = np.random.choice(np.arange(df[0].shape[0]), [int(0.1 * df[0].shape[0])], False)
    idx_training_set = np.delete(np.arange(df[0].shape[0]), idx_test_set)

    time_arr = np.arange(9)

    X_full = [np.c_[df[time]['ERA5'].to_numpy()] for time in time_arr]
    y_hat_full = [np.c_[df[time]['TCC'].to_numpy()] for time in time_arr]

    X_train = np.array([X_full[time][idx_training_set] for time in time_arr])
    y_hat_train = np.array([y_hat_full[time][idx_training_set] for time in time_arr])

    X_test = np.array([X_full[time][idx_test_set] for time in time_arr])
    y_hat_test = np.array([y_hat_full[time][idx_test_set] for time in time_arr])

    # plot_diff_percentage(plot_y_hat, plot_y_fit, "true", "fit")
    
    # transform them from shape[n:1] into shape[n]
    plot_y_fit = X_test[-1][:, 0]
    plot_y_hat = y_hat_test[-1][:, 0]
    
    result = sp_stats.linregress(plot_y_hat, plot_y_fit)

    # Append Data file
    pd.concat(
        [
            pd.read_csv(RECORD_FILE_NAME), 
            pd.DataFrame(
                {
                    "m": [result.slope], 
                    "b": [result.intercept], 
                    "r": [result.rvalue], 
                    "loss": [0]
                }
            )
        ], 
        ignore_index=True
    ).to_csv(
        RECORD_FILE_NAME, 
        index=False
    )
    
    print("\tr value: {0}".format(result))

In [None]:
# Create Data file
# pd.DataFrame(columns=["m", "b", "r", "loss"]).to_csv(RECORD_FILE_NAME, index=False)

In [None]:
for i in np.arange(100):
    print("-------- {0} START --------".format(i))
    test_and_record()
    print("--------  {0} END  --------".format(i))