In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import numpy as np
import pandas as pd

# Load the data
train_df = pd.read_csv('data-v6/train/train.csv')
test_df = pd.read_csv('data-v6/test/test.csv')
valid_df = pd.read_csv('data-v6/valid/valid.csv')

# Combine train and valid datasets for training
train_df = pd.concat([train_df, valid_df])
train_df = train_df[train_df['pothole_area_mm2'] <= 100000000]
train_df = train_df.dropna()  
test_df = test_df.dropna()

total_train_df = pd.concat([train_df, test_df])

train_df.to_csv('train_features.csv', index=False)

X_train = train_df.drop(columns=['Bags used '])  # Replace 'target' with your target column name
y_train = train_df['Bags used ']
X_test = test_df.drop(columns=['Bags used '])  # Replace 'target' with your target column name
y_test = test_df['Bags used ']

class ContrastivePotholeDataset(Dataset):
    def __init__(self, dataframe):
        self.dataframe = dataframe
        self.features = dataframe.drop(columns=['Bags used ']).values
        self.labels = dataframe['Bags used '].values

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        anchor_feature = self.features[idx]
        anchor_label = self.labels[idx]

        # Find a positive pair (a sample with a similar label)
        positive_idx = torch.argmin(torch.abs(torch.tensor(self.labels) - anchor_label))
        positive_feature = self.features[positive_idx]
        positive_label = self.labels[positive_idx]

        # Find a negative pair (a sample with a dissimilar label)
        negative_idx = torch.argmax(torch.abs(torch.tensor(self.labels) - anchor_label))
        negative_feature = self.features[negative_idx]
        negative_label = self.labels[negative_idx]

        return anchor_feature, positive_feature, negative_feature, anchor_label, positive_label, negative_label

# Load the data into the dataset
train_dataset = ContrastivePotholeDataset(train_df)

# DataLoader for batching
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)


In [8]:
import torch.nn as nn
import torch.nn.functional as F

# Define the triplet margin loss function
class TripletMarginLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(TripletMarginLoss, self).__init__()
        self.margin = margin

    def forward(self, anchor, positive, negative):
        distance_positive = F.pairwise_distance(anchor, positive, p=2)
        distance_negative = F.pairwise_distance(anchor, negative, p=2)
        loss = torch.clamp(distance_positive - distance_negative + self.margin, min=0.0)
        return loss.mean()

In [9]:
class ContrastiveNet(nn.Module):
    def __init__(self, input_dim):
        super(ContrastiveNet, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 32)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)  # Output embeddings
        return x


In [10]:
# Initialize the model, loss function, and optimizer
input_dim = X_train.shape[1]
model = ContrastiveNet(input_dim)
criterion = TripletMarginLoss(margin=1.0)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 20
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for i, (anchor_feature, positive_feature, negative_feature, _, _, _) in enumerate(train_loader):
        anchor_feature = torch.tensor(anchor_feature, dtype=torch.float32)
        positive_feature = torch.tensor(positive_feature, dtype=torch.float32)
        negative_feature = torch.tensor(negative_feature, dtype=torch.float32)

        # Forward pass
        anchor_output = model(anchor_feature)
        positive_output = model(positive_feature)
        negative_output = model(negative_feature)

        # Calculate loss
        loss = criterion(anchor_output, positive_output, negative_output)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')


  from .autonotebook import tqdm as notebook_tqdm
  anchor_feature = torch.tensor(anchor_feature, dtype=torch.float32)
  positive_feature = torch.tensor(positive_feature, dtype=torch.float32)
  negative_feature = torch.tensor(negative_feature, dtype=torch.float32)


Epoch [1/20], Loss: 3057.9433
Epoch [2/20], Loss: 273.4253
Epoch [3/20], Loss: 268.8365
Epoch [4/20], Loss: 73.9060
Epoch [5/20], Loss: 1.1229
Epoch [6/20], Loss: 1.0637
Epoch [7/20], Loss: 18.4918
Epoch [8/20], Loss: 1.0174
Epoch [9/20], Loss: 1.0000
Epoch [10/20], Loss: 1.0000
Epoch [11/20], Loss: 1.0000
Epoch [12/20], Loss: 1.0000
Epoch [13/20], Loss: 1.0000
Epoch [14/20], Loss: 1.0000
Epoch [15/20], Loss: 1.0000
Epoch [16/20], Loss: 1.0000
Epoch [17/20], Loss: 1.0000
Epoch [18/20], Loss: 1.0000
Epoch [19/20], Loss: 1.0000
Epoch [20/20], Loss: 1.0000


In [11]:
# Extract embeddings for training data
train_embeddings = model(torch.tensor(X_train.values, dtype=torch.float32)).detach().numpy()

# Use these embeddings in a regression model (e.g., Bayesian Ridge, SVR, etc.)
from sklearn.linear_model import BayesianRidge

regression_model = BayesianRidge()
regression_model.fit(train_embeddings, y_train)

# Predict on the test set
test_embeddings = model(torch.tensor(X_test.values, dtype=torch.float32)).detach().numpy()
y_test_pred = regression_model.predict(test_embeddings)


In [12]:
from sklearn.metrics import mean_squared_error

mse = mean_squared_error(y_test, y_test_pred)
print(f'Mean Squared Error: {mse:.4f}')


Mean Squared Error: 8.8356
