In [None]:
import torch
from torch.utils.data import DataLoader, Dataset

from core.src.constants import IMAGES_PATH, TRAIN_DATA_CSV, TEST_DATA_CSV
from PIL import Image
import timm
from torch import nn
import pandas as pd

model = timm.create_model("fastvit_t8.apple_in1k", pretrained=True, num_classes=0)
# model.head = nn.Linear(model.num_features, 1)  # change head to regression
model.head = nn.Sequential(
    nn.AdaptiveAvgPool2d(1),
    nn.Flatten(),
    nn.Linear(model.num_features, 1),
)  # change head to regression

# get model specific transforms (normalization, resize)
data_config = timm.data.resolve_model_data_config(model)
transforms = timm.data.create_transform(**data_config, is_training=False)

model

In [None]:
class CustomDataset(Dataset):
    def __init__(self, image_paths, prices, transform=None):
        self.image_paths = image_paths
        self.prices = prices
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img = Image.open(self.image_paths[idx])
        price = self.prices[idx]
        if self.transform:
            img = self.transform(img)
        return img, price


df_train = pd.read_csv(TRAIN_DATA_CSV, dtype={"unique_id": str})
df_test = pd.read_csv(TEST_DATA_CSV, dtype={"unique_id": str})
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

train_images = df_train["unique_id"].values
train_images = [IMAGES_PATH / path / "00.png" for path in train_images][:10000]

test_images = df_test["unique_id"].values
test_images = [IMAGES_PATH / path / "00.png" for path in test_images][:10000]

train_dataset = CustomDataset(
    image_paths=train_images,
    prices=df_train["price"].values,
    transform=transforms,
)

test_dataset = CustomDataset(
    image_paths=test_images,
    prices=df_test["price"].values,
    transform=transforms,
)

In [None]:
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
model.to(device)

criterion = nn.L1Loss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

model.train()
num_epochs = 10  # or more depending on convergence

for epoch in range(num_epochs):
    for images, prices in train_loader:
        images = images.to(device)
        prices = prices.to(device)

        # Forward pass
        predictions = model(images).squeeze()  # Squeeze is used to remove extra dim if any at output
        loss = criterion(predictions, prices.float())

        # Backward pass and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}")

In [None]:
model.eval()  # Set the model to evaluation mode
with torch.no_grad():
    total_loss = 0
    for images, prices in test_loader:
        images = images.to(device)
        prices = prices.to(device)

        predictions = model(images).squeeze()
        loss = criterion(predictions, prices.float())
        total_loss += loss.item()
    print(f"Test Loss: {total_loss / len(test_loader):.4f}")