**Imports**

In [1]:
# imports
import os
import json as j
import numpy as np

# Import torch
import torch
import torch.nn as nn
import torch.optim as optim

**Sample data**

In [2]:
path_dir = os.path.dirname("")
with open(os.path.join(path_dir, 'timeseries_pheno_metrics.json')) as f:
    json = j.load(f)
    f.close()

X_train = json["timeseries_pheno_metrics"]
y_train = json["label_id"]

In [3]:
new_X_train = []
new_y_train = []

for i in range(len(y_train)):
    if int(y_train[i]) == 370:
        new_X_train.append(X_train[i])
        new_y_train.append(y_train[i])
    if int(y_train[i]) == 372:
        new_X_train.append(X_train[i])
        new_y_train.append(y_train[i])
    if int(y_train[i]) == 365:
        new_X_train.append(X_train[i])
        new_y_train.append(y_train[i])
    if int(y_train[i]) == 359:
        new_X_train.append(X_train[i])
        new_y_train.append(y_train[i])

**Convert data to PyTorch tensors**

In [4]:
X_train_tensor = torch.tensor(np.expand_dims(new_X_train, axis=1), dtype=torch.float32)
y_train_tensor = torch.tensor(new_y_train, dtype=torch.long)

print(X_train_tensor.shape)

torch.Size([494, 1, 41])


**LSTM classifier model**

In [5]:
class LSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out

**Define model parameters**

In [6]:
input_size = X_train_tensor.shape[2]
hidden_size = 32
num_layers = 2
output_size = len(set(y_train_tensor))

**Instantiate**

In [7]:
model = LSTMClassifier(input_size, hidden_size, num_layers, output_size)

**Loss function and optimizer**

In [8]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

**Train the model**

In [9]:
num_epochs = 500
for epoch in range(num_epochs):
    optimizer.zero_grad()
    outputs = model(X_train_tensor)
    loss = criterion(outputs, y_train_tensor)
    loss.backward()
    optimizer.step()
    #print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item()}')

**Make a prediction**

In [10]:
from cshd import cube_query, get_timeseries, params_phenometrics, calc_phenometrics, cshd_array

In [11]:
def return_class_by_id(id):
    if 370: return "Soja"
    if 372: return "Arroz"
    if 359: return "Vegetação Florestal"
    if 365: return "Corpos d'agua"
    if 367: return "Superfícies Artificiais"
    if 363: return "Pastagem"
    if 361: return "Formação Campestre"

In [12]:
S2_cube = cube_query(
    collection="S2-16D-2",
    start_date="2023-01-01",
    end_date="2023-12-31",
    freq='16D',
    band="NDVI"
)

config = params_phenometrics(
    peak_metric='pos', 
    base_metric='vos', 
    method='seasonal_amplitude', 
    factor=0.2, 
    thresh_sides='two_sided', 
    abs_value=0.1
)

longitude = -52.97612606396396
latitude =  -29.325361867915753

ts = get_timeseries(
    cube=S2_cube, 
    geom=[dict(coordinates = [longitude, latitude])],
    cloud_filter = True
)
ndvi_array = cshd_array(
    timeserie=ts['values'],
    start_date='2023-01-01',
    freq='16D'
)
ds_phenos = calc_phenometrics(
    da=ndvi_array,
    engine='phenolopy',
    config=config,
    start_date='2023-01-01'
)
test_tl = ts['values'] + ds_phenos

Initialising calculation of phenometrics.

Beginning extraction of CRS metadata.
> Extracting CRS metadata.
> No CRS metadata found. Returning None.

Beginning calculation of phenometrics. This can take awhile - please wait.

Beginning calculation of peak of season (pos) values and times.
> Calculating peak of season (pos) values.
> Calculating peak of season (pos) times.
> Success!

Beginning calculation of valley of season (vos) values and times.
> Calculating valley of season (vos) values.
> Calculating valley of season (vos) times.
> Success!

Beginning calculation of middle of season (mos) values (times not possible).
> Calculating middle of season (mos) values.
> Success!

Beginning calculation of base (bse) values (times not possible).
> Calculating base (bse) values.
> Success!

Beginning calculation of amplitude of season (aos) values (times not possible).
> Calculating amplitude of season (aos) values.
> Success!

Beginning calculation of start of season (sos) values and time

In [13]:
X_test_tensor = torch.tensor(np.expand_dims([test_tl], axis=1), dtype=torch.float32)
with torch.no_grad():
    predictions = model(X_test_tensor)
    predicted_labels = torch.argmax(predictions, dim=1)
    print("Predicted Label:", predicted_labels[0], return_class_by_id(predicted_labels[0]))

Predicted Label: tensor(359) Soja


**Model Accuracy**

In [14]:
with open(os.path.join(path_dir, 'test_timeseries_pheno_metrics.json')) as ff:
    json_test = j.load(ff)
    ff.close()

X_test = json_test["timeseries_pheno_metrics"]
y_test = json_test["label_id"]

In [15]:
new_X_test = []
new_y_test = []

for i in range(len(y_test)):
    if int(y_test[i]) == 370:
        new_X_test.append(X_test[i])
        new_y_test.append(y_test[i])
    if int(y_test[i]) == 372:
        new_X_test.append(X_test[i])
        new_y_test.append(y_test[i])
    if int(y_test[i]) == 365:
        new_X_test.append(X_test[i])
        new_y_test.append(y_test[i])
    if int(y_test[i]) == 359:
        new_X_test.append(X_test[i])
        new_y_test.append(y_test[i])

In [16]:
X_test_tensor = torch.tensor(np.expand_dims(new_X_test, axis=1), dtype=torch.float32)
y_test_tensor = torch.tensor(new_y_test, dtype=torch.long)

print(X_test_tensor.shape)

torch.Size([200, 1, 41])


In [17]:
with torch.no_grad():
    predictions = model(X_test_tensor)
    predicted_labels = torch.argmax(predictions, dim=1)

correct_count = sum(a == b for a, b in zip(y_test_tensor, predicted_labels))
overlap = float(correct_count) / len(y_test_tensor) * 100

float(overlap)

62.5