In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
import os
os.environ["CUDA_VISIBLE_DEVICES"]="2"  # specify which GPU(s) to be used
import numpy as np
import h5py
import matplotlib.pyplot as plt
import os.path

In [None]:
!unzip -q "labelled.zip" -d "./"

In [None]:
!unzip -q "unlabelled.zip" -d "./"

In [None]:
!unzip -q "test.zip" -d "./"

In [None]:
labelled = "labelled_data/"

In [None]:
unlabelled = "unlabelled_data/"

In [None]:
test = "test/"

In [None]:
import h5py
import numpy as np
import os

# Function to load labeled dataset
def load_labeled_dataset(dataset_path):
    # Initialize empty lists to store data
    H_Re_list, H_Im_list, SNR_list, Pos_list = [], [], [], []

    # Iterate through each file in the labeled dataset folder
    for file_name in os.listdir(dataset_path):
        if file_name.endswith(".h5"):  # Assuming the files are in HDF5 format
            file_path = os.path.join(dataset_path, file_name)

            # Open the HDF5 file
            with h5py.File(file_path, 'r') as file:
                # Load data from the HDF5 file
                H_Re = np.array(file['H_Re'])
                H_Im = np.array(file['H_Im'])
                SNR = np.array(file['SNR'])
                Pos = np.array(file['Pos'])

                # Append the loaded data to the lists
                H_Re_list.append(H_Re)
                H_Im_list.append(H_Im)
                SNR_list.append(SNR)
                Pos_list.append(Pos)

    # Concatenate the lists to create arrays
    H_Re_array = np.concatenate(H_Re_list, axis=0)
    H_Im_array = np.concatenate(H_Im_list, axis=0)
    SNR_array = np.concatenate(SNR_list, axis=0)
    Pos_array = np.concatenate(Pos_list, axis=0)

    return H_Re_array, H_Im_array, SNR_array, Pos_array

# Path to the labeled dataset folder
labeled_dataset_path = '/content/labelled_data'

# Load the labeled dataset
H_Re_labeled, H_Im_labeled, SNR_labeled, Pos_labeled = load_labeled_dataset(labeled_dataset_path)

# Print the shapes of the loaded arrays
print("H_Re shape:", H_Re_labeled.shape)
print("H_Im shape:", H_Im_labeled.shape)
print("SNR shape:", SNR_labeled.shape)
print("Pos shape:", Pos_labeled.shape)


In [None]:
def get_data(data_file):

    f = h5py.File(data_file, 'r')
    H_Re = f['H_Re'][:] #shape (sample size, 56, 924, 5)
    H_Im = f['H_Im'][:] #shape (sample size, 56, 924, 5)
    SNR = f['SNR'][:] #shape (sample size, 56, 5)
    Pos = f['Pos'][:] #shape(sample size, 3)
    f.close()

    return H_Re, H_Im, SNR, Pos

In [None]:
### example code to load data from file_1.hdf5, make sure to read all file_{id} to obtain full data  ###

#load data from the list
data_list= []
for i in range(1,9):
  data_file = labelled + "file_" + str(i) + ".hdf5"
  data_list.append(get_data(data_file))
  print(f"Data loaded from file_{i}.hdf5")

# Extract the data from the list
for i, data in enumerate(data_list, start=1):
    H_Re, H_Im, SNR, Pos = data
    print(f"Shapes from file_{i}.hdf5:")
    print("H_Re is of shape", H_Re.shape)
    print("H_Im is of shape", H_Im.shape)
    print("SNR is of shape", SNR.shape)
    print("Pos is of shape", Pos.shape)
    print("-------------------------------------")
  # H_Re, H_Im, SNR, Pos = get_data(data_file)
  # print("H_Re is of shape {}".format(H_Re.shape))
  # print("H_Im is of shape {}".format(H_Im.shape))
  # print("SNR is of shape {}".format(SNR.shape))
  # print("Pos is of shape {}".format(Pos.shape))

H_Re is of shape (512, 56, 924, 5)
H_Im is of shape (512, 56, 924, 5)
SNR is of shape (512, 56, 5)
Pos is of shape (512, 3)


In [None]:
# adjusting the paths
labeled_data = np.load("/content/labelled_data.npz")
unlabeled_data = np.load("/content/unlabelled_data.npz")
test_data = np.load("/content/test.npz")

In [None]:

seed = 42
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)


In [None]:
# Define your feature size based on the output size of the self-supervised encoder
your_feature_size = 32


In [None]:
class SelfSupervisedDataset(Dataset):
    def __init__(self, labeled_data, positions):
        self.data = torch.tensor(labeled_data, dtype=torch.float32)
        self.pos = torch.tensor(positions, dtype=torch.float32)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        coords = self.pos[idx]
        return sample, coords

train_ratio = 0.9
val_ratio = 0.05
test_ratio = 0.05

# Calculate the sizes of each split
total_samples = len(labels)
train_samples = int(train_ratio * total_samples)
val_samples = int(val_ratio * total_samples)
test_samples = total_samples - train_samples - val_samples

# Use random_split to split your dataset into train, validation, and test sets
train_dataset, val_dataset, test_dataset = random_split(
    SelfSupervisedDataset(magnitude, labels),
    [train_samples, val_samples, test_samples]
)

# Create DataLoader instances for each split
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=64)
test_dataloader = DataLoader(test_dataset, batch_size=64)

In [None]:
class SelfSupervisedModel(nn.Module):
    def __init__(self):
        super(SelfSupervisedModel, self).__init__()
        # Adjust the input size to match your data
        self.encoder = nn.Sequential(
            nn.Linear(56 * 924, 256),  # Adjust the input size here
            nn.ReLU(),
            nn.Linear(256, 128),  # Adjust the input size here
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 32)
        )
        self.decoder = nn.Sequential(
            nn.Linear(32, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, 56 * 924)
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return encoded, decoded

self_supervised_encoder = SelfSupervisedModel().to(device)

model_state_dict = torch.load('results/unlabeled/mlp/best_self_supervised_model.pth')

# Load the entire model state_dict
self_supervised_encoder.load_state_dict(model_state_dict)

# Access the pre-trained encoder
pretrained_encoder = self_supervised_encoder.encoder

In [None]:
# Define your position estimation model
class PositionEstimationModel(nn.Module):
    def __init__(self, input_size):
        super(PositionEstimationModel, self).__init__()
        # Define your position estimation model architecture here
        self.fc1 = nn.Linear(input_size, 128)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 3)  # Assuming 3 output dimensions for position estimation

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.fc3(x)
        return x

# Define your combined model
class CombinedPositionEstimationModel(nn.Module):
    def __init__(self, encoder, input_size):
        super(CombinedPositionEstimationModel, self).__init__()
        self.encoder = encoder
        self.position_estimation_model = PositionEstimationModel(input_size).to(device)

    def forward(self, x):
        features = self.encoder(x)

        # Make sure features is a tensor (flattened if necessary)
        if isinstance(features, tuple):
            features = features[0]

        positions = self.position_estimation_model(features)
        return positions

input_size = 32  # Adjust as needed based on the self-supervised encoder's output size

combined_model = CombinedPositionEstimationModel(self_supervised_encoder, input_size=input_size).to(device)

# Define your loss function and optimizer for position estimation
criterion = nn.MSELoss().to(device)
optimizer = optim.Adam(combined_model.parameters(), lr=0.001)

# Initialize variables to keep track of the best validation loss and the corresponding model weights
best_val_loss = float('inf')
best_model_weights = None

In [None]:
# # Lists to store training and validation loss and accuracy
train_losses = []
val_losses = []

# Training loop
num_epochs = 100  # Adjust as needed
for epoch in range(num_epochs):
    combined_model.train()  # Set the model to training mode
    total_train_loss = 0.0
    for data, labels in train_dataloader:
        optimizer.zero_grad()
        inputs = data.view(data.size(0), -1).to(device)
        labels = labels.to(device)
        predictions = combined_model(inputs)
        loss = criterion(predictions, labels)
        loss.backward()
        optimizer.step()
        total_train_loss += loss.item()


    # Calculate average training loss for this epoch
    avg_train_loss = total_train_loss / len(train_dataloader)
    train_losses.append(avg_train_loss)

    print(f'Epoch [{epoch+1}/{num_epochs}] - Train Loss: {avg_train_loss:.4f}')

    # Validation loop
    combined_model.eval()  # Set the model to evaluation mode
    total_val_loss = 0.0
    with torch.no_grad():
        for val_data, val_labels in val_dataloader:
        	val_data = val_data.view(val_data.size(0), -1).to(device)
        	val_labels = val_labels.to(device)
        	val_predictions = combined_model(val_data)
        	val_loss = criterion(val_predictions, val_labels)
        	total_val_loss += val_loss.item()

    # Calculate average validation loss for this epoch
    avg_val_loss = total_val_loss / len(val_dataloader)
    val_losses.append(avg_val_loss)

    # Check if this is the best validation loss so far
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        best_model_weights = combined_model.state_dict()
        print(f'better weights for model')

    print(f'Epoch [{epoch+1}/{num_epochs}] - Validation Loss: {avg_val_loss:.4f}')

# Save the trained combined model
torch.save(combined_model.state_dict(), 'results/unlabeled/mlp/combined_position_estimation_model.pth')

# Plot the training and validation loss curves
plt.figure(figsize=(12, 6))
plt.plot(range(1, num_epochs+1), train_losses, label='Training Loss')
plt.plot(range(1, num_epochs+1), val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.grid(True)
plt.savefig('results/unlabeled/mlp/loss_curves.png')
# plt.show()

In [None]:
#TESTING
from sklearn.metrics import mean_squared_error, mean_absolute_error
import numpy as np

def calculate_mse(predictions, labels):
    return mean_squared_error(labels, predictions)

def calculate_mae(predictions, labels):
    return mean_absolute_error(labels, predictions)

def calculate_rmse(predictions, labels):
    return np.sqrt(mean_squared_error(labels, predictions))


def calculate_mape(predictions, labels):
    absolute_percentage_errors = np.abs((labels - predictions) / labels)
    return np.mean(absolute_percentage_errors)

def calculate_rmspe(predictions, labels):
    percentage_errors = ((labels - predictions) / labels) ** 2
    return np.sqrt(np.mean(percentage_errors))

model_weights_path = 'results/unlabeled/mlp/combined_position_estimation_model.pth'
combined_model.load_state_dict(torch.load(model_weights_path))

test_losses = []
mse_values = []
mae_values = []
rmse_values = []
norm_mae_values = []
norm_rmse_values = []

combined_model.eval()  # Set the model to evaluation mode
with torch.no_grad():
    for test_data, test_labels in test_dataloader:
        test_data = test_data.view(test_data.size(0), -1).to(device)
        test_labels = test_labels.to(device)
        test_predictions = combined_model(test_data)
        test_loss = criterion(test_predictions, test_labels)
        test_losses.append(test_loss.item())

        # Convert predictions and labels back to CPU if necessary
        test_predictions = test_predictions.cpu().numpy()
        test_labels = test_labels.cpu().numpy()

        mse = calculate_mse(test_predictions, test_labels)
        mae = calculate_mae(test_predictions, test_labels)
        rmse = calculate_rmse(test_predictions, test_labels)
        norm_mae = calculate_mape(test_predictions, test_labels)
        norm_rmse = calculate_rmspe(test_predictions, test_labels)

        mse_values.append(mse)
        mae_values.append(mae)
        rmse_values.append(rmse)
        norm_mae_values.append(norm_mae)
        norm_rmse_values.append(norm_rmse)

avg_test_loss = np.mean(test_losses)
avg_mse = np.mean(mse_values)
avg_mae = np.mean(mae_values)
avg_rmse = np.mean(rmse_values)
avg_norm_mae_values = np.mean(norm_mae_values)
avg_norm_rmse_values = np.mean(norm_rmse_values)

print(f'Test Loss: {avg_test_loss:.4f}')
print(f'MSE: {avg_mse:.4f}')
print(f'MAE: {avg_mae:.4f}')
print(f'RMSE: {avg_rmse:.4f}')
print(f'MAPE: {avg_norm_mae_values:.4f}')
print(f'RMSPE: {avg_norm_rmse_values:.4f}')

In [None]:
# Create a DataFrame for the submission
submission_df = pd.DataFrame({
    'id': range(1, len(test_data) + 1),
    'x': x_pred.cpu().numpy(),
    'y': y_pred.cpu().numpy(),
    'z': z_pred.cpu().numpy()
})

# Save the DataFrame to a CSV file
submission_df.to_csv('submission.csv', index=False)