In [138]:
%pip install -r data-requirements.txt

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
Defaulting to user installation because normal site-packages is not writeable
Collecting anyio==4.2.0
  Using cached anyio-4.2.0-py3-none-any.whl (85 kB)
Collecting arrow==1.3.0
  Using cached arrow-1.3.0-py3-none-any.whl (66 kB)
Collecting attrs==23.2.0
  Using cached attrs-23.2.0-py3-none-any.whl (60 kB)
Collecting Babel==2.14.0
  Using cached Babel-2.14.0-py3-none-any.whl (11.0 MB)
Collecting beautifulsoup4==4.12.3
  Using cached beautifulsoup4-4.12.3-py3-none-any.whl (147 kB)
Collecting bleach==6.1.0
  Using cached bleach-6.1.0-py3-none-any.whl (162 kB)
Collecting cffi==1.16.0
  Using cached cffi-1.16.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (443 kB)
Collecting comm==0.2.1
  Using cac

In [139]:
import json
import os
import typing as t

# data prep
import pandas as pd
import numpy as np
import pyodbc

# for creating image vector embeddings
from PIL import Image
from img2vec_pytorch import Img2Vec

# for creating semantic (text-based) vector embeddings
from sentence_transformers import SentenceTransformer

# for Redis
import redis
from redis.commands.search.field import (
    NumericField,
    TagField,
    TextField,
    VectorField,
)
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.query import Query

from dotenv import load_dotenv

# load connection info from .env
load_dotenv('../.env')
DB_SERVER=os.environ.get('DB_SERVER')
DB_NAME=os.environ.get('DB_NAME')
DB_USERNAME=os.environ.get('DB_USERNAME')
DB_PASSWORD=os.environ.get('DB_PASSWORD')
DB_LIMIT=int(os.environ.get('DB_LIMIT'))
REDIS_HOST=os.environ.get('REDIS_HOST')
REDIS_PORT=os.environ.get('REDIS_PORT')
REDIS_PASSWORD=os.environ.get('REDIS_PASSWORD')
REDIS_KEY=os.environ.get('REDIS_KEY')


In [140]:
# connect to database, load in data
connection_string = f'DRIVER={{ODBC Driver 18 for SQL Server}};SERVER={DB_SERVER};DATABASE={DB_NAME};UID={DB_USERNAME};PWD={DB_PASSWORD}'
conn = pyodbc.connect(connection_string) 

product_query = f'SELECT TOP {DB_LIMIT} [id],[gender],[masterCategory],[subCategory],[articleType],[baseColour],[season],[year],[usage],[productDisplayName] FROM [aidemo].[styles]'

df = pd.read_sql_query(product_query, conn)

# Display the DataFrame
df.info()

# Close the connection
conn.close()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 10 columns):
 #   Column              Non-Null Count  Dtype 
---  ------              --------------  ----- 
 0   id                  100 non-null    int64 
 1   gender              100 non-null    object
 2   masterCategory      100 non-null    object
 3   subCategory         100 non-null    object
 4   articleType         100 non-null    object
 5   baseColour          100 non-null    object
 6   season              100 non-null    object
 7   year                100 non-null    int64 
 8   usage               100 non-null    object
 9   productDisplayName  100 non-null    object
dtypes: int64(2), object(8)
memory usage: 7.9+ KB




In [141]:
df["product_text"] = df.apply(lambda row: f"name {row['productDisplayName']} category {row['masterCategory']} subcategory {row['subCategory']} color {row['baseColour']} gender {row['gender']}".lower(), axis=1)
df.rename({"id":"product_id"}, inplace=True, axis=1)

df.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 11 columns):
 #   Column              Non-Null Count  Dtype 
---  ------              --------------  ----- 
 0   product_id          100 non-null    int64 
 1   gender              100 non-null    object
 2   masterCategory      100 non-null    object
 3   subCategory         100 non-null    object
 4   articleType         100 non-null    object
 5   baseColour          100 non-null    object
 6   season              100 non-null    object
 7   year                100 non-null    int64 
 8   usage               100 non-null    object
 9   productDisplayName  100 non-null    object
 10  product_text        100 non-null    object
dtypes: int64(2), object(9)
memory usage: 8.7+ KB


In [142]:
# check out one of the texts we will use to create semantic embeddings
df["product_text"][0]

'name nike sahara team india fanwear round neck jersey category apparel subcategory topwear color blue gender men'

In [143]:
# Resnet-18 to create image embeddings
img2vec = Img2Vec(cuda=True)

# bert variant to create text embeddings
model = SentenceTransformer('sentence-transformers/all-distilroberta-v1')



In [144]:
def get_batch(seq, size):
    return (seq[pos:pos + size] for pos in range(0, len(seq), size))

def generate_image_vectors(products, image_base_path, batch_size=1000):
    output_dict={}

    for batch in get_batch(products, batch_size):
        product_ids = batch['product_id'].values.tolist()
        image_filenames = [image_base_path + "/" + str(_id) + ".jpg" for _id in product_ids]
        images=[]
        converted=[]

        for img_path, _id in zip(image_filenames, product_ids):
            try:
                img = Image.open(img_path).convert('RGB')
                img = img.resize((224, 224))
                images.append(img)
                converted.append(_id)
            except:
                #unable_to_convert -> skip to the next image
                continue

        #Generate vectors for all images in this batch
        vec_list = img2vec.get_vec(images)

        #update the dictionary to be returned
        batch_dict= dict(zip(converted, vec_list))
        output_dict.update(batch_dict)
        print(f"Processed {str(batch_size)} product images")

    return output_dict

def generate_text_vectors(products_df):
    text_vectors = {}
    # generate text vector
    for index, row in products_df.iterrows():
        text_vector = model.encode(row["product_text"])
        text_vectors[row["product_id"]] = text_vector.astype(np.float32)
    
    print(f"Processed {str(len(text_vectors))} product text fields")
    return text_vectors

# combine into a single json file
def combine_vector_dicts(txt_vectors, img_vectors, products):
    product_vectors = []
    for _, row in products.iterrows():
        try:
            _id = row["product_id"]
            text_vector = txt_vectors[_id].tolist()
            img_vector = img_vectors[_id].tolist()
            vector_dict = {
                "text_vector": text_vector,
                "img_vector": img_vector,
                "product_id": _id
            }
            product_vectors.append(vector_dict)
        except KeyError:
            continue
    return product_vectors

def write_product_vector_json(vector_dict):
    product_vector_json = json.dumps(vector_dict)
    with open("./product_vectors.json", "w") as f:
        f.write(product_vector_json)

def write_product_metadata_json(metadata):

    products_json = json.dumps(metadata)
    with open("./product_metadata.json", "w") as f:
        f.write(products_json)

def create_product_metadata(metadata_df):
    products = []
    for _, row in metadata_df.iterrows():
        product = {
            "product_id": row["product_id"],
            # create a text based representation to create a semantic embedding with
            "product_metadata": {
                "name": row["productDisplayName"],
                "gender": row["gender"],
                "master_category": row["masterCategory"],
                "sub_category": row["subCategory"],
                "article_type": row["articleType"],
                "base_color": row["baseColour"],
                "season": row["season"],
                "year": row["year"],
                "usage": row["usage"]
            }
        }
        products.append(product)

    return products

def create_redis_index(redis_client, vector_dim, text_dim):
    print("create index for product and vector data")
    schema = (
        TextField("$.product_id", no_stem=True, as_name="product_id"),
        TextField("$.gender", no_stem=True, as_name="gender"),
        NumericField("$.masterCategory", as_name="category"),
        TagField("$.subCategory", as_name="sub"),
        TextField("$.articleType", as_name="type"),
        TextField("$.baseColor", as_name="color"),
        TextField("$.season", as_name="season"),
        NumericField("$.year", as_name="year"),
        TextField("$.usage", as_name="usage"),
        TextField("$.productDisplayName", as_name="name"),
        VectorField(
            "$.image_embeddings",
            "HNSW",
            {
                "TYPE": "FLOAT32",
                "DIM": vector_dim,
                "DISTANCE_METRIC": "COSINE",
            },
            as_name="image_vectors",
        ),
        VectorField(
            "$.text_embeddings",
            "HNSW",
            {
                "TYPE": "FLOAT32",
                "DIM": text_dim,
                "DISTANCE_METRIC": "COSINE",
            },
            as_name="text_vectors",
        ),
    )
    definition = IndexDefinition(prefix=[f"{{{REDIS_KEY}}}:"], index_type=IndexType.JSON)
    
    try:
        res = redis_client.ft(f"idx:{REDIS_KEY}").create_index(
            fields=schema, definition=definition
        )
    except:
        print("index already exists")

def push_redis_data(redis_client, image_vectors, text_vectors, metadata):
    print("push JSON data to Redis")
    pipeline = redis_client.pipeline()
    index=0
    for index in range(len(metadata)):
        redis_key = f"{{{REDIS_KEY}}}:{metadata[index]['product_id']:03}"
        pipeline.json().set(redis_key, "$", metadata[index])
        pipeline.json().set(redis_key, "$.image_embeddings", image_vectors[metadata[index]['product_id']].tolist())
        pipeline.json().set(redis_key, "$.text_embeddings", text_vectors[metadata[index]['product_id']].tolist())
        if index%50==0:
            pipeline.execute()
    pipeline.execute()


In [145]:
#process vector and metadata for products
data_path = "../app/vecsim_app/static/images"
image_vectors = generate_image_vectors(df[:DB_LIMIT], data_path, DB_LIMIT)
text_vectors = generate_text_vectors(df[:DB_LIMIT])
vector_dict = combine_vector_dicts(text_vectors, image_vectors, df)
image_dim = [len(i) for i in image_vectors.values()][0]
text_dim = [len(i) for i in text_vectors.values()][0]

metadata = create_product_metadata(df[:DB_LIMIT])
#optional write to file system
write_product_metadata_json(metadata)
write_product_vector_json(vector_dict)

#setup Redis for product cache and VSS
vector_dim = len(vector_dict[1])
redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, decode_responses=True)
create_redis_index(redis_client, image_dim, text_dim)
push_redis_data(redis_client, image_vectors, text_vectors, metadata)

Processed 100 product images
Processed 100 product text fields
create index for product and vector data
push JSON data to Redis
[0.754553496837616, 0.057041652500629425, 0.07074171304702759, 3.784083604812622, 0.1362796425819397, 0.7385162711143494, 0.030697381123900414, 1.014151692390442, 1.8575726747512817, 0.3524724543094635, 0.3924204409122467, 0.13033971190452576, 1.762441635131836, 0.632201075553894, 0.02103620395064354, 0.09956343472003937, 0.41398030519485474, 0.0107567198574543, 0.0, 0.029510915279388428, 1.0963799953460693, 1.0885154008865356, 0.39776381850242615, 0.07661043852567673, 0.025821197777986526, 0.3557906448841095, 0.31822144985198975, 0.24479129910469055, 0.40759873390197754, 0.18144634366035461, 0.691885769367218, 1.6363680362701416, 1.2538657188415527, 0.21233877539634705, 4.239214897155762, 0.7709817290306091, 1.2878063917160034, 0.012281876988708973, 0.6171779632568359, 0.33751288056373596, 0.9071231484413147, 0.10093782097101212, 0.8229975700378418, 0.0184913