<a href="https://colab.research.google.com/github/Kittipong-Dev/secondhand-pricing-model/blob/main/secondhand_pricing_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# download file
!wget https://github.com/NextGen-AI-Camp/curriculum-2025/releases/download/Workshop-Week2-Dataset/dataset.zip
!unzip dataset.zip

--2025-06-23 14:50:07--  https://github.com/NextGen-AI-Camp/curriculum-2025/releases/download/Workshop-Week2-Dataset/dataset.zip
Resolving github.com (github.com)... 140.82.116.3
Connecting to github.com (github.com)|140.82.116.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/974978898/68dc38c4-3c9a-4094-9b09-7d5a94633577?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=releaseassetproduction%2F20250623%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250623T145007Z&X-Amz-Expires=1800&X-Amz-Signature=03c2bd949b019878b4a3fef5244b790a1acfa762634f3c6cedb9e41c74fcb39f&X-Amz-SignedHeaders=host&response-content-disposition=attachment%3B%20filename%3Ddataset.zip&response-content-type=application%2Foctet-stream [following]
--2025-06-23 14:50:07--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/974978898/68dc38c4-3c9a-4094-9b09-7d5a94633577?X-Amz-Algorithm=AWS4

In [None]:
# mount to google drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# import all dependencies
import torch
import torchvision.transforms as transforms
from torch.utils.data import Dataset
from torch.utils.data import DataLoader,SubsetRandomSampler
from tqdm import tqdm
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms.functional as TF

import seaborn as sns

import os
import random

import pandas as pd

import numpy as np
import shutil
import cv2
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import classification_report, precision_score, recall_score, f1_score

from glob import glob
from sklearn.model_selection import train_test_split
from sklearn.model_selection import KFold
from PIL import Image

import librosa
import IPython.display as display


In [None]:
# try using cuda
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
class BasicBlock(nn.Module):
    expansion = 1
    def __init__(self, in_planes, planes, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, 3, stride, 1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, 3, 1, 1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion * planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion * planes, 1, stride, bias=False),
                nn.BatchNorm2d(self.expansion * planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out)) + self.shortcut(x)
        return F.relu(out)


In [None]:
class SecondhandAudioEvaluatingModel(nn.Module):
    def __init__(self):
        super().__init__()

        self.labels = ["smell", "pilling", "condition"]
        self.out_dims = [2, 5, 5]

        self.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1_block1 = BasicBlock(64, 64, stride=1)
        self.layer1_block2 = BasicBlock(64, 64, stride=1)
        self.layer2_block1 = BasicBlock(64, 128, stride=2)
        self.layer2_block2 = BasicBlock(128, 128, stride=1)
        self.layer3_block1 = BasicBlock(128, 256, stride=2)
        self.layer3_block2 = BasicBlock(256, 256, stride=1)
        self.layer4_block1 = BasicBlock(256, 512, stride=2)
        self.layer4_block2 = BasicBlock(512, 512, stride=1)

        # Explicitly name heads for each task
        self.head_smell = nn.Linear(512, 2)
        self.head_pilling = nn.Linear(512, 5)
        self.head_condition = nn.Linear(512, 5)

    def forward(self, x):
        x = self.maxpool(F.relu(self.bn1(self.conv1(x))))
        x = self.layer1_block1(x); x = self.layer1_block2(x)
        x = self.layer2_block1(x); x = self.layer2_block2(x)
        x = self.layer3_block1(x); x = self.layer3_block2(x)
        x = self.layer4_block1(x); x = self.layer4_block2(x)
        x = F.adaptive_avg_pool2d(x, (1, 1)).view(x.size(0), -1)  # [B, 512]

        # Return dictionary without for loop
        return {
            "smell": self.head_smell(x),
            "pilling": self.head_pilling(x),
            "condition": self.head_condition(x)
        }

In [None]:
class SecondhandImageEvaluatingModel(nn.Module):
  def __init__(self):
    super(SecondhandImageEvaluatingModel, self).__init__()
    self.labels = ["type", "color"]
    self.out_dims = [2, 11]  # type has 2 classes, color has 11

    self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
    self.bn1 = nn.BatchNorm2d(64)
    self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

    self.layer1_block1 = BasicBlock(64, 64, stride=1)
    self.layer1_block2 = BasicBlock(64, 64, stride=1)
    self.layer2_block1 = BasicBlock(64, 128, stride=2)
    self.layer2_block2 = BasicBlock(128, 128, stride=1)
    self.layer3_block1 = BasicBlock(128, 256, stride=2)
    self.layer3_block2 = BasicBlock(256, 256, stride=1)
    self.layer4_block1 = BasicBlock(256, 512, stride=2)
    self.layer4_block2 = BasicBlock(512, 512, stride=1)
    self.type_out = nn.Linear(512 * BasicBlock.expansion, type_num_classes)
    self.color_out = nn.Linear(512 * BasicBlock.expansion, color_num_classes)

  def forward(self, x):
    out = self.maxpool(F.relu(self.bn1(self.conv1(x))))

    out = self.layer1_block1(out) #64
    out = self.layer1_block2(out)

    out = self.layer2_block1(out) #128
    out = self.layer2_block2(out)

    out = self.layer3_block1(out) #256
    out = self.layer3_block2(out)

    out = self.layer4_block1(out) #512
    out = self.layer4_block2(out)

    out = nn.AdaptiveAvgPool2d((1, 1))(out) # [32, 512, 7, 7] --> [32, 512, 1, 1]
    out = out.view(out.size(0), -1) # [32, 512]

    type_out, color_out = self.type_out(out), self.color_out(out)
    return type_out, color_out

In [None]:
class SecondhandEvaluatingModel(nn.Module):
  def __init__(self, modelA=SecondhandAudioEvaluatingModel(), modelB=SecondhandImageEvaluatingModel(), task_keysA=["smell", "pilling", "condition"], task_keysB=["type", "color"]):
    super().__init__()
    self.modelA = modelA
    self.modelB = modelB
    self.task_keysA = task_keysA
    self.task_keysB = task_keysB
    self.out_dims = [modelA.out_dims[modelA.labels.index(k)] for k in task_keysA] + \
                    [modelB.out_dims[modelB.labels.index(k)] for k in task_keysB]

  def forward(self, xA, xB):
    outA = self.modelA(xA)
    outB = self.modelB(xB)
    out = {}
    for k in self.task_keysA:
        out[k] = outA[k]
    for k in self.task_keysB:
        out[k] = outB[k]
    return out

In [None]:
class NoLabelImageDataset(Dataset):
    def __init__(self, df, image_folder, transform=None):
        self.df = df
        self.image_folder = image_folder
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        fname = self.df.iloc[idx]['image_name']
        path = os.path.join(self.image_folder, fname)
        image = Image.open(path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, fname

In [None]:
transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.6554, 0.6678, 0.6761],
                         std=[0.1793, 0.1804, 0.1822])
])

In [None]:
df = pd.read_csv(csv_path)

In [None]:
dataset = NoLabelImageDataset(df, image_folder, transform=transform)
loader = DataLoader(dataset, batch_size=16, shuffle=False)

In [None]:
csv_path = ""
output_path = ""
pth_path = ""

In [None]:
model = SecondhandEvaluatingModel()
model.load_state_dict(torch.load(pth_path))
model.to(device)
model.eval()

df = pd.read_csv(csv_path)

idx_to = {
    "type": ['bottom', 'top'],
    "color": ['Black', 'Blue', 'Brown', 'Gray', 'Green', 'Orange', 'Pink', 'Purple', 'Red', 'White', 'Yellow'],
    "smell": ["FALSE",  "TRUE"],
    "pilling": [str(i) for i in range(1, 6)],
    "condition": [str(i) for i in range(1, 6)]
}
labels = ["type", "color", "smell", "pilling", "condition"]
updates = []

with torch.no_grad():
  for images, filenames in tqdm(loader):
    images = images.to(device)
    out = model(, images)

    batch_preds = {
      label: out[label].argmax(dim=1).cpu().numpy()
      for label in labels
    }

    for i, fname in enumerate(filenames):
      record = {"image_name": fname}
      for label in labels:
        class_idx = batch_preds[label][i]
        record[label] = idx_to[label][class_idx]
      updates.append(record)

updates_df = pd.DataFrame(updates).set_index("image_name")

df = df.set_index("image_name")
for col in labels:
    df[col] = updates_df[col]

df = df.reset_index()

df.to_csv(output_path, index=False)
print(f"Saved csv to {output_path}")