In [None]:
# Updated installation command for latest CPU-compatible PyTorch
!pip install torch==2.2.2+cpu torchvision==0.17.2+cpu torchaudio==2.2.2+cpu -f https://download.pytorch.org/whl/cpu/torch_stable.html


# Cyber Threat Forecasting Using TCN + ARIMA Hybrid Model


## By analyzing patterns in real-world cyberattack data, the model aims to predict future incident counts and help organizations prepare proactively. The project also includes thorough EDA, visualizations, and per-industry insights, making it suitable for both operational intelligence and academic demonstration.




### Installing required libraries and loading data

In [None]:
import pandas as pd
import numpy as np
from collections import Counter
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from pmdarima.arima import auto_arima
from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error

data = pd.read_csv("Incidents.csv")

### EDA and Preprocessing


In [None]:

data['Date'] = pd.to_datetime(data['Date'], errors='coerce')
data.dropna(subset=['Date'], inplace=True)
data['YearMonth'] = data['Date'].dt.to_period('M')
data['Sub-Type'] = data['Sub-Type'].fillna("Unknown")


grouped_data = data.groupby('YearMonth').agg({
    'Type': lambda x: Counter(x).most_common(1)[0][0],
    'Sub-Type': lambda x: Counter(x).most_common(1)[0][0],
    'Date': 'count'
}).reset_index().rename(columns={'Date': 'Incident count'})


grouped_data['YearMonth_encoded'] = pd.to_datetime(grouped_data['YearMonth'].astype(str)).view('int64') / 1e9

# Scaling
scaler = StandardScaler()
grouped_data['Incident count_scaled'] = scaler.fit_transform(grouped_data[['Incident count']])

# Lag features
grouped_data['Lag_1'] = grouped_data['Incident count_scaled'].shift(1).fillna(0)
grouped_data['Lag_2'] = grouped_data['Incident count_scaled'].shift(2).fillna(0)
print("Dataset Preview:")
print(data.head())

print("\nMissing Values Summary:")
print(data.isnull().sum())

print(f"\n Date Range: {data['Date'].min()} to {data['Date'].max()}")

print(f"Unique Industries: {data['Industry'].nunique()}")
print(f"Unique Types: {data['Type'].nunique()}")
print(f"Unique Sub-Types: {data['Sub-Type'].nunique()}")
print(f"Unique Locations: {data['Location'].nunique()}")

# Top categories
for col in ['Industry', 'Type', 'Sub-Type', 'Location']:
    print(f"\nTop 5 in '{col}':")
    print(data[col].value_counts().head())

# Monthly distribution
data['YearMonth'] = data['Date'].dt.to_period('M')
monthly_counts = data.groupby('YearMonth').size()
print("\nMonthly Incident Count Stats:")
print(monthly_counts.describe())


### Visualization 

In [None]:
sns.set(style='whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)

#Monthly incident count 
monthly_counts = data.resample('M', on='Date').size()
plt.plot(monthly_counts.index, monthly_counts.values, marker='o', color='teal')
plt.title("Monthly Incident Counts")
plt.xlabel("Month")
plt.ylabel("Incident Count")
plt.xticks(rotation=45)
plt.grid(True)
plt.show()

#Top 5 attack types
attack_counts = data['Type'].value_counts().head(5)
sns.barplot(x=attack_counts.values, y=attack_counts.index, palette='muted')
plt.title("Top 5 Attack Types")
plt.xlabel("Count")
plt.show()

#Top 10 industries
industry_counts = data['Industry'].value_counts().head(10)
sns.barplot(x=industry_counts.values, y=industry_counts.index, palette='coolwarm')
plt.title("Top 10 Affected Industries")
plt.xlabel("Incident Count")
plt.show()

#Top 10 countries
location_counts = data['Location'].value_counts().head(10)
sns.barplot(x=location_counts.values, y=location_counts.index, palette='viridis')
plt.title("Top 10 Countries Affected")
plt.xlabel("Incident Count")
plt.show()

#Heatmap of Attack Type vs Industry
pivot = data.pivot_table(index='Industry', columns='Type', aggfunc='size', fill_value=0)
plt.figure(figsize=(16, 10))
sns.heatmap(pivot, cmap='YlGnBu', linewidths=0.5)
plt.title("Heatmap: Attack Type vs Industry")
plt.xlabel("Attack Type")
plt.ylabel("Industry")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

#most common attack type
top_attack_type = data['Type'].value_counts().idxmax()
monthly_top_attack = data[data['Type'] == top_attack_type].resample('M', on='Date').size()
plt.plot(monthly_top_attack.index, monthly_top_attack.values, marker='o', color='crimson')
plt.title(f"Trend of '{top_attack_type}' Incidents Over Time")
plt.xlabel("Month")
plt.ylabel("Incident Count")
plt.grid(True)
plt.xticks(rotation=45)
plt.show()


### Removing Outliers

In [None]:
# Apply IQR method to detect and cap outliers in 'Incident count'
Q1 = grouped_data['Incident count'].quantile(0.25)
Q3 = grouped_data['Incident count'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

# Capping the values instead of removing to preserve continuity
grouped_data['Incident count'] = np.where(grouped_data['Incident count'] < lower_bound, lower_bound,
                                          np.where(grouped_data['Incident count'] > upper_bound, upper_bound,
                                                   grouped_data['Incident count']))


### Training TCN + ARIMA Hybrid model

In [None]:
class TemporalBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, dilation):
        super().__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size,
                               padding=(kernel_size - 1) * dilation, dilation=dilation)
        self.relu1 = nn.ReLU()
        self.dropout1 = nn.Dropout(0.2)
        self.net = nn.Sequential(self.conv1, self.relu1, self.dropout1)
        self.downsample = nn.Conv1d(in_channels, out_channels, 1) if in_channels != out_channels else None

    def forward(self, x):
        out = self.net(x)
        if self.downsample:
            x = self.downsample(x)
        return out[:, :, :-out.size(2) + x.size(2)] + x

class TCN(nn.Module):
    def __init__(self, input_size, output_size):
        super().__init__()
        self.tcn = nn.Sequential(
            TemporalBlock(input_size, 64, kernel_size=2, dilation=1),
            TemporalBlock(64, 32, kernel_size=2, dilation=2),
        )
        self.linear = nn.Linear(32, output_size)

    def forward(self, x):
        y1 = self.tcn(x)
        return self.linear(y1[:, :, -1])


In [None]:
# Prepare features
features = grouped_data[['Lag_1', 'Lag_2']].values
labels = grouped_data['Incident count_scaled'].values

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, shuffle=False)

# Convert to tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).unsqueeze(1)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).unsqueeze(1)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

# Model
model = TCN(input_size=1, output_size=1)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()

# Training
for epoch in range(200):
    model.train()
    optimizer.zero_grad()
    output = model(X_train_tensor).squeeze()
    loss = criterion(output, y_train_tensor)
    loss.backward()
    optimizer.step()
    if (epoch + 1) % 20 == 0:
        print(f"Epoch {epoch + 1}/200, Loss: {loss.item():.4f}")


In [None]:
# TCN Prediction
model.eval()
with torch.no_grad():
    pred_tcn = model(X_test_tensor).squeeze().numpy()

# Inverse scale
tcn_pred_rescaled = scaler.inverse_transform(pred_tcn.reshape(-1, 1)).flatten()

# ARIMA on residuals
residuals = y_train - model(X_train_tensor).squeeze().detach().numpy()
arima_model = auto_arima(residuals, seasonal=False, trace=False)
arima_forecast = arima_model.predict(n_periods=len(y_test))

# Hybrid Forecast = TCN + ARIMA
hybrid_forecast = tcn_pred_rescaled + arima_forecast


### Evaluation 

In [None]:
# Evaluation
actual_values = grouped_data['Incident count'][-len(y_test):].values
mae = mean_absolute_error(actual_values, hybrid_forecast)
mse = mean_squared_error(actual_values, hybrid_forecast)
mape = mean_absolute_percentage_error(actual_values, hybrid_forecast)

print(f"MAE: {mae:.2f}")
print(f"MSE: {mse:.2f}")
print(f"MAPE: {mape * 100:.2f}%")
print(f"Accuracy: {100 - mape * 100:.2f}%")

# Plot
plt.figure(figsize=(12, 6))
plt.plot(grouped_data['YearMonth'][-len(y_test):].astype(str), actual_values, label='Actual', marker='o')
plt.plot(grouped_data['YearMonth'][-len(y_test):].astype(str), hybrid_forecast, label='Hybrid Forecast', linestyle='--', marker='x')
plt.title("TCN + ARIMA Forecast vs Actual")
plt.xlabel("Year-Month")
plt.ylabel("Incident Count")
plt.xticks(rotation=45)
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()

# Table
pd.DataFrame({
    'YearMonth': grouped_data['YearMonth'][-len(y_test):].astype(str),
    'Actual': actual_values,
    'Forecast (TCN + ARIMA)': np.round(hybrid_forecast).astype(int)
})
