#### Pre-Processing

In [None]:
import pandas as pd

df['day'] = df['date'].dt.date
sentiment_counts = df.groupby(['day', 'extracted_sentiment']).size().unstack(fill_value=0)
sentiment_counts['total'] = sentiment_counts.sum(axis=1)
sentiment_ratios = sentiment_counts.div(sentiment_counts['total'], axis=0)
sentiment_ratios = sentiment_ratios.rename(columns={0: 'negative_ratio', 1: 'neutral_ratio', 2: 'positive_ratio'})
sentiment_ratios = sentiment_ratios.drop(columns='total')
print(sentiment_ratios.head())

#### Negative_Ratio

In [None]:
import pandas as pd
import numpy as np
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error
import matplotlib.pyplot as plt

# Prepare data
df = sentiment_counts[['negative_ratio']].copy()
df.index = pd.to_datetime(df.index)
df = df.sort_index()

# Expand training data to include February
train_df = df[(df.index.month == 1) | (df.index.month == 2) | (df.index.month == 3)]
test_df = df[df.index.month == 4]

scaler = MinMaxScaler()
train_scaled = scaler.fit_transform(train_df)
test_scaled = scaler.transform(test_df)

def create_sequences(data, seq_len):
    X, y = [], []
    for i in range(len(data) - seq_len):
        X.append(data[i:i + seq_len])
        y.append(data[i + seq_len])
    return np.array(X), np.array(y)

SEQ_LEN = 5
X_train, y_train = create_sequences(train_scaled, SEQ_LEN)
X_test, y_test = create_sequences(test_scaled, SEQ_LEN)

X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Smooth Quantile Loss using Huber loss
class SmoothQuantileLoss(nn.Module):
    def __init__(self, quantiles, delta=1.0):
        super().__init__()
        self.quantiles = quantiles
        self.delta = delta

    def forward(self, preds, target):
        losses = []
        for i, q in enumerate(self.quantiles):
            errors = target - preds[:, i:i+1]
            abs_errors = torch.abs(errors)
            sq_errors = 0.5 * errors ** 2
            linear_errors = self.delta * (abs_errors - 0.5 * self.delta)
            loss = torch.where(abs_errors <= self.delta, sq_errors, linear_errors)
            loss = torch.max((q - 1) * errors, q * errors)
            losses.append(loss)
        return torch.mean(torch.sum(torch.cat(losses, dim=1), dim=1))

# LSTM model
class LSTMQuantileWithSmoothLoss(nn.Module):
    def __init__(self, input_size, hidden_size, quantiles=[0.025, 0.5, 0.975]):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers=5, batch_first=True)
        self.dropout = nn.Dropout(0.3)
        self.fc = nn.Linear(hidden_size, len(quantiles))

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.dropout(out)
        out = out[:, -1, :]
        return self.fc(out)

# Training and forecasting
def train_and_forecast(model, train_loader, X_test_tensor, y_test_tensor, scaler, quantiles, epochs=100, lr=0.001):
    criterion = SmoothQuantileLoss(quantiles)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.5)
    losses = []

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for xb, yb in train_loader:
            preds = model(xb)
            loss = criterion(preds, yb)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        scheduler.step()
        avg_loss = total_loss / len(train_loader)
        losses.append(avg_loss)
        print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}")

    # Plot training loss
    plt.figure(figsize=(6, 4))
    plt.plot(losses, label='Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training Loss over Epochs')
    plt.legend()
    plt.tight_layout()
    plt.show()

    # Forecast
    model.eval()
    q05_preds, q50_preds, q90_preds, true_vals = [], [], [], []
    rolling_steps = min(len(X_test_tensor), len(y_test_tensor))

    with torch.no_grad():
        for i in range(rolling_steps):
            x_in = X_test_tensor[i:i+1]
            y_true = y_test_tensor[i].numpy()
            y_pred = model(x_in).numpy()

            q05 = scaler.inverse_transform(y_pred[:, 0:1])[0, 0]
            q50 = scaler.inverse_transform(y_pred[:, 1:2])[0, 0]
            q90 = scaler.inverse_transform(y_pred[:, 2:3])[0, 0]
            true = scaler.inverse_transform(y_true.reshape(-1, 1))[0, 0]

            q05_preds.append(q05)
            q50_preds.append(q50)
            q90_preds.append(q90)
            true_vals.append(true)

    # Evaluation
    rmse = np.sqrt(mean_squared_error(true_vals, q50_preds))
    mape = mean_absolute_percentage_error(true_vals, q50_preds)
    picp = np.mean((np.array(true_vals) >= np.array(q05_preds)) & (np.array(true_vals) <= np.array(q90_preds)))
    mpiw = np.mean(np.array(q90_preds) - np.array(q05_preds))

    print("\n--- Rolling Forecast Evaluation ---")
    print(f"RMSE (Median): {rmse:.4f}")
    print(f"MAPE (Median): {mape*100:.2f}%")
    print(f"PICP (Coverage): {picp*100:.2f}%")
    print(f"MPIW (Interval Width): {mpiw:.4f}")

    # Plot forecast
    plt.figure(figsize=(8, 4))
    plt.plot(true_vals, label="True", color="blue")
    plt.plot(q50_preds, label="Predicted Median", color="red")
    plt.fill_between(range(len(q50_preds)), q05_preds, q90_preds, color='orange', alpha=0.4, label="Q025–Q975")
    plt.plot(q05_preds, '--', color='orange', alpha=0.6)
    plt.plot(q90_preds, '--', color='orange', alpha=0.6)

    # Set y-axis limits and ticks
    plt.ylim(0, 0.5)
    plt.yticks([0.1, 0.2, 0.3, 0.4, 0.5])

    plt.xlim(0, len(true_vals) - 1)
    plt.legend()
    plt.title("Quantile Forecast - April (Negative Ratio)")
    plt.xlabel("Time Steps")
    plt.ylabel("Ratio")
    plt.tight_layout()
    plt.show()

    return {
        'q05': np.array(q05_preds),
        'q50': np.array(q50_preds),
        'q90': np.array(q90_preds),
        'true': np.array(true_vals),
        'rmse': rmse,
        'mape': mape,
        'picp': picp,
        'mpiw': mpiw
    }

# Run everything
quantiles = [0.025, 0.5, 0.975]
model = LSTMQuantileWithSmoothLoss(input_size=1, hidden_size=256, quantiles=quantiles)

results = train_and_forecast(
    model=model,
    train_loader=train_loader,
    X_test_tensor=X_test_tensor,
    y_test_tensor=y_test_tensor,
    scaler=scaler,
    quantiles=quantiles,
    epochs=100,
    lr=0.001
)


#### Positive_Ratio

In [None]:
import pandas as pd
import numpy as np
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error
import matplotlib.pyplot as plt

# Prepare data
df = sentiment_counts[['positive_ratio']].copy()
df.index = pd.to_datetime(df.index)
df = df.sort_index()

# Expand training data to include February
train_df = df[(df.index.month == 1) | (df.index.month == 2) | (df.index.month == 3)]
test_df = df[df.index.month == 4]

scaler = MinMaxScaler()
train_scaled = scaler.fit_transform(train_df)
test_scaled = scaler.transform(test_df)

def create_sequences(data, seq_len):
    X, y = [], []
    for i in range(len(data) - seq_len):
        X.append(data[i:i + seq_len])
        y.append(data[i + seq_len])
    return np.array(X), np.array(y)

SEQ_LEN = 5
X_train, y_train = create_sequences(train_scaled, SEQ_LEN)
X_test, y_test = create_sequences(test_scaled, SEQ_LEN)

X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Smooth Quantile Loss using Huber loss
class SmoothQuantileLoss(nn.Module):
    def __init__(self, quantiles, delta=1.0):
        super().__init__()
        self.quantiles = quantiles
        self.delta = delta

    def forward(self, preds, target):
        losses = []
        for i, q in enumerate(self.quantiles):
            errors = target - preds[:, i:i+1]
            abs_errors = torch.abs(errors)
            sq_errors = 0.5 * errors ** 2
            linear_errors = self.delta * (abs_errors - 0.5 * self.delta)
            loss = torch.where(abs_errors <= self.delta, sq_errors, linear_errors)
            loss = torch.max((q - 1) * errors, q * errors)
            losses.append(loss)
        return torch.mean(torch.sum(torch.cat(losses, dim=1), dim=1))

# LSTM model
class LSTMQuantileWithSmoothLoss(nn.Module):
    def __init__(self, input_size, hidden_size, quantiles=[0.025, 0.5, 0.975]):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers=5, batch_first=True)
        self.dropout = nn.Dropout(0.3)
        self.fc = nn.Linear(hidden_size, len(quantiles))

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.dropout(out)
        out = out[:, -1, :]
        return self.fc(out)

# Training and forecasting
def train_and_forecast(model, train_loader, X_test_tensor, y_test_tensor, scaler, quantiles, epochs=100, lr=0.001):
    criterion = SmoothQuantileLoss(quantiles)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.5)
    losses = []

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for xb, yb in train_loader:
            preds = model(xb)
            loss = criterion(preds, yb)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        scheduler.step()
        avg_loss = total_loss / len(train_loader)
        losses.append(avg_loss)
        print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}")

    # Plot training loss
    plt.figure(figsize=(6, 4))
    plt.plot(losses, label='Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training Loss over Epochs')
    plt.legend()
    plt.tight_layout()
    plt.show()

    # Forecast
    model.eval()
    q05_preds, q50_preds, q90_preds, true_vals = [], [], [], []
    rolling_steps = min(len(X_test_tensor), len(y_test_tensor))

    with torch.no_grad():
        for i in range(rolling_steps):
            x_in = X_test_tensor[i:i+1]
            y_true = y_test_tensor[i].numpy()
            y_pred = model(x_in).numpy()

            q05 = scaler.inverse_transform(y_pred[:, 0:1])[0, 0]
            q50 = scaler.inverse_transform(y_pred[:, 1:2])[0, 0]
            q90 = scaler.inverse_transform(y_pred[:, 2:3])[0, 0]
            true = scaler.inverse_transform(y_true.reshape(-1, 1))[0, 0]

            q05_preds.append(q05)
            q50_preds.append(q50)
            q90_preds.append(q90)
            true_vals.append(true)

    # Evaluation
    rmse = np.sqrt(mean_squared_error(true_vals, q50_preds))
    mape = mean_absolute_percentage_error(true_vals, q50_preds)
    picp = np.mean((np.array(true_vals) >= np.array(q05_preds)) & (np.array(true_vals) <= np.array(q90_preds)))
    mpiw = np.mean(np.array(q90_preds) - np.array(q05_preds))

    print("\n--- Rolling Forecast Evaluation ---")
    print(f"RMSE (Median): {rmse:.4f}")
    print(f"MAPE (Median): {mape*100:.2f}%")
    print(f"PICP (Coverage): {picp*100:.2f}%")
    print(f"MPIW (Interval Width): {mpiw:.4f}")

    # Plot forecast
    plt.figure(figsize=(8, 4))
    plt.plot(true_vals, label="True", color="blue")
    plt.plot(q50_preds, label="Predicted Median", color="red")
    plt.fill_between(range(len(q50_preds)), q05_preds, q90_preds, color='orange', alpha=0.4, label="Q025–Q975")
    plt.plot(q05_preds, '--', color='orange', alpha=0.6)
    plt.plot(q90_preds, '--', color='orange', alpha=0.6)

    # Set y-axis limits and ticks
    plt.ylim(0.5,1.0)
    plt.yticks([0.5, 0.6, 0.7, 0.8, 0.9])

    plt.xlim(0, len(true_vals) - 1)
    plt.legend()
    plt.title("Quantile Forecast - April (Positive Ratio)")
    plt.xlabel("Time Steps")
    plt.ylabel("Ratio")
    plt.tight_layout()
    plt.show()

    return {
        'q05': np.array(q05_preds),
        'q50': np.array(q50_preds),
        'q90': np.array(q90_preds),
        'true': np.array(true_vals),
        'rmse': rmse,
        'mape': mape,
        'picp': picp,
        'mpiw': mpiw
    }

# Run everything
quantiles = [0.025, 0.5, 0.975]
model = LSTMQuantileWithSmoothLoss(input_size=1, hidden_size=256, quantiles=quantiles)

results = train_and_forecast(
    model=model,
    train_loader=train_loader,
    X_test_tensor=X_test_tensor,
    y_test_tensor=y_test_tensor,
    scaler=scaler,
    quantiles=quantiles,
    epochs=100,
    lr=0.001
)


#### Neutral_Ratio

In [None]:
import pandas as pd
import numpy as np
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error
import matplotlib.pyplot as plt

# Prepare data
df = sentiment_counts[['neutral_ratio']].copy()
df.index = pd.to_datetime(df.index)
df = df.sort_index()

# Expand training data to include February
train_df = df[(df.index.month == 1) | (df.index.month == 2) | (df.index.month == 3)]
test_df = df[df.index.month == 4]

scaler = MinMaxScaler()
train_scaled = scaler.fit_transform(train_df)
test_scaled = scaler.transform(test_df)

def create_sequences(data, seq_len):
    X, y = [], []
    for i in range(len(data) - seq_len):
        X.append(data[i:i + seq_len])
        y.append(data[i + seq_len])
    return np.array(X), np.array(y)

SEQ_LEN = 5
X_train, y_train = create_sequences(train_scaled, SEQ_LEN)
X_test, y_test = create_sequences(test_scaled, SEQ_LEN)

X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Smooth Quantile Loss using Huber loss
class SmoothQuantileLoss(nn.Module):
    def __init__(self, quantiles, delta=1.0):
        super().__init__()
        self.quantiles = quantiles
        self.delta = delta

    def forward(self, preds, target):
        losses = []
        for i, q in enumerate(self.quantiles):
            errors = target - preds[:, i:i+1]
            abs_errors = torch.abs(errors)
            sq_errors = 0.5 * errors ** 2
            linear_errors = self.delta * (abs_errors - 0.5 * self.delta)
            loss = torch.where(abs_errors <= self.delta, sq_errors, linear_errors)
            loss = torch.max((q - 1) * errors, q * errors)
            losses.append(loss)
        return torch.mean(torch.sum(torch.cat(losses, dim=1), dim=1))

# LSTM model
class LSTMQuantileWithSmoothLoss(nn.Module):
    def __init__(self, input_size, hidden_size, quantiles=[0.025, 0.5, 0.975]):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers=5, batch_first=True)
        self.dropout = nn.Dropout(0.3)
        self.fc = nn.Linear(hidden_size, len(quantiles))

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.dropout(out)
        out = out[:, -1, :]
        return self.fc(out)

# Training and forecasting
def train_and_forecast(model, train_loader, X_test_tensor, y_test_tensor, scaler, quantiles, epochs=100, lr=0.001):
    criterion = SmoothQuantileLoss(quantiles)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.5)
    losses = []

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for xb, yb in train_loader:
            preds = model(xb)
            loss = criterion(preds, yb)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        scheduler.step()
        avg_loss = total_loss / len(train_loader)
        losses.append(avg_loss)
        print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}")

    # Plot training loss
    plt.figure(figsize=(6, 4))
    plt.plot(losses, label='Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training Loss over Epochs')
    plt.legend()
    plt.tight_layout()
    plt.show()

    # Forecast
    model.eval()
    q05_preds, q50_preds, q90_preds, true_vals = [], [], [], []
    rolling_steps = min(len(X_test_tensor), len(y_test_tensor))

    with torch.no_grad():
        for i in range(rolling_steps):
            x_in = X_test_tensor[i:i+1]
            y_true = y_test_tensor[i].numpy()
            y_pred = model(x_in).numpy()

            q05 = scaler.inverse_transform(y_pred[:, 0:1])[0, 0]
            q50 = scaler.inverse_transform(y_pred[:, 1:2])[0, 0]
            q90 = scaler.inverse_transform(y_pred[:, 2:3])[0, 0]
            true = scaler.inverse_transform(y_true.reshape(-1, 1))[0, 0]

            q05_preds.append(q05)
            q50_preds.append(q50)
            q90_preds.append(q90)
            true_vals.append(true)

    # Evaluation
    rmse = np.sqrt(mean_squared_error(true_vals, q50_preds))
    mape = mean_absolute_percentage_error(true_vals, q50_preds)
    picp = np.mean((np.array(true_vals) >= np.array(q05_preds)) & (np.array(true_vals) <= np.array(q90_preds)))
    mpiw = np.mean(np.array(q90_preds) - np.array(q05_preds))

    print("\n--- Rolling Forecast Evaluation ---")
    print(f"RMSE (Median): {rmse:.4f}")
    print(f"MAPE (Median): {mape*100:.2f}%")
    print(f"PICP (Coverage): {picp*100:.2f}%")
    print(f"MPIW (Interval Width): {mpiw:.4f}")

    # Plot forecast
    plt.figure(figsize=(8, 4))
    plt.plot(true_vals, label="True", color="blue")
    plt.plot(q50_preds, label="Predicted Median", color="red")
    plt.fill_between(range(len(q50_preds)), q05_preds, q90_preds, color='orange', alpha=0.4, label="Q025–Q975")
    plt.plot(q05_preds, '--', color='orange', alpha=0.6)
    plt.plot(q90_preds, '--', color='orange', alpha=0.6)

    # Set y-axis limits and ticks
    plt.ylim(0.0, 0.5)
    plt.yticks([0.1,0.2,0.3, 0.4, 0.5])

    plt.xlim(0, len(true_vals) - 1)
    plt.legend()
    plt.title("Quantile Forecast - April (Neutral Ratio)")
    plt.xlabel("Time Steps")
    plt.ylabel("Ratio")
    plt.tight_layout()
    plt.show()

    return {
        'q05': np.array(q05_preds),
        'q50': np.array(q50_preds),
        'q90': np.array(q90_preds),
        'true': np.array(true_vals),
        'rmse': rmse,
        'mape': mape,
        'picp': picp,
        'mpiw': mpiw
    }

# Run everything
quantiles = [0.025, 0.5, 0.975]
model = LSTMQuantileWithSmoothLoss(input_size=1, hidden_size=256, quantiles=quantiles)

results = train_and_forecast(
    model=model,
    train_loader=train_loader,
    X_test_tensor=X_test_tensor,
    y_test_tensor=y_test_tensor,
    scaler=scaler,
    quantiles=quantiles,
    epochs=100,
    lr=0.001
)