In [None]:
pip list

In [None]:
import sys
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.figure import Figure
from PyQt5.QtWidgets import (
    QApplication, QWidget, QVBoxLayout, QPushButton, QFileDialog,
    QLabel, QComboBox, QTextEdit, QSizePolicy, QScrollArea
)
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.stattools import adfuller, kpss
import matplotlib.dates as mdates
from statsmodels.tsa.arima.model import ARIMA
from scipy import stats
from scipy.stats import norm, levy_stable, linregress
from numpy.fft import fft, ifft
from sklearn.metrics import mean_squared_error, mean_absolute_error
import warnings

warnings.filterwarnings("ignore")

class TimeSeriesApp(QWidget):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("Time Series Analysis")
        self.df = None
        self.initUI()

    def initUI(self):
        layout = QVBoxLayout()
    
        self.loadButton = QPushButton("Load CSV")
        self.loadButton.clicked.connect(self.load_csv)
        layout.addWidget(self.loadButton)
    
        self.columnSelector = QComboBox()
        layout.addWidget(QLabel("Select Time Series Column:"))
        layout.addWidget(self.columnSelector)
    
        self.runButton = QPushButton("Run Analysis and Forecast")
        self.runButton.clicked.connect(self.run_analysis)
        layout.addWidget(self.runButton)

        self.resultText = QTextEdit()
        self.resultText.setReadOnly(True)
        layout.addWidget(QLabel("Results:"))
        layout.addWidget(self.resultText)
    
        self.figure = plt.Figure(figsize=(6, 18))
        self.canvas = FigureCanvas(self.figure)
    
        self.canvas_container = QWidget()
        container_layout = QVBoxLayout()
        container_layout.setContentsMargins(0, 0, 0, 0)
        container_layout.addWidget(self.canvas)
        self.canvas_container.setLayout(container_layout)
        self.canvas_container.setMinimumHeight(1500)
        self.canvas_container.setMinimumWidth(800)
    
        scroll_area = QScrollArea()
        scroll_area.setWidgetResizable(True)
        scroll_area.setWidget(self.canvas_container)
        layout.addWidget(scroll_area)
    
        self.setLayout(layout)



    def load_csv(self):
        file_path, _ = QFileDialog.getOpenFileName(self, "Open CSV File", "", "CSV Files (*.csv)")
        if file_path:
            self.df = pd.read_csv(file_path)
            if 'timestamp' in self.df.columns:
                self.df['timestamp'] = pd.to_datetime(self.df['timestamp'], errors='coerce')
                self.df.set_index('timestamp', inplace=True)
                self.df = self.df.sort_index()
                self.df.dropna(inplace=True)
                for col in self.df.columns:
                    self.df[col] = pd.to_numeric(self.df[col], errors='coerce')
                self.df.dropna(inplace=True)
                self.columnSelector.clear()
                self.columnSelector.addItems(self.df.columns)

    def hurst_exponent_rs(self, series, min_window=50, max_window=None, step=50, ax=None):
        ts = np.array(series)
        N = len(ts)
        max_window = N // 10

        rs_values = []
        window_sizes = range(min_window, max_window, step)

        for window in window_sizes:
            n_segments = N // window
            rs = []
            for i in range(n_segments):
                segment = ts[i * window:(i + 1) * window]
                mean = np.mean(segment)
                dev = segment - mean
                cum_dev = np.cumsum(dev)
                R = np.max(cum_dev) - np.min(cum_dev)
                S = np.std(segment)
                if S != 0:
                    rs.append(R / S)
            if rs:
                rs_values.append(np.mean(rs))

        log_rs = np.log(rs_values)
        log_windows = np.log(list(window_sizes))
        hurst, _ = np.polyfit(log_windows, log_rs, 1)

        if ax is not None:
            ax.plot(log_windows, log_rs, 'o', label='log(R/S)')
            ax.plot(log_windows, hurst * np.array(log_windows), label=f'Fit line (H={hurst:.3f})')
            ax.set_xlabel('log(window size)')
            ax.set_ylabel('log(R/S)')
            ax.set_title('Hurst Exponent Estimation via R/S')
            ax.legend()
            ax.grid(True)

        return hurst

    def gph_estimate(self, series, m=None):
        series = np.log(series + 1e-5)  # log-transform to stabilize variance
        n = len(series)
        if m is None:
            m = int(n ** 0.5)
        freqs = 2 * np.pi * np.arange(1, m + 1) / n
        periodogram = np.abs(fft(series - np.mean(series)))[:m+1]**2 / (2 * np.pi * n)
        y = np.log(periodogram[1:m + 1])
        x = np.log(4 * (np.sin(freqs / 2) ** 2))
        slope, _, _, _, _ = stats.linregress(x, y)
        d_estimate = -slope / 2
        return d_estimate

    def _ma_model(self, params, n_points, *, noise_std=1, noise_alpha=2):
        np.random.seed(42)  # Fix random seed for reproducibility
        ma_order = len(params)
        if noise_alpha == 2:
            noise = norm.rvs(scale=noise_std, size=(n_points + ma_order))
        else:
            noise = levy_stable.rvs(noise_alpha, 0, scale=noise_std, size=(n_points + ma_order))
        if ma_order == 0:
            return noise
        ma_coeffs = np.append([1], params)
        ma_series = np.zeros(n_points)
        for idx in range(ma_order, n_points + ma_order):
            take_idx = np.arange(idx, idx - ma_order - 1, -1).astype(int)
            ma_series[idx - ma_order] = np.dot(ma_coeffs, noise[take_idx])
        return ma_series[ma_order:]

    def _arma_model(self, params, noise):
        np.random.seed(42)  # Fix random seed for reproducibility
        ar_order = len(params)
        if ar_order == 0:
            return noise
        n_points = len(noise)
        arma_series = np.zeros(n_points + ar_order)
        for idx in np.arange(ar_order, len(arma_series)):
            take_idx = np.arange(idx - 1, idx - ar_order - 1, -1).astype(int)
            arma_series[idx] = np.dot(params, arma_series[take_idx]) + noise[idx - ar_order]
        return arma_series[ar_order:]

    def _frac_diff(self, x, d):
        def next_pow2(n):
            return (n - 1).bit_length()
        n_points = len(x)
        fft_len = 2 ** next_pow2(2 * n_points - 1)
        prod_ids = np.arange(1, n_points)
        frac_diff_coefs = np.append([1], np.cumprod((prod_ids - d - 1) / prod_ids))
        dx = ifft(fft(x, fft_len) * fft(frac_diff_coefs, fft_len))
        return np.real(dx[0:n_points])

    def arfima(self, ar_params, d, ma_params, n_points, *, noise_std=1, noise_alpha=2, warmup=0):
        np.random.seed(42)  # Fix random seed for reproducibility
        ma_series = self._ma_model(ma_params, n_points + warmup, noise_std=noise_std, noise_alpha=noise_alpha)
        frac_ma = self._frac_diff(ma_series, -d)
        series = self._arma_model(ar_params, frac_ma)
        return series[-n_points:]

    def frac_diff(self, series, d, thresh=1e-5):
        w = [1.]
        k = 1
        while True:
            w_ = -w[-1] * (d - k + 1) / k
            if abs(w_) < thresh:
                break
            w.append(w_)
            k += 1
        w = np.array(w[::-1]).reshape(-1, 1)
        diff_series = np.zeros_like(series, dtype='float64')
        for i in range(len(w), len(series)):
            window = series[i - len(w):i]
            diff_series[i] = np.dot(w.T, window)
        return diff_series[len(w):]

    def run_analysis(self):
        if self.df is None:
            return

        column = self.columnSelector.currentText()
        series = self.df[column].dropna()

        # ADF Test
        adf_result = adfuller(series)
        adf_text = (
            "ADF Test Results:\n"
            f"Test Statistic: {adf_result[0]}\n"
            f"p-value: {adf_result[1]}\n"
            f"Critical Values: {adf_result[4]}\n"
            f"{'Stationary' if adf_result[1] <= 0.05 else 'Non-stationary'}\n"
        )

        # KPSS Test
        kpss_stat, kpss_p, _, kpss_crit = kpss(series, regression='ct')
        kpss_text = (
            "\nKPSS Test Results:\n"
            f"Statistic: {kpss_stat}\n"
            f"p-value: {kpss_p}\n"
            f"Critical Values: {kpss_crit}\n"
            f"{'Non-stationary' if kpss_p < 0.05 else 'Stationary'}\n"
        )

        # Clear and resize figure
        self.canvas.figure.clear()
        self.canvas.figure.set_size_inches(6, 18)  # Narrower and taller

        # Create separate axes
        axes = [
            self.canvas.figure.add_subplot(6, 1, 1),
            self.canvas.figure.add_subplot(6, 1, 2),
            self.canvas.figure.add_subplot(6, 1, 3),
            self.canvas.figure.add_subplot(6, 1, 4),
            self.canvas.figure.add_subplot(6, 1, 5),  # Hurst plot
            self.canvas.figure.add_subplot(6, 1, 6)

        ]

        # Seasonal Decomposition
        decomposition = seasonal_decompose(series, model='additive', period=365)

        axes[0].plot(series.index, series, label='Original', color='blue')
        axes[0].set_title('Original Time Series')
        axes[0].set_ylabel('Value')
        axes[0].legend()

        axes[1].plot(series.index, decomposition.trend, label='Trend', color='green')
        axes[1].set_title('Trend')
        axes[1].set_ylabel('Value')
        axes[1].legend()

        axes[2].plot(series.index, decomposition.seasonal, label='Seasonality', color='orange')
        axes[2].set_title('Seasonality')
        axes[2].set_ylabel('Value')
        axes[2].legend()

        axes[3].plot(series.index, decomposition.resid, label='Residuals', color='red')
        axes[3].set_title('Residuals')
        axes[3].set_ylabel('Value')
        axes[3].legend()

        axes[3].xaxis.set_major_locator(mdates.YearLocator())
        axes[3].xaxis.set_major_formatter(mdates.DateFormatter('%Y'))

        # Hurst Exponent Plot
        hurst = self.hurst_exponent_rs(series, ax=axes[4])
        hurst_text = f"\nHurst Exponent: {hurst:.4f}\n"

        full_text = adf_text + kpss_text + hurst_text
        self.resultText.setPlainText(full_text)

        self.canvas.figure.tight_layout(pad=3.0)
        self.canvas.draw()


        series = pd.Series(series).asfreq('B').ffill().values
    
        d_est = self.gph_estimate(series)
        fd_close = self.frac_diff(series, d_est)
    
        aic_vals = []
        orders = [(p, q) for p in range(4) for q in range(4)]
        for p, q in orders:
            try:
                model = ARIMA(fd_close, order=(p, 0, q)).fit()
                aic_vals.append((model.aic, p, q))
            except:
                continue
    
        if not aic_vals:
            self.resultText.append("No suitable ARIMA model found.")
            return
    
        _, best_p, best_q = sorted(aic_vals)[0]
        final_model = ARIMA(fd_close, order=(best_p, 0, best_q)).fit()
        ar_params = final_model.arparams.tolist()
        ma_params = final_model.maparams.tolist()
    
        forecast_horizon = 30
        forecast = self.arfima(ar_params, d_est, ma_params, forecast_horizon)
        forecast_rescaled = forecast + series[-1]
    
        # Train/test split
        train_series = series[:-forecast_horizon]
        test_series = series[-forecast_horizon:]
    
        d_est_train = self.gph_estimate(train_series)
        fd_train = self.frac_diff(train_series, d_est)
        model = ARIMA(fd_train, order=(best_p, 0, best_q)).fit()
        ar_params = model.arparams.tolist()
        ma_params = model.maparams.tolist()
        forecast_train = self.arfima(ar_params, d_est, ma_params, forecast_horizon)
        forecast_rescaled_train = forecast_train + train_series[-1]
    
        min_len = min(len(forecast_rescaled_train), len(test_series))
        forecast_rescaled_train = forecast_rescaled_train[:min_len]
        test_series = test_series[:min_len]
    
        rmse = np.sqrt(mean_squared_error(test_series, forecast_rescaled_train))
        mae = mean_absolute_error(test_series, forecast_rescaled_train)
        mean_actual = np.mean(test_series)
        rmse_ratio = rmse / mean_actual
        mae_ratio = mae / mean_actual

    
        forecast_text = f"\nARFIMA Forecast vs Actual (30 Days):\n"
        for i in range(min_len):
            forecast_text += f"Day {i+1}: Forecast = {forecast_rescaled_train[i]:.2f}, Actual = {test_series[i]:.2f}\n"

        metrics_text = (
            f"\nEstimated d: {d_est:.4f}\n"
            f"Best AR order: {best_p}\n"
            f"Best MA order: {best_q}\n"
            f"RMSE: {rmse:.4f}, MAE: {mae:.4f}\n"
            f"RMSE Ratio: {rmse_ratio:.2%}, MAE Ratio: {mae_ratio:.2%}\n"
        )

        self.resultText.append(metrics_text + forecast_text)
    
        # Forecast plot
        # Inside your run_analysis function:

        # Prepare the train and test data
        min_len = min(len(forecast_rescaled_train), len(test_series))
        train_series = series[:-forecast_horizon]
        test_series = series[-forecast_horizon:]
        forecast_rescaled_train = forecast_rescaled_train[:min_len]
        x_test = range(len(train_series), len(train_series) + min_len)
        
        # Plotting on the existing axes (trend plot area)
        axes[5].plot(range(len(train_series)), train_series, label="Training Data", color='gray')
        axes[5].plot(x_test, test_series[:min_len], label="Actual", color='blue')  # Trim to match length
        axes[5].plot(x_test, forecast_rescaled_train[:min_len], label="Forecast", color='red', linestyle='--')
        axes[5].set_title("ARFIMA Forecast vs Actual (Next 30 Days)")
        axes[5].legend()

        # Refresh canvas to update the plot
        self.canvas.draw()
        

if __name__ == "__main__":
    app = QApplication(sys.argv)
    window = TimeSeriesApp()
    window.resize(1000, 900)  # Resize the window as needed
    window.show()  # Show the window
    sys.exit(app.exec_())  # Start the application loop and exit when done
