In [1]:
import torch
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import numpy as np
import random
from representations.base import BaseImageRepresentation
from model.base import BaseModel
from utils.loss import ContrastiveLoss

from data.evaluate import EvaluateData
from data.training import TrainingData

from execution.evaluate import Evaluate
from execution.trainining import Training

In [2]:
seed = 42
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
learning_rate = 0.0001
epochs = 1
batch_size = 64
num_workers = 2
num_runs = 100
feat_dim = 128
k_shot = 1
model_name = "Resnet18"
dataset_name = "ufop"
image_representation = "Skeleton-DML"

torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)

  return torch._C._cuda_getDeviceCount() > 0


In [3]:
image_method = BaseImageRepresentation.get_type(image_representation)

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.Grayscale(num_output_channels=3),  # Converte para 3 canais
    transforms.ToTensor(),
])

In [4]:
import logging
import os
import numpy as np
import torch
from torch.utils.data import Dataset
import pandas as pd
from PIL import Image
import re
import matplotlib.pyplot as plt
from collections import Counter

from utils.logs import Logs


class Data(Dataset):
	def __init__(self, dataset_name='ufop', image_method=None, transform=None):
		self.dataset_name = dataset_name
		self.dataset = self.get_dataset()
		self.signs = self.get_signs(self.dataset)
		self.dataframe = self.prepare_data(self.dataset)
		self.categories = list(self.dataframe["category"].unique())
		self.persons = list(self.dataframe["person"].unique())
		self.image_method = image_method
		self.transform = transform
		self.image_size = self.transform.transforms[0].size


	def __getitem__(self, index):
		dataframe = self.dataframe.iloc[index]
		x, y, category, person = dataframe["x"], dataframe["y"], dataframe["category"], dataframe["person"]

		image = self.image_method().transform(x, y)
		image = Image.fromarray(np.uint8(image * 255)).convert('RGB')

		if self.transform:
			image = self.transform(image)

		return image, torch.tensor(self.categories.index(category), dtype=torch.int64), torch.tensor(self.persons.index(person), dtype=torch.int64)


	def __len__(self):
		return len(self.dataframe)


	def get_dataset(self):
		try:
			dataset_file = f"libras_{self.dataset_name}_openpose.csv"
			dataset_path = os.path.join(f"datasets/{self.dataset_name}", dataset_file)
			return pd.read_csv(dataset_path, low_memory=True)
		except FileNotFoundError as e:
			Logs(logging.ERROR, f"Dataset {self.dataset_name} not found. Error: {e}")
			return None



	def get_features(self):
		# total_image_size = self.image_size[0] * self.image_size[1] * 3  # Multiplicando por 3 para RGB

		_X, _y, _p = [], [], []
		# np.empty((0, total_image_size)), np.empty((0, )), np.empty((0, ))

		for index in range(self.dataframe.shape[0]):
			_image, _label, _person = self[index]

			_image = _image.cpu().numpy()
			_label = _label.cpu().numpy()
			_person = _person.cpu().numpy()

			_X.append(_image)
			_y.append(_label)
			_p.append(_person)

			# X = np.append(X, [image], axis=0)
			# y = np.append(y, [label], axis=0)
			# p = np.append(p, [person], axis=0)


		X = np.stack(_X)
		y = np.stack(_y)
		p = np.stack(_p)
		print(X.shape, y.shape, p.shape)

		return X, y, p


	def get_signs(self, df):
		signs = list(df.columns)
		signs = [s for s in signs if s.endswith("_x") or s.endswith("_y") or s.endswith("_z")]
		excluded_body_landmarks = [10, 11, 13, 14, 19, 20, 21, 22, 23, 24]
		excluded_body_landmarks = tuple([f"pose_{i}" for i in excluded_body_landmarks])
		unwanted_pose_columns = [i for i in list(signs) if i.startswith(excluded_body_landmarks)]
		signs = [s for s in signs if s not in unwanted_pose_columns]
		return signs


	def prepare_data(self, df):
		if (self.dataset_name == "minds"):
			if "person" not in df.columns:
				df["person"] = df["video_name"].apply(lambda i: int(re.findall(r".*Sinalizador(\d+)-.+.mp4", i)[0]))

		columns = ["category", "video_name", "person", "frame"] + self.signs
		df = df[columns]
		videos = df["video_name"].unique()
		data = []

		for video in videos:
			df_video = df[df["video_name"] == video].sort_values("frame")
			category = df_video.iloc[0]["category"]
			person = df_video.iloc[0]["person"]

			df_video = df_video.drop(["category", "video_name", "frame"], axis=1)
			x = self.get_axis_df(df_video, "x")
			y = self.get_axis_df(df_video, "y")

			x = x.T.to_numpy()
			y = y.T.to_numpy()

			x = self.normalize_axis(x)
			y = self.normalize_axis(y)

			data.append({
				"x": x,
				"y": y,
				"video_name": video,
				"category": category,
				"person": person
			})

		return pd.DataFrame.from_dict(data)


	def get_axis_df(self, df, axis):
		return df[[c for c in self.signs if c.endswith(axis)]]


	def normalize_axis(self, axis):
		axis[axis < 0] = 0
		axis[axis > 1] = 1
		return axis



In [5]:
data = Data(dataset_name=dataset_name, image_method=image_method, transform=transform)

X, y, p = data.get_features()

(3040, 3, 224, 224) (3040,) (3040,)


In [6]:
from torch import nn
import torchvision.models as models

class Resnet18Model(nn.Module):

    def __init__(self, num_classes: int):
        super(Resnet18Model, self).__init__()

        self.model = models.resnet18(pretrained=True)

        # self.model.flatten = nn.Flatten()

        num_ftrs = self.model.fc.in_features

        self.model.fc = nn.Sequential(
            nn.BatchNorm1d(num_ftrs),
            nn.Linear(num_ftrs, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, num_classes)
        )


    def forward_once(self, x):
        output = self.model(x)

        # output = output.view(output.size()[0], -1) # batches x feat_dim
        # output = self.model.flatten(output)
        # output = self.model.fc(output)

        # print(x.shape, output.shape)
        # print(self.model.fc)


        return output


    def forward(self, input1, input2):
        output1 = self.forward_once(input1)
        output2 = self.forward_once(input2)
        return output1, output2


In [7]:
num_features = len(np.unique(y))
print(num_features)

model = Resnet18Model(num_classes=num_features)
model.to(device)

criterion = ContrastiveLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

56




In [8]:
import numpy as np
from PIL import Image
from torchvision import transforms

# Supondo que X e y sejam arrays NumPy contendo as imagens e os rótulos
# Verifique a forma dos dados antes da divisão
print(f"Original X shape: {X.shape}, y shape: {y.shape}")

# Certifique-se de que os dados estão no formato correto
# X = (X * 255).astype(np.uint8)  # Converta para uint8 se necessário

# Definir as transformações
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.Grayscale(num_output_channels=3),  # Converte para 3 canais
])

# Dividir os dados em conjuntos de treinamento, validação e teste
from sklearn.model_selection import train_test_split
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.4, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# Verifique a forma dos dados após a divisão
print(f"X_train shape: {X_train.shape}, y_train shape: {y_train.shape}")
print(f"X_val shape: {X_val.shape}, y_val shape: {y_val.shape}")
print(f"X_test shape: {X_test.shape}, y_test shape: {y_test.shape}")

Original X shape: (3040, 3, 224, 224), y shape: (3040,)
X_train shape: (1824, 3, 224, 224), y_train shape: (1824,)
X_val shape: (608, 3, 224, 224), y_val shape: (608,)
X_test shape: (608, 3, 224, 224), y_test shape: (608,)


In [9]:
train_dataset = TrainingData(X=X_train, y=y_train, transform=None)
train_dataloader = DataLoader(train_dataset, shuffle=True, num_workers=num_workers, batch_size=batch_size)
eval_dataset = EvaluateData(X=X_val, y=y_val, transform=None)
eval_dataloader = DataLoader(eval_dataset, shuffle=False, num_workers=num_workers, batch_size=batch_size)
test_dataset = EvaluateData(X=X_test, y=y_test, transform=None)
test_dataloader = DataLoader(test_dataset, shuffle=False, num_workers=num_workers, batch_size=batch_size)
train_evaluator = Evaluate(labels=y_val, k_shot=k_shot, num_runs=num_runs, device=device)

# for images, labels, _ in train_dataloader:
#     print(images.shape)  # Deve ser [batch_size, 3, 224, 224]
#     break

best_epoch, best_accuracy, best_loss_history, loss_history, accuracy_history = Training.exec(model=model,train_dataloader=train_dataloader, val_dataloader=eval_dataloader,num_epochs=epochs,criterion=criterion,optimizer=optimizer,device=device,evaluator=train_evaluator,k_shot=k_shot)
print(f"Best Epoch {best_epoch} Best Accuracy {best_accuracy:3f}")
Training.chart(epochs, loss_history, accuracy_history)

Epoch 0 Loss 5.3533 (Best Epoch -1 Best Accuracy -1.000000)


KeyboardInterrupt: 