In [None]:
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import mean_squared_error, accuracy_score, r2_score, f1_score
from sklearn.model_selection import train_test_split
from transformers import ViTFeatureExtractor, ViTModel
from tqdm import tqdm
from PIL import Image
from io import BytesIO
import requests

# Function to fetch image
def fetch_image(image_url):
    try:
        response = requests.get(image_url, timeout=5)
        image = Image.open(BytesIO(response.content)).convert('RGB')
        image = image.resize((224, 224))  # Resize image to 224x224
        return image
    except Exception as e:
        print(f"Error fetching {image_url}: {e}")
        return None

# Custom dataset
class CustomDataset(Dataset):
    def __init__(self, df, feature_extractor, transform=None):
        self.df = df
        self.feature_extractor = feature_extractor
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        image_url = self.df.iloc[idx]['image_link']
        image = fetch_image(image_url)
        if image is None:
            image = Image.new('RGB', (224, 224))  # Create a blank image if fetch fails

        if self.transform:
            image = self.transform(image)
        pixel_values = self.feature_extractor(images=image, return_tensors="pt").pixel_values
        entity_value = self.df.iloc[idx]['entity_values']
        entity_unit = self.df.iloc[idx]['entity_value_units']
        return pixel_values.squeeze(0), entity_value, entity_unit


# ViT model for feature extraction
class ViTFeatureExtractorModel(nn.Module):
    def __init__(self):
        super(ViTFeatureExtractorModel, self).__init__()
        self.vit = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k')
        self.hidden_size = self.vit.config.hidden_size

    def forward(self, pixel_values):
        outputs = self.vit(pixel_values=pixel_values)
        return outputs.last_hidden_state[:, 0, :]  # CLS token for each image

# Function to extract features from ViT model
def extract_features(model, dataloader):
    model.eval()
    features = []
    labels_values = []
    labels_units = []

    with torch.no_grad():
        for pixel_values, values, units in tqdm(dataloader, desc="Extracting Features"):
            pixel_values = pixel_values.to(device)
            feature_rep = model(pixel_values)
            features.append(feature_rep.cpu().numpy())
            labels_values.append(values.numpy())
            labels_units.append(np.array(units))

    return np.concatenate(features), np.concatenate(labels_values), np.concatenate(labels_units)

# Loading the dataset and preprocessing
df = pd.read_csv('/content/processed_train_unit.csv')

# df['log_entity_values'] = np.log1p(df['entity_value_nos'])
scaler = StandardScaler()
df['entity_values'] = scaler.fit_transform(df[['entity_value_nos']])

label_encoder = LabelEncoder()
df['units'] = label_encoder.fit_transform(df['entity_value_units'])

entity_name = 'depth'
df = df[df['entity_name'] == entity_name]

# Remove train-validation split and use the full dataset
feature_extractor = ViTFeatureExtractor.from_pretrained('google/vit-base-patch16-224-in21k')
full_dataset = CustomDataset(df, feature_extractor)

full_loader = DataLoader(full_dataset, batch_size=32, shuffle=True, num_workers=4, pin_memory=True)

# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vit_model = ViTFeatureExtractorModel().to(device)

# Extract features from the ViT model using the full dataset
full_features, full_values, full_units = extract_features(vit_model, full_loader)

# Encoding units for classification
full_units = label_encoder.fit_transform(full_units)


Extracting Features:   6%|▌         | 84/1355 [01:51<28:05,  1.33s/it]


KeyboardInterrupt: 

In [None]:
# Define the CNN model for classification
class CNNClassifier(nn.Module):
    def __init__(self, input_size, num_classes):
        super(CNNClassifier, self).__init__()
        self.fc1 = nn.Linear(input_size, 512)
        self.fc2 = nn.Linear(512, 256)
        # self.fc3 = nn.Linear(256,256)
        self.fc3 = nn.Linear(256, num_classes)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        # x = self.relu(self.fc3(x))

        x = self.fc3(x)
        return x

# Define the CNN model for regression
class CNNRegressor(nn.Module):
    def __init__(self, input_size):
        super(CNNRegressor, self).__init__()
        self.fc1 = nn.Linear(input_size, 1024)
        self.fc2 = nn.Linear(1024, 512)
        self.fc3 = nn.Linear(512, 512)
        # self.fc4 = nn.Linear(256, 128)
        self.fc4 = nn.Linear(512, 1)  # Single output for regression
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.relu(self.fc3(x))
        # x = self.relu(self.fc4(x))
        x = self.fc4(x)
        return x


In [None]:
# Define and prepare CNN models for classification and regression
input_size = full_features.shape[1]  # Updated to use full_features

num_classes = len(np.unique(full_units))
cnn_classifier = CNNClassifier(input_size, num_classes).to(device)
cnn_regressor = CNNRegressor(input_size).to(device)

# Define loss functions and optimizers
classification_criterion = nn.CrossEntropyLoss()
regression_criterion = nn.MSELoss()
classification_optimizer = optim.Adam(cnn_classifier.parameters(), lr=0.001)
regression_optimizer = optim.Adam(cnn_regressor.parameters(), lr=0.001)

# Custom dataset for CNN
class CustomDatasetCNN(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

# Prepare data loaders for CNN
full_dataset_class = CustomDatasetCNN(full_features, full_units)  # Updated to use full_features and full_units
full_dataset_reg = CustomDatasetCNN(full_features, full_values)  # Updated to use full_features and full_values

full_loader_class = DataLoader(full_dataset_class, batch_size=32, shuffle=True, num_workers=4)
full_loader_reg = DataLoader(full_dataset_reg, batch_size=32, shuffle=True, num_workers=4)


In [None]:
import os
import warnings
warnings.filterwarnings("ignore")

# Create a directory to save the best models
model_save_dir = "best_models"
os.makedirs(model_save_dir, exist_ok=True)

def train_model(model, criterion, optimizer, train_loader, num_epochs=100, is_classification=True):
    best_metric = float('-inf')  # Best metric is set to -inf to track highest score
    best_model_path = None

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        all_labels = []
        all_preds = []

        for features, labels in train_loader:
            features = torch.tensor(features, dtype=torch.float32).to(device)
            if is_classification:
                labels = torch.tensor(labels, dtype=torch.long).to(device)
            else:
                labels = torch.tensor(labels, dtype=torch.float32).to(device)

            optimizer.zero_grad()
            outputs = model(features)

            if is_classification:
                loss = criterion(outputs, labels)
                _, predicted = torch.max(outputs.data, 1)
                all_labels.extend(labels.cpu().numpy())
                all_preds.extend(predicted.cpu().numpy())
            else:
                loss = criterion(outputs.squeeze(), labels)
                all_labels.extend(labels.cpu().numpy())
                all_preds.extend(outputs.squeeze().detach().cpu().numpy())  # Ensure detach is used for regression

            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}")

        # Calculate metrics on the training data
        if is_classification:
            f1 = f1_score(all_labels, all_preds, average='weighted')
            print(f"Training F1 Score: {f1:.4f}")
            if f1 > best_metric:
                best_metric = f1
                best_model_path = os.path.join(model_save_dir, f"best_classifier_model_{entity_name}.pth")
                torch.save(model.state_dict(), best_model_path)
                print(f"Saved best classifier model with F1 score: {best_metric:.4f}")
        else:
            # Remove squeeze in predictions for safety when single output
            all_preds = np.array(all_preds).flatten()  # Make sure predictions are properly flattened

            r2 = r2_score(all_labels, all_preds)
            print(f"Training R² Score: {r2:.4f}")
            if r2 > best_metric:
                best_metric = r2
                best_model_path = os.path.join(model_save_dir, f"best_regressor_model_{entity_name}.pth")
                torch.save(model.state_dict(), best_model_path)
                print(f"Saved best regressor model with R² score: {best_metric:.4f}")

# Train and evaluate CNN models using full dataset
print("Training CNN for Regression on full dataset...")
train_model(cnn_regressor, regression_criterion, regression_optimizer, full_loader_reg, num_epochs=100, is_classification=False)

print("Training CNN for Classification on full dataset...")
train_model(cnn_classifier, classification_criterion, classification_optimizer, full_loader_class, num_epochs=100, is_classification=True)


Training CNN for Regression on full dataset...
Epoch [1/100], Loss: 0.0026
Training R² Score: 0.9976
Saved best regressor model with R² score: 0.9976
Epoch [2/100], Loss: 0.0034
Training R² Score: 0.9969
Epoch [3/100], Loss: 0.0041
Training R² Score: 0.9963
Epoch [4/100], Loss: 0.0053
Training R² Score: 0.9953
Epoch [5/100], Loss: 0.0073
Training R² Score: 0.9934
Epoch [6/100], Loss: 0.0099
Training R² Score: 0.9912
Epoch [7/100], Loss: 0.0212
Training R² Score: 0.9811
Epoch [8/100], Loss: 0.0282
Training R² Score: 0.9747
Epoch [9/100], Loss: 0.0150
Training R² Score: 0.9865
Epoch [10/100], Loss: 0.0079
Training R² Score: 0.9929
Epoch [11/100], Loss: 0.0050
Training R² Score: 0.9955
Epoch [12/100], Loss: 0.0034
Training R² Score: 0.9969
Epoch [13/100], Loss: 0.0028
Training R² Score: 0.9974
Epoch [14/100], Loss: 0.0028
Training R² Score: 0.9975
Epoch [15/100], Loss: 0.0029
Training R² Score: 0.9974
Epoch [16/100], Loss: 0.0033
Training R² Score: 0.9970
Epoch [17/100], Loss: 0.0039
Trai

In [None]:
# Define the conversion factor dictionary (as shown above)
conversion_factors = {
    'gram': 1, 'kilogram': 1000, 'milligram': 0.001, 'microgram': 1e-6, 'ton': 1e6, 'ounce': 28.3495, 'pound': 453.592,
    'millilitre': 1, 'litre': 1000, 'cup': 240, 'fluid ounce': 29.5735, 'quart': 946.353, 'gallon': 3785.41,
    'cubic foot': 28316.8, 'cubic inch': 16.3871, 'decilitre': 100, 'centilitre': 10, 'pint': 473.176,
    'centimetre': 1, 'metre': 100, 'millimetre': 0.1, 'inch': 2.54, 'foot': 30.48,
    'volt': 1, 'watt': 1, 'kilowatt': 1000
}

# These are the max occurring units for each entity_name
standard_units = {
    'depth': 'centimetre',
    'height': 'centimetre',
    'item_volume': 'millilitre',
    'item_weight': 'gram',
    'maximum_weight_recommendation': 'kilogram',
    'voltage': 'volt',
    'wattage': 'watt',
    'width': 'centimetre'
}

def inverse_transform(predicted_value, predicted_unit, entity_name):
    """
    Convert the standardized predicted_value back to the predicted_unit.

    predicted_value: The value predicted by the regression head in the standard unit.
    predicted_unit: The unit predicted by the classification head.
    entity_name: The type of entity (e.g., 'item_weight', 'height', etc.).
    """
    standard_unit = standard_units[entity_name]  # Lookup the standard unit for the entity (e.g., 'kilogram')

    if predicted_unit in conversion_factors and standard_unit in conversion_factors:
        # Calculate the inverse conversion factor
        inverse_conversion_factor = conversion_factors[standard_unit] / conversion_factors[predicted_unit]
        # Apply inverse transformation to convert from standard unit to predicted unit
        return predicted_value * inverse_conversion_factor
    else:
        return predicted_value  # If no valid conversion, return the original value

# Assuming df now contains the 'predicted_value' (standardized) and 'predicted_unit' columns
# df['final_value'] = df.apply(lambda row: inverse_transform(row['predicted_value'],
#                                                           row['predicted_unit'],
#                                                           row['entity_name']), axis=1)

In [None]:
def predict_single_image(image_url, vit_model, regression_model, classification_model, feature_extractor, scaler, label_encoder, device):
    # Fetch the image
    image = fetch_image(image_url)
    if image is None:
        print(f"Image at {image_url} could not be fetched.")
        return None

    # Convert the image into pixel values using the feature extractor
    pixel_values = feature_extractor(images=image, return_tensors="pt").pixel_values
    pixel_values = pixel_values.to(device)

    # Extract features using the ViT model
    vit_model.eval()
    with torch.no_grad():
        features = vit_model(pixel_values)
        features = features.cpu().numpy().reshape(1, -1)  # Reshape to match model input

    # Predict the entity value (regression)
    cnn_regressor.eval()
    with torch.no_grad():
        features_tensor = torch.tensor(features, dtype=torch.float32).to(device)
        predicted_value = regression_model(features_tensor).cpu().numpy().reshape(-1, 1)

    # Inverse transform to get the original scale of entity values
    original_value = scaler.inverse_transform(predicted_value)
    original_value = np.expm1(original_value)
    print(f"Predicted numerical value (Original Scale): {original_value[0][0]}")

    # Predict the entity unit (classification)
    cnn_classifier.eval()
    with torch.no_grad():
        predicted_unit = classification_model(features_tensor)
        _, predicted_class = torch.max(predicted_unit, 1)
        predicted_class = predicted_class.cpu().numpy()

    # Convert the predicted class back to the original unit label
    predicted_unit_label = label_encoder.inverse_transform(predicted_class)
    print(f"Predicted unit (Classification): {predicted_unit_label[0]}")

    return original_value[0][0], predicted_unit_label[0]

# Example usage for prediction
image_url = 'https://m.media-amazon.com/images/I/71gSRbyXmoL.jpg'
predicted_value, predicted_unit = predict_single_image(
    image_url, vit_model, cnn_regressor, cnn_classifier, feature_extractor, scaler, label_encoder, device
)

predicted_value = inverse_transform(predicted_value,predicted_unit,entity_name)

print(f"Predicted Value: {predicted_value}")
print(f"Predicted Unit: {predicted_unit}")


Predicted numerical value (Original Scale): 261.1028747558594
Predicted unit (Classification): cup
Predicted Value: 1.0879286448160808
Predicted Unit: cup
