In [1]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import GroupShuffleSplit, GroupKFold
from sklearn.metrics import f1_score
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import optuna

from database.query import fetch_all, load_csv

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
df = fetch_all()
df = df.drop("id", axis=1)
df.columns = [str(c) for c in df.columns]

In [None]:
# Feature engineering
df["days_since_planting"] = (df["timestamp"] - df.groupby("plant_id")["timestamp"].transform("min")).dt.days
df["delta_soil_moisture"] = df.groupby("plant_id")["soil_moisture"].diff().fillna(0)
df["delta_chlorophyll"] = df.groupby("plant_id")["chlorophyll_content"].diff().fillna(0)

# Rolling statistics (window=3)
for col in ['soil_moisture', 'chlorophyll_content', 'ambient_temperature']:
    df[f'{col}_rolling_mean_3'] = df.groupby('plant_id')[col].transform(lambda x: x.rolling(3, min_periods=1).mean()).fillna(0)
    df[f'{col}_rolling_std_3'] = df.groupby('plant_id')[col].transform(lambda x: x.rolling(3, min_periods=1).std()).fillna(0)


In [None]:
df.columns

In [None]:
# Encode target
le = LabelEncoder()
df["plant_health_status"] = le.fit_transform(df["plant_health_status"])
num_classes = len(le.classes_)

features = df.drop(columns=['timestamp', 'plant_id', 'plant_health_status']).columns.tolist()

In [None]:
gss = GroupShuffleSplit(n_splits=1, test_size=0.3, random_state=2503)
train_idx, test_idx = next(gss.split(df, df['plant_health_status'], groups=df['plant_id']))

train = df.iloc[train_idx].copy()
test = df.iloc[test_idx].copy()

scaler = StandardScaler()
train[features] = scaler.fit_transform(train[features])
test[features] = scaler.transform(test[features])

# Deep

In [None]:
def create_sequences(df, features, target, window_size):
    X, y, groups = [], [], []

    for plant_id, plant_df in df.groupby("plant_id"):
        plant_df = plant_df.sort_values("timestamp")
        values = plant_df[features].values
        labels = plant_df[target].values

        for i in range(len(plant_df) - window_size):
            X.append(values[i:i+window_size])
            y.append(labels[i+window_size])
            groups.append(plant_id)

    return np.array(X), np.array(y), np.array(groups)

In [None]:
class TemporalCNN(nn.Module):
    def __init__(self, n_features, window_size, num_classes, n_filters, kernel_size, dropout):
        super().__init__()
        self.conv1 = nn.Conv1d(n_features, n_filters, kernel_size)
        self.conv2 = nn.Conv1d(n_filters, n_filters, kernel_size)
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear((window_size - 2*(kernel_size-1)) * n_filters, num_classes)

    def forward(self, x):
        x = x.permute(0, 2, 1)
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = x.flatten(1)
        x = self.dropout(x)
        return self.fc(x)


In [None]:
def objective(trial):
    window_size = trial.suggest_int("window_size", 3, 10)
    n_filters = trial.suggest_int("n_filters", 16, 64)
    kernel_size = trial.suggest_int("kernel_size", 2, 4)
    dropout = trial.suggest_float("dropout", 0.1, 0.5)
    lr = trial.suggest_float("lr", 1e-4, 1e-2, log=True)

    X, y, groups = create_sequences(
        train, features, "plant_health_status", window_size
    )

    device = "cuda" if torch.cuda.is_available() else "cpu"
    gkf = GroupKFold(n_splits=5)
    f1_scores = []

    for train_idx, val_idx in gkf.split(X, y, groups):
        model = TemporalCNN(
            n_features=len(features),
            window_size=window_size,
            num_classes=num_classes,
            n_filters=n_filters,
            kernel_size=kernel_size,
            dropout=dropout
        ).to(device)

        optimizer = torch.optim.Adam(model.parameters(), lr=lr)
        criterion = nn.CrossEntropyLoss()

        X_train = torch.tensor(X[train_idx], dtype=torch.float32).to(device)
        y_train = torch.tensor(y[train_idx], dtype=torch.long).to(device)
        X_val = torch.tensor(X[val_idx], dtype=torch.float32).to(device)
        y_val = torch.tensor(y[val_idx], dtype=torch.long).to(device)

        for _ in range(20):
            optimizer.zero_grad()
            loss = criterion(model(X_train), y_train)
            loss.backward()
            optimizer.step()

        preds = torch.argmax(model(X_val), dim=1)
        f1_scores.append(
            f1_score(y_val.cpu(), preds.cpu(), average="macro")
        )

    return 1 - np.mean(f1_scores)


In [None]:
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=200)

In [None]:
best_trial = study.best_trial
print("Best Hyperparameters:", best_trial.params)

In [None]:
best_params = study.best_trial.params
seq_len = best_params['seq_len']
final_dataset = PlantDataset(train, features, 'plant_health_status', seq_len=seq_len)
final_loader = DataLoader(final_dataset, batch_size=best_params['batch_size'], shuffle=True)

final_model = NeuralNet(input_dim=len(features), hidden_dim=best_params['hidden_dim'], 
                      num_layers=best_params['num_layers'], num_classes=num_classes, seq_len=seq_len)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(final_model.parameters(), lr=best_params['lr'])

for epoch in range(50):
    final_model.train()
    for X_batch, y_batch in final_loader:
        optimizer.zero_grad()
        outputs = final_model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()

In [None]:
test_dataset = PlantDataset(test, features, 'plant_health_status', seq_len=seq_len)
test_loader = DataLoader(test_dataset, batch_size=best_params['batch_size'])

final_model.eval()
all_preds, all_labels = [], []
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        outputs = final_model(X_batch)
        preds = torch.argmax(outputs, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(y_batch.cpu().numpy())


In [None]:
from sklearn.metrics import classification_report
print(classification_report(all_labels, all_preds, target_names=le.classes_))