In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings("ignore")
%matplotlib inline

In [2]:
file_list = ["garmin_edge_820/3993730634_ACTIVITY_data.csv",
             "garmin_edge_820/4557226804_ACTIVITY_data.csv",
             "garmin_edge_820/4593452980_ACTIVITY_data.csv",
             "garmin_edge_820/5191513011_ACTIVITY_data.csv",
]
combined_df = pd.concat([pd.read_csv(file, sep=";") for file in file_list], ignore_index=True)

In [3]:
def convert_brackets(string):
    return string.replace('[', '(').replace(']', ')')

combined_df.columns = [convert_brackets(col) for col in combined_df.columns]

In [4]:
hr_zones = [(0, 128), (129, 146), (147, 156), (157, 165),(166, 174), (175, 179), (180, float('inf'))]
power_zones = [(0, 157), (158, 186), (187, 200), (201, 218),(219, 247), (248, 287), (288, float('inf'))]

def get_zone(rate, zones):
    for zone, (lower, upper) in enumerate(zones, start=1):
        if lower <= rate <= upper:
            return zone
        
combined_df['hr_zone'] = combined_df['heart_rate(bpm)'].apply(get_zone, zones=hr_zones)
combined_df['pwr_zone'] = combined_df['power(watts)'].apply(get_zone, zones=power_zones)

In [5]:
window_size = 20 
combined_df['altitude_diff(m)'] = combined_df['altitude(m)'] - combined_df['altitude(m)'].shift(1)
combined_df['distance_diff(m)'] = combined_df['distance(m)'] - combined_df['distance(m)'].shift(1)
combined_df[['altitude_diff(m)', 'distance_diff(m)']] = combined_df[['altitude_diff(m)', 'distance_diff(m)']].fillna(0)
combined_df['slope_percent'] = np.where(combined_df['distance_diff(m)'] == 0, 0, combined_df['altitude_diff(m)'] / combined_df['distance_diff(m)'] * 100)
combined_df['avg_slope_percent'] = combined_df['slope_percent'].rolling(window=int(window_size), center=True).mean()
combined_df = combined_df.dropna(subset=['avg_slope_percent'])

In [6]:
window_size = 6
combined_df['avg_power(watts)'] = combined_df['power(watts)'].rolling(window=int(window_size), center=True).mean()
combined_df = combined_df.dropna(subset=['avg_power(watts)'])
combined_df['avg_power(watts)'] = combined_df['avg_power(watts)'].astype('int64')
combined_df = combined_df[combined_df['avg_power(watts)'] > 0]

In [7]:
combined_df['power_left(watts)'] = combined_df['left_right_balance'] - 128
combined_df['power_right(watts)'] = 100 - combined_df['power_left(watts)']

In [8]:
combined_df = combined_df[combined_df['speed(m/s)'] > 0]
combined_df = combined_df[(combined_df['power(watts)'] > 0) & (combined_df['power(watts)'] < 600)]
combined_df = combined_df[combined_df['cadence(rpm)'] > 0]
combined_df = combined_df[combined_df['heart_rate(bpm)'] > 80]
combined_df = combined_df.dropna(subset=['speed(m/s)'])

In [9]:
combined_df['timestamp(s)'] = combined_df['timestamp(s)'] + 631065600
combined_df['time'] = pd.to_datetime(combined_df.pop('timestamp(s)'), unit='s')
combined_df.set_index("time", inplace=True)

In [10]:
combined_df['time_since_start(s)'] = combined_df.groupby(pd.Grouper(freq='D')).cumcount() + 1

In [11]:
combined_df = combined_df.drop(['left_power_phase(degrees)',
                            'left_power_phase_peak(degrees)',
                            'right_power_phase(degrees)',
                            'right_power_phase_peak(degrees)',
                            'left_right_balance'], axis=1)

In [14]:
from sklearn.model_selection import train_test_split

In [15]:
X_hr = combined_df.drop(['heart_rate(bpm)','altitude(m)','avg_power(watts)','slope_percent','temperature(C)','hr_zone','pwr_zone','altitude_diff(m)','distance_diff(m)','left_pco(mm)','right_pco(mm)','power_left(watts)','power_right(watts)','accumulated_power(watts)'], axis=1)
y_hr = combined_df['heart_rate(bpm)']
X_train, X_val, y_train, y_val = train_test_split(X_hr, y_hr, test_size=1/3, random_state=42)

# Previsione battito (regressione)

In [16]:
import torch
from torch.utils.data import DataLoader, TensorDataset, random_split
from sklearn.preprocessing import StandardScaler

# Convert data to numpy arrays
X_np = X_hr.to_numpy(dtype=np.float32)
y_np = y_hr.to_numpy(dtype=np.float32).reshape(-1, 1)

# Normalize the data
scaler_X = StandardScaler()
scaler_y = StandardScaler()

X_np = scaler_X.fit_transform(X_np)
y_np = scaler_y.fit_transform(y_np)

# Convert data to PyTorch tensors
X_tensor = torch.tensor(X_np)
y_tensor = torch.tensor(y_np)

# Create a dataset from tensors
dataset = TensorDataset(X_tensor, y_tensor)

# Split dataset into training and validation sets
train_size = int(0.67 * len(dataset))
val_size = len(dataset) - train_size

train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

train_size, val_size


(36523, 17989)

In [25]:
import torch.nn as nn
import torch.nn.functional as F

class HeartRatePredictor(nn.Module):
    def __init__(self, input_dim):
        super(HeartRatePredictor, self).__init__()
        
        # Define the architecture
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 32)
        self.fc4 = nn.Linear(32, 1)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        return self.fc4(x)

# Create the model
input_dim = X_hr.shape[1]
model = HeartRatePredictor(input_dim)
model


HeartRatePredictor(
  (fc1): Linear(in_features=6, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=64, bias=True)
  (fc3): Linear(in_features=64, out_features=32, bias=True)
  (fc4): Linear(in_features=32, out_features=1, bias=True)
)

In [27]:
import torch.optim as optim
from tqdm import tqdm

# Define the loss and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 50
# Create a single tqdm object for the entire training process
pbar = tqdm(total=num_epochs * len(train_loader), position=0, leave=True)

# Training loop
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    
    for i, (inputs, labels) in enumerate(train_loader):
        
        # Zero the parameter gradients
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(inputs)
        
        # Calculate loss
        loss = criterion(outputs, labels)
        
        # Backward pass and optimize
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        
        # Update the progress bar
        desc = f"Epoch {epoch+1}/{num_epochs} - Loss: {running_loss/(i+1):.4f}"
        pbar.set_description(desc)
        pbar.update(1)
    
print("Finished Training")

# Reset the tqdm progress bar for validation
pbar = tqdm(total=len(val_loader), position=0, leave=True, desc="Validation")

# Evaluate the model on validation set
model.eval()
with torch.no_grad():
    val_loss = 0.0
    for inputs, labels in val_loader:
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        val_loss += loss.item()
        
        # Update the progress bar for validation
        pbar.update(1)
        
    print(f"Validation Loss: {val_loss/len(val_loader):.4f}")


Epoch 50/50 - Loss: 0.1259: 100%|█████████▉| 28549/28550 [02:00<00:00, 271.05it/s]

Finished Training


Epoch 50/50 - Loss: 0.1259: 100%|██████████| 28550/28550 [02:00<00:00, 237.28it/s]
Validation:  77%|███████▋  | 217/282 [00:00<00:00, 1082.25it/s]

Validation Loss: 0.1502


Validation: 100%|██████████| 282/282 [00:16<00:00, 1082.25it/s]

In [44]:
from sklearn.metrics import r2_score, mean_absolute_error

model.eval()  # Set the model to evaluation mode

y_preds = []
y_true = []

with torch.no_grad():
    for inputs, labels in val_loader:
        outputs = model(inputs)
        y_preds.append(outputs)
        y_true.append(labels)

# Concatenate the results
y_pred = torch.cat(y_preds, dim=0)
y_val = torch.cat(y_true, dim=0)

# Convert tensors to numpy arrays
y_val_np = y_val.numpy()
y_pred_np = y_pred.numpy()

# Calculate metrics
mae = mean_absolute_error(y_val_np, y_pred_np)
mse = np.mean((y_val_np - y_pred_np)**2)
rmse = np.sqrt(mse)
r2 = r2_score(y_val_np, y_pred_np)

print(f"MAE: {mae:.4f}")
print(f"MSE: {mse:.4f}")
print(f"RMSE: {rmse:.4f}")
print(f"R^2: {r2:.4f}")


MAE: 0.2732
MSE: 0.1492
RMSE: 0.3862
R^2: 0.8498


# Classificazione zone frequenza cardiaca

In [46]:
# Define the classification neural network
class HeartRateZoneClassifier(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(HeartRateZoneClassifier, self).__init__()
        
        # Define the architecture
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 32)
        self.fc4 = nn.Linear(32, num_classes)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        return F.softmax(self.fc4(x), dim=1)

# Create the classification model
num_classes = len(hr_zones)
model_classify = HeartRateZoneClassifier(input_dim, num_classes)

# Convert zones to tensor for training
y_zone_tensor = torch.tensor(combined_df['hr_zone'].to_numpy(), dtype=torch.long)

# Create a dataset for classification from tensors
dataset_classify = TensorDataset(X_tensor, y_zone_tensor)

# Split dataset into training and validation sets
train_size_classify = int(0.67 * len(dataset_classify))
val_size_classify = len(dataset_classify) - train_size_classify

train_dataset_classify, val_dataset_classify = random_split(dataset_classify, [train_size_classify, val_size_classify])

# Create data loaders for classification
train_loader_classify = DataLoader(train_dataset_classify, batch_size=64, shuffle=True)
val_loader_classify = DataLoader(val_dataset_classify, batch_size=64, shuffle=False)

model_classify


HeartRateZoneClassifier(
  (fc1): Linear(in_features=6, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=64, bias=True)
  (fc3): Linear(in_features=64, out_features=32, bias=True)
  (fc4): Linear(in_features=32, out_features=7, bias=True)
)