In [None]:
import os
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import rasterio

def load_vegetation_indices(file_path):
    with rasterio.open(file_path) as src:
        bands = src.read()  
    vegetation_indices = torch.tensor(bands, dtype=torch.float32)  
    return vegetation_indices

def load_vegetation_indices_from_year_folder(year_folder_path):
    indices = []
    for month_folder in sorted(os.listdir(year_folder_path)):
        month_path = os.path.join(year_folder_path, month_folder)
        if not os.path.isdir(month_path):
            continue  

        for file_name in sorted(os.listdir(month_path)):
            file_path = os.path.join(month_path, file_name)
            if file_path.endswith(".png"):
                with Image.open(file_path) as img:
                    img_array = np.array(img.convert("L"))  
                    indices.append(torch.tensor(img_array, dtype=torch.float32))
    
    if not indices:
        raise ValueError(f"No valid vegetation index files found in {year_folder_path}")
    
    vegetation_indices = torch.stack(indices, dim=0)
    return vegetation_indices

def load_ground_truth_png(file_path):
    with Image.open(file_path) as img:
        ground_truth = np.array(img.convert("L"))  
    return torch.tensor(ground_truth, dtype=torch.long).view(-1) 

def match_tensor_shapes(tensor1, tensor2):
  
    size1 = tensor1.size(1)  
    size2 = tensor2.size(1)  

    if size1 < size2:
        tensor2 = tensor2[:, :size1]
    elif size2 < size1:
        tensor1 = tensor1[:, :size2]

    return tensor1, tensor2

years = ["Year_2023", "Year_2022", "Year_2021", "Year_2020", "Year_2019"]
vegetation_indices_folders = {
    "Year_2023": "C:/Users/ADMIN/Downloads/year 2023_indices/year 2023-2024",  
    "Year_2022": "C:/Users/ADMIN/Downloads/year_2022_Indices/Year_2022",
    "Year_2021": "C:/Users/ADMIN/Downloads/year_2021_Indices/Year_2021",
    "Year_2020": "C:/Users/ADMIN/Downloads/year_2020_Indices/Year_2020",
    "Year_2019": "C:/Users/ADMIN/Downloads/year_2019_Indices/Year_2019"
}
ground_truth_base_folder = "C:/Users/ADMIN/Downloads/GT" 

all_vegetation_data = []

for year in years:
    folder_path = vegetation_indices_folders.get(year, None)
    if folder_path is None:
        print(f"No folder specified for {year}. Skipping.")
        continue

    if not os.path.exists(folder_path):
        print(f"Error: The folder {folder_path} does not exist.")
        continue

    try:
        vegetation_data = load_vegetation_indices_from_year_folder(folder_path)
        num_indices, height, width = vegetation_data.shape
        vegetation_data = vegetation_data.view(num_indices, -1).transpose(0, 1)  
        all_vegetation_data.append(vegetation_data)
        print(f"{year} Vegetation data shape: {vegetation_data.shape}")
    except Exception as e:
        print(f"Error loading {year} vegetation indices: {e}")

if len(all_vegetation_data) > 0:
    min_shape = min(data.shape[1] for data in all_vegetation_data)
    all_vegetation_data = [data[:, :min_shape] for data in all_vegetation_data]  
    vegetation_data = torch.cat(all_vegetation_data, dim=0)
    print("Combined vegetation data shape:", vegetation_data.shape)
else:
    raise ValueError("Failed to load any vegetation indices.")

all_labels = []
all_ground_truth_data = []

for year in years:
    year_path = os.path.join(ground_truth_base_folder, year + "_GT")
    
    if not os.path.exists(year_path):
        print(f"Year folder {year}_GT does not exist. Skipping.")
        continue

    for month_folder in os.listdir(year_path):
        month_path = os.path.join(year_path, month_folder)
        
        if os.path.isdir(month_path):
            ground_truth_files = [f for f in os.listdir(month_path) if f.endswith('.png')]
            print(f"Processing {len(ground_truth_files)} ground truth files in {year}/{month_folder}")
            
            for ground_truth_file in ground_truth_files:
                ground_truth_path = os.path.join(month_path, ground_truth_file)

                if "Non_Vegetation" in ground_truth_file:
                    label = 0  
                elif "Vegetation" in ground_truth_file:
                    label = 1  
                else:
                    print(f"Skipping file {ground_truth_file} (unknown label)")
                    continue

                try:
                    ground_truth_labels = load_ground_truth_png(ground_truth_path)
                    all_ground_truth_data.append(ground_truth_labels)
                    all_labels.append(torch.full_like(ground_truth_labels, label))  
                except Exception as e:
                    print(f"Error loading ground truth file {ground_truth_file} in {year}/{month_folder}: {e}")

if len(all_labels) > 0 and len(all_ground_truth_data) > 0:
    all_ground_truth_data = torch.cat(all_ground_truth_data, dim=0)
    all_labels = torch.cat(all_labels, dim=0)
    print("Ground truth data and labels successfully loaded and concatenated.")
    print("Ground truth data shape:", all_ground_truth_data.shape)
    print("Ground truth labels shape:", all_labels.shape)
else:
    raise ValueError("Error: No ground truth data loaded. Please check file paths and ensure files are present.")

num_vegetation_samples = vegetation_data.size(0)
num_label_samples = all_labels.size(0)

if num_vegetation_samples > num_label_samples:
    vegetation_data = vegetation_data[:num_label_samples]
    print("Truncated vegetation data to match labels.")
elif num_label_samples > num_vegetation_samples:
    all_labels = all_labels[:num_vegetation_samples]
    all_ground_truth_data = all_ground_truth_data[:num_vegetation_samples]
    print("Truncated labels to match vegetation data.")

print("Adjusted vegetation data shape:", vegetation_data.shape)
print("Adjusted ground truth labels shape:", all_labels.shape)


class VegetationDataset(Dataset):
    def __init__(self, vegetation_data, labels):
        self.vegetation_data = vegetation_data
        self.labels = labels
        assert len(self.vegetation_data) == len(self.labels), "Data dimensions do not match"

    def __len__(self):
        return len(self.vegetation_data)

    def __getitem__(self, idx):
        return self.labels[idx], self.vegetation_data[idx]

dataset = VegetationDataset(vegetation_data, all_labels)
train_loader = DataLoader(dataset, batch_size=64, shuffle=True) 


class MultiHeadSelfAttention(nn.Module):
    def __init__(self, embed_size, heads):
        super(MultiHeadSelfAttention, self).__init__()
        self.embed_size = embed_size
        self.heads = heads
        self.head_dim = embed_size // heads

        assert (
            self.head_dim * heads == embed_size
        ), "Embedding size needs to be divisible by heads"

        self.values = nn.Linear(self.head_dim, self.head_dim, bias=False)
        self.keys = nn.Linear(self.head_dim, self.head_dim, bias=False)
        self.queries = nn.Linear(self.head_dim, self.head_dim, bias=False)
        self.fc_out = nn.Linear(heads * self.head_dim, embed_size)

    def forward(self, values, keys, query, mask=None):
        N = query.shape[0]
        value_len, key_len, query_len = values.shape[1], keys.shape[1], query.shape[1]

        values = values.reshape(N, value_len, self.heads, self.head_dim)
        keys = keys.reshape(N, key_len, self.heads, self.head_dim)
        queries = query.reshape(N, query_len, self.heads, self.head_dim)

        values = self.values(values)
        keys = self.keys(keys)
        queries = self.queries(queries)

        energy = torch.einsum("nqhd,nkhd->nhqk", [queries, keys])

        if mask is not None:
            energy = energy.masked_fill(mask == 0, float("-1e20"))

        attention = torch.softmax(energy / (self.embed_size ** (1 / 2)), dim=3)

        out = torch.einsum("nhql,nlhd->nqhd", [attention, values]).reshape(
            N, query_len, self.heads * self.head_dim
        )
        out = self.fc_out(out)
        return out

class LSTMAttentionModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, embed_size, heads):
        super(LSTMAttentionModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
        self.attention = MultiHeadSelfAttention(embed_size, heads)
        self.fc1 = nn.Linear(hidden_size * 2, 64)  
        
        self.fc2 = nn.Linear(64, output_size)

    def forward(self, x):
        
        x = x.unsqueeze(1) 
        lstm_out, _ = self.lstm(x)  
        lstm_out = lstm_out.squeeze(1) 

        attention_out = self.attention(lstm_out.unsqueeze(1), lstm_out.unsqueeze(1), lstm_out.unsqueeze(1))
        attention_out = attention_out.squeeze(1)  

        x = F.relu(self.fc1(lstm_out)) 
        x = self.fc2(x)  
        return x

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

input_size = 45  
hidden_size = 128
num_layers = 2
output_size = 2  
embed_size = 256
heads = 8

model = LSTMAttentionModel(
    input_size=input_size,
    hidden_size=hidden_size,
    num_layers=num_layers,
    output_size=output_size,
    embed_size=embed_size,
    heads=heads
).to(device)

loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    correct = 0
    total = 0
    for batch_idx, (labels, vegetation_data) in enumerate(train_loader):
        labels, vegetation_data = labels.to(device), vegetation_data.to(device)

        outputs = model(vegetation_data)
        loss = loss_fn(outputs, labels.long())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    avg_loss = epoch_loss / len(train_loader)
    accuracy = 100 * correct / total
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%')




In [None]:
import os
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision.transforms import functional as TF
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt

years = ["Year_2023", "Year_2022", "Year_2021", "Year_2020", "Year_2019"]
vegetation_indices_folders = {
    "Year_2023": "C:/Users/ADMIN/Downloads/year 2023_indices/year 2023-2024", 
    "Year_2022": "C:/Users/ADMIN/Downloads/year_2022_Indices/Year_2022",
    "Year_2021": "C:/Users/ADMIN/Downloads/year_2021_Indices/Year_2021",
    "Year_2020": "C:/Users/ADMIN/Downloads/year_2020_Indices/Year_2020",
    "Year_2019": "C:/Users/ADMIN/Downloads/year_2019_Indices/Year_2019"
}

class VegetationDataset(Dataset):
    def __init__(self, folder_paths, resize=(128, 128)): 
        self.folder_paths = folder_paths  
        self.files = []
        self.resize = resize
        for folder in folder_paths:
            for root, _, files in os.walk(folder):
                for file in files:
                    if file.endswith(".png"):
                        self.files.append(os.path.join(root, file))

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        file_path = self.files[idx]
        with Image.open(file_path) as img:
            img_resized = TF.resize(img, self.resize)  
            img_array = np.array(img_resized, dtype=np.float32) 
        return torch.tensor(img_array).permute(2, 0, 1)  

folder_paths = [vegetation_indices_folders[year] for year in years]

dataset = VegetationDataset(folder_paths)
train_loader = DataLoader(dataset, batch_size=4, shuffle=True)  

class MultiHeadSelfAttention(nn.Module):
    def __init__(self, embed_size, heads):
        super(MultiHeadSelfAttention, self).__init__()
        self.embed_size = embed_size
        self.heads = heads
        self.head_dim = embed_size // heads

        assert self.head_dim * heads == embed_size, "Embedding size must be divisible by heads"

        self.values = nn.Linear(self.head_dim, self.head_dim, bias=False)
        self.keys = nn.Linear(self.head_dim, self.head_dim, bias=False)
        self.queries = nn.Linear(self.head_dim, self.head_dim, bias=False)
        self.fc_out = nn.Linear(embed_size, embed_size)

    def forward(self, values, keys, query, mask=None):
        N = query.shape[0]  
        value_len, key_len, query_len = values.shape[1], keys.shape[1], query.shape[1]

        values = values.reshape(N, value_len, self.heads, self.head_dim)
        keys = keys.reshape(N, key_len, self.heads, self.head_dim)
        queries = query.reshape(N, query_len, self.heads, self.head_dim)

        values = self.values(values)
        keys = self.keys(keys)
        queries = self.queries(queries)

        energy = torch.einsum("nqhd,nkhd->nhqk", [queries, keys]) 

        if mask is not None:
            energy = energy.masked_fill(mask == 0, float("-1e20"))

        attention = torch.softmax(energy / (self.embed_size ** (1 / 2)), dim=3)

        out = torch.einsum("nhql,nlhd->nqhd", [attention, values]).reshape(
            N, query_len, self.heads * self.head_dim
        )

        out = self.fc_out(out)
        return out

class RCNNWithAttention(nn.Module):
    def __init__(self, input_channels, conv_filters, lstm_hidden_size, lstm_layers, embed_size, heads, latent_dim):
        super(RCNNWithAttention, self).__init__()

        self.conv1 = nn.Conv2d(input_channels, conv_filters, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(conv_filters, conv_filters * 2, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.lstm = nn.LSTM(conv_filters * 2, lstm_hidden_size, lstm_layers, batch_first=True, bidirectional=True)

        self.attention = MultiHeadSelfAttention(embed_size, heads)

        self.fc_latent = nn.Linear(embed_size, latent_dim)  

    def forward(self, x):
        batch_size, num_channels, height, width = x.size()

        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = self.pool(x)  
        x = x.flatten(start_dim=2)  
        x = x.permute(0, 2, 1)  

        lstm_out, _ = self.lstm(x)  

        attention_out = self.attention(lstm_out, lstm_out, lstm_out)  

        latent_rep = self.fc_latent(attention_out.mean(dim=1))  
        return latent_rep

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
input_channels = 4  
conv_filters = 64
lstm_hidden_size = 128
lstm_layers = 2
embed_size = 256
heads = 8
latent_dim = 64

model = RCNNWithAttention(input_channels, conv_filters, lstm_hidden_size, lstm_layers, embed_size, heads, latent_dim).to(device)

optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 10
model.train()
for epoch in range(num_epochs):
    epoch_loss = 0
    for batch in train_loader:
        batch = batch.to(device)  
        latent_rep = model(batch) 
        loss = latent_rep.norm(2).mean()  
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}")

model.eval()
latent_reps = []
with torch.no_grad():
    for batch in train_loader:
        batch = batch.to(device)
        latent_rep = model(batch)
        latent_reps.append(latent_rep.cpu())
latent_reps = torch.cat(latent_reps, dim=0)

kmeans = KMeans(n_clusters=3, random_state=42)
clusters = kmeans.fit_predict(latent_reps.numpy())

pca = PCA(n_components=2)
reduced_latent_reps = pca.fit_transform(latent_reps.numpy())
plt.scatter(reduced_latent_reps[:, 0], reduced_latent_reps[:, 1], c=clusters, cmap='viridis')
plt.title('Clustering of Vegetation Indices')
plt.colorbar()
plt.show()

