**SFT CNN-LSTM Model Code**

* Items to add
    * Add database links
    * Add Eval Stage
    * Add Individual Inferences
* Items to check/review
    * Ensure preprocessing is correct before importing images
    * Check input size from images


In [None]:
import torch
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.nn.functional as F
import cv2
import get_thermal_images as gt
import time
from datetime import datetime
from torch.utils.data import Dataset, DataLoader
import os
import torch.optim as optim

In [None]:
"""
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv=nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=3),
            nn.Conv2d(in_channels=32, out_channels=45),
            nn.MaxPool2d(kernel_size=2, stride=3),
        )
    def forward(self, x):
        x=self.conv1(x)
        x=self.relu(x)
        x=self.pool1(x)
        x=self.conv2(x)
        x=self.relu(x)
        x=self.pool2(x)
        return x
"""

In [None]:
class CNN_LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, dropout_rate, batch_first=True):
        super(CNN_LSTM, self).__init__()
        self.hidden_size=hidden_size
        self.num_layers=num_layers
        self.l1=nn.LSTM(input_size, hidden_size, num_layers, dropout=dropout_rate, batch_first=False)
        self.l2=nn.Linear(hidden_size, output_size)
        self.dropout=nn.Dropout(dropout_rate)
        self.batch_first=batch_first
        self.cnn=nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=3),
            nn.Conv2d(in_channels=32, out_channels=45),
            nn.MaxPool2d(kernel_size=2, stride=3),
        )
    def forward(self, x):

        batch_size, seq_len, c, h, w = x.shape
        x = x.view(batch_size * seq_len, c, h, w)
        x=self.cnn(x)

        if x.dim()==2:
            x=x.unsqueeze(1)
        batch_size=x.size(1)
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)
        out, _=self.l1(x, (h0, c0))
        out = self.l2(out[:, -1, :])
        out=torch.relu(out)
        return out

In [None]:
def get_thermal_data():
    ret=False
    try:
        thermal_data=gt.read_thermal_data()
        ret=True
    except:
        ret=False
        thermal_data=None
    return ret, thermal_data

In [None]:
def save_images():
    while True:
        ret, frame = get_thermal_data()
        if ret==True:
            filename = f"thermal_images/thermal_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg"
            cv2.imwrite(filename, frame)
            print(f"Saved: {filename}")

        time.sleep(5)  # Capture every 5 seconds


In [None]:
def preprocess_img(path):
    img=cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    img=img/255.0
    return img

In [None]:
"""
class ThermalImageDataset(Dataset):
   def __init__(self, dataset_path, transform=None):
       self.dataset_path = dataset_path
       self.transform = transform
       self.images = []

       for filename in os.listdir(dataset_path):
           if filename.endswith('.jpg') or filename.endswith('.png'):
               img_path = os.path.join(dataset_path, filename)
               img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
               img = cv2.resize(img, IMG_SIZE)
               img = img / 255.0
               self.images.append(img)

       self.images = np.array(self.images).astype(np.float32)

   def __len__(self):
       return len(self.images)

   def __getitem__(self, idx):
       image = self.images[idx]
       return torch.tensor(image).unsqueeze(0)
"""

In [None]:
class ThermalDataset(Dataset):
    def __init__(self, image_folder, sequence_length=5):
        self.image_paths = sorted(os.listdir(image_folder))
        self.sequence_length = sequence_length
        self.data = []

        for i in range(len(self.image_paths) - sequence_length):
            seq = [preprocess_img(os.path.join(image_folder, self.image_paths[j])) for j in range(i, i+sequence_length)]
            self.data.append(seq)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        seq = np.array(self.data[idx])  # Convert to numpy array
        X = seq[:-1]  # Input sequence
        y = seq[-1]  # Target image
        return torch.tensor(X, dtype=torch.float32).unsqueeze(1), torch.tensor(y, dtype=torch.float32).unsqueeze(0)

In [None]:
# Load dataset
dataset = ThermalDataset("thermal_images/")
dataloader = DataLoader(dataset, batch_size=2, shuffle=True)

# Initialize model, loss function, optimizer
model = CNN_LSTM()#Set model init parameters
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Training loop

#From GPT
epochs = 10
for epoch in range(epochs):
    for X, y in dataloader:
        optimizer.zero_grad()
        y_pred = model(X)
        loss = criterion(y_pred, y)
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.5f}")