In [None]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler
from scipy.stats import zscore
import matplotlib as plt

In [None]:
pd.set_option("display.max_columns",None)
df = pd.read_csv("df.csv")

fdf = df.sort_values(by='time_epoch', ascending=True)           # SORT VALUES BY TIME 
fdf = fdf[fdf["Source"] == 2]                                   # FIXED SOURCE SELECTION
fdf["time_epoch"] = fdf["time_epoch"].diff().fillna(0)          # TIME EPOCH DIFFERENCE BETWEEN VALUES CORRESPONDING
fdf = fdf[(fdf["time_epoch"] < 1)]                              # DEFINED THRESHOLD

fdf = fdf[['time_epoch','RSSI','speed_kmh_source','speed_kmh_destination','distance']]  # LIMIT DATAPOINTS
fdf = fdf.head(80_000)
fdf.reset_index(drop=True, inplace=True)

# Data Standardization
fdf['RSSI'] = zscore(fdf['RSSI'])

#  Min-Max scaling for speed_kmh_source and speed_kmh_destination
scaler = MinMaxScaler()
fdf[['speed_kmh_source', 'speed_kmh_destination']] = scaler.fit_transform(fdf[['speed_kmh_source', 'speed_kmh_destination']])

# Log transformation for time_epoch and re-normalize to [0, 1] (if needed)
fdf['time_epoch'] = fdf['time_epoch'].apply(lambda x: np.log1p(x))  # log(1 + x) to handle zero values
fdf['time_epoch'] = scaler.fit_transform(fdf[['time_epoch']])       # Min-Max scaling to keep values between [0, 1]
fdf['log_distance'] = np.log(fdf['distance'] + 1)
fdf.describe().T

In [None]:
# features and labels
X = fdf[['time_epoch', 'RSSI', 'speed_kmh_source', 'speed_kmh_destination']].values  # Features
y = fdf['distance'].values  # Target 

# test train split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=True)

# data to tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)

X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

# Data Loader
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

batch_size = 32  
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, dropout):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # LSTM layer with dropout between layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, 
                            batch_first=True, dropout=dropout)
        
        # connected layer
        self.fc = nn.Linear(hidden_size, output_size)

        # dropout layer
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        batch_size = x.size(0)  # Get the batch size dynamically
        
        # hidden & cell states
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)
        
        # Forward
        out, _ = self.lstm(x, (h0, c0))
        
        # dropout in training
        out = self.dropout(out[:, -1, :])  
        out = self.fc(out)
        return out

# model initialize
input_size = 4 
hidden_size = 64
num_layers = 3
output_size = 1  
dropout = 0.5  

model = LSTMModel(input_size, hidden_size, num_layers, output_size, dropout=dropout)

In [None]:

# Loss function and optimizer
criterion = nn.MSELoss()  # Mean Squared Error for regression tasks
optimizer = optim.Adam(model.parameters(), lr=0.001)  # Adam optimizer
num_epochs = 10  # Set the number of epochs
for epoch in range(num_epochs):
    model.train() 
    for batch_X, batch_y in train_loader:      
        optimizer.zero_grad()  # Zero  gradients
        outputs = model(batch_X.unsqueeze(1)) # add sequence
        loss = criterion(outputs, batch_y.unsqueeze(1))  # loss
        loss.backward()  # Backpropagate gradient
        optimizer.step()  # Update weights
        

    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')

In [None]:
model.eval()
with torch.no_grad():  # Disable gradient for evaluation
    predictions = []
    true_values = []
    
    for batch_X, batch_y in test_loader:
        # Forward pass
        outputs = model(batch_X.unsqueeze(1))
        predictions.append(outputs.numpy())
        true_values.append(batch_y.numpy())

    # Convert list to array
    predictions = np.concatenate(predictions)
    true_values = np.concatenate(true_values)

# Calculate evaluation metrics
mse = mean_squared_error(true_values, predictions)
rmse = np.sqrt(mse)
mae = mean_absolute_error(true_values, predictions)
r2 = r2_score(true_values, predictions)

print(f"MSE: {mse}")
print(f"RMSE: {rmse}")
print(f"MAE: {mae}")
print(f"R²: {r2}")