In [None]:
# Ship as a Wave Buoy using Machine Learning
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
import matplotlib.pyplot as plt

# Load data
data = np.load('Data_Project1.npz', allow_pickle=True)
X = data['X']
Hs = data['Hs']
Tp = data['Tp']
Dir = data['Dir']

# Data preprocessing
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X.reshape(-1, X.shape[-1])).reshape(X.shape)

# Split the data into training and testing sets
X_train, X_test, Hs_train, Hs_test, Tp_train, Tp_test, Dir_train, Dir_test = train_test_split(
    X_scaled, Hs, Tp, Dir, test_size=0.2, random_state=42
)

# Define a custom dataset
class ShipWaveDataset(Dataset):
    def __init__(self, inputs, outputs_Hs, outputs_Tp, outputs_Dir):
        self.inputs = inputs
        self.outputs_Hs = outputs_Hs
        self.outputs_Tp = outputs_Tp
        self.outputs_Dir = outputs_Dir

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        return (
            torch.FloatTensor(self.inputs[idx]),
            torch.FloatTensor([self.outputs_Hs[idx]]),
            torch.FloatTensor([self.outputs_Tp[idx]]),
            torch.FloatTensor([self.outputs_Dir[idx]])
        )

# Create datasets and data loaders
train_dataset = ShipWaveDataset(X_train, Hs_train, Tp_train, Dir_train)
test_dataset = ShipWaveDataset(X_test, Hs_test, Tp_test, Dir_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Define CNN model
class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv1d(6, 64, kernel_size=3)
        self.pool = nn.MaxPool1d(kernel_size=2)
        self.conv2 = nn.Conv1d(64, 128, kernel_size=3)
        self.flatten = nn.Flatten()

    def forward(self, x):
        x = self.conv1(x)
        x = self.pool(x)
        x = self.conv2(x)
        x = self.pool(x)
        x = self.flatten(x)
        return x

# Define MLP model
class MLPModel(nn.Module):
    def __init__(self):
        super(MLPModel, self).__init__()
        self.fc1 = nn.Linear(128 * 237, 256)
        self.relu = nn.ReLU()
        self.fc2_Hs = nn.Linear(256, 1)
        self.fc2_Tp = nn.Linear(256, 1)
        self.fc2_Dir = nn.Linear(256, 1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        Hs = self.fc2_Hs(x)
        Tp = self.fc2_Tp(x)
        Dir = self.fc2_Dir(x)
        return Hs, Tp, Dir

# Instantiate the models and define the loss and optimizer
cnn_model = CNNModel()
mlp_model = MLPModel()
criterion = nn.MSELoss()
optimizer = optim.Adam(list(cnn_model.parameters()) + list(mlp_model.parameters()), lr=0.001)

# Training loop for the hybrid model
num_epochs = 10
for epoch in range(num_epochs):
    for inputs, target_Hs, target_Tp, target_Dir in train_loader:
        optimizer.zero_grad()

        # Forward pass through CNN
        cnn_output = cnn_model(inputs.permute(0, 2, 1))

        # Forward pass through MLP using CNN output
        mlp_output_Hs, mlp_output_Tp, mlp_output_Dir = mlp_model(cnn_output)

        loss = (
            criterion(mlp_output_Hs, target_Hs) +
            criterion(mlp_output_Tp, target_Tp) +
            criterion(mlp_output_Dir, target_Dir)
        )
        
        loss.backward()
        optimizer.step()

# Evaluate the hybrid model
cnn_model.eval()
mlp_model.eval()
with torch.no_grad():
    test_loss = 0.0
    predictions_Hs, predictions_Tp, predictions_Dir = [], [], []
    for inputs, target_Hs, target_Tp, target_Dir in test_loader:
        cnn_output = cnn_model(inputs.permute(0, 2, 1))
        mlp_output_Hs, mlp_output_Tp, mlp_output_Dir = mlp_model(cnn_output)

        test_loss += (
            criterion(mlp_output_Hs, target_Hs) +
            criterion(mlp_output_Tp, target_Tp) +
            criterion(mlp_output_Dir, target_Dir)
        )

        predictions_Hs.extend(mlp_output_Hs.numpy())
        predictions_Tp.extend(mlp_output_Tp.numpy())
        predictions_Dir.extend(mlp_output_Dir.numpy())

    average_test_loss = test_loss / len(test_loader.dataset)
    print(f'Average Test Loss (Hybrid): {average_test_loss.item()}')

# Plot predictions against true values
plt.figure(figsize=(12, 6))
plt.subplot(131)
plt.scatter(Hs_test, predictions_Hs, alpha=0.5)
plt.title('Hs Prediction')
plt.xlabel('True Hs')
plt.ylabel('Predicted Hs')

plt.subplot(132)
plt.scatter(Tp_test, predictions_Tp, alpha=0.5)
plt.title('Tp Prediction')
plt.xlabel('True Tp')
plt.ylabel('Predicted Tp')

plt.subplot(133)
plt.scatter(Dir_test, predictions_Dir, alpha=0.5)
plt.title('Dir Prediction')
plt.xlabel('True Dir')
plt.ylabel('Predicted Dir')

plt.tight_layout()
plt.show()
