### Filtering Flickr30k Dataset
- Perform zero-shot classification with bart-large-mnli model
- Define labels on both landscape and non-landscape categories
- Set threshold for each label under landscape category
- Classify the filtered dataset into the primary five classes

You may download flickr30k dataset here:
https://www.kaggle.com/datasets/adityajn105/flickr30k

In [1]:
#Imports
import pandas as pd
from transformers import pipeline
from tqdm import tqdm
import os
import shutil

In [None]:
def zero_shot_classification(input_csv:str, output_csv:str, labels:str):
    """
    Performs zero-shot classification on captions from an input CSV and saves the results to an output CSV.

    Args:
        input_csv (str): Path to the input CSV file.
        output_csv (str): Path where the output CSV file will be saved.
        labels (str): A list of labels for classification.
    """
    df = pd.read_csv(input_csv)

    classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli")

    results = []
    for caption in tqdm(df['caption'].to_list()):
        #run zero shot application
        result = classifier(caption, labels, multi_label=False)
        results.append((result['labels'][0], result['scores'][0]))

    df['label'], df['score'] = zip(*results)

    df.to_csv(output_csv, index=False)

    print(f"Classification completed and saved to {output_csv}.")

In [None]:
labels = ["beach", "lake", "glacier", "mountains", "snow", "desert", "sand", "forest", "sea", "park", "ice", "city", "indoor", "stadium", "urban", "grass", "pool", "garden"]
zero_shot_classification('../../input/Flickr30k/captions.csv', '../../input/Flickr30k/flick30k_all_result.csv', labels)

In [None]:
def filter_flickr(input_csv:str, output_csv:str, thresholds:dict):
    """
    Filters captions based on predefined thresholds for each label and saves the filtered results to a CSV.
    
    Args:
        input_csv (str): Path to the CSV file containing captions and their classification scores.
        output_csv (str): Path where the filtered results will be saved.
        thresholds (dict): Dictionary where keys are labels and values are the minimum score thresholds for those labels.
    """
    df = pd.read_csv(input_csv)

    def filter_row(row):
        return row['label'] in thresholds and row['score'] >= thresholds[row['label']]

    #filters result
    filtered_result_df = df[df.apply(filter_row, axis=1)]

    filtered_result_df.to_csv(output_csv, index=False)

    print(f"Total number of qualifying captions: {len(filtered_result_df)}")

In [None]:
thresholds = {
    'sea': 0.7,
    'beach': 0.5,
    'desert': 0.8,
    'forest': 0.7,
    'glacier': 0.2,
    'mountains': 0.3,  
    'snow': 0.5,
    'sand': 0.4,
    'lake': 0.4
}

filter_flickr('../../input/Flickr30k/flick30k_all_result.csv', '../../input/Flickr30k/flick30k_filtered_result.csv', thresholds)

In [None]:
def extract_images(csv_file:str, directories:list, output_folder:str):
    """
    Extract images listed in a CSV file from multiple directories and save them to an output folder.

    Args:
        csv_file (str): Path to the CSV file containing image filenames.
        directories (list): List of directories to search for the images.
        output_folder (str): Path to the output folder where the images will be saved.

    Returns:
        not_found_images (list): A list of filenames of images that were not found in any of the provided directories.
    """

    #create directory if does not exist
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    df = pd.read_csv(csv_file)
    image_filenames = df['image_filename'].unique() #retrieve all images names

    not_found_images = [] 

    for filename in image_filenames:
        found = False  
        for dir in directories:
            #check all directory to retreive image
            source_path = os.path.join(dir, filename)
            if os.path.exists(source_path):
                #copy the image over to the proper folder
                shutil.copy2(source_path, os.path.join(output_folder, filename))
                print(f"Copied {filename} to {output_folder}")
                found = True
                break 
        if not found:
            #image does not exist
            not_found_images.append(filename)

    if not_found_images:
        print("Images not found in any of the provided directories:")
        for img in not_found_images:
            print(img)

    return not_found_images

In [None]:
csv_file = '../../input/Flicker30k/flick30k_filtered_result.csv'
flicker_image_files = ['../../input/Flickr30k/flickr30k_images/']

flicker_filtered_image = '../../input/Flickr30k/flicker30k_output_images/'
extract_images(csv_file, flicker_image_files, flicker_filtered_image)

In [None]:
def classify_flickr(input_csv:str, output_csv:str):
    """
    Load CSV and map each label to it's specific class as defined in label_map
    Saves output as a csv in output_csv_path
    
    Args:
        input_csv_path (str): Path to the input CSV file containing labeled data.
        output_csv_path (str): Path where the categorized and classified results will be saved. 
    """
    df = pd.read_csv(input_csv)

    label_map = {
        'beach': 'coast',
        'lake': 'coast',
        'glacier': 'glacier',
        'mountain': 'mountain',
        'snow': 'glacier',
        'desert': 'desert',
        'sand': 'desert',
        'forest': 'forest',
        'sea': 'coast'
    }

    df['new_label'] = df['label'].map(label_map)

    def determine_label(group):
        label_counts = group['new_label'].value_counts()
        if len(label_counts) == 1 or label_counts.iloc[0] > label_counts.iloc[1]:
            return label_counts.idxmax()
        else:
            return group.sort_values('score', ascending=False)['new_label'].iloc[0]

    grouped = df.groupby('image_filename')
    df['classified_label'] = grouped.apply(lambda x: determine_label(x))

    df.to_csv(output_csv, index=False)

    print(f"File saved to {output_csv}")


In [None]:
classify_flickr('../../input/Flicker30k/flick30k_filtered_result.csv', 'classified_images.csv')

#### Other Experimental Results

The following were also tested but did not yield as good of a result
1. Cosine similarity check with landscape data
2. Zero-shot classification of images with OpenAI CLIP model.

In [None]:
# Imports
import pandas as pd

import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

from PIL import Image
from transformers import CLIPProcessor, CLIPModel

In [None]:
# Another way of filter flickr dataset
# Looking through all captions of each image, choose those image with at least 3 captions that meets the threshold of the label

def filter_flickr(input_csv:str, output_csv:str, thresholds:dict, min_label_count:int=3):
    """
    Filters images based on the specified thresholds for each label, 
    Each image must have at least min_label_count labels that exist the threshold to be considered 

    Args:
        input_csv (str): Path to the input CSV file containing labeled data.
        output_csv (str): Path where the filtered results will be saved.
        thresholds (dict): Dictionary where keys are labels and values are the minimum confidence thresholds for those labels.
        min_label_count (int, optional): Range from [0,5]. Minimum number of times a label must exceed its threshold to qualify an image. Defaults to 3.
    """
    if 0 < min_label_count or min_label_count > 5:
        raise Exception("min_label_count must be within 0 to 5")
    
    df = pd.read_csv(input_csv)

    grouped = df.groupby('image_filename')
    qualify_images = []

    for name, group in grouped:
        label_counts = {label: 0 for label in thresholds.keys()}
        for _, row in group.iterrows():
            if row['label'] in thresholds and row['confidence'] >= thresholds[row['label']]:
                label_counts[row['label']] += 1

        if sum(count >= min_label_count for count in label_counts.values()) >= 1:
            qualify_images.append(name)

    filtered_result_df = df[df['image_filename'].isin(qualify_images)]
    filtered_result_df.to_csv(output_csv, index=False)

    print(f"Total number of qualifying images: {len(qualify_images)}")

In [None]:
thresholds = {
    'sea': 0.7,
    'beach': 0.5,
    'desert': 0.8,
    'forest': 0.7,
    'glacier': 0.2,
    'mountain': 0.5,
    'snow': 0.5,
    'sand': 0.5,
    'lake': 0.5
}

filter_flickr('flick30k_all_result.csv', 'flick30k_filtered_result.csv', thresholds)

In [None]:
# Caption Similarity Check with Kosomos and Blip Caption

import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

def compute_tfidf(ref_file1:str, ref_file2:str, target_file:str):
    """
    Compute TF-IDF vectors for reference captions and new captions.

    Args:
        ref_file1 (str): Path to the first reference CSV file containing image captions.
        ref_file2 (str): Path to the second reference CSV file containing image captions.
        new_file (str): Path to the csv file with the captions to be compared against

    Returns:
        ref_vectors: TF-IDF vectors for reference captions
        target_vectors: TF-IDF vectors for new captions
        target_captions_df: DataFrame of new captions
    """
    ref_captions1 = pd.read_csv(ref_file1)['image_caption'].tolist()
    ref_captions2 = pd.read_csv(ref_file2)['image_caption'].tolist()
    ref_captions = ref_captions1 + ref_captions2
    
    target_captions_df = pd.read_csv(target_file)
    new_captions = target_captions_df['caption'].tolist()

    all_captions = ref_captions + new_captions
    vectorizer = TfidfVectorizer()
    all_captions_vectors = vectorizer.fit_transform(all_captions)

    ref_vectors = all_captions_vectors[:len(ref_captions)]
    target_vectors = all_captions_vectors[len(ref_captions):]

    return ref_vectors, target_vectors, target_captions_df


def filter_flicker(ref_vectors, new_vectors, target_captions_df:pd.DataFrame, threshold:float, output_file:str):
    """
    Filter new captions based on similarity to reference captions using cosine similarity.

    Args:
        ref_vectors (array-like): TF-IDF vectors of reference captions.
        new_vectors (array-like): TF-IDF vectors of new captions.
        target_captions_df (DataFrame): DataFrame containing new captions.
        threshold (float): Similarity threshold for filtering.
        output_file (str): Path to save the filtered captions.
    """
    similarities = cosine_similarity(new_vectors, ref_vectors)
    max_similarities = np.max(similarities, axis=1)
    target_captions_df['similarity_score'] = max_similarities

    qualifying_images = set()

    for filename in target_captions_df['image_filename'].unique():
        image_data = target_captions_df[target_captions_df['image_filename'] == filename]
        qualifying_captions = image_data[image_data['similarity_score'] >= threshold]

        if len(qualifying_captions) >= 2:
            qualifying_images.add(filename)

    filtered_df = target_captions_df[target_captions_df['image_filename'].isin(qualifying_images)]

    filtered_df.to_csv(output_file, index=False)
    print(f"Filtered captions saved to {output_file}. Total qualifying images: {len(qualifying_images)}")

In [None]:
blip_label = '../../input/Landscape/Label/Blip_Label.csv'
kosomos_label = '../../input/Landscape/Label/Kosmos_Label.csv'
filtered_flicker30k = '../../input/flick30k_filtered_result.csv'

ref_vectors, new_vectors, target_captions_df = compute_tfidf(blip_label, kosomos_label, filtered_flicker30k)

filter_flicker(ref_vectors, new_vectors, target_captions_df, 0.4, '../../input/flicker30k_output_file.csv')

In [None]:
# Using CLIP Model to filter the landscape image

def is_landscape(model:CLIPModel, processor:CLIPProcessor, image_path:str, positive_categories:list, negative_categories:list):
    """
    ZeroShot with clip model for positive_categories relating to landscape and negative_categories not relating to landscape.

    Args:
        model: CLIP model
        processor: CLIP embedding model
        image_path (str): Path to the image file.
        positive_categories (list): List of positive categories related to landscapes.
        negative_categories (list): List of negative categories not related to landscapes.

    Returns:
        maximum positive probability and maximum negative probability.
    """
    try:
        image = Image.open(image_path)
    except Exception as e:
        print(f"File is not an image or cannot be opened: {image_path}")
        return None, None
    
    inputs = processor(text=positive_categories + negative_categories, images=image, return_tensors="pt", padding=True)
    outputs = model(**inputs)
    logits_per_image = outputs.logits_per_image
    probs = logits_per_image.softmax(dim=1)

    positive_probs = probs[0][:len(positive_categories)]
    negative_probs = probs[0][len(positive_categories):]

    max_positive_prob = max(positive_probs).item()
    max_negative_prob = max(negative_probs).item()

    print(f"Image: {image_path}, Positive Probability: {max_positive_prob}, Negative Probability: {max_negative_prob}")

    return max_positive_prob, max_negative_prob

def classify_images(model:CLIPModel, processor:CLIPProcessor, folder_path, output_folder, positive_categories, negative_categories, threshold=0.5):
    """
    Classify images in a folder into 'similar' or 'landscape' categories based on positive and negative categories.

    Args:
        model: CLIP model
        processor: CLIP embedding model
        folder_path (str): Path to the folder containing images.
        output_folder (str): Path to the output folder.
        positive_categories (list): List of positive categories related to landscapes.
        negative_categories (list): List of negative categories not related to landscapes.
        threshold (float, optional): Threshold for classification probability, defaults to 0.5.
    """
    #create folders
    similar_folder = os.path.join(output_folder, 'similar')
    landscape_folder = os.path.join(output_folder, 'landscape')
    
    if not os.path.exists(similar_folder):
        os.makedirs(similar_folder)
    if not os.path.exists(landscape_folder):
        os.makedirs(landscape_folder)
    
    image_files = [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]
    total_images = len(image_files)
    similar_count, landscape_count = 0, 0
    
    for image_file in image_files:
        image_path = os.path.join(folder_path, image_file)
        positive_prob, negative_prob = is_landscape(model, processor, image_path, positive_categories, negative_categories)
        
        if positive_prob is None or negative_prob is None:
            continue
        
        if negative_prob > positive_prob:
            continue

        destination_folder = None
        #similar images (images that are hard to be classified, manual classification required)
        if abs(positive_prob - negative_prob) <= 0.1:
            destination_folder = similar_folder
            similar_count += 1

        #images that are strongly landscape
        elif positive_prob > threshold:
            destination_folder = landscape_folder
            landscape_count += 1
        
        if destination_folder:
            shutil.copy2(image_path, os.path.join(destination_folder, image_file))

    print(f"Total number of images: {total_images}")
    print(f"Number of images in 'similar' folder: {similar_count}")
    print(f"Number of images in 'landscape' folder: {landscape_count}")

In [None]:
model_name = "openai/clip-vit-base-patch32"
model = CLIPModel.from_pretrained(model_name)
processor = CLIPProcessor.from_pretrained(model_name)

positive_categories = ["mountain", "desert", "snow", "sea", "glacier", "beach"]
negative_categories = ["water", "city", "indoor", "parks", "grass", "urban", "pool", "stadium", "lake", "building", "street", "transport", "house", "shop", "garden", "traffic"]
folder_path = flicker_filtered_image
output_folder = '../../input/Flicker8k/Output/Zero_Shot'
classify_images(model, processor, folder_path, output_folder, positive_categories, negative_categories)

In [None]:
# Classify each image into the 5 Landscape class using CLIP
def classify_with_clip(model:CLIPModel, processor:CLIPProcessor, image_filename:str, image_dir:str, labels:dict):
    """
    Classify an image using CLIP model with ZeroShot.

    Args:
        model: CLIP model
        processor: CLIP embedding model
        image_filename (str): Filename of the image
        image_dir (str): Directory where image is in
        labels (dict): Labels for zero shot with a short description

    Returns:
        str: The determined image class.
    """
    image_path = os.path.join(image_dir, image_filename)
    if not os.path.exists(image_path):
        raise FileNotFoundError(f"No such file: {image_path}")

    #Read images
    image = Image.open(image_path)
    texts = list(labels.values()) #all labels
    
    #Embed image and run zeroshot
    inputs = processor(text=texts, images=image, return_tensors="pt", padding=True)
    outputs = model(**inputs)
    logits_per_image = outputs.logits_per_image
    probs = logits_per_image.softmax(dim=1)
    max_index = probs.argmax().item()
    chosen_class = list(labels.keys())[max_index]
    return chosen_class


def determine_image_class(model:CLIPModel, processor:CLIPProcessor, image_df:pd.DataFrame, image_dir:str, labels:dict):
    """
    Runs classification on image using CLIP model with ZeroShot for all images in dataframe

    Args:
        model: CLIP model
        processor: CLIP embedding model
        image_df (DataFrame): DataFrame containing all filepath of images
        image_dir (str): Directory where image is in
        labels (dict): Labels for zero shot with a short description

    Returns:
        str: The determined image class.
    """
    labels = image_df['label'].tolist()
    if len(set(labels)) == 1 and labels[0] in list(labels.keys()):
        return labels[0]
    else:
        return classify_with_clip(model, processor, image_df.name, image_dir, labels)

In [None]:
image_directory = '../../input/flicker30k/flicker30k_output_images'
df = pd.read_csv('../../input/Flicker30k/flick30k_filtered_result.csv')

model_name = "openai/clip-vit-base-patch32"
processor = CLIPProcessor.from_pretrained(model_name)
model = CLIPModel.from_pretrained(model_name)

target_classes = ['glacier', 'coast', 'forest', 'mountains', 'desert']

descriptive_texts = {
    'glacier': 'A vast body of ice',
    'coast': 'Where the land meets the sea',
    'forest': 'A dense collection of trees and plants',
    'mountains': 'Tall and rocky natural elevations',
    'desert': 'A hot, sandy, and arid region'
}


image_classes = df.groupby('image_filename').apply(determine_image_class, model=model, processor=processor, image_dir=image_directory, labels=descriptive_texts)

df['image_class'] = df['image_filename'].map(image_classes)
df.to_csv('../../input/Flicker30k/flick30k_filtered_result_updated.csv', index=False)
