Brand predictors

In [1]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.semi_supervised import LabelPropagation
from sklearn.metrics import make_scorer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.preprocessing import LabelEncoder
import cv2
import sys
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.applications.resnet50 import preprocess_input
from tensorflow.keras.utils import to_categorical

sys.path.append('../../utils')
import config_handling as conf
from database import Database
from file_io import path_handler
import matplotlib.image as mpimg
from tqdm import tqdm
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

2025-01-10 12:50:23.846867: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1736513423.867374    5160 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1736513423.874067    5160 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-01-10 12:50:23.896058: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [None]:

# Ensure TensorFlow compatibility for CNN-based feature extraction and classifiers
import tensorflow as tf
tf.config.set_visible_devices([], 'GPU')
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.optimizers import Adam

ModuleNotFoundError: No module named 'pytorch'

In [3]:
use_bbox = True    #run the model/training with bounding boxes when available? 

#set the learning device: gpu or cpu: 
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"       #force CPU

In [4]:

# Load configuration
config = conf.read_config('../../config/automotive.conf.ini')
config.read('config.ini')
connection_type = config['settings']['connection']
user = config[connection_type]['user']
pw = config[connection_type]['pw']
host = config[connection_type]['host']
db = config[connection_type]['db']
port = config[connection_type].getint('port')
db = Database(host,
              port,
              user,
              pw,
              db
              )
db.connect()


Connection established


In [5]:

# Image directory
basedir = config['settings']['image_directory']


In [6]:
distinct_brands = db.execute_query('SELECT distinct(brand) FROM listings')

In [7]:
brands = pd.DataFrame(distinct_brands)

In [8]:
# Define Brand tags 
BRANDS = brands['brand']


In [9]:

def bbox_to_ints(df):
    df = df.copy()
    for col in ['yolobox_top_left_x', 'yolobox_top_left_y', 'yolobox_bottom_right_x', 'yolobox_bottom_right_y']:
        df[col] = df[col].astype(int)
    return df

In [10]:
# Query labeled and unlabeled data
tags = db.execute_query("""SELECT
                            images.yolobox_top_left_x, 
                            images.yolobox_top_left_y, 
                            images.yolobox_bottom_right_x, 
                            images.yolobox_bottom_right_y, 
                            images.image_path, 
                            listings.brand
                        FROM images 
                        JOIN listings ON listings.id = images.listing_id
                        WHERE 
                            listings.countrycode = 'B' """)
labeled_data = pd.DataFrame(tags).fillna(-1)
labeled_data = bbox_to_ints(labeled_data)


In [11]:
labeled_data["image_path"] = labeled_data["image_path"].apply(lambda x: x.replace('\\', '/'))
labeled_data["abs_image_path"]= basedir + '/' + labeled_data["image_path"]

In [12]:
len(labeled_data)

1354804

In [13]:
brand_encoder = LabelEncoder()
labeled_data['brand'] = brand_encoder.fit_transform(labeled_data['brand'])  # Encode labels
num_classes = len(brand_encoder.classes_)
num_classes

30

In [14]:

# Data generator for online learning
class DataGenerator:
    def __init__(self, dataframe, batch_size, target_size, use_bbox):
        self.dataframe = dataframe
        self.batch_size = batch_size
        self.target_size = target_size
        self.usebbox = use_bbox

    def __len__(self):
        return int(np.ceil(len(self.dataframe) / self.batch_size))

    def __iter__(self):
        for start in range(0, len(self.dataframe), self.batch_size):
            end = min(start + self.batch_size, len(self.dataframe))
            batch_data = self.dataframe.iloc[start:end]
            
            images = []
            labels = []
            for _, row in batch_data.iterrows():
                img_path = row['abs_image_path']
                label = row['brand']
                img = Image.open(img_path).convert('RGB')

                # TODO add the bounding box here!
                x1, y1, x2, y2 = row['yolobox_top_left_x'], row['yolobox_top_left_y'], row['yolobox_bottom_right_x'], row['yolobox_bottom_right_y']
                if self.usebbox and x1 != -1:
                    #TODO implement bbox
                    img = img[y1:y2, x1:x2]

                # Load and preprocess the image
                img = img.resize(self.target_size)
                img = preprocess_input(np.array(img))  # Normalize for ResNet
                images.append(img)
                labels.append(label)
            
            images = np.array(images)
            labels = to_categorical(labels, num_classes=num_classes)
            yield images, labels


In [15]:

# Initialize the data generator
batch_size = 64
target_size = (224, 224)
data_gen = DataGenerator(labeled_data, batch_size, target_size, use_bbox)

# Define the model (transfer learning using ResNet50)
base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
base_model.trainable = False  # Freeze the base model

model = Sequential([
    base_model,
    Flatten(),
    Dense(256, activation='relu'),
    Dropout(0.5),
    Dense(num_classes, activation='softmax')
])


2025-01-10 12:56:38.290155: W external/local_xla/xla/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 102760448 exceeds 10% of free system memory.
2025-01-10 12:56:38.331218: W external/local_xla/xla/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 102760448 exceeds 10% of free system memory.
2025-01-10 12:56:38.349667: W external/local_xla/xla/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 102760448 exceeds 10% of free system memory.


In [16]:

# Compile the model
model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])


In [17]:

# Train the model using online learning
steps_per_epoch = len(labeled_data) // batch_size
epochs = 5
model.fit(
    data_gen,
    steps_per_epoch=steps_per_epoch,
    epochs=epochs
)


ValueError: Unrecognized data type: x=<__main__.DataGenerator object at 0x70f096d9b8d0> (of type <class '__main__.DataGenerator'>)

In [None]:

# Save the model
model.save('online_model.h5')
print("Training complete and model saved.")

In [None]:
old code below!

In [None]:

# Preprocess labeled and unlabeled data
labeled_images = load_data(labeled_data, use_bounding_boxes=True)


In [20]:

X_train_binary, X_test_binary, y_train_binary, y_test_binary = train_test_split(labeled_images, binary_labels, test_size=0.2, random_state=42)


In [None]:

# Train binary classifier
binary_model.fit(X_train_binary, y_train_binary, epochs=10, batch_size=32, validation_split=0.2)


In [None]:

# Evaluate binary classifier
binary_loss, binary_accuracy = binary_model.evaluate(X_test_binary, y_test_binary)
print(f"Binary Classifier - Loss: {binary_loss}, Accuracy: {binary_accuracy}")


In [None]:
binary_cutoff_point = 0.5
# Predict non-crappy images
binary_predictions_unlabeled = binary_model.predict(unlabeled_images)
non_crappy_indices = np.where(binary_predictions_unlabeled < binary_cutoff_point)[0]
non_crappy_images = unlabeled_images[non_crappy_indices]


In [None]:
plt.scatter(
    x= range(len(binary_predictions_unlabeled)), 
    y = binary_predictions_unlabeled, 
    marker='.'
    )
plt.hlines(binary_cutoff_point, xmin=0, xmax=len(binary_predictions_unlabeled), color='red')

In [26]:

# Prepare angle labels
not_crappy_mask = labels != "crappy"
angle_labels = labels[not_crappy_mask]
angle_images = labeled_images[not_crappy_mask]


In [83]:

label_encoder = LabelEncoder()
angle_labels_encoded = label_encoder.fit_transform(angle_labels)


In [84]:

# Train angle classifier
X_train_angle, X_test_angle, y_train_angle, y_test_angle = train_test_split(angle_images, angle_labels_encoded, test_size=0.2, random_state=42)


In [None]:

angle_model.fit(X_train_angle, y_train_angle, epochs=10, batch_size=32, validation_split=0.2)


In [None]:

# Evaluate angle classifier
angle_loss, angle_accuracy = angle_model.evaluate(X_test_angle, y_test_angle)
print(f"Angle Classifier - Loss: {angle_loss}, Accuracy: {angle_accuracy}")


In [None]:
#visualize a confusion matrix: 
y_test_predictions = angle_model.predict(X_test_angle)
y_test_prediction_labels = np.argmax(y_test_predictions, axis=1)

In [None]:
labels_decoded = label_encoder.inverse_transform(range(8))
labels_decoded

In [None]:
confusion_on_y_test = pd.DataFrame(
    {
        'actuals': y_test_angle,
        'predicts': y_test_prediction_labels
    }
)

#confusion_on_y_test.sample(5)
cm = confusion_matrix(confusion_on_y_test['actuals'], confusion_on_y_test['predicts'], labels=range(8))

# Display confusion matrix using matplotlib
fig, ax = plt.subplots(figsize=(8, 6))
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels_decoded)
disp.plot(ax=ax, cmap='Blues', colorbar=True)

plt.title("Confusion Matrix")
plt.xlabel("Predicted Labels")
plt.ylabel("Actual Labels")
plt.show()

In [None]:

# Predict angles for non-crappy images
predicted_angles = angle_model.predict(non_crappy_images)
predicted_labels = label_encoder.inverse_transform(np.argmax(predicted_angles, axis=1))


In [32]:

random_unlabled_data = unlabeled_data.sample(200)
random_unlabled_data["predicted_label"] = "crappy"
random_unlabled_data.iloc[non_crappy_indices, random_unlabled_data.columns.get_loc("predicted_label")] = predicted_labels


In [None]:
random_unlabled_data.sample(10)