<a href="https://colab.research.google.com/github/ariakhademi/telematics/blob/main/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install opendatasets

Collecting opendatasets
  Downloading opendatasets-0.1.22-py3-none-any.whl.metadata (9.2 kB)
Downloading opendatasets-0.1.22-py3-none-any.whl (15 kB)
Installing collected packages: opendatasets
Successfully installed opendatasets-0.1.22


In [2]:
import opendatasets as od
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.optim
from sklearn.preprocessing import StandardScaler

In [55]:
# data loading
dataset_url = 'https://www.kaggle.com/datasets/yunlevin/levin-vehicle-telematics'
od.download(dataset_url)
file_path = 'levin-vehicle-telematics/allcars.csv'
df_orig = pd.read_csv(file_path)

Skipping, found downloaded files in "./levin-vehicle-telematics" (use force=True to force download)


  df_orig = pd.read_csv(file_path)


**Detect hard brakes with a threshold**

In [4]:
# constants
acceleration_threshold = 0.5

# data preprocessing
df['timeStamp'] = pd.to_datetime(df['timeStamp'])
delta_speed = df['speed'].diff()
delta_time = df['timeStamp'].diff().dt.total_seconds()
acceleration = delta_speed / delta_time
df['acceleration'] = acceleration

# detect hard brakes
df['hard_brakes'] = (df['acceleration'] < -acceleration_threshold).astype(int)
df['hard_brakes']

Unnamed: 0,hard_brakes
0,0
1,0
2,0
3,0
4,0
...,...
7214690,0
7214691,0
7214692,0
7214693,0


**Detect hard brakes with machine learning**

In [56]:
# data preprocessing
df = df_orig.copy()
df = df[:100000]
df['timeStamp'] = pd.to_datetime(df['timeStamp'])
delta_speed = df['speed'].diff()
delta_time = df['timeStamp'].diff().dt.total_seconds()
acceleration = delta_speed / delta_time
df['acceleration'] = acceleration
scaler = StandardScaler()
df[['speed', 'acceleration']] = scaler.fit_transform(df[['speed', 'acceleration']])

# create sequences
seq_length = 10
input_sequence = []
for i in range(len(df) - seq_length):
    input_sequence.append(df[['speed', 'acceleration']].iloc[i:i+seq_length].values)
# sequence_input, e.g. = [[[1,2],[3,4],...,[x,y]],[[6,7],[8,9],...,[a,b]],...]
input_sequence = np.array(input_sequence)
input_tensor = torch.tensor(input_sequence) # convert to tensor
dataset = torch.utils.data.TensorDataset(input_tensor.float(), input_tensor.float())  # for auto encoder
dataloader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True)

In [57]:
# hyperparams
num_features = 2
encoded_size = 8
lr = 0.01
num_epochs = 10
input_size = seq_length * num_features
hidden_size = 16
batch_size = 32

class AutoEncoder(nn.Module):
    def __init__(self,input_size,encoded_size,batch_size):
      super(AutoEncoder, self).__init__()

      # params
      self.batch_size = batch_size
      self.input_size = input_size
      self.encoded_size = encoded_size

      # encoder
      self.encoder = nn.Sequential(
          nn.Linear(input_size,hidden_size),
          nn.LeakyReLU(negative_slope=0.1),
          nn.Linear(hidden_size,encoded_size),
          nn.LeakyReLU(negative_slope=0.1)
      )

      # decoder
      self.decoder = nn.Sequential(
          nn.Linear(encoded_size,hidden_size),
          nn.LeakyReLU(negative_slope=0.1),
          nn.Linear(hidden_size,input_size),
      )

    def forward(self, x):
      x = x.view(x.size(0), -1)
      return self.decoder(self.encoder(x))

# model, loss, optimizer
model = AutoEncoder(input_size,encoded_size,batch_size)
loss_fn = nn.MSELoss()
optim = torch.optim.Adam(params=model.parameters(), lr=lr)

In [58]:
torch.manual_seed(42)

# training
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    # each batch
    for input,_ in dataloader:
      optim.zero_grad()
      output = model(input)
      loss = loss_fn(output,input.view(input.size(0),-1))
      loss.backward()
      nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
      optim.step()
      total_loss += loss.item()

    # epoch loss
    print(f'Epoch: {epoch}, Loss: {total_loss}')

Epoch: 0, Loss: nan
Epoch: 1, Loss: nan
Epoch: 2, Loss: nan
Epoch: 3, Loss: nan
Epoch: 4, Loss: nan
Epoch: 5, Loss: nan
Epoch: 6, Loss: nan
Epoch: 7, Loss: nan
Epoch: 8, Loss: nan
Epoch: 9, Loss: nan


In [None]:
# function for hard brakes
def hard_brakes(model, input):
    with torch.inference_mode():
        model.eval()
        output = model(input)
        err = torch.mean(output - input)
        anomaly = err > acceleration_threshold