In [None]:
import tensorflow as tf
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        # Prevent memory hogging
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)
import keras
import numpy as np
import matplotlib.pyplot as plt
import glob
import pandas as pd
from PIL import Image
import os, sys
from tqdm import tqdm

# Preparation

In [None]:
# Data pipeline utils
error_images = []
@tf.py_function(Tout=(tf.string, tf.float32))
def load_image(file_path):
    try:
        img = Image.open(file_path.numpy().decode()).convert('RGB')
        img = img.resize((256, 256))
        img = tf.convert_to_tensor(np.array(img), dtype=tf.float32) / 255.0
        return file_path, img
    except Exception as e:
        # Return dummy tensor and mark with empty string path
        error_images.append(file_path.numpy().decode())
        return tf.constant('', dtype=tf.string), tf.zeros([256, 256, 3], dtype=tf.float32)

def filter_invalid(file_path, image):
    return tf.strings.length(file_path) > 0

def build_dataset(image_paths, batch_size=32):
    dataset = tf.data.Dataset.from_tensor_slices(image_paths)
    dataset = dataset.map(load_image, num_parallel_calls=4)
    dataset = dataset.filter(filter_invalid)
    dataset = dataset.batch(batch_size).prefetch(4)
    return dataset

In [None]:
# Load model from checkpoint
def load_model(ckpt):
    try:
        model = keras.models.load_model(ckpt)
        print(f"Model loaded successfully from {ckpt}")
        # model.summary()
        return model
    except Exception as e:
        print(f"Error loading model from {ckpt}: {e}")
        return None

def infer(model, dataset):
    predictions = []
    for batch in tqdm(dataset):
        paths, imgs = batch # Unpack paths and images
        # Inference
        preds = model.predict(imgs, verbose=0)
        # Process predictions
        for i in range(len(paths)):
            haze_color = np.argmax(preds[0][i])
            haze_density = np.argmax(preds[1][i])
            predictions.append({
                'image_path': paths[i].numpy().decode(),
                'haze_color': haze_color,
                'haze_density': haze_density,
            })
        del paths, imgs, preds # Clear memory
    return predictions

# Visualize color clusters

In [None]:
COLOR_CLUSTERS = {
    1: [
        (246.48, 254.53),
        (213.99, 251.55),
        (149.79, 239.74)
    ],
    2: [
        (146.16, 217.47),
        (181.47, 230.52),
        (205.76, 241.87)
    ],
    3: [
        (218.99, 255.0),
        (217.01, 255.0),
        (215.25, 255.0)
    ]
}


# Initialize densities
HAZE_DENSITIES = {
    0: (0.01, 0.075), # Light
    1: (0.075, 0.25),  # Medium
    2: (0.25, 0.5)    # Heavy
}

def viz_cluster_color_range(cluster_id='0', step=5):
    cluster = COLOR_CLUSTERS[cluster_id] # Get cluster
    # Get value range for each color channel
    red_range = np.linspace(cluster[0][0], cluster[0][1], step)
    green_range = np.linspace(cluster[1][0], cluster[1][1], step)
    blue_range = np.linspace(cluster[2][0], cluster[2][1], step)
    # Get color list
    colors = []
    for i in range(step):
        colors.append((
            int(red_range[i]),
            int(green_range[i]),
            int(blue_range[i])
        ))
    f, ax = plt.subplots(1, step, figsize=(10,10), tight_layout=True)
    for i in range(step):
        color_arr = np.ones((100, 100, 3), dtype=np.uint8) * np.asarray(colors[i])
        ax[i].imshow(color_arr)
        ax[i].axis('off')
        ax[i].set_title(str(colors[i]))

# Visualize color ranges for each cluster
for key in COLOR_CLUSTERS.keys():
    print(f'Cluster {key}: {COLOR_CLUSTERS[key]}')
    viz_cluster_color_range(key, step=5)
    plt.show()

# Inference

In [None]:
# MAIN 
image_paths = glob.glob('/home/duynd/haze_attrs/crawled_images/*/*.*') # Change to your image directory
print(f"Found {len(image_paths)} images.")
# Load the pre-trained model
model = load_model('haze_classifier_tuned_v1.keras')
# Load inference dataset
dataset = build_dataset(image_paths, batch_size=64)

In [None]:
# Define output file
output_file = 'haze_attributes.csv' # Output CSV file for predictions
error_file = 'error_images.txt' # Error log file for images that failed to load

# Perform inference
predictions = infer(model, dataset)
# Save error images
if len(error_images) > 0:
    with open(error_file, 'w') as f:
        for img in error_images:
            f.write(f"{img}\n")
    print(f"Error images saved to error_images.txt. Total: {len(error_images)}")
# Save predictions as csv
predictions_df = pd.DataFrame(predictions)
predictions_df.to_csv(output_file, index=False)

# OPTIONAL: Sort images by colors and desities

In [None]:
import shutil
import os

# OPTIONAL: Save results to attribute directories
save_dir = './results'
if not os.path.exists(save_dir):
    os.makedirs(save_dir)

new_predictions = []

# Create directories for each haze color and density
for color in COLOR_CLUSTERS.keys():
    color_dir = os.path.join(save_dir, 'color', f'color_{color}')
    if not os.path.exists(color_dir):
        os.makedirs(color_dir)
for density in HAZE_DENSITIES.keys():
    density_dir = os.path.join(save_dir, 'density', f'density_{density}')
    if not os.path.exists(density_dir):
        os.makedirs(density_dir)

index = 0
for prediction in tqdm(predictions):
    image_path = prediction['image_path']
    haze_color = prediction['haze_color']
    haze_density = prediction['haze_density']
    
    # Copy the image to the appropriate directory
    color_dest_path = os.path.join(save_dir, 'color', f'color_{haze_color}', f'{index:06d}.jpg')
    os.makedirs(os.path.dirname(color_dest_path), exist_ok=True)
    shutil.copy(image_path, color_dest_path)
    
    density_dest_path = os.path.join(save_dir, 'density', f'density_{haze_density}', f'{index:06d}.jpg')
    os.makedirs(os.path.dirname(density_dest_path), exist_ok=True)
    shutil.copy(image_path, density_dest_path)
    index += 1

    # Append to new predictions list
    new_predictions.append({
        'image_path': image_path,
        'haze_color': haze_color,
        'haze_density': haze_density,
        'color_dest_path': color_dest_path,
        'density_dest_path': density_dest_path
    })

# Save new predictions to CSV
new_predictions_df = pd.DataFrame(new_predictions)
new_predictions_df.to_csv(os.path.join(save_dir, 'haze_attributes_sorted.csv'), index=False)
print(f"Results saved to {save_dir} with sorted images by haze color and density.")