<a href="https://colab.research.google.com/github/BasmalaAB/CNN/blob/main/Copy-TimeSeries.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Installing Libraries


In [None]:
! pip install yfinance
! pip install pandas
! pip install numpy
! pip install matplotlib
! pip install seaborn
! pip install statsmodels
! pip install ipywidgets
! pip install pmdarima

# Importing Libraries

In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from statsmodels.tsa.stattools import adfuller, kpss, pacf, acf
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.api import ExponentialSmoothing, SimpleExpSmoothing, Holt
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.vector_ar.var_model import VAR
from ipywidgets import interact, widgets
from sklearn.metrics import mean_squared_error
import statsmodels.api as sm
from pmdarima import auto_arima

# Data Extraction

In [None]:
def fetch_data(ticker, start_date, end_date):
    data = yf.download(ticker, start=start_date, end=end_date)
    return data['Close']

# Visualization

In [None]:
def plot_rolling_mean_std(ts):
    rolling_mean = ts.rolling(12).mean()
    rolling_std = ts.rolling(12).std()
    plt.figure(figsize=(22,10))

    plt.plot(ts, label='Actual Mean')
    plt.plot(rolling_mean, label='Rolling Mean')
    plt.plot(rolling_std, label = 'Rolling Std')
    plt.xlabel("Date")
    plt.ylabel("Mean Temperature")
    plt.title('Rolling Mean & Rolling Standard Deviation')
    plt.legend()
    plt.show()

# Data Manipulation

In [None]:
def feature_engineering(data):
  transfomation = data.copy()
  transfomation['MA20'] = transfomation['Close'].rolling(window=20).mean()
  transfomation['MA100'] = transfomation['Close'].rolling(window=100).mean()
  transfomation['Return'] = transfomation['Close'].pct_change()
  transfomation['LogRet'] = np.log(transfomation['Close']).diff()
  transfomation['Volatility'] = transfomation['LogRet'].rolling(window=20).std()
  transfomation.dropna(inplace=True)
  return transfomation

In [None]:
def train_test_split(series, test_size=0.2):
    split_index = int(len(series) * (1 - test_size))
    train_data, test_data = series.iloc[:split_index], series.iloc[split_index - 1:]
    return train_data, test_data

In [None]:
def resampling(df, frequency='M'):
    sampled_df = df.resample(rule=frequency).mean().fillna(method='pad')
    return sampled_df

# Stationarity


In [None]:
def test_stationarity(timeseries):
    # Rolling statistics
    rolling_mean = timeseries.rolling(window=12).mean()
    rolling_std = timeseries.rolling(window=12).std()

    # Plotting rolling statistics
    plt.figure(figsize=(12, 6))
    plt.plot(timeseries, label='Original')
    plt.plot(rolling_mean, color='red', label='Rolling Mean')
    plt.plot(rolling_std, color='black', label='Rolling Std')
    plt.legend(loc='best')
    plt.title('Rolling Mean & Standard Deviation')
    plt.show(block=False)

    # Perform Dickey-Fuller test
    print('Results of Dickey-Fuller Test:')
    dftest = adfuller(timeseries, autolag='AIC')
    dfoutput = pd.Series(dftest[0:4], index=['Test Statistic','p-value','#Lags Used','Number of Observations Used'])
    for key,value in dftest[4].items():
        dfoutput['Critical Value (%s)'%key] = value
    print(dfoutput)

    # Perform KPSS test
    print('Results of KPSS Test:')
    kpsstest = kpss(timeseries, regression='c')
    kpss_output = pd.Series(kpsstest[0:3], index=['Test Statistic', 'p-value', 'Lags Used'])
    for key, value in kpsstest[3].items():
        kpss_output['Critical Value (%s)'%key] = value
    print(kpss_output)

In [None]:
def make_stationary(series, log_transform=False, difference=True, shift=1):
    if log_transform:
        series = np.log(series)
    if difference:
        series = series.diff(periods=shift).dropna()
    return series

# ACF/PACF

In [None]:
def plot_acf_pacf(series, lags=30):
    fig, axes = plt.subplots(1, 2, figsize=(16, 4))
    fig = sm.graphics.tsa.plot_acf(series, ax=axes[0])
    fig = sm.graphics.tsa.plot_pacf(series,  ax=axes[1])
    plt.show()

# ARIMA


In [None]:
def fit_and_forecast_arima(train_data, test_data, p=1, d=1, q=1, forecast_days=7, display_days=7):
    print(p,d,q)
    # Fit the ARIMA model on the training data
    model = ARIMA(train_data, order=(p, d, q))
    results = model.fit()
    print(results.summary())

    forecast_days = min(forecast_days, len(test_data))
    forecast = results.forecast(steps=forecast_days)
    forecast_index = test_data.index[:forecast_days]
    forecast_series = pd.Series(forecast.values, index=forecast_index)

    # Plot forecast vs actual
    plt.figure(figsize=(12, 6))
    plt.plot(train_data[-(display_days+1):], label='Training Data', color='orange')
    plt.plot(test_data[:display_days], label='Testing Data', color='blue')
    plt.plot(forecast_series, label='Forecast', color='red')
    plt.legend(loc='best')
    plt.title(f'ARIMA Model Forecast vs Actual (p={p}, d={d}, q={q})')
    plt.show()
    # Calculate and print RMSE
    rmse = np.sqrt(mean_squared_error(test_data[:forecast_days], forecast_series))
    print(f'Root Mean Squared Error: {rmse}')

In [None]:
def interactive_arima(train_data, test_data, p=1, d=1, q=1,):
    def update(p=p, d=d, q=q, forecast_days=2, display_days=7):
        fit_and_forecast_arima(train_data, test_data, p, d, q, forecast_days, display_days)
    interact(update,
             p=widgets.IntSlider(min=0, max=15, step=1, value=0),
             d=widgets.IntSlider(min=0, max=2, step=1, value=0),
             q=widgets.IntSlider(min=0, max=15, step=1, value=0),
             forecast_days=widgets.IntSlider(min=1, max=7, step=1, value=2),
             display_days=widgets.IntSlider(min=3, max=14, step=1, value=7))

# AutoARIMA

In [None]:
def fit_and_forecast_auto_arima(train_data, test_data, forecast_days=2, display_days=7):
    # Fit the auto_arima model on the training data
    model = auto_arima(train_data, seasonal=False, trace=True, error_action='ignore', suppress_warnings=True)
    print(model.summary())

    # Forecast starting from the end of training data
    forecast = model.predict(n_periods=forecast_days)
    forecast_start_index = train_data.index[-1]
    forecast_days = min(forecast_days, len(test_data))
    forecast_index = test_data.index[:forecast_days]
    # forecast_index = pd.date_range(start=forecast_start_index, periods=forecast_days + 1, freq=train_data.index.freq)[1:]
    forecast_series = pd.Series(forecast.values, index=forecast_index)

    # Plot forecast vs actual including the transition point
    plt.figure(figsize=(12, 6))
    plt.plot(train_data[-(display_days+1):], label='Training Data', color='orange')
    plt.plot(test_data[:display_days], label='Testing Data', color='blue')
    plt.plot(forecast_series, label='Forecast', color='red')
    plt.legend(loc='best')
    plt.title('Auto ARIMA Model Forecast vs Actual')
    plt.show()

    # Calculate and print RMSE
    rmse = np.sqrt(mean_squared_error(test_data[:forecast_days], forecast_series))
    print(f'Root Mean Squared Error: {rmse}')

# VAR

In [None]:
def fit_and_forecast_var(df, train_data, test_data, lags=1, forecast_days=2, display_days=7):
    # Fit the VAR model on training data
    model = VAR(train_data)
    results = model.fit(lags)
    print(results.summary())

    # Forecast
    forecast = results.forecast(train_data.values[-lags:], steps=forecast_days)
    forecast_index = pd.date_range(start=train_data.index[-1], periods=forecast_days + 1, freq=train_data.index.freq)[1:]
    forecast_df = pd.DataFrame(forecast, index=forecast_index, columns=train_data.columns)

    # Plot forecast vs actual
    plt.figure(figsize=(12, 6))
    for col in test_data.columns:
        plt.plot(train_data[col].iloc[-(display_days+1):], label=f'Training {col}', color='orange')
        plt.plot(test_data[col][:display_days], label=f'Testing {col}', color='blue')
        plt.plot(forecast_df.index, forecast_df[col], label=f'Forecast {col}', color='red', linestyle='--')
    plt.legend(loc='best')
    plt.title('VAR Model Forecast vs Actual')
    plt.show()

In [None]:
def interactive_var(train_data, test_data):
    def update(lags=1, forecast_days=2, display_days=7):
        fit_and_forecast_var(df, train_data, test_data, lags, forecast_days, display_days)
    interact(update,
             lags=widgets.IntSlider(min=1, max=10, step=1, value=1),
             forecast_days=widgets.IntSlider(min=1, max=7, step=1, value=2),
             display_days=widgets.IntSlider(min=3, max=14, step=1, value=7))

# Main

In [None]:
path = '/content/testset.csv'
weather_df = pd.read_csv(path, parse_dates=['datetime_utc'], index_col='datetime_utc')
weather_df = weather_df.loc[:,[' _hum', ' _tempm']]
weather_df = weather_df.rename(index=str, columns={' _hum': 'humidity', ' _pressurem': 'pressure', ' _tempm': 'temprature'})
weather_df.index = pd.to_datetime(weather_df.index)
weather_df.ffill(inplace=True)
print(f'dataset shape (rows, columns) - {weather_df.shape}')
weather_df.head()

In [None]:
weather_df.describe()

In [None]:
# weather_df = weather_df[weather_df.temprature < 50]
# weather_df = weather_df[weather_df.humidity <= 100]

In [None]:
weather_df.describe()

In [None]:
weather_df.head()

In [None]:
weather_weekly = resampling(weather_df, frequency='W')
weather_monthly = resampling(weather_df, frequency='M')

In [None]:
train_df, test_df = train_test_split(weather_monthly['2000':'2017'])

In [None]:
plot_rolling_mean_std(train_df.temprature)

In [None]:
print("Testing Stationarity of the Original Series:")
test_stationarity(train_df.temprature)

In [None]:
# Feature Engineering to achieve stationarity
print("\nApplying Log Transform to Achieve Stationarity:")
stationary_series_log = make_stationary(train_df.temprature, log_transform=True, difference=False)
test_stationarity(stationary_series_log)

In [None]:
# Feature Engineering to achieve stationarity
print("\nApplying Differencing to Achieve Stationarity:")
stationary_series_diff = make_stationary(train_df.temprature, log_transform=False, difference=True, shift=1)
test_stationarity(stationary_series_diff)

In [None]:
print("\nPlotting ACF and PACF:")
plot_acf_pacf(train_df.temprature)

In [None]:
print("\nPlotting ACF and PACF:")
plot_acf_pacf(stationary_series_diff)

In [None]:
# Univariate Forecasting with ARIMA
print("\n### Univariate Forecasting with ARIMA ###\n")
print("\nInteractive ARIMA Model:")
interactive_arima(train_df.temprature, test_df.temprature)

In [None]:
fit_and_forecast_auto_arima(train_df.temprature, test_df.temprature, forecast_days=7)

In [None]:
# Multivariate Forecasting using VAR
print("\n### Multivariate Forecasting ###\n")
print("Fetching additional data for multivariate analysis...")
print("\nInteractive VAR Model:")
interactive_var(train_df,test_df)