In [1]:
import os

lst_images = os.listdir('../images')
lst_images

['image2.png',
 'image0.png',
 'image3.png',
 'image6.jpeg',
 'image5.jpg',
 'image4.jpg',
 'image1.jpg']

In [2]:
# use chromadb for the same
import chromadb

client = chromadb.PersistentClient('./db')
client.list_collections()

[Collection(name=clip_embeddings)]

In [29]:
from chromadb.utils.embedding_functions import OpenCLIPEmbeddingFunction
from chromadb.utils.data_loaders import ImageLoader
data_loader = ImageLoader()


In [None]:
embedding_function = OpenCLIPEmbeddingFunction('ViT-B-16-SigLIP', 'webli')

# device='cuda' for GPU


In [32]:
client.delete_collection('clip_embeddings')

In [33]:
collection = client.get_or_create_collection(name='clip_embeddings', embedding_function=embedding_function, data_loader=data_loader)

In [34]:
from PIL import Image
import numpy as np

collection.add(
    ids=lst_images,
    uris=[os.path.join('../images', img) for img in lst_images],
    metadatas=[{'image': img} for img in lst_images],
)

In [37]:
results = collection.query(
    query_texts=["a photo of staircase"],
    n_results=5,
)
results

{'ids': [['image0.png',
   'image1.jpg',
   'image5.jpg',
   'image4.jpg',
   'image6.jpeg']],
 'embeddings': None,
 'documents': [[None, None, None, None, None]],
 'uris': None,
 'data': None,
 'metadatas': [[{'image': 'image0.png'},
   {'image': 'image1.jpg'},
   {'image': 'image5.jpg'},
   {'image': 'image4.jpg'},
   {'image': 'image6.jpeg'}]],
 'distances': [[1.801679647785668,
   1.966377784601841,
   1.974831337251781,
   1.982125198556581,
   1.9870082452673323]],
 'included': [<IncludeEnum.distances: 'distances'>,
  <IncludeEnum.documents: 'documents'>,
  <IncludeEnum.metadatas: 'metadatas'>]}

In [41]:
results['ids'][0]
results['metadatas'][0]

[{'image': 'image0.png'},
 {'image': 'image1.jpg'},
 {'image': 'image5.jpg'},
 {'image': 'image4.jpg'},
 {'image': 'image6.jpeg'}]

In [None]:
# update old image with a new image

collection.update(
    ids=lst_images[0],
    uri=os.path.join('../images', 'staircase.jpg'),
    metadata={'image': 'staircase.jpg'}
)

In [None]:
# format
# {
#     '12_34_180': {
#         'image_path': '12_34_180.jpg',
#         'label': '12_34_180'
#     },
#     '12_34_270': {
#         'image_path': '12_34_270.jpg',
#         'label': '12_34_270'
#     }
# }

def store_images(data, collection_name='embeddings'):    
    client = chromadb.PersistentClient('./db')
    collection = client.get_or_create_collection(name=collection_name, embedding_function=embedding_function, data_loader=data_loader)
    collection.add(
        ids=data.keys(),        # list of x_y_yaw strings
        uris=[node['image_path'] for node in data],
        metadatas=[data[node] for node in data]
    )
    
def update_images(new_data, collection_name='embeddings'):
    client = chromadb.PersistentClient('./db')
    collection = client.get_or_create_collection(name=collection_name, embedding_function=embedding_function, data_loader=data_loader)
    for node in new_data:
        collection.update(
            ids=node,
            uri=new_data[node]['image_path'],
            metadata=new_data[node]
        )
        
def query_images(query_text, n_results=20, collection_name='embeddings'):
    client = chromadb.PersistentClient('./db')
    collection = client.get_or_create_collection(name=collection_name, embedding_function=embedding_function, data_loader=data_loader)
    results = collection.query(
        query_texts=[query_text],
        n_results=n_results,
    )
    
    return results

In [None]:
import asyncio
import aiohttp
import pandas as pd
import json
from tqdm.asyncio import tqdm_asyncio
import os
import gc

# Configuration
IMAGE_FOLDER = "images_test"
OUTPUT_FOLDER = "output/"

OPENAI_API_KEY = "EMPTY"
OPENAI_API_BASE = "http://0.0.0.0:8080/v1"
MODEL_NAME = "allenai/Molmo-7B-D-0924"

CONCURRENT_REQUESTS = 10  # Number of concurrent API requests

# Prompt Template
PROMPT = """
ONLY ONE IMAGE IS PROVIDED TO YOU. Extract textual features notably height, Depth, Width, Maximum Weight Recommendation, Item Weight, Voltage, Wattage, Item_volume whichever visible STRICTLY.
"""

async def fetch(session, semaphore, image_url, index, entity_name):
    """
    Asynchronously fetch the API response for a single image.
    """
    async with semaphore:
        try:
            payload = {
                "model": MODEL_NAME,
                "messages": [{
                    "role": "user",
                    "content": [
                        {"type": "text", "text": PROMPT},
                        {"type": "image_url", "image_url": {"url": image_url}},
                    ],
                }],
            }

            headers = {
                "Authorization": f"Bearer {OPENAI_API_KEY}",
                "Content-Type": "application/json",
            }

            # Set a specific timeout for the request
            request_timeout = aiohttp.ClientTimeout(total=5)  # 5-second timeout

            async with session.post(f"{OPENAI_API_BASE}/chat/completions", json=payload, headers=headers, timeout=request_timeout) as response:
                if response.status == 200:
                    data = await response.json()
                    # Adjust based on actual response structure
                    vlm_output = data['choices'][0]['message']['content']
                else:
                    vlm_output = f"Error: {response.status}"
        except asyncio.TimeoutError:
            vlm_output = "Timeout Error: Request took longer"
        except Exception as e:
            vlm_output = f"Exception: {str(e)}"

        return {
            'index': int(index),
            'entity_name': entity_name,
            'vlm_output': vlm_output
        }

async def process_images(session, semaphore, images, start_idx):
    """
    Process a list of images asynchronously.
    """
    tasks = []
    for i, image_url in enumerate(images):
        idx = start_idx + i
        entity_name = df_test.at[idx, 'entity_name']
        tasks.append(fetch(session, semaphore, image_url, df_test.at[idx, 'index'], entity_name))
    
    results = await asyncio.gather(*tasks)
    return results

async def main_async():
    """
    Main asynchronous function to process all images.
    """
    semaphore = asyncio.Semaphore(CONCURRENT_REQUESTS)
    connector = aiohttp.TCPConnector(limit=CONCURRENT_REQUESTS)
    timeout = aiohttp.ClientTimeout(total=None)  # Adjust timeout as needed

    cumulative_results = []
    batch_counter = 1

    async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
        for start in tqdm_asyncio(range(0, len(image_paths), BATCH_SIZE), desc="Processing Batches"):
            end = min(start + BATCH_SIZE, len(image_paths))
            batch_images = image_paths[start:end]
            batch_results = await process_images(session, semaphore, batch_images, start)

            cumulative_results.extend(batch_results)

            # Write to file
            output_path = os.path.join(OUTPUT_FOLDER, f'batch_output_{batch_counter}.json')
            with open(output_path, 'w') as outfile:
                json.dump(batch_results, outfile, indent=4)
            
            print(f"----- Batch {batch_counter} saved with {len(batch_results)} results. -----")
            batch_counter += 1

            # Clear memory
            cumulative_results = []
            gc.collect()

    print("All batches processed successfully.")

if __name__ == "__main__":
    asyncio.run(main_async())
    