In [None]:
import os
import pickle
import numpy as np
import pandas as pd
import geopandas as gpd

import matplotlib.pyplot as plt

import config

In [None]:
import torch
from torch.utils.data import Dataset

# Geração do dataset

In [None]:
# load full frame
frames = gpd.read_file(config.TR_FRAMES)

#load frames idx detail
frames_idx = pd.read_csv(config.TR_FRAMES_IDX, index_col=0)

In [None]:
# create file coordinates limits description
def extract_coord_from_file(f):
    filename = f.split(".")[0]
    indexes = filename.split("_")
    return int(indexes[1]), int(indexes[2])
coord_files = {f: extract_coord_from_file(f) for f in os.listdir(config.TR_DEFORESTATION)}

In [None]:
xmin = 100
xmax = 130
ymin = 100
ymax = 130

# get indexes of frames inside limits
in_limit_idx = frames_idx[
            (frames_idx["x"] >= xmin) &
            (frames_idx["x"] < xmax) &
            (frames_idx["y"] >= ymin) &
            (frames_idx["y"] < ymax)
        ]
# get frames inside limites
in_limit_frames = frames[
    frames["frame_id"].isin(
        in_limit_idx.index
    )
]
# read from files that represent regions inside limits
in_limit_files = [
    f for (f, (x, y))
    in coord_files.items()
    if 
        (x <= xmax) & \
        (x > xmin) &\
        (y <= ymax) &\
        (y > ymin)
]
print(in_limit_files)
full_history = []
for filename in in_limit_files:
    with open(os.path.join(config.TR_DEFORESTATION, filename), "rb") as file:
        full_history.append(pickle.load(file))

# create limits history grid
grid = np.zeros((xmax-xmin, ymax-ymin))
grid_history = np.array([grid.copy() for _ in full_history[0]])
for t, regions in enumerate(zip(*full_history)):
    for region in regions:
        in_limit_region = region[
            region["frame_id"].isin(
                in_limit_frames["frame_id"].values
            )
        ]

        grid_history[t, :, :] += (
            pd.Series(0, index=in_limit_idx.index) +\
            in_limit_region.set_index("frame_id")["area"]
        ).fillna(0).values.reshape(grid.shape)

In [None]:
fig, ax = plt.subplots()
frames.boundary.plot(ax=ax, linewidth=0.1)
in_limit_frames.plot(ax=ax, color="red")
plt.show()

In [None]:
grid_history.shape

In [None]:
# reshape to add channel
deforestation_history = grid_history.reshape((1, 74, 30, 30))
deforestation_history.shape

In [None]:
class CustomDataset(Dataset):
    def __init__(self, X):
        super(CustomDataset, self).__init__()
        
        self.X = X[:, :-1, :, :]
        self.Y = X[:,  1:, :, :]

    def __len__(self):
        return self.X.shape[1]

    def __getitem__(self, index):
        data = torch.tensor(self.X[:, index, :, :]).float()
        labels = torch.tensor(self.Y[:, index, :, :]).float()
        return data, labels

In [None]:
train_data = deforestation_history[:, :-24, :, :]
test_data = deforestation_history[:, -24:, :, :]

In [None]:
train_data.shape, test_data.shape

In [None]:
trainloader = torch.utils.data.DataLoader(
    CustomDataset(train_data),
    batch_size=1,
    shuffle=False
)

testloader = torch.utils.data.DataLoader(
    CustomDataset(test_data),
    batch_size=1,
    shuffle=False
)

# Modelo baseline

In [None]:
# baseline
base_train_err = 0
for inputs, labels in trainloader:
    y_pred = inputs
    base_train_err += torch.square(y_pred - labels).float().sum()
base_train_err = base_train_err**0.5 / len(trainloader)
    
base_test_err = 0
for inputs, labels in testloader:
    y_pred = inputs
    base_test_err += torch.square(y_pred - labels).float().sum()
base_test_err = base_test_err**0.5 / len(testloader)

print(f"Baseline: Train Error = {base_train_err:.6f} | Test Error = {base_test_err:.6f}")

O erro de teste no modelo baseline é maior

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision

 
class BasicConvModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.epoch = 0
        self.errs = []
        self.conv1 = nn.Conv2d(1, 64, kernel_size=(5,5), stride=1, padding=2)
        self.act1 = nn.ReLU()
 
        self.conv2 = nn.Conv2d(64, 64, kernel_size=(5,5), stride=1, padding=2)
        self.act2 = nn.ReLU()
 
        self.conv3 = nn.Conv2d(64, 1, kernel_size=(5,5), stride=1, padding=2)
 
    def forward(self, x):
        y = self.act1(self.conv1(x))
        y = self.act2(self.conv2(y))
        y = self.conv3(y)
        return x+y

In [None]:
model = BasicConvModel()
optimizer = optim.SGD(model.parameters(), lr=1e-1, momentum=0.9)

errs = []

def train(n_epochs):
    for epoch in range(n_epochs):
        model.epoch += 1
        for inputs, labels in trainloader:
            y_pred = model(inputs)
            loss = torch.square(y_pred - labels).mean()
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        train_err = 0
        for inputs, labels in trainloader:
            y_pred = model(inputs)
            train_err += torch.square(y_pred - labels).float().sum()
        train_err = train_err**0.5 / len(trainloader)
            
        test_err = 0
        for inputs, labels in testloader:
            y_pred = model(inputs)
            test_err += torch.square(y_pred - labels).float().sum()
        test_err = test_err**0.5 / len(testloader)
        model.errs.append([train_err, test_err])

        print(f"Epoch {model.epoch}: Train Error = {train_err:.6f} | Test Error = {test_err:.6f}")

train(10)

In [None]:
train(30)

Ganhamos do baseline?

In [None]:
(
    float(model.errs[-1][0] - base_train_err), 
    float(model.errs[-1][1] - base_test_err)
)

In [None]:
import matplotlib.pyplot as plt

plt.plot([float(e[0]) for e in model.errs])
plt.plot([float(e[1]) for e in model.errs])
plt.show()