# AR, MA, ARMA, ARIMA: Pure NumPy/Pandas Implementations

This project provides a pure NumPy and Pandas implementation of classical time series models:
- **AR (AutoRegressive)**
- **MA (Moving Average)**
- **ARMA (AutoRegressive Moving Average)**
- **ARIMA (AutoRegressive Integrated Moving Average)**

The models are implemented from scratch to enhance understanding of time series dynamics and optimization without relying on external ML libraries.

Written by Hyeokgi Kim, 2025. Built for education.

In [None]:
# Hidden helper code
# Helper functions
import numpy as np
import pandas as pd
from numpy import ndarray
from pandas import Series
import matplotlib.pyplot as plt


def acf(series: ndarray, max_lag: int) -> np.ndarray:
    """Return biased autocorrelation sequence up to max_lag."""
    n = len(series)
    mean = series.mean()
    var = ((series - mean) ** 2).sum() / n
    res = np.empty(max_lag + 1)

    for k in range(max_lag + 1):
        num = np.dot(series[: n - k] - mean, series[k:] - mean) / n
        res[k] = num / var
    return res


def levinson_durbin(r: ndarray, p: int) -> np.ndarray:
    """Solve the Yule-Walker equations via Levinson-Durbin recursion."""
    phi = np.zeros(p)
    sig = r[0]

    for k in range(1, p + 1):
        acc = r[k] - sum(phi[j - 1] * r[k - j] for j in range(1, k))
        gamma = acc / sig

        phi_prev = phi.copy()
        phi[k - 1] = gamma
        for j in range(1, k):
            phi[j - 1] = phi_prev[j - 1] - gamma * phi_prev[k - j - 1]

        sig *= 1.0 - gamma**2
    return phi


def difference(x: ndarray, d: int = 1) -> np.ndarray:
    """Apply d-order differencing."""
    for _ in range(d):
        x = np.diff(x)
    return x


def optimize_gd(
    init: ndarray,
    loss_grad_fn,
    lr: float = 1e-3,
    epochs: int = 5000,
    grad_clip: float = 1000.0,
    tol: float = 1e-8,
):
    """
    Generic first-order optimizer.
    Uses gradient descent with clipping

    Parameters
    ----------
    init        : Initial parameter vector (copied internally).
    loss_grad_fn: Callable(params) -> (loss, grad).
    lr          : Learning rate.
    epochs      : Maximum iterations.
    grad_clip   : Gradient absolute-value clipping threshold.
    tol         : Convergence threshold on parameter update norm.
    """
    params = init.copy()

    for _ in range(epochs):
        loss, grad = loss_grad_fn(params)
        grad = np.clip(grad, -grad_clip, grad_clip)
        params -= lr * grad
        step_norm = np.sqrt(np.sum((lr * grad) ** 2))
        if _ % 1000 == 0:
            # print(loss, grad, params)
            continue
        if step_norm < tol and 0:
            # print(f"loss = {loss}, converged")
            break
    return params


# Base model interface
class TimeSeriesModel:
    def fit(self, series: Series):
        raise NotImplementedError

    def predict(self, series: pd.Series, steps: int = 1) -> np.ndarray:
        raise NotImplementedError


## AR Model

In [None]:
"""AR model implementation"""

import numpy as np
import pandas as pd
from numpy import ndarray
from pandas import Series
import matplotlib.pyplot as plt

from helper import TimeSeriesModel
from helper import acf
from helper import levinson_durbin
from helper import optimize_gd


# AR(p) model
class ARModel(TimeSeriesModel):
    def __init__(self, p: int):
        self.p = p
        self.coef = None

    def fit(self, series: Series):
        print("Fitting AR model...", end="")
        r = acf(series.to_numpy(dtype=float), self.p)
        self.coef = levinson_durbin(r, self.p)
        print("Done!")

    def predict(self, series: Series, steps: int = 1) -> np.ndarray:
        hist = series.to_numpy(dtype=float).tolist()
        out = []
        for _ in range(steps):
            # TODO: compute y_hat using the last p values.
            out.append(y_hat)
            hist.append(y_hat)
        return np.array(out)


if __name__ == "__main__":
    np.random.seed(0)

    # Simulate ARIMA(2, 1, 2) series
    n = 300
    ar_coef = np.array([0.6, -0.3])
    ma_coef = np.array([0.5, 0.4])
    noise = np.random.randn(n + 100)
    x = np.zeros(n + 100)

    for t in range(2, len(x)):
        ar_part = np.dot(ar_coef, x[t - 2 : t][::-1])
        ma_part = np.dot(ma_coef, noise[t - 2 : t][::-1])
        x[t] = ar_part + ma_part + noise[t]

    x = np.cumsum(x[100:])  # integration (d = 1)
    ts = pd.Series(x)

    # Fit model
    ar_model = ARModel(2)
    ar_model.fit(ts)

    # Forecast next five points
    print("AR: ", ar_model.predict(ts, 5))



### ✎ Exercise

Modify the `ARModel` class to log the loss every 1000 iterations during training.
- Hint: Look inside the `optimize_gd` function.
- Optional: Add visualization of loss over epochs.

```python
# Your code here

```


## MA Model

In [None]:
"""MA model implementation"""

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from helper import TimeSeriesModel
from helper import acf
from helper import levinson_durbin
from helper import optimize_gd


# MA(q) model
class MAModel(TimeSeriesModel):
    def __init__(self, q: int, lr: float = 1e-4, epochs: int = 10000):
        self.q, self.lr, self.epochs = q, lr, epochs
        self.mu = None
        self.std = None
        self.theta = None

    def fit(self, series: pd.Series):
        print("Fitting MA model...", end="")
        x_original = series.to_numpy(dtype=float)
        self.mu = x_original.mean()
        self.std = x_original.std()
        x = (x_original - self.mu) / self.std
        n, q = len(x), self.q

        def loss_grad(theta):
            eps = np.zeros(n)

            # Forward pass - compute residuals
            for t in range(n):
                eps[t] = x[t]
                for j in range(min(t, q)):
                    eps[t] -= theta[j] * eps[t - j - 1]

            # Compute loss
            loss = 0.5 * np.mean(eps**2)

            # Compute gradient more stably by using finite differences
            # for the most sensitive components
            grad = np.zeros(q)
            for i in range(q):
                h = 1e-6  # Small perturbation
                theta_plus = theta.copy()
                theta_plus[i] += h

                # Compute perturbed residuals
                eps_plus = np.zeros(n)
                for t in range(n):
                    eps_plus[t] = x[t]
                    for j in range(min(t, q)):
                        eps_plus[t] -= theta_plus[j] * eps_plus[t - j - 1]

                # Compute numerical gradient
                # TODO: write the function to calculate loss_plus and grad

            return loss, grad

        # Use a smaller learning rate and tighter gradient clipping
        self.theta = optimize_gd(
            np.zeros(q),  # Start from zeros instead of random
            loss_grad,
            lr=self.lr,
            epochs=self.epochs,
            grad_clip=1.0,  # Much tighter gradient clipping
        )
        print("Done!")

    def predict(self, series: pd.Series, steps: int = 1) -> np.ndarray:
        if self.theta is None:
            raise ValueError("Model not fitted yet. Call fit() first.")

        x_original = series.to_numpy(dtype=float)
        x = (x_original - self.mu) / self.std
        eps_hist = np.zeros(len(x))

        # Compute residuals on training data (used as history)
        for t in range(len(x)):
            eps_hist[t] = x[t]
            for j in range(min(t, self.q)):
                eps_hist[t] -= self.theta[j] * eps_hist[t - j - 1]

        preds = []
        for _ in range(steps):
            pred = self.mu
            for j in range(min(self.q, len(eps_hist))):
                pred += self.theta[j] * self.std * eps_hist[-(j + 1)]
            preds.append(pred)
            eps_hist = np.append(eps_hist, 0.0)

        return np.array(preds)


if __name__ == "__main__":
    np.random.seed(0)

    # Simulate ARIMA(2, 1, 2) series
    n = 300
    ar_coef = np.array([0.6, -0.3])
    ma_coef = np.array([0.5, 0.4])
    noise = np.random.randn(n + 100)
    x = np.zeros(n + 100)

    for t in range(2, len(x)):
        ar_part = np.dot(ar_coef, x[t - 2 : t][::-1])
        ma_part = np.dot(ma_coef, noise[t - 2 : t][::-1])
        x[t] = ar_part + ma_part + noise[t]

    x = np.cumsum(x[100:])  # integration (d = 1)
    ts = pd.Series(x)

    # Fit model
    ma_model = MAModel(2, lr=1e-3, epochs=10000)
    ma_model.fit(ts)

    # Forecast next five points
    print("MA    →", ma_model.predict(ts, 5))



### ✎ Exercise

Modify the `MAModel` class to log the loss every 1000 iterations during training.
- Hint: Look inside the `optimize_gd` function.
- Optional: Add visualization of loss over epochs.

```python
# Your code here

```


## ARMA Model

In [None]:
"""ARMA model implementations"""

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from helper import TimeSeriesModel
from helper import optimize_gd


class ARMAModel(TimeSeriesModel):
    def __init__(self, p: int, q: int, lr: float = 1e-4, epochs: int = 10000):
        self.p, self.q, self.lr, self.epochs = p, q, lr, epochs
        self.ar_coef = None
        self.ma_coef = None
        self.mu = None
        self.std = None

    def fit(self, series: pd.Series):
        print("Fitting ARMA model...", end="")
        x_original = series.to_numpy(dtype=float)
        self.mu = x_original.mean()
        self.std = x_original.std()
        x = (x_original - self.mu) / self.std
        n, p, q = len(x), self.p, self.q

        def loss_grad(params):
            eps = np.zeros(n)
            ar_params = params[:p]
            ma_params = params[p:]

            # Compute residuals
            for t in range(n):
                eps[t] = x[t]
                for j in range(min(t, p)):
                    eps[t] -= ar_params[j] * x[t - j - 1]
                for j in range(min(t, q)):
                    eps[t] -= ma_params[j] * eps[t - j - 1]

            # Compute loss
            loss = 0.5 * np.mean(eps**2)

            # Compute gradients - finite difference
            grad = np.zeros(p + q)
            # ar params
            for i in range(p):
                h = 1e-6
                params_plus = params.copy()
                params_plus[i] += h
                ar_plus = params_plus[:p]
                ma_plus = params_plus[p:]

                eps_plus = np.zeros(n)
                for t in range(n):
                    eps_plus[t] = x[t]
                    for j in range(min(t, p)):
                        eps_plus[t] -= ar_plus[j] * x[t - j - 1]
                    for j in range(min(t, q)):
                        eps_plus[t] -= ma_plus[j] * eps_plus[t - j - 1]

                loss_plus = 0.5 * np.mean(eps_plus**2)
                grad[i] = (loss_plus - loss) / h

            # ma params
            for i in range(q):
                h = 1e-6
                params_plus = params.copy()
                params_plus[p + i] += h
                ar_plus = params_plus[:p]
                ma_plus = params_plus[p:]

                eps_plus = np.zeros(n)
                for t in range(n):
                    eps_plus[t] = x[t]
                    for j in range(min(t, p)):
                        eps_plus[t] -= ar_plus[j] * x[t - j - 1]
                    for j in range(min(t, q)):
                        eps_plus[t] -= ma_plus[j] * eps_plus[t - j - 1]

                loss_plus = 0.5 * np.mean(eps_plus**2)
                grad[p + i] = (loss_plus - loss) / h
            return loss, grad

        params = optimize_gd(
            np.zeros(p + q),
            loss_grad,
            lr=self.lr,
            epochs=self.epochs,
            grad_clip=1.0,
        )
        self.ar_coef = params[:p]
        self.ma_coef = params[p:]
        print("Done!")

    def predict(self, series: pd.Series, steps: int = 1) -> np.ndarray:
        if self.ar_coef is None or self.ma_coef is None:
            raise ValueError("Model not fitted yet. Call fit() first.")

        x_original = series.to_numpy(dtype=float)
        x = (x_original - self.mu) / self.std
        n = len(x)
        eps = np.zeros(n)

        # Compute residuals
        for t in range(n):
            eps[t] = x[t]
            for j in range(min(t, self.p)):
                eps[t] -= self.ar_coef[j] * x[t - j - 1]
            for j in range(min(t, self.q)):
                eps[t] -= self.ma_coef[j] * eps[t - j - 1]

        preds = []
        forecast_x = np.append(x, np.zeros(steps))
        forecast_eps = np.append(eps, np.zeros(steps))

        for t in range(n, n + steps):
            forecast = self.mu
            for j in range(self.p):
                if t - j - 1 >= 0:
                    forecast += self.ar_coef[j] * self.std * forecast_x[t - j - 1]
            for j in range(self.q):
                if t - j - 1 >= 0:
                    forecast += self.ma_coef[j] * self.std * forecast_eps[t - j - 1]

            preds.append(forecast)
            forecast_x[t] = (forecast - self.mu) / self.std
        return np.array(preds)

if __name__ == "__main__":
    np.random.seed(0)

    # Simulate ARIMA(2, 1, 2) series
    n = 300
    ar_coef = np.array([0.6, -0.3])
    ma_coef = np.array([0.5, 0.4])
    noise = np.random.randn(n + 100)
    x = np.zeros(n + 100)

    for t in range(2, len(x)):
        ar_part = np.dot(ar_coef, x[t - 2 : t][::-1])
        ma_part = np.dot(ma_coef, noise[t - 2 : t][::-1])
        x[t] = ar_part + ma_part + noise[t]

    x = np.cumsum(x[100:])  # integration (d = 1)
    ts = pd.Series(x)

    # Fit models
    arma_model = ARMAModel(2, 2, lr=1e-2, epochs=5000)
    arma_model.fit(ts.diff().dropna())  # Manually difference for ARMA

    # Forecast next five points
    print("ARMA coeffs: ", arma_model.ar_coef, arma_model.ma_coef)
    print("ARMA predictions: ", arma_model.predict(ts.diff().dropna(), 5))


### ✎ Exercise

Modify the `ARMAModel` class to log the loss every 100 iterations during training.
And try to plot it.
- Hint: Look inside the `optimize_gd` function.
- Optional: Add visualization of loss over epochs.


In [None]:
# TODO: your code here!

## ARIMA Model

In [None]:
"""ARIMA model implementations"""

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from helper import TimeSeriesModel
from helper import optimize_gd
from arma import ARMAModel


class ARIMAModel(TimeSeriesModel):
    def __init__(self, p: int, d: int, q: int, lr: float = 1e-4, epochs: int = 10000):
        self.p = p  # AR order
        self.d = d  # Differencing order
        self.q = q  # MA order
        self.lr = lr
        self.epochs = epochs
        self.arma_model = ARMAModel(p, q, lr, epochs)
        self.original_series = None

    def fit(self, series: pd.Series):
        print(f"Fitting ARIMA({self.p},{self.d},{self.q}) model...", end="")
        self.original_series = series.copy()

        # Apply d-order differencing
        differenced_series = series.copy()
        # TODO: apply d-order differencing.
        # you may use helper.difference

        # Fit ARMA model on differenced series
        self.arma_model.fit(differenced_series)
        print("ARIMA model fitted!")

    def predict(self, series: pd.Series, steps: int = 1) -> np.ndarray:
        if self.arma_model.ar_coef is None:
            raise ValueError("Model not fitted yet. Call fit() first.")

        latest_series = series.copy()
        differenced_series = latest_series.copy()
        # TODO: apply d-order differencing.
        # just write the same code from above

        diff_forecasts = self.arma_model.predict(differenced_series, steps)

        # Undo differencing (integration)
        if self.d == 0:
            return diff_forecasts
        last_values = [latest_series.iloc[-i - 1] for i in range(self.d)]

        forecasts = np.zeros(steps)
        for i in range(steps):
            value = diff_forecasts[i]
            for j in range(self.d):
                value += last_values[j]
            forecasts[i] = value
            for j in range(self.d - 1, 0, -1):
                last_values[j] = last_values[j - 1]
            last_values[0] = forecasts[i]

        return forecasts


if __name__ == "__main__":
    np.random.seed(0)

    # Simulate ARIMA(2, 1, 2) series
    n = 300
    ar_coef = np.array([0.6, -0.3])
    ma_coef = np.array([0.5, 0.4])
    noise = np.random.randn(n + 100)
    x = np.zeros(n + 100)

    for t in range(2, len(x)):
        ar_part = np.dot(ar_coef, x[t - 2 : t][::-1])
        ma_part = np.dot(ma_coef, noise[t - 2 : t][::-1])
        x[t] = ar_part + ma_part + noise[t]

    x = np.cumsum(x[100:])  # integration (d = 1)
    ts = pd.Series(x)

    # Fit models
    arima_model = ARIMAModel(2, 1, 2, lr=1e-2, epochs=5000)
    arima_model.fit(ts)

    # Forecast next five points
    print("ARIMA coeffs: ", arima_model.ar_coef, arima_model.ma_coef)
    print("ARIMA predictions: ", arima_model.predict(ts, 5))

