In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os

import torch
import torch.nn.functional as F
import torchvision
from torchvision import models, transforms

from PIL import Image

from tqdm import tqdm

import json
import requests

In [None]:
# Load your data
df_main = pd.read_pickle('df_main.pkl')

In [None]:
df_main.head()

In [None]:
# Download the ImageNet class index to label mapping
IMAGENET_CLASSES_URL = "https://storage.googleapis.com/download.tensorflow.org/data/imagenet_class_index.json"
response = requests.get(IMAGENET_CLASSES_URL)
class_idx = response.json()

# Convert it to the idx_to_class dictionary
idx_to_class = {int(k): v[1] for k, v in class_idx.items()}

In [None]:
def predict_image(image, model, transform, topk=5):
    """
    Predict the top-k categories for an image.
    """
    # If image is a path, then load it; otherwise, assume it's already an array
    if isinstance(image, str):
        image = Image.open(image)
    else:
        image = Image.fromarray(image)
    
    image = transform(image).unsqueeze(0)  # Add batch dimension
    image = image.to(device)
    
    model.eval()
    with torch.no_grad():
        output = model(image)
        probs, indices = output.topk(topk)
        probs = F.softmax(probs, dim=1)[0] * 100
        indices = indices[0]
    
    labels = [idx_to_class[int(idx.item())] for idx in indices]
    #labels = [idx_to_class[idx] for idx in indices]
    return labels, probs.cpu().numpy()


In [None]:
def load_images_from_directory(base_path, limit=None):
    all_images = {}
    assembly_folders = [d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d))]
    
    if limit:
        assembly_folders = assembly_folders[:limit]
    
    for assembly_id in tqdm(assembly_folders, desc="Loading images"):
        assembly_path = os.path.join(base_path, assembly_id)
        all_images[assembly_id] = {
            'assembly': None,
            'bodies': {}
        }
        
        for image_name in os.listdir(assembly_path):
            if not image_name.endswith('.png'):  # Only process .png files
                continue
                
            image_path = os.path.join(assembly_path, image_name)
            body_id = image_name.rstrip('.png')  # Use the filename as body_id
            
            # Load assembly image
            if image_name == 'assembly.png':
                with Image.open(image_path) as img:
                    all_images[assembly_id]['assembly'] = np.array(img)
                    
            # Load body images
            else:
                with Image.open(image_path) as img:
                    all_images[assembly_id]['bodies'][body_id] = np.array(img)
                    
    return all_images

In [None]:
def predict_first_n_images(all_images, model, transform, n=3):
    predictions = {}
    for assembly_id, images in list(all_images.items())[:n]:
        predictions[assembly_id] = {
            'assembly': None,
            'bodies': {}
        }
        
        # Predict for the assembly image
        assembly_image = images['assembly']
        if assembly_image is not None:
            labels, probabilities = predict_image(assembly_image, model, transform)
            predictions[assembly_id]['assembly'] = (labels, probabilities)
            
        # Predict for the body images
        for body_id, body_image in images['bodies'].items():
            labels, probabilities = predict_image(body_image, model, transform)
            predictions[assembly_id]['bodies'][body_id] = (labels, probabilities)
            print(f"Stored predictions for body {body_id} of assembly {assembly_id}.")
            
    return predictions


In [None]:
def plot_top_predictions(all_images, predictions, top_n=20):
    # Flatten the predictions and their associated image IDs and types
    flattened_predictions = []
    for image_id, prediction_data in predictions.items():
        for image_type, prediction in prediction_data.items():
            # Handle assembly predictions
            if image_type == 'assembly' and isinstance(prediction, tuple) and len(prediction) == 2:
                labels, probabilities = prediction
                for label, prob in zip(labels, probabilities):
                    flattened_predictions.append({
                        'image_id': image_id,
                        'image_type': image_type,
                        'label': label,
                        'probability': prob,
                        'image': all_images[image_id][image_type]
                    })
            # Handle body predictions
            elif image_type == 'bodies':
                for idx, body_prediction in enumerate(prediction):
                    if isinstance(body_prediction, tuple) and len(body_prediction) == 2:
                        labels, probabilities = body_prediction
                        for label, prob in zip(labels, probabilities):
                            flattened_predictions.append({
                                'image_id': image_id,
                                'image_type': f"{image_type}_{idx}",
                                'label': label,
                                'probability': prob,
                                'image': all_images[image_id][image_type][idx]
                            })
    
    # Sort the predictions by probability in descending order and take top_n
    top_predictions = sorted(flattened_predictions, key=lambda x: x['probability'], reverse=True)[:top_n]
    
    # Plot the top predictions
    fig, axs = plt.subplots(int(top_n/4), 4, figsize=(15, 5*int(top_n/4)))
    for ax, pred in zip(axs.ravel(), top_predictions):
        ax.imshow(pred['image'])
        ax.set_title(f"{pred['label']} ({pred['probability']:.2f}%)")
        ax.axis('off')
    
    plt.tight_layout()
    plt.show()


In [None]:
base_path = "C:\\Users\\richt\\Documents\\ASME_data\\train\\Fusion360GalleryDataset_23hackathon_train"
all_images = load_images_from_directory(base_path, limit=50)

In [None]:
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [None]:
model = models.resnet50(pretrained=True)
model.eval() # Set the model to evaluation mode

In [None]:
# Check if CUDA is available and set device accordingly
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
missing_indices = set(range(1000)) - set(idx_to_class.keys())
print(f"Missing indices: {missing_indices}")


In [None]:
#n_sel = len(all_images)
n_sel = 20
selected_predictions = predict_first_n_images(all_images, model, transform, n=n_sel)

In [None]:
for image_id, prediction_data in selected_predictions.items():
    print(f"\nPredictions for Image ID: {image_id}")
    
    for image_type, predictions in prediction_data.items():
        print("\nProcessing image type:", image_type)  # Added Diagnostic print statement

        print(f"\n  {image_type} Predictions:")
        
        if image_type == 'assembly':

            if isinstance(predictions, tuple) and len(predictions) == 2:
                labels, probabilities = predictions
                for label, prob in zip(labels, probabilities):
                    print(f"    Label: {label}, Probability: {prob:.2f}%")
            else:
                print("    No predictions available for this image type.")

        elif image_type == 'bodies':
            print("Inside 'bodies' condition")  # Added Diagnostic print statement

            for body_id, body_prediction in predictions.items():
                print(f"Processing body ID: {body_id}")  # Added Diagnostic print statement

                if isinstance(body_prediction, tuple) and len(body_prediction) == 2:
                    labels, _ = body_prediction
                    body_data.append({'body_id': body_id, 'body_label': ', '.join(labels)})
                    print(f"Added body_id {body_id} with labels: {', '.join(labels)}")
                else:
                    print(f"No tuple predictions for body ID: {body_id}")  # Added Diagnostic print statement




In [None]:
print(len(body_data))

In [None]:
# Call the plotting function
plot_top_predictions(all_images, selected_predictions, top_n=20)

In [None]:
for assembly_id, images in all_images.items():
    if images['bodies']:
        print(f"Assembly {assembly_id} has {len(images['bodies'])} body images.")


In [None]:
# Create dataframes to store the results for assemblies and bodies
assembly_data = []
body_data = []

for image_id, prediction_data in selected_predictions.items():
    for image_type, prediction in prediction_data.items():
        if image_type == 'assembly' and isinstance(prediction, tuple) and len(prediction) == 2:
            labels, _ = prediction
            assembly_data.append({'assembly_id': image_id, 'assembly_label': ', '.join(labels)})
        elif image_type == 'bodies':
            for body_idx, body_prediction in enumerate(prediction):
                if isinstance(body_prediction, tuple) and len(body_prediction) == 2:
                    labels, _ = body_prediction
                    body_id = f"{image_id}_body_{body_idx}"  # Constructing body_id from image_id and body_idx
                    body_data.append({'body_id': body_id, 'body_label': ', '.join(labels)})
                    print(f"Added body_id {body_id} with labels: {', '.join(labels)}")


In [None]:
# Convert lists of dictionaries to dataframes
df_assembly_predictions = pd.DataFrame(assembly_data)
df_body_predictions = pd.DataFrame(body_data)

In [None]:
df_body_predictions.head()

In [None]:
print(f"Predicting for body {body_id} of assembly {assembly_id}...")
labels, probabilities = predict_image(body_image, model, transform)
print(f"Predictions: {labels}")

In [None]:
if assembly_id not in predictions:
    predictions[assembly_id] = {}
if 'bodies' not in predictions[assembly_id]:
    predictions[assembly_id]['bodies'] = {}
predictions[assembly_id]['bodies'][body_id] = (labels, probabilities)

In [None]:
# Merge with df_main
df_main = df_main.merge(df_assembly_predictions, on='assembly_id', how='left')
df_main = df_main.merge(df_body_predictions, on='body_id', how='left')

In [None]:
# Predict for the body images
for body_id, body_image in images['bodies'].items():
    labels, probabilities = predict_image(body_image, model, transform)
    predictions[assembly_id]['bodies'][body_id] = (labels, probabilities)


In [None]:
print(f"Predicting for body {body_id} of assembly {assembly_id}...")
labels, probabilities = predict_image(body_image, model, transform)
print(f"Predictions: {labels}")


In [None]:
body_id = f"{image_id}_body_{body_idx}"  # Constructing body_id from image_id and body_idx
