In [9]:
from mysql.connector import pooling
from tqdm import tqdm
from sklearn.metrics.pairwise import euclidean_distances
import spacy
import pandas as pd
import os
from collections import namedtuple
from dotenv import load_dotenv
load_dotenv()

True

In [10]:
ImageProperties = namedtuple('ImageProperties', ['name', 'hex_color', 'tags', 'make', 'orientation', 'width', 'height'])

In [11]:
preferences = {
    'Make': '',
    'ImageWidth': '',
    'ImageHeight': '',
    'Orientation': 1,
    'dominant_color': '#73AD3D',
    'tags': ['vase', 'toilet']
}

# Set SQL variables
sql_host = os.getenv("SQL_HOST")
sql_user = os.getenv("SQL_USER")
sql_password = os.getenv("SQL_PASSWORD")
sql_database = os.getenv("SQL_DATABASE")

# set the database config
config = {
    'user': sql_user,
    'password': sql_password,
    'host': sql_host,
    'port': '3306',
    'database': sql_database,
}

In [7]:
# Create a connection pool
connection_pool = pooling.MySQLConnectionPool(pool_name="mypool",
                                              pool_size=2,
                                              **config)

In [16]:
def get_metadata_from_mariadb_db():
    """
    Get the metadata from the MariaDB database

    :return: A pandas DataFrame with the metadata
    """
    # Open a connection to the database
    conn = connection_pool.get_connection()
    # Create a cursor
    c = conn.cursor()

    # Retrieve the metadata
    c.execute("""
        SELECT filename, GROUP_CONCAT(CONCAT(mkey, '\t', mvalue) SEPARATOR '\n') AS metadata
        FROM metadata
        GROUP BY filename;
    """)
    metadata = c.fetchall()

    # Close the connection
    conn.close()

    # Create an empty DataFrame with the desired columns
    columns = ['filename', 'Make', 'Software', 'ImageWidth', 'ImageHeight', 'Orientation', 'DateTimeOriginal',
               'dominant_color', 'tags']
    df = pd.DataFrame(columns=columns)

    # Fill the DataFrame with the metadata
    for image in tqdm(metadata, desc="Get metadata from database"):
        try:
            props = {'filename': image[0]}
            metadata_str = image[1].split('\n')
            for prop in metadata_str:
                if prop:
                    k, value = prop.split('\t')
                    if k in columns[1:]:
                        if k == 'dominant_color':
                            color_list = eval(value)
                            color_list = [c[0] for c in color_list]
                            props[k] = color_list
                        elif k == 'tags':
                            props[k] = eval(value)
                        else:
                            props[k] = value
            df = df.append(props, ignore_index=True)
        except Exception as e:
            print(e, image)

    return df

import ast

ImageProperties = namedtuple('ImageProperties', ['name', 'hex_color', 'tags', 'make', 'orientation', 'width', 'height'])

def get_metadata_from_mariadb_as_imageproperties():
    # Open a connection to the database
    conn = connection_pool.get_connection()
    # Create a cursor
    c = conn.cursor()

    # Retrieve the metadata
    c.execute("""
        SELECT DISTINCT filename, GROUP_CONCAT(CONCAT(mkey, '\t', mvalue) SEPARATOR '\n') AS metadata
        FROM metadata
        WHERE mkey IN ('Make', 'Orientation', 'ImageWidth', 'ImageHeight', 'tags', 'dominant_color')
        GROUP BY filename;
    """)
    metadata = c.fetchall()

    # Close the connection
    conn.close()

    # use the namedtuple ImageProperties to store the metadata
    images = []

    # Loop through the rows of metadata
    for row in metadata:
        filename, metadata_str = row
        metadata_items = metadata_str.split('\n')
        metadata_dict = {key: value for key, value in (item.split('\t') for item in metadata_items)}

        # Clean dominant colors: convert the string to a list of tuples and extract only the color hex codes
        dominant_colors = ast.literal_eval(metadata_dict.get('dominant_color', '[]'))
        hex_colors = [color[0] for color in dominant_colors]

        # Clean tags: convert the string to a list of strings
        tags = ast.literal_eval(metadata_dict.get('tags', '[]'))

        # Create an ImageProperties object for each row
        image = ImageProperties(
            name=filename,
            hex_color=hex_colors,
            tags=tags,
            make=metadata_dict.get('Make', None),
            orientation=metadata_dict.get('Orientation', None),
            width=metadata_dict.get('ImageWidth', None),
            height=metadata_dict.get('ImageHeight', None)
        )

        # Add the ImageProperties object to the list
        images.append(image)

    return images

ImageProperties(name='image_0.jpg', hex_color=['#15170e', '#4a6423', '#7b9652', '#2d3618'], tags=['person'], make='Canon', orientation='0', width='4272', height='2848')
ImageProperties(name='image_1.jpg', hex_color=['#274210', '#e0e2e3', '#657f4f', '#acb5a7'], tags=[], make='Panasonic', orientation='0', width='3000', height='4000')
ImageProperties(name='image_10.jpg', hex_color=['#d9d9d9', '#474747', '#bebebe', '#cecece'], tags=['bird'], make='SONY', orientation='1', width='6000', height='4000')
ImageProperties(name='image_100.jpg', hex_color=['#243a45', '#e4d3de', '#081f11', '#4c688d'], tags=[], make='SONY', orientation='0', width='7952', height='5304')
ImageProperties(name='image_101.jpg', hex_color=['#100e1d', '#2e3248', '#1d1e2f', '#5c6d8f'], tags=[], make='NIKON CORPORATION', orientation='8', width='6000', height='4000')
ImageProperties(name='image_102.jpg', hex_color=['#789cba', '#4779a1', '#a5b9cc', '#0c4b6b'], tags=['person', 'surfboard'], make='Canon', orientation='0', width='

In [None]:
def hex_to_rgb(color):
    try:
        # remove the # from the color
        color = color[1:]
        # convert the color to rgb values
        rgb = tuple(int(color[i:i + 2], 16) for i in (0, 2, 4))
        return rgb
    except:
        return 0, 0, 0


def get_clean_preferences(df_preferences):
    # remove the rows with nan in dominant_color
    df_preferences = df_preferences.dropna(subset=['dominant_color'])
    # split dominant color into 4 columns and remove the dominant_color column
    # convert the tags column to a list of strings
    # Replace all NaN values with empty strings with the fillna() method
    df_preferences = df_preferences.fillna(0)
    # convert colors to rgb values
    df_preferences['dominant_color'] = df_preferences['dominant_color'].apply(lambda x: hex_to_rgb(x))
    # replace all 0 values with empty strings
    df_preferences['dominant_color'] = df_preferences['dominant_color'].replace(0, '')

    return df_preferences


def get_clean_dataset():
    metadata = get_metadata_from_mariadb_db()
    df_metadata = pd.DataFrame(metadata)
    # remove the rows with nan in dominant_color
    df_metadata = df_metadata.dropna(subset=['dominant_color'])
    # split dominant color into 4 columns and remove the dominant_color column
    if 'dominant_color' in df_metadata.columns:
        df_metadata['color1'] = df_metadata['dominant_color'].apply(lambda x: x[0] if len(x) >= 1 else 0)
        df_metadata['color2'] = df_metadata['dominant_color'].apply(lambda x: x[1] if len(x) >= 2 else 0)
        df_metadata['color3'] = df_metadata['dominant_color'].apply(lambda x: x[2] if len(x) == 3 else 0)
        df_metadata['color4'] = df_metadata['dominant_color'].apply(lambda x: x[3] if len(x) == 4 else 0)
        # convert colors to rgb values
        df_metadata['color1'] = df_metadata['color1'].apply(lambda x: hex_to_rgb(x) if x else (0, 0, 0))
        df_metadata['color2'] = df_metadata['color2'].apply(lambda x: hex_to_rgb(x) if x else (0, 0, 0))
        df_metadata['color3'] = df_metadata['color3'].apply(lambda x: hex_to_rgb(x) if x else (0, 0, 0))
        df_metadata['color4'] = df_metadata['color4'].apply(lambda x: hex_to_rgb(x) if x else (0, 0, 0))
        df_metadata = df_metadata.drop('dominant_color', axis=1)
    else:
        df_metadata['color1'] = 0
        df_metadata['color2'] = 0
        df_metadata['color3'] = 0
        df_metadata['color4'] = 0

    # convert the tags column to a list of strings
    df_metadata = df_metadata.fillna(0)
    # remove all columns except filename, tags, color1, color2, color3, color4, Make, Width, Height
    df_metadata = df_metadata[
        ['filename', 'Make', 'ImageWidth', 'ImageHeight', 'Orientation', 'DateTimeOriginal', 'tags', 'color1', 'color2',
         'color3', 'color4']]
    # replace all 0 values with empty strings
    df_metadata['Make'] = df_metadata['Make'].replace(0, '')

    return df_metadata

In [None]:
df_pref = pd.DataFrame([preferences])
df_preferences = get_clean_preferences(df_pref)
df_preferences.head()

In [None]:
df_metadata = get_clean_dataset()

In [None]:
df_metadata.head()

# Color Similarity

In [None]:
def recommend_colors(df_metadata, df_preferences, n=0):
    # Load the dataset into a Pandas DataFrame
    data = df_metadata.copy()

    # Extract the individual r, g, and b values from tupbles in the color columns
    data[['r1', 'g1', 'b1']] = pd.DataFrame(data['color1'].tolist(), index=data.index)
    data[['r2', 'g2', 'b2']] = pd.DataFrame(data['color2'].tolist(), index=data.index)
    data[['r3', 'g3', 'b3']] = pd.DataFrame(data['color3'].tolist(), index=data.index)
    data[['r4', 'g4', 'b4']] = pd.DataFrame(data['color4'].tolist(), index=data.index)

    # Normalize the r, g, and b columns to be between 0 and 1
    data[['r1', 'g1', 'b1', 'r2', 'g2', 'b2', 'r3', 'g3', 'b3', 'r4', 'g4', 'b4']] = data[['r1', 'g1', 'b1', 'r2', 'g2',
                                                                                           'b2', 'r3', 'g3', 'b3', 'r4',
                                                                                           'g4', 'b4']] / 255

    # Normalize the input RGB color to be between 0 and 1
    r, g, b = df_preferences['dominant_color'][0]
    r_norm, g_norm, b_norm = r / 255, g / 255, b / 255

    # Compute the Euclidean distance between the input color and all the colors in the dataset
    data['similarity_dominant_color'] = euclidean_distances(
        [[r_norm, g_norm, b_norm, r_norm, g_norm, b_norm, r_norm, g_norm, b_norm, r_norm, g_norm, b_norm]],
        data[['r1', 'g1', 'b1', 'r2', 'g2', 'b2', 'r3', 'g3', 'b3', 'r4', 'g4', 'b4']])[0]

    # Sort the dataset by Euclidean distance in ascending order and return the top 10 closest matches
    if n == 0:
        closest_matches = data.sort_values('similarity_dominant_color', ascending=True)[
            ['filename', 'color1', 'color2', 'color3', 'color4', 'similarity_dominant_color']]
    else:
        closest_matches = data.sort_values('similarity_dominant_color', ascending=True).head(n)[
            ['filename', 'color1', 'color2', 'color3', 'color4', 'similarity_dominant_color']]

    return closest_matches

In [None]:
recommend_colors(df_metadata, df_preferences)  # OK

# Tag Similarity

In [None]:
def recommend_tags(df_metadata, df_preferences, n=0, nlp=None):
    # Load the spaCy model if it hasn't been loaded
    if not nlp:
        nlp = spacy.load("en_core_web_md")

    # Define the preferences list and the dataframe
    preferences = df_preferences['tags'][0]
    # Load dataset with words and drop duplicate rows
    df = df_metadata.copy()
    df = df.dropna(subset=["tags"]).reset_index(drop=True)
    # replace int with empty list
    df['tags'] = df['tags'].apply(lambda x: x if x else [])

    # Precompute the similarity between each tag word and each preference word
    similarity_dict = {}
    for tag_word in set([word for tags in df['tags'] for word in tags]):
        for pref_word in set(preferences):
            similarity_dict[(tag_word, pref_word)] = nlp(tag_word).similarity(nlp(pref_word))

    # Compute the average similarity for each row in the dataframe
    similarities = []
    for tags in df['tags']:
        sum_similarity = 0
        for tag_word in tags:
            for pref_word in preferences:
                sum_similarity += similarity_dict[(tag_word, pref_word)]
        avg_similarity = sum_similarity / (len(tags) * len(preferences)) if len(tags) > 0 else 0
        similarities.append(avg_similarity)

    # Add the similarity scores to a new column in the dataframe
    df['similarity_tags'] = similarities
    if n == 0:
        closest_matches = df.sort_values('similarity_tags', ascending=False)[
            ['filename', 'similarity_tags']]
    else:
        closest_matches = df.sort_values('similarity_tags', ascending=False).head(n)[
            ['filename', 'similarity_tags']]

    return closest_matches


In [None]:
recommend_tags(df_metadata, df_preferences)  # OK

# Make Similarity

In [None]:
def recommend_make(df_metadata, df_preferences, n=0):
    # Load the spaCy model
    nlp = spacy.load("en_core_web_md")

    # Define the preferences list and the dataframe
    make = df_preferences['Make'][0]
    # Load dataset with words and drop duplicate rows
    df = df_metadata.copy()
    df = df.dropna(subset=["Make"]).reset_index(drop=True)

    # Convert make and Make to document objects
    make_doc = nlp(make)
    df['Make'] = df['Make'].apply(nlp)

    # Compute the cosine similarity between the make preferences and all the makes in the dataset
    similarities = [make_doc.similarity(doc) for doc in df['Make']]

    # Add the similarity scores to a new column in the dataframe
    df['similarity_make'] = similarities
    if n == 0:
        closest_matches = df.sort_values('similarity_make', ascending=False)[
            ['filename', 'similarity_make']]
    else:
        closest_matches = df.sort_values('similarity_make', ascending=False).head(n)[
            ['filename', 'similarity_make']]

    return closest_matches


In [None]:
recommend_make(df_metadata, df_preferences)  # OK

# Orientation Similarity

In [None]:
def recommend_orientation(df_metadata, df_preferences, n=0):
    # Define the preferences list and the dataframe
    orientation = df_preferences['Orientation'][0]
    # Load dataset with words and drop duplicate rows
    df = df_metadata.dropna(subset=["Orientation"]).reset_index(drop=True)
    # if Orientation contain '' or '0' or '1' then replace with 0 or 1
    df['Orientation'] = df['Orientation'].apply(lambda x: 0 if x == '' or x == '0' else 1)

    # Convert the Orientation column to integer type
    df['Orientation'] = df['Orientation'].astype(int)

    # Orientation is 0 or 1, so we can just subtract the preference from the orientation
    df['similarity_orientation'] = df['Orientation'].apply(lambda x: abs(x - orientation))

    # sort by similarity
    if n > 0:
        closest_matches = df.sort_values('similarity_orientation', ascending=False).head(n)[
            ['filename', 'similarity_orientation']]
    else:
        closest_matches = df.sort_values('similarity_orientation', ascending=False)[
            ['filename', 'similarity_orientation']]

    return closest_matches


In [None]:
recommend_orientation(df_metadata, df_preferences)  # OK

# Size Similarity

In [None]:
def recommend_size(df_metadata, df_preferences, n=0):
    # Define the preferences list and the dataframe
    width = int(df_preferences['ImageWidth'][0])
    height = int(df_preferences['ImageHeight'][0])
    # Load dataset with words and drop duplicate rows
    df = df_metadata.dropna(subset=["ImageWidth", "ImageHeight"]).reset_index(drop=True)

    # Convert the ImageWidth and ImageHeight column to integer type
    df[['ImageWidth', 'ImageHeight']] = df[['ImageWidth', 'ImageHeight']].astype(int)

    # Compute the product of width and height outside the loop
    product = width * height

    # Use apply method to compute similarity score for each row
    df['similarity_size'] = df.apply(lambda x: 1 - abs(product - (x['ImageWidth'] * x['ImageHeight'])) / product, axis=1)

    if n == 0:
        closest_matches = df.sort_values('similarity_size', ascending=False)[
            ['filename', 'similarity_size']]
    else:
        closest_matches = df.sort_values('similarity_size', ascending=False).head(n)[
            ['filename', 'similarity_size']]

    return closest_matches


In [None]:
recommend_size(df_metadata, df_preferences)  # OK

In [None]:
def recommend(df_metadata, df_preferences, n=0):
    # Assign weights to properties based on user preferences
    weights = {
        'Make': float(5.0),
        'ImageWidth': float(1.0),
        'ImageHeight': float(1.0),
        'Orientation': float(2.0),
        'dominant_color': float(3.0),
        'tags': float(5.0)
    }

    # Create a dictionary with the preferences and the corresponding recommendation methods
    preference_methods = {
        'Make': recommend_make,
        'ImageWidth': recommend_size,
        'ImageHeight': recommend_size,
        'Orientation': recommend_orientation,
        'dominant_color': recommend_colors,
        'tags': recommend_tags
    }

    # Remove preferences with no values
    preferences = {k: v for k, v in df_preferences.squeeze().to_dict().items() if v != ''}

    # Calculate the sum of the weights
    weights_sum = 0
    for key in weights:
        weights_sum += weights[key]
    for key in weights:
        weights[key] = weights[key] / weights_sum

    # Calculate similarity score for each property
    df_metadata['similarity_score'] = 0.0
    for preference, value in preferences.items():
        method = preference_methods[preference]
        if preference == 'ImageWidth' or preference == 'ImageHeight':
            similarity = method(df_metadata, df_preferences, n)['similarity_size'].astype(float)
        else:
            similarity = method(df_metadata, df_preferences, n)[f'similarity_{preference.lower()}'].astype(float)
        df_metadata['similarity_score'] += similarity * (weights[preference] / weights_sum)

    # Replace NaN values in the 'similarity_score' column with 0
    df_metadata['similarity_score'].fillna(0, inplace=True)

    # Sort by similarity score
    if n == 0:
        closest_matches = df_metadata.sort_values('similarity_score', ascending=False)[
            ['filename', 'similarity_score']]
    else:
        closest_matches = df_metadata.sort_values('similarity_score', ascending=False).head(n)[
            ['filename', 'similarity_score']]

    return closest_matches


In [None]:
recommend(df_metadata, df_preferences)  # OK



# Test


In [39]:
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

data = get_metadata_from_mariadb_as_imageproperties()

preferences = {
    'Make': 'Canon',
    'ImageWidth': '',
    'ImageHeight': '',
    'Orientation': 1,
    'dominant_color': '#00000',
    'tags': ['cat']
}


def hex_to_rgb(hex_color):
    return tuple(int(hex_color[i:i+2], 16) for i in (1, 3, 5))


def preprocess_data(data):
    for image in data:
        image_tags = " ".join(image.tags)
        avg_rgb_color = np.mean([hex_to_rgb(color) for color in image.hex_color], axis=0)
        try:
            make_len = len(image.make)
        except TypeError:
            make_len = 0

        try:
            width = int(image.width)
        except TypeError:
            width = 0

        try:
            height = int(image.height)
        except TypeError:
            height = 0

        try:
            orientation = int(image.orientation)
        except:
            orientation = 0

        avg_rgb_color_list = avg_rgb_color.tolist() if hasattr(avg_rgb_color, 'tolist') else [avg_rgb_color]

        try:
            yield np.array([
                *avg_rgb_color_list,  # Ensure avg_rgb_color is an iterable before unpacking
                len(image_tags),
                make_len,
                orientation,
                width,
                height
            ])
        except:
            pass


def user_preferences_vector(preferences):
    dominant_color = hex_to_rgb(preferences['dominant_color'])
    tags = " ".join(preferences['tags'])
    make = preferences['Make']

    try:
        make_len = len(make)
    except TypeError:
        make_len = 0

    orientation = preferences['Orientation']

    try:
        width = int(preferences['ImageWidth'])
    except (TypeError, ValueError):
        width = 0

    try:
        height = int(preferences['ImageHeight'])
    except (TypeError, ValueError):
        height = 0

    return np.array([
        *dominant_color,
        len(tags),
        make_len,
        orientation,
        width,
        height
    ])


def recommend_images(preferences, data, top_n=10):
    preprocessed_data = np.array(list(preprocess_data(data)))
    user_vector = user_preferences_vector(preferences)
    similarity_matrix = cosine_similarity(np.vstack([user_vector, preprocessed_data]))
    most_similar_indices = np.argsort(-similarity_matrix[0])[1:top_n+1]
    return [data[i] for i in most_similar_indices]

# Test the recommender system
recommended_images = recommend_images(preferences, data)
for image in recommended_images:
    print(image.name)

ImageProperties(name='image_808.jpg', hex_color=[], tags=[], make='', orientation='1', width='3584', height='3584')
image_963.jpg
image_747.jpg
image_50.jpg
image_435.jpg
image_648.jpg
image_328.jpg
image_728.jpg
image_903.jpg
image_764.jpg
image_393.jpg


  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
