In [None]:
import json
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from sklearn.metrics import accuracy_score, precision_score, recall_score, confusion_matrix
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from utils import DataPreprocessing, SpectrogramDataset, plot_waveform, plot_spectrogram

ROOT_PATH = os.getcwd()

RAW_DATASET_PATH = os.path.join(ROOT_PATH, "dataset", "raw")
PRODUCTION_DATASET_PATH = os.path.join(ROOT_PATH, "dataset", "production")
LOGS_PATH = os.path.join(ROOT_PATH, "logs")
MODELS_PATH = os.path.join(ROOT_PATH, "models")

SUBSAMPLE_SIZE = 1000
NOISE_RATIO = 0.5
EARTHQUAKE_RATIO = 0.5

def preprocess_data():
	# Compute preprocessed data
	data_processor = DataPreprocessing(
		subsample_size=SUBSAMPLE_SIZE,
		raw_dataset_path=RAW_DATASET_PATH,
		logs_path=LOGS_PATH,
		noise_ratio=NOISE_RATIO,
		earthquake_ratio=EARTHQUAKE_RATIO
	)

	# Save preprocessed data
	signals = data_processor.subsample_traces
	np.save(os.path.join(PRODUCTION_DATASET_PATH, f"signals_{SUBSAMPLE_SIZE}.npy"), np.array(list(signals.values())))

	metadata = data_processor.subsample_metadata
	metadata.to_feather(os.path.join(PRODUCTION_DATASET_PATH, f"metadata_{SUBSAMPLE_SIZE}.feather"))

	waveform_images = data_processor.create_waveform_images()
	np.save(os.path.join(PRODUCTION_DATASET_PATH, f"waveform_images_{SUBSAMPLE_SIZE}.npy"), waveform_images)

	spectrogram_images = data_processor.create_spectrogram_images()
	np.save(os.path.join(PRODUCTION_DATASET_PATH, f"spectrogram_images_{SUBSAMPLE_SIZE}.npy"), spectrogram_images)

	# Print summary of preprocessed data
	print(f"""
	1. Expected samples: {SUBSAMPLE_SIZE}
	2. Actual samples: {len(signals)}\n{metadata['category'].value_counts(normalize=True)}
	""")

preprocess_data()

In [None]:
def visualise_data(num_samples=1):
	"""Randomly selects and plots waveform and spectrogram images."""
	# Load saved images
	waveform_images = np.load(os.path.join(PRODUCTION_DATASET_PATH, f"waveform_images_{SUBSAMPLE_SIZE}.npy"))
	spectrogram_images = np.load(os.path.join(PRODUCTION_DATASET_PATH, f"spectrogram_images_{SUBSAMPLE_SIZE}.npy"))

	# Create figure with subplots
	fig, axes = plt.subplots(num_samples, 2, figsize=(6, 3 * num_samples))

	# Ensure axes is iterable for a single sample case
	if num_samples == 1:
		axes = [axes]

	# Plot random images
	for i1, i2 in enumerate(np.random.choice(len(waveform_images), num_samples, replace=False)):
		axes[i1][0].imshow(waveform_images[i2])
		axes[i1][0].set_title(f"Waveform {i2}")
		axes[i1][0].axis("off")
		axes[i1][1].imshow(spectrogram_images[i2])
		axes[i1][1].set_title(f"Spectrogram {i2}")
		axes[i1][1].axis("off")
	plt.tight_layout()
	plt.show()

visualise_data()

In [None]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
SEED = 37

BATCH_SIZE = 32
LEARNING_RATE = 0.001
NUM_EPOCHS = 50
PATIENCE = 7

# Load production dataset and compute labels
spectrograms = np.load(os.path.join(PRODUCTION_DATASET_PATH, f"spectrogram_images_{SUBSAMPLE_SIZE}.npy"))
metadata = pd.read_feather(os.path.join(PRODUCTION_DATASET_PATH, f"metadata_{SUBSAMPLE_SIZE}.feather"))
labels = (metadata["category"] == "earthquake").astype(np.float32).values  # 0 (noise), 1 (earthquake)

# Normalize and convert spectrograms and labels to tensors
spectrograms = torch.tensor(spectrograms.astype(np.float32) / 255.0).permute(0, 3, 1, 2)
labels = torch.tensor(labels).view(-1, 1)

def split_data(spectrograms, labels, test_size=0.2, dev_size=0.5):
	train_i, temp_i, train_labels, temp_labels = train_test_split(spectrograms, labels, test_size=test_size, stratify=labels, random_state=SEED)
	dev_i, test_i, dev_labels, test_labels = train_test_split(temp_i, temp_labels, test_size=dev_size, stratify=temp_labels, random_state=SEED)
	return train_i, train_labels, dev_i, dev_labels, test_i, test_labels

# Split production dataset into train, dev, and test sets
train_i, train_labels, dev_i, dev_labels, test_i, test_labels = split_data(spectrograms, labels)

# Convert train, dev, and test sets to train, dev, and test loaders
train_loader = DataLoader(SpectrogramDataset(train_i, train_labels), batch_size=BATCH_SIZE, shuffle=True)
dev_loader = DataLoader(SpectrogramDataset(dev_i, dev_labels), batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(SpectrogramDataset(test_i, test_labels), batch_size=BATCH_SIZE, shuffle=False)

def train_model(model, train_loader, dev_loader, criterion, optimizer):
	# Create model folder
	model_folder = os.path.join(MODELS_PATH, f"{model.id}")
	os.makedirs(model_folder, exist_ok=True)

	best_dev_loss = float("inf")
	best_model_path = os.path.join(model_folder, "best.pth")
	patience_counter = 0
	train_losses = []
	dev_losses = []

	for epoch in range(NUM_EPOCHS):
		# Train model on train set
		model.train()
		total_loss = 0
		for images, labels in train_loader:
			images, labels = images.to(DEVICE), labels.to(DEVICE)
			optimizer.zero_grad()
			outputs = model(images)
			loss = criterion(outputs, labels)
			loss.backward()
			optimizer.step()
			total_loss += loss.item()
		train_loss = total_loss / len(train_loader)
		train_losses.append(train_loss)

		# Evaluate model on dev set
		model.eval()
		dev_loss = 0
		with torch.no_grad():
			for images, labels in dev_loader:
				images, labels = images.to(DEVICE), labels.to(DEVICE)
				outputs = model(images)
				dev_loss += criterion(outputs, labels).item()
		dev_loss /= len(dev_loader)
		dev_losses.append(dev_loss)

		print(f"Epoch {epoch+1}/{NUM_EPOCHS} - Train Loss: {train_loss:.4f} - Dev Loss: {dev_loss:.4f}")

		# Save model epoch checkpoint
		torch.save(model.state_dict(), os.path.join(model_folder, f"epoch={epoch+1}-dev_loss={dev_loss:.4f}.pth"))

		if dev_loss < best_dev_loss:
			best_dev_loss = dev_loss

			# Save best model
			torch.save(model.state_dict(), best_model_path)
			patience_counter = 0  # Reset patience counter
		else:
			patience_counter += 1
			# Stop early if needed
			if patience_counter >= PATIENCE:
				print("Stopping early")
				break

	# Save losses
	loss_data = {"train_losses": train_losses, "dev_losses": dev_losses}
	with open(os.path.join(model_folder, "losses.json"), "w") as f:
		json.dump(loss_data, f)

In [None]:
class CNN1(nn.Module):
	def __init__(self):
		super(CNN1, self).__init__()
		self.id = "CNN1"
		self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1)
		self.pool = nn.MaxPool2d(2, 2)
		self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
		self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
		self.dropout = nn.Dropout(0.5)
		self.fc1 = nn.Linear(64 * 25 * 37, 128)
		self.fc2 = nn.Linear(128, 16)
		self.fc3 = nn.Linear(16, 1)

	def forward(self, x):
		x = self.pool(F.relu(self.conv1(x)))
		x = self.pool(F.relu(self.conv2(x)))
		x = self.pool(F.relu(self.conv3(x)))
		x = torch.flatten(x, 1)
		x = F.relu(self.fc1(x))
		x = self.dropout(x)
		x = F.relu(self.fc2(x))
		return torch.sigmoid(self.fc3(x))

In [None]:
class FourierFeatureMapping(nn.Module):
	def __init__(self, in_channels, out_channels):
		super(FourierFeatureMapping, self).__init__()
		self.W = nn.Parameter(torch.randn(in_channels, out_channels) * 10, requires_grad=False)
		self.b = nn.Parameter(torch.rand(out_channels) * 2 * torch.pi, requires_grad=False)

	def forward(self, x):
		x_proj = torch.einsum('bchw,cd->bdhw', x, self.W) + self.b[None, :, None, None]
		return torch.cat([torch.sin(x_proj), torch.cos(x_proj)], dim=1)

class CNNFFMe(nn.Module):
	def __init__(self):
		super(CNNFFMe, self).__init__()
		self.id = "CNNFFMe"
		self.fourier = FourierFeatureMapping(3, 6)  # Expand input to 6 channels
		self.conv1 = nn.Conv2d(12, 16, kernel_size=3, padding=1)
		self.pool = nn.MaxPool2d(2, 2)
		self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
		self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
		self.dropout = nn.Dropout(0.5)
		self.fc1 = nn.Linear(64 * 25 * 37, 128)
		self.fc2 = nn.Linear(128, 16)
		self.fc3 = nn.Linear(16, 1)

	def forward(self, x):
		x = self.fourier(x)  # Apply Fourier feature mapping
		x = self.pool(F.relu(self.conv1(x)))
		x = self.pool(F.relu(self.conv2(x)))
		x = self.pool(F.relu(self.conv3(x)))
		x = torch.flatten(x, 1)
		x = F.relu(self.fc1(x))
		x = self.dropout(x)
		x = F.relu(self.fc2(x))
		return torch.sigmoid(self.fc3(x))

In [None]:
class CNN_KolArn(nn.Module):
	def __init__(self):
		super(CNN_KolArn, self).__init__()
		self.id = "CNN_KolArn"

		self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1)
		self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
		self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
		self.pool = nn.MaxPool2d(2, 2)
		self.dropout = nn.Dropout(0.5)
		self.fc1 = nn.Linear(64 * 25 * 37, 128)
		self.kolarn1 = self.kol_arn_layer(128, 128)
		self.fc2 = nn.Linear(128, 16)
		self.kolarn2 = self.kol_arn_layer(16, 16)
		self.fc3 = nn.Linear(16, 1)

	def kol_arn_layer(self, in_features, out_features):
		"""Kolmogorov-Arnold layer: Expands features non-linearly, then projects down."""
		hidden = in_features * 2  # Expand feature space
		return nn.Sequential(
			nn.Linear(in_features, hidden),
			nn.ReLU(),
			nn.Linear(hidden, out_features)
		)

	def forward(self, x):
		x = self.pool(F.relu(self.conv1(x)))
		x = self.pool(F.relu(self.conv2(x)))
		x = self.pool(F.relu(self.conv3(x)))
		x = torch.flatten(x, 1)

		x = F.relu(self.fc1(x))
		x = self.kolarn1(x)
		x = self.dropout(x)

		x = F.relu(self.fc2(x))
		x = self.kolarn2(x)

		return torch.sigmoid(self.fc3(x))

In [None]:
class CNN_KolArnSC(nn.Module):
	def __init__(self):
		super(CNN_KolArnSC, self).__init__()
		self.id = "CNN_KolArnSC"

		# Convolutional layers
		self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1)
		self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
		self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)

		# Pooling layer
		self.pool = nn.MaxPool2d(2, 2)

		# Kolmogorov-Arnold Skip Connections (Now channel-wise using 1x1 convolutions)
		self.kolarn_skip1 = self.kol_arn_layer(16, 16)
		self.kolarn_skip2 = self.kol_arn_layer(32, 32)

		# Dropout layer
		self.dropout = nn.Dropout(0.5)

		# Fully connected layers with Kolmogorov-Arnold units
		self.fc1 = nn.Linear(64 * 25 * 37, 128)
		self.kolarn1 = self.kol_arn_layer(128, 128, fc=True)
		self.fc2 = nn.Linear(128, 16)
		self.kolarn2 = self.kol_arn_layer(16, 16, fc=True)
		self.fc3 = nn.Linear(16, 1)

	def kol_arn_layer(self, in_channels, out_channels, fc=False):
		"""Kolmogorov-Arnold layer: Expands features non-linearly, then projects down.
		   Uses 1x1 convs for spatial features, FC for dense layers.
		"""
		if fc:
			hidden = in_channels * 2  # Expand feature space for FC layers
			return nn.Sequential(
				nn.Linear(in_channels, hidden),
				nn.ReLU(),
				nn.Linear(hidden, out_channels)
			)
		else:
			return nn.Sequential(
				nn.Conv2d(in_channels, in_channels * 2, kernel_size=1),  # Expand
				nn.ReLU(),
				nn.Conv2d(in_channels * 2, out_channels, kernel_size=1)  # Contract
			)

	def forward(self, x):
		# First convolution + skip connection
		x1 = self.pool(F.relu(self.conv1(x)))
		x1 = x1 + self.kolarn_skip1(x1)  # Kol-Arn skip connection

		# Second convolution + skip connection
		x2 = self.pool(F.relu(self.conv2(x1)))
		x2 = x2 + self.kolarn_skip2(x2)  # Kol-Arn skip connection

		# Third convolution
		x3 = self.pool(F.relu(self.conv3(x2)))

		# Flatten for fully connected layers
		x3 = torch.flatten(x3, 1)

		# Fully connected layers with Kolmogorov-Arnold transformations
		x3 = F.relu(self.fc1(x3))
		x3 = self.kolarn1(x3)
		x3 = self.dropout(x3)

		x3 = F.relu(self.fc2(x3))
		x3 = self.kolarn2(x3)

		return torch.sigmoid(self.fc3(x3))


In [None]:
class KolArnLayer(nn.Module):
	def __init__(self, in_features, out_features):
		super(KolArnLayer, self).__init__()
		self.fc1 = nn.Linear(in_features, out_features)
		self.fc2 = nn.Linear(out_features, out_features)
		self.activation = nn.Sigmoid()  # Can experiment with Tanh, Swish, etc.

	def forward(self, x):
		x = self.fc1(x)
		x = self.activation(x)
		x = self.fc2(x)
		return x

class GatedAttentionUnit(nn.Module):
	"""Gated Attention Unit (GAU) for enhanced feature selection."""
	def __init__(self, in_channels):
		super(GatedAttentionUnit, self).__init__()
		self.conv_f = nn.Conv2d(in_channels, in_channels, kernel_size=1)
		self.conv_g = nn.Conv2d(in_channels, in_channels, kernel_size=1)
		self.conv_h = nn.Conv2d(in_channels, in_channels, kernel_size=1)
		self.sigmoid = nn.Sigmoid()

	def forward(self, x):
		f = self.conv_f(x)
		g = self.conv_g(x)
		attention = self.sigmoid(f * g)  # Element-wise multiplication
		h = self.conv_h(x)
		return attention * h + x  # Residual connection

class CNN_KolArn_GAU(nn.Module):
	def __init__(self):
		super(CNN_KolArn_GAU, self).__init__()
		self.id = "CNN_KolArn_GAU"
		self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1)
		self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
		self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
		self.pool = nn.MaxPool2d(2, 2)
		self.gau1 = GatedAttentionUnit(16)
		self.gau2 = GatedAttentionUnit(32)
		self.gau3 = GatedAttentionUnit(64)
		self.fc1 = nn.Linear(64 * 25 * 37, 128)
		self.kol_arn = KolArnLayer(128, 64)
		self.fc2 = nn.Linear(64, 16)
		self.fc3 = nn.Linear(16, 1)
		self.dropout = nn.Dropout(0.5)

	def forward(self, x):
		x = self.pool(F.relu(self.gau1(self.conv1(x))))
		x = self.pool(F.relu(self.gau2(self.conv2(x))))
		x = self.pool(F.relu(self.gau3(self.conv3(x))))

		x = torch.flatten(x, 1)
		x = F.relu(self.fc1(x))
		x = self.kol_arn(x)
		x = self.dropout(x)
		x = F.relu(self.fc2(x))
		return torch.sigmoid(self.fc3(x))

In [None]:
class KolArnLayer(nn.Module):
	def __init__(self, in_features, out_features):
		super(KolArnLayer, self).__init__()
		self.fc1 = nn.Linear(in_features, out_features)
		self.fc2 = nn.Linear(out_features, out_features)
		self.activation = nn.Sigmoid()  # Can be Tanh, Swish, etc.

	def forward(self, x):
		x = self.fc1(x)
		x = self.activation(x)
		x = self.fc2(x)
		return x

class CNN_KolArn_Cycle(nn.Module):
	def __init__(self):
		super(CNN_KolArn_Cycle, self).__init__()
		self.id = "CNN_KolArn_Cycle"
		self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1)
		self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
		self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
		self.pool = nn.MaxPool2d(2, 2)
		self.conv1_backflow = nn.Conv2d(16, 3, kernel_size=1)  # Cyclic backflow
		self.conv2_backflow = nn.Conv2d(32, 16, kernel_size=1)
		self.fc1 = nn.Linear(80 * 25 * 37, 128)
		self.kol_arn = KolArnLayer(128, 64)  # Kolmogorov-Arnold mapping
		self.fc2 = nn.Linear(64, 16)
		self.fc3 = nn.Linear(16, 1)
		self.dropout = nn.Dropout(0.5)

	def forward(self, x):
		# Standard forward pass
		x1 = self.pool(F.relu(self.conv1(x)))  # 16 channels -> [batch, 16, 100, 150]
		x2 = self.pool(F.relu(self.conv2(x1)))  # 32 channels -> [batch, 32, 50, 75]
		x3 = self.pool(F.relu(self.conv3(x2)))  # 64 channels -> [batch, 64, 25, 37]
		# Cyclic backflow: Resize and inject feature maps
		x1_feedback = F.relu(self.conv1_backflow(x1))  # [batch, 3, 100, 150]
		x1_feedback = F.interpolate(x1_feedback, size=(200, 300), mode="bilinear", align_corners=False)
		x2_feedback = F.relu(self.conv2_backflow(x2))  # [batch, 16, 50, 75]
		x2_feedback = F.interpolate(x2_feedback, size=(100, 150), mode="bilinear", align_corners=False)
		x1_refined = x + x1_feedback  # Inject x1_feedback back into input
		x2_refined = x1 + x2_feedback  # Inject x2_feedback back into x1
		x2_refined = F.interpolate(x2_refined, size=(25, 37), mode="bilinear", align_corners=False)
		x_concat = torch.cat([x3, x2_refined], dim=1)
		x = torch.flatten(x_concat, 1)
		x = F.relu(self.fc1(x))
		x = self.kol_arn(x)
		x = self.dropout(x)
		x = F.relu(self.fc2(x))
		return torch.sigmoid(self.fc3(x))

In [None]:
model = CNN_KolArnSC().to(DEVICE)
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

train_model(model, train_loader, dev_loader, criterion, optimizer)

In [None]:
def compute_metrics(y_true, y_pred):
	y_true, y_pred = y_true.cpu().numpy(), (y_pred.cpu().numpy() > 0.5)
	acc = accuracy_score(y_true, y_pred)
	precision = precision_score(y_true, y_pred, zero_division=0)
	recall = recall_score(y_true, y_pred)
	tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
	tpr = tp / (tp + fn) if (tp + fn) > 0 else 0
	fpr = fp / (fp + tn) if (fp + tn) > 0 else 0
	return {
		"accuracy": round(acc, 6),
		"precision": round(precision, 6),
		"TPR": float(round(tpr, 6)),
		"FPR": float(round(fpr, 6))
	}

def evaluate_model(model, test_loader):
	model.eval()
	y_true, y_pred = [], []

	with torch.no_grad():
		for images, labels in test_loader:
			images, labels = images.to(DEVICE), labels.to(DEVICE)
			outputs = model(images)
			y_true.append(labels)
			y_pred.append(outputs)

	y_true = torch.cat(y_true, dim=0)
	y_pred = torch.cat(y_pred, dim=0)
	metrics = compute_metrics(y_true, y_pred)
	print(f"Test Metrics: {metrics}")

model.load_state_dict(torch.load(os.path.join(MODELS_PATH, f"{model.id}", "best.pth")))
evaluate_model(model, test_loader)