In [None]:
import torch
from PIL import Image
from transformers import AutoImageProcessor, AutoModel, AutoTokenizer
import numpy as np

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = AutoModel.from_pretrained("openai/clip-vit-base-patch16").to(device)
processor = AutoImageProcessor.from_pretrained("openai/clip-vit-base-patch16")
tokenizer = AutoTokenizer.from_pretrained("openai/clip-vit-base-patch16")

In [None]:
import pandas as pd

dataset = pd.read_json("description.json")
dataset["new_name"]

In [None]:
import cv2

dataset['text_embeddings'] = dataset.apply(
    lambda x: model.get_text_features(
        **tokenizer([x["description"]], truncation=True, return_tensors="pt").to(device)
    )[0]
    .detach()
    .cpu()
    .numpy(), 
    axis=1
)
dataset['image_embedding'] = dataset.apply(
    lambda x: model.get_image_features(**processor([cv2.imread(x["new_name"])], return_tensors="pt").to(device))[0]
        .detach()
        .cpu()
        .numpy(),
        axis=1
)
dataset

In [None]:
from PIL import Image

In [None]:
a = dataset.apply(
    lambda x: cv2.imread(x["new_name"]), axis=1
)

a 

In [None]:
from elasticsearch import Elasticsearch

# Connect to Elasticsearch
es = Elasticsearch([{'host': 'es-test.aws.primehub.io', 'port': 9200, 'scheme': 'http'}],
                   http_auth=('elastic', '=='))

# Define the index settings and mappings
index_settings = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 1
    },
    "mappings": {
        "properties": {
            "content": {
                "type": "text",
                "analyzer": "smartcn"
            },
            "content_vector": {
                "type": "dense_vector",
                "dims": 512  # Replace with the actual number of dimensions
            }
        }
    }
}
index_name = 'image-text-search-v1'
# Delete the index if it exists
if es.indices.exists(index=index_name):
    es.indices.delete(index=index_name)
    print(f"Deleted existing index: {index_name}")
else:
    print(f"Index {index_name} does not exist.")


es.indices.create(index=index_name, body=index_settings)
print(f"Created index: {index_name}")

In [None]:
import os 

folder_path = './'  # Replace with your folder path

for index, row in dataset.iterrows():
    # Construct the document
    doc = {
        "content": row["new_name"],
        "content_vector": row["image_embedding"]
    }

    # # Index the document
    es.index(index=index_name, body=doc)

    print(f"Indexed row {index+1}/{len(dataset)}")

In [None]:
query_vector = model.get_text_features(
        **tokenizer(["fish"], truncation=True, return_tensors="pt")).detach().cpu().numpy()
search_query = {
    "query": {
        "knn": {
            "field": "content_vector",
            "query_vector": query_vector,
            "num_candidates": 1
        }
    }
}

response = es.search(index=index_name, body=search_query)
for hit in response['hits']['hits']:
    print(hit['_source'])