<a href="https://colab.research.google.com/github/ashikf378/About/blob/main/shipment_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [6]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
import glob
import os

# Define data paths for real data
data_paths = {
    "stationary": {"path": r"/content/drive/MyDrive/smart_shipment/0 stationary", "target": 0},
    "moving":     {"path": r"/content/drive/MyDrive/smart_shipment/1 moving", "target": 1},
    "picked":     {"path": r"/content/drive/MyDrive/smart_shipment/2 picked", "target": 2},
    "wrong":      {"path": r"/content/drive/MyDrive/smart_shipment/3 wrong", "target": 3},
    "thrown":     {"path": r"/content/drive/MyDrive/smart_shipment/4 thrown", "target": 4}
}

# Parameters for windowing (from synthetic generation approach)
window_size = 50
sensor_cols = ['acc_x', 'acc_y', 'acc_z', 'gyro_x', 'gyro_y', 'gyro_z']  # Correct sensor columns

# Collect features and labels from real data with window-based feature extraction
data = []
labels = []
all_data_dfs = []  # For optional combined CSV saving

for label_name, info in data_paths.items():
    files = glob.glob(os.path.join(info["path"], "*.csv"))
    for file in files:
        df = pd.read_csv(file)
        # Add target column for optional saving
        df["target"] = info["target"]
        all_data_dfs.append(df)

        # Select sensor data
        if not all(col in df.columns for col in sensor_cols):
            print(f"Skipping {file}: Missing sensor columns.")
            continue
        data_df = df[sensor_cols]

        # Convert to numeric and drop NaNs
        data_df = data_df.apply(pd.to_numeric, errors='coerce')
        data_df = data_df.dropna()

        # Extract windows
        num_windows = len(data_df) // window_size
        for i in range(num_windows):
            window = data_df.iloc[i * window_size : (i + 1) * window_size]
            acc = window[['acc_x', 'acc_y', 'acc_z']].values
            gyro = window[['gyro_x', 'gyro_y', 'gyro_z']].values
            features = np.concatenate([
                acc.mean(axis=0), acc.std(axis=0), gyro.mean(axis=0), gyro.std(axis=0)
            ])
            data.append(features)
            labels.append(info["target"])

# Convert to arrays
X = np.array(data)
y = np.array(labels)

# Optional: Save combined raw dataset (before windowing)
combined_df = pd.concat(all_data_dfs, ignore_index=True)
combined_df.to_csv("combined_dataset.csv", index=False)
print(f"Combined dataset saved. Shape: {combined_df.shape}")

# If no data was collected (e.g., insufficient windows), fallback to synthetic
if len(X) == 0:
    print("No real data windows found. Generating synthetic data.")
    def generate_synthetic_data(num_samples_per_class=500, num_classes=5, window_size=50):
        data = []
        labels = []

        for class_id in range(num_classes):
            for _ in range(num_samples_per_class // window_size):
                if class_id == 0:  # stationary: small noise around gravity
                    acc = np.random.normal([0, 0, 1], 0.01, size=(window_size, 3))
                    gyro = np.random.normal(0, 0.1, size=(window_size, 3))
                elif class_id == 1:  # moving: sinusoidal motion
                    t = np.linspace(0, 2 * np.pi, window_size)
                    acc = np.column_stack([np.sin(t)*0.5, np.cos(t)*0.3, np.ones(window_size) + np.sin(t)*0.1])
                    gyro = np.column_stack([np.cos(t)*5, np.sin(t)*3, np.random.normal(0, 1, window_size)])
                elif class_id == 2:  # picked: sudden upward acceleration
                    acc = np.column_stack([np.random.normal(0, 0.05, window_size),
                                           np.random.normal(0, 0.05, window_size),
                                           np.linspace(1, 1.5, window_size) + np.random.normal(0, 0.1, window_size)])
                    gyro = np.random.normal(0, 2, size=(window_size, 3))
                elif class_id == 3:  # wrong position: tilted
                    acc = np.random.normal([0.5, 0.5, 0.7], 0.05, size=(window_size, 3))
                    gyro = np.random.normal([10, 10, 0], 1, size=(window_size, 3))
                elif class_id == 4:  # thrown: high acceleration and rotation
                    acc = np.random.normal(0, 1, size=(window_size, 3)) + np.linspace(0, 2, window_size)[:, None]
                    gyro = np.random.normal(0, 20, size=(window_size, 3))

                # Flatten the window into features
                features = np.concatenate([
                    acc.mean(axis=0), acc.std(axis=0), gyro.mean(axis=0), gyro.std(axis=0)
                ])
                data.append(features)
                labels.append(class_id)

        return np.array(data), np.array(labels)

    X, y = generate_synthetic_data()

# Preprocess: Scale features
scaler = StandardScaler()
X = scaler.fit_transform(X)

# Split into train/test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Custom Dataset
class SensorDataset(Dataset):
    def __init__(self, features, labels):
        self.features = torch.tensor(features, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.long)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

# DataLoaders
train_dataset = SensorDataset(X_train, y_train)
test_dataset = SensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Simple MLP Model
class ShipmentClassifier(nn.Module):
    def __init__(self, input_size, num_classes):
        super(ShipmentClassifier, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, num_classes)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Initialize model
input_size = X.shape[1]  # 12 features (mean/std for 6 sensors)
num_classes = 5
model = ShipmentClassifier(input_size, num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train the model
num_epochs = 50
for epoch in range(num_epochs):
    model.train()
    for features, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(features)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

# Evaluate
model.eval()
y_pred = []
y_true = []
with torch.no_grad():
    for features, labels in test_loader:
        outputs = model(features)
        _, predicted = torch.max(outputs, 1)
        y_pred.extend(predicted.numpy())
        y_true.extend(labels.numpy())

accuracy = accuracy_score(y_true, y_pred)
print(f'Accuracy: {accuracy * 100:.2f}%')

# Save the model
torch.save(model.state_dict(), 'shipment_model.pkl')

from sklearn.metrics import classification_report

# Evaluate
model.eval()
y_pred = []
y_true = []
with torch.no_grad():
    for features, labels in test_loader:
        outputs = model(features)
        _, predicted = torch.max(outputs, 1)
        y_pred.extend(predicted.numpy())
        y_true.extend(labels.numpy())

print(classification_report(y_true, y_pred))

new_df = pd.read_csv("/content/drive/MyDrive/test_1/THROWN.csv")

# Preprocess the same way as training data
# Drop target column if it exists
if 'target' in new_df.columns:
    new_df = new_df.drop(columns=['target'], errors='ignore')

# Select only sensor columns and convert to numeric, dropping NaNs
new_df = new_df[sensor_cols].apply(pd.to_numeric, errors='coerce').dropna()

# Apply window-based feature extraction to new data
new_data = []
num_windows = len(new_df) // window_size

for i in range(num_windows):
    window = new_df.iloc[i * window_size : (i + 1) * window_size]
    acc = window[['acc_x', 'acc_y', 'acc_z']].values
    gyro = window[['gyro_x', 'gyro_y', 'gyro_z']].values
    features = np.concatenate([
        acc.mean(axis=0), acc.std(axis=0), gyro.mean(axis=0), gyro.std(axis=0)
    ])
    new_data.append(features)

# Convert to array
new_data_array = np.array(new_data)


# Scale the new data using the same scaler fitted on the training data
# This assumes the scaler object is still available from the previous cell
if 'scaler' in globals():
    new_data_scaled = scaler.transform(new_data_array)
    new_data_scaled_tensor = torch.tensor(new_data_scaled, dtype=torch.float32)
else:
    print("Scaler not found. Cannot scale new data. Using unscaled data.")
    new_data_scaled_tensor = torch.tensor(new_data_array, dtype=torch.float32)


# Set the model to evaluation mode
model.eval()

# Get predictions
with torch.no_grad():
    outputs = model(new_data_scaled_tensor)
    _, predicted_classes = torch.max(outputs, 1)

# The predicted_classes tensor contains the predicted class index for each window of the new data
print("Predicted classes:", predicted_classes.numpy())

Combined dataset saved. Shape: (16033, 7)
Epoch [10/50], Loss: 0.5948
Epoch [20/50], Loss: 0.4619
Epoch [30/50], Loss: 0.0355
Epoch [40/50], Loss: 0.0611
Epoch [50/50], Loss: 0.0255
Accuracy: 98.31%
              precision    recall  f1-score   support

           0       0.92      1.00      0.96        11
           1       1.00      1.00      1.00        13
           2       1.00      0.92      0.96        13
           3       1.00      1.00      1.00         9
           4       1.00      1.00      1.00        13

    accuracy                           0.98        59
   macro avg       0.98      0.98      0.98        59
weighted avg       0.98      0.98      0.98        59

Predicted classes: [4 4 4 4]


In [7]:
# Add this import at the top of your training script
import joblib

# In your training code, after this line:
scaler = StandardScaler()
X = scaler.fit_transform(X)

# Add this line to save the scaler:
joblib.dump(scaler, 'scaler.pkl')
print("Scaler saved as scaler.pkl")

Scaler saved as scaler.pkl


In [8]:
# Add this after loading the model
test_features = np.random.randn(1, 12)  # Replace with actual features from your training data
test_features_scaled = scaler.transform(test_features)
test_tensor = torch.tensor(test_features_scaled, dtype=torch.float32)
with torch.no_grad():
    outputs = model(test_tensor)
    _, predicted = torch.max(outputs, 1)
print(f"Test prediction: {predicted.item()}")

Test prediction: 2
