# Importing libraries

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, TensorDataset
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

In [None]:
random_floats = np.random.uniform(-1, 1, size=(3,10000))
random = np.random.randint(9, size=10000)

In [None]:
df = pd.DataFrame()
df['red'] = random_floats[0]
df['nir'] = random_floats[1]
df['swir'] = random_floats[2]
df['class'] = random

In [None]:
df.to_csv('df_filtered_new.csv', index=False)

# Settings

In [None]:
classe_info = {
  'Пшеница - 231 тонн':                {'value':0, 'color': '#CCFF66'},
  'Не найдено - 10 тонн':              {'value':1, 'color': '#000000'},
  'Подсолнечник - 0 тонн':             {'value':2, 'color': '#CC9933'},
  'Хлопок - 1 тонн':                   {'value':3, 'color': '#CC0000'},
  'Табак - 0 тонн':                    {'value':4, 'color': '#FF00FF'},
  'Сахарная свекла - 0 тонн':          {'value':5, 'color': '#9900FF'},
  'Капуста - 0 тонн':                  {'value':6, 'color': '#6633CC'},
  'Свекла - 0 тонн':                   {'value':7, 'color': '#3300FF'},
  'Моpковь - 0 тонн':                  {'value':8, 'color': '#33FFCC'},
  'Лук - 0 тонн':                      {'value':9, 'color': '#00FF00'},
  'Кукуруза - 0 тонн':                 {'value':10, 'color': '#99CC00'},
  'Зелень - 0 тонн':                   {'value':11, 'color': '#99CCFF'},
  'Перец - 0 тонн':                    {'value':12, 'color': '#CCCCCC'},
  'Огурцы - 0 тонн':                   {'value':13, 'color': '#CC99FF'},
  'Помидоры - 0 тонн':                 {'value':14, 'color': '#CC9999'},
  'Чеснок - 0 тонн':                   {'value':15, 'color': '#FF9900'},
  'Редька - 0 тонн':                   {'value':16, 'color': '#CC3300'},
  'Ячмень - 350 тонн':                 {'value':17, 'color': '#6600CC'},
  'Рис - 0 тонн':                      {'value':18, 'color': '#FFFF00'},
  'Овес - 420 тонн':                   {'value':19, 'color': '#66CC99'},
}

classes = {x : y.get('value') for x, y in classe_info.items()}

classes_colors = [y.get('color') for x, y in classe_info.items()]

features = ['red', 'nir', 'swir']
n_features = len(features)

sequence_size = 30

model_dir = './logs'

# Dataset preparation

In [None]:
# Dataset preparation
url = "df_filtered.csv"
data = pd.read_csv(url)
# data = data.drop(['date', 'id'], axis=1)

X = data.iloc[:, :-1].values
y = data.iloc[:, -1].values

scaler = MinMaxScaler()
X = scaler.fit_transform(X)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)

print('data shape = ',data.shape)
print('X_train, y_train shape = ', X_train.shape, y_train.shape)

##### Tuning hyperparameters

In [None]:
input_dim = X_train.shape[1]
output_dim = len(np.unique(y_train))
hidden_dim = 64
num_layers = 1
batch_size = 32
learning_rate = 0.001
num_epochs = 1

##### Check for CUDA-enabled GPU

In [None]:
device = torch.device("cpu")
print(f"Using {device} for training.")

##### Move data to GPU

In [None]:
X_train = torch.Tensor(X_train).to(device)
y_train = torch.LongTensor(y_train).to(device)

X_test = torch.Tensor(X_test).to(device)
y_test = torch.LongTensor(y_test).to(device)

##### Create Datasets and DataLoaders

In [None]:
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

# LSTM model

In [None]:
class EncoderDecoderLSTM(nn.Module):
  def __init__(self, input_dim, hidden_dim, num_layers, output_dim):
    super(EncoderDecoderLSTM, self).__init__()
    self.hidden_dim = hidden_dim
    self.num_layers = num_layers

    self.encoder_lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
    self.decoder_lstm = nn.LSTM(hidden_dim, hidden_dim, num_layers, batch_first=True)
    self.fc = nn.Linear(hidden_dim, output_dim)
    self.relu = nn.ReLU()

  def forward(self, x):
    h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)
    c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)

    _, (hn, cn) = self.encoder_lstm(x.unsqueeze(1), (h0, c0))  # Add a dimension for the sequence length
    out, _ = self.decoder_lstm(hn.transpose(0, 1), (hn, cn))  # Transpose hn to match the expected input shape
    out = self.fc(out.squeeze(1))  # Squeeze the sequence length dimension
    out = self.relu(out)  # Apply ReLU activation function to the output of the linear layer

    return out

## Initialize the model

In [None]:
# Initialize the model, loss function and optimizer
device = torch.device("cpu")
model = EncoderDecoderLSTM(input_dim, hidden_dim, num_layers, output_dim).to(device)

### loss function and optimizer

In [None]:
criterion = nn.CrossEntropyLoss().to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Train model

In [None]:
# from IPython.display import clear_output
# import matplotlib.pyplot as plt
import time

# Initialize empty lists for storing the loss and time
train_loss = []
train_time = []

# Enable interactive mode in matplotlib
# plt.ion()

# Create plot for training loss
# fig, ax = plt.subplots(figsize=(15, 6))
# ax.set_xlabel("Epoch")
# ax.set_ylabel("Loss")

# Train the model
for epoch in range(num_epochs):
    start_time = time.time()
    for i, (inputs, labels) in enumerate(train_loader):
        inputs = inputs.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # Compute and store the training loss and time
    train_loss.append(loss.item())
    train_time.append(time.time() - start_time)
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, Time: {train_time[-1]:.2f}s')

    # Update plot for training loss
#     ax.plot(train_loss, '-o')
#     ax.set_xlim(1, len(train_loss))
#     ax.set_ylim(min(train_loss), max(train_loss))
#     clear_output(wait=True)
#     display(fig)
#     plt.pause(0.01)

# # Show the final plot
# plt.show()

# Test model

In [None]:
model.eval()
with torch.no_grad():
    correct = 0
    total = 0

    for inputs, labels in test_loader:
        # print(inputs)
        # print(labels)

        inputs = inputs.to(device)
        labels = labels.to(device)

        outputs = model(inputs)

        _, predicted = torch.max(outputs.data, 1)

        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f'Test Accuracy: {100 * correct / total:.2f}%')

# Save the model checkpoint
# torch.save({'model_state_dict': model.state_dict()}, 'model_checkpoint.pt')
torch.save(model,'cpu2.pt')

# Test model on custom photo

### Get grom 3 tiff(red, nir swir) one array

In [None]:
import rasterio
import numpy as np
import pandas as pd

# Open the TIFF files and extract the data as NumPy arrays
with rasterio.open('./B04.tif') as src:
    red = src.read(1)

with rasterio.open('./B8A.tif') as src:
    nir = src.read(1)

with rasterio.open('./B11.tif') as src:
    swir = src.read(1)

# Combine the NumPy arrays for the different bands into a single NumPy array
data = np.dstack((red, nir, swir))
data[:,:,1].shape

### Preparation data

In [None]:
float_array = data.astype(np.float32)
# float_array = float_array[:-5400, :-5400]

# input_tensor_test = torch.from_numpy(float_array).float()
input_tensor_test = torch.from_numpy(float_array)

print(input_tensor_test.shape)
tensor_2d = input_tensor_test.reshape(-1, 3)

# Print the shape of the reshaped tensor
print(tensor_2d)
tensor_2d.shape

##### transform to torch

In [None]:
scaler = MinMaxScaler()
X2 = scaler.fit_transform(tensor_2d)
# Move data to GPU
X2 = torch.Tensor(X2).to(device)
X2.shape

### Predict data

In [None]:
model.eval()
with torch.no_grad():
    output_tensor = model.forward(X2)

## Preparation predicted data

##### from torch tensor to numpy array

In [None]:
argmax_tensor = torch.argmax(output_tensor, dim=1)
print('argmax_tensor shape = ', argmax_tensor.shape)

tensor_cpu = argmax_tensor.cpu()

argmax_array = tensor_cpu.numpy()

array_2d = argmax_array.reshape(data[:,:,1].shape)

print('array_2d shape = ', array_2d.shape)
print('array_2d', array_2d)


# Visualization

## Visualization on predicted data

In [None]:
from matplotlib import pyplot as plt
import matplotlib.animation as animation
from matplotlib.colors import ListedColormap
import matplotlib.patches as mpatches
from matplotlib import colors 

keys = list(classe_info.keys())
plt.rcParams["figure.figsize"] = (10,10)
cmap = colors.ListedColormap(classes_colors)
plt.imshow(array_2d, cmap=cmap)
plt.legend(title="Наименование и урожайность культуры", bbox_to_anchor =(1, 1.01), loc='upper left')
plt.axis('off')
plt.title('Культура')
plt.savefig('out.png')

## Get one culture

In [None]:
from scipy.stats import mode

array_2d = np.array(array_2d)
array_2d = array_2d.astype(int)

# Get the mode of the array
mode_value = mode(array_2d, axis=None)[0]

# Fill the array with the mode value
filled_array = np.full(array_2d.shape, mode_value)
filled_array

In [None]:
from matplotlib import pyplot as plt
import matplotlib.animation as animation
from matplotlib.colors import ListedColormap
import matplotlib.patches as mpatches
from matplotlib import colors 

keys = list(classe_info.keys())
plt.rcParams["figure.figsize"] = (10,10)
cmap = colors.ListedColormap(classes_colors)
plt.imshow(array_2d, cmap=cmap)
plt.legend(title="Наименование и урожайность культуры", bbox_to_anchor =(1, 1.01), loc='upper left')
plt.axis('off')
plt.title('Культура')
plt.savefig('out.png')

# Load model and predict

In [None]:
# # Check if GPU is available
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# # Load the saved model checkpoint
# checkpoint = torch.load('model_checkpoint.pt', map_location=device)

# # Define the model architecture
# model = EncoderDecoderLSTM(input_dim, hidden_dim, num_layers, output_dim)

# # Load the saved model parameters
# model.load_state_dict(checkpoint['model_state_dict'])

# # Move the model to the GPU
# model.to(device)

In [None]:
# Check if GPU is available
device = torch.device('cpu')

# Load the saved model checkpoint
checkpoint = torch.load('model_checkpoint.pt', map_location=device)

# Define the model architecture
model = EncoderDecoderLSTM(input_dim, hidden_dim, num_layers, output_dim)

# Load the saved model parameters
model.load_state_dict(checkpoint['model_state_dict'])

# Move the model and its parameters back to the CPU
model = model.cpu()

In [None]:
# scaler = MinMaxScaler()
# X2 = scaler.fit_transform(tensor_2d)
# # Move data to GPU
# X2 = torch.Tensor(X2).to(device)
# X2.shape

# Move data to GPU
device = torch.device('cpu')
X2 = torch.Tensor(X2)

# Do some processing on the GPU

# Move the tensor back to the CPU
X2 = X2.cpu()

# Convert the tensor back to a numpy array
X2 = X2.numpy()



with torch.no_grad():
    output = model(X2)

In [None]:
# argmax_tensor = torch.argmax(output_tensor, dim=1)
# print('argmax_tensor shape = ', argmax_tensor.shape)

# tensor_cpu = argmax_tensor.cpu()

# argmax_array = tensor_cpu.numpy()

# array_2d = argmax_array.reshape(data[:,:,1].shape)

# print('array_2d shape = ', array_2d.shape)
# print('array_2d', array_2d)



# Move the tensor from GPU to CPU
argmax_tensor_cpu = argmax_tensor.cpu()

# Convert the tensor to a numpy array
argmax_array = argmax_tensor_cpu.numpy()

# Reshape the array to the desired shape
array_2d = argmax_array.reshape(data[:,:,1].shape)

print('array_2d shape = ', array_2d.shape)
print('array_2d', array_2d)
