## Colab helper functions

In [None]:
""" Delete a folder """

import os
import shutil

folder_path = '/content/image_dataset'

# Attempt to delete individual files
for filename in os.listdir(folder_path):
    file_path = os.path.join(folder_path, filename)
    try:
        os.remove(file_path)  # Remove file
    except Exception as e:
        print(f"Error deleting {file_path}: {e}")

# Now delete the folder
shutil.rmtree(folder_path)


In [None]:
""" Download a folder with its contanees """

import shutil
from google.colab import files

# Specify the path to the folder you want to download
folder_path = '/content/image_dataset'
# Specify the name for the zip file (without extension)
zip_file_name = 'image_dataset'

# Create a zip file of the folder
shutil.make_archive(zip_file_name, 'zip', folder_path)

# Download the zip file
files.download(zip_file_name + '.zip')


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

## Initialize

In [None]:
!pip install -q datasets weaviate-client transformers torch pillow open_clip_torch
print("Done!")

# After this, restart the session


Done!


## Download datasets

In [None]:
from datasets import load_dataset
from tqdm.notebook import tqdm
from PIL import Image
import soundfile as sf
import os
import json


In [None]:
# Image
image_dataset = load_dataset("Multimodal-Fatima/COCO_captions_test")

# Audio
# audio_dataset = load_dataset("mshah1/speech_robust_bench", "accented_cv")

# Text
# We use the text part of the COCO dataset for text_dataset (for relevant texts)
# text_dataset = []


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]

data/test-00000-of-00002-e0f1e8b674a1af9(…):   0%|          | 0.00/411M [00:00<?, ?B/s]

data/test-00001-of-00002-dc43b445e1598d9(…):   0%|          | 0.00/413M [00:00<?, ?B/s]

Generating test split:   0%|          | 0/5000 [00:00<?, ? examples/s]

In [None]:
""" Save images locally """

def save_images(dataset, dataset_folder, num_images=1000):
    for i in tqdm(range(num_images), desc="Saving images"):
        image = dataset['test'][i]['image']
        image = image.resize((224, 224), Image.BILINEAR)
        image.save(os.path.join(dataset_folder, f'image_{i+1}.png'))


image_dataset_folder = '/content/image_dataset'
os.makedirs(image_dataset_folder, exist_ok=True)
save_images(image_dataset, image_dataset_folder, num_images=5000)

print(f"\nSaved the first 1000 images to {image_dataset_folder}")


Saving images:   0%|          | 0/5000 [00:00<?, ?it/s]


Saved the first 1000 images to /content/image_dataset


In [None]:
""" Save two longest sentences from dataset """

def save_longest_sentences(dataset, num_items=1000, top_k=2):
    longest_sentences = []
    for i in tqdm(range(num_items), desc="Extracting longest sentences"):
        sentences = dataset['test'][i]['sentences_raw']  # A list of strings
        if sentences:  # Check it's not empty
            # Sort by length (longest first)
            sorted_sentences = sorted(sentences, key=len, reverse=True)
            # Take top_k sentences (e.g., 2)
            top_sentences = sorted_sentences[:top_k]
            longest_sentences.extend(top_sentences)

    return longest_sentences


text_dataset = save_longest_sentences(image_dataset, num_items=5000, top_k=2)

print(f"\n✅ Extracted {len(text_dataset)} longest sentences successfully.")


Extracting longest sentences:   0%|          | 0/5000 [00:00<?, ?it/s]


✅ Extracted 10000 longest sentences successfully.


## Config weaviate database

In [None]:
import weaviate
import torch
import base64
from PIL import Image
import io
import requests
import json
from typing import List, Dict, Any
import numpy as np
import os
import weaviate
from weaviate.classes.init import Auth
from weaviate.classes.config import Property, DataType, Configure


In [None]:
# Best practice: store your credentials in environment variables
# Find these on your cluster on weaviate
weaviate_url = "..."
weaviate_api_key = "..."

# Connect to Weaviate Cloud
weaviate_client = weaviate.connect_to_weaviate_cloud(
    cluster_url=weaviate_url,
    auth_credentials=Auth.api_key(weaviate_api_key),
)

print(weaviate_client.is_ready())


True


In [None]:
# حذف کالکشن قبلی (اختیاری)
try:
    weaviate_client.collections.delete("MultimodalItem")
    print("Deleted existing collection ✅")
except:
    print("No existing collection to delete")

# ایجاد کالکشن جدید
weaviate_client.collections.create(
    name="MultimodalItem",
    description="Stores text, image, and audio items with embeddings",
    vectorizer_config=Configure.Vectorizer.none(),
    properties=[
        Property(name="contentId", data_type=DataType.TEXT),
        Property(name="modality", data_type=DataType.TEXT),
        Property(name="filePath", data_type=DataType.TEXT),
        Property(name="content", data_type=DataType.TEXT),
    ],
)

print("Collection created successfully ✅")

# گرفتن کالکشن
collection = weaviate_client.collections.get("MultimodalItem")


Deleted existing collection ✅
Collection created successfully ✅


In [None]:
count_result = collection.aggregate.over_all(total_count=True)
print("🔢 Total items stored:", count_result.total_count)


🔢 Total items stored: 0


## Initialize CLIP model

In [None]:
# embedder_openclip.py
from typing import Union
import numpy as np
from PIL import Image
import torch
import open_clip

# --- تنظیمات ---
MODEL_NAME = "ViT-B-32"       # یا ViT-B-16، RN50 و ... (بسته به موجود بودن در open_clip)
PRETRAINED = "openai"         # یا مدل‌های دیگر که open-clip پشتیبانی کند
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
# -----------------

# لود مدل و توابع پردازش
clip_model, _, preprocess = open_clip.create_model_and_transforms(MODEL_NAME, pretrained=PRETRAINED)
tokenizer = open_clip.get_tokenizer(MODEL_NAME)
clip_model.to(DEVICE)
clip_model.eval()

def _to_numpy(t: torch.Tensor) -> np.ndarray:
    """convert tensor (1 x D) -> 1D numpy array"""
    return t.detach().cpu().numpy().reshape(-1)

def embed_text(text: str) -> np.ndarray:
    """
    دریافت یک رشته و بازگرداندن embedding نرمالایز شده (1D numpy array).
    """
    if not isinstance(text, str):
        raise ValueError("text باید یک رشته (str) باشد.")
    # توکنایز
    tokens = tokenizer([text]).to(DEVICE)
    with torch.no_grad():
        text_features = clip_model.encode_text(tokens)       # shape: (1, D)
        # نرمالیزه کن (unit vector)
        text_features = text_features / text_features.norm(dim=-1, keepdim=True)
    return _to_numpy(text_features)

def embed_image(image_path: str) -> np.ndarray:
    """
    دریافت مسیر تصویر، لود تصویر، پردازش و بازگرداندن embedding نرمالایز شده (1D numpy array).
    """
    if not isinstance(image_path, str):
        raise ValueError("image_path باید یک مسیر (str) باشد.")
    # load image
    img = Image.open(image_path).convert("RGB")
    x = preprocess(img).unsqueeze(0).to(DEVICE)   # shape: (1, C, H, W)
    with torch.no_grad():
        image_features = clip_model.encode_image(x)     # shape: (1, D)
        image_features = image_features / image_features.norm(dim=-1, keepdim=True)
    return _to_numpy(image_features)

def get_embedding(modality: str, input_data: Union[str, None]) -> np.ndarray:
    """
    تابع wrapper: modality in ["text","image"].
    برای تصویر ورودی باید مسیر فایل باشه، برای text باید رشته.
    """
    mod = modality.lower()
    if mod == "text":
        return embed_text(input_data)
    elif mod == "image":
        return embed_image(input_data)
    else:
        raise ValueError("modality باید 'text' یا 'image' باشد.")


open_clip_model.safetensors:   0%|          | 0.00/605M [00:00<?, ?B/s]



In [None]:
def store_text_item(item_id: str, text_data: str):
    """
    Store a text item with its embedding and metadata.
    """
    embedding = get_embedding("text", text_data)

    properties = {
        "contentId": item_id,
        "modality": "text",
        "filePath": "",
        "content": text_data,
    }

    collection.data.insert(properties=properties, vector=embedding.tolist())

def store_image_item(item_id: str, image_path: str):
    """
    Store an image item with its embedding and metadata.
    """
    embedding = get_embedding("image", image_path)

    properties = {
        "contentId": item_id,
        "modality": "image",
        "filePath": image_path,
        "content": "",
    }

    collection.data.insert(properties=properties, vector=embedding.tolist())

def store_audio_item(item_id: str, transcription_text: str, audio_path: str = ""):
    """
    Store an audio item with its text transcription embedding and metadata.
    """
    embedding = get_embedding("text", transcription_text)

    properties = {
        "contentId": item_id,
        "modality": "audio",
        "filePath": audio_path,
        "content": transcription_text,
    }

    collection.data.insert(properties=properties, vector=embedding.tolist())


## Add all the data to database

In [None]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
import os
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm


def process_images(image_folder, max_workers=8):
    """Process and store image embeddings in parallel."""
    image_files = [
        os.path.join(image_folder, f)
        for f in os.listdir(image_folder)
        if f.lower().endswith((".jpg", ".jpeg", ".png"))
    ]

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for img_path in image_files:
            content_id = os.path.splitext(os.path.basename(img_path))[0]
            futures.append(
                executor.submit(
                    store_image_item, content_id, img_path
                )
            )

        for _ in tqdm(as_completed(futures), total=len(futures), desc="📸 Processing images in parallel"):
            pass

    print(f"✅ Processed {len(image_files)} images.")


def process_texts(text_list, max_workers=8):
    """Process and store text embeddings in parallel."""
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for i, text_item in enumerate(text_list):
            content_id = f"text_{i+1}"
            futures.append(
                executor.submit(store_text_item, content_id, text_item)
            )

        for _ in tqdm(as_completed(futures), total=len(futures), desc="📝 Processing texts in parallel"):
            pass

    print(f"✅ Processed {len(text_list)} text items.")


def process_audio_json(json_path, max_workers=8, num_entries=None):
    """
    Process and store audio embeddings in parallel from JSON metadata file.
    Each entry in JSON should have: id, filepath, transcription

    Args:
        json_path (str): Path to JSON metadata file.
        max_workers (int): Number of threads for parallel processing.
        num_entries (int, optional): Limit to first N valid entries. Defaults to None (process all).
    """

    # === Load JSON ===
    with open(json_path, "r", encoding="utf-8") as f:
        metadata = json.load(f)

    # === Validate entries ===
    valid_items = [
        item for item in metadata
        if item.get("filepath") and item.get("transcription")
    ]

    # === Limit entries if requested ===
    if num_entries is not None:
        valid_items = valid_items[:num_entries]

    print(f"📂 Found {len(valid_items)} valid audio entries to process.")

    # === Process in parallel ===
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(
                store_audio_item,
                str(item["id"]),
                item["transcription"],
                item["filepath"]
            ): item for item in valid_items
        }

        # Track progress with tqdm
        for _ in tqdm(as_completed(futures), total=len(futures), desc="🎧 Storing audio embeddings"):
            pass

    print(f"✅ Successfully processed {len(valid_items)} audio items.")


In [None]:
image_folder = "/content/image_dataset"
process_images(image_folder)


📸 Processing images in parallel: 100%|██████████| 5000/5000 [02:34<00:00, 32.40it/s]

✅ Processed 5000 images.





In [None]:
json_path = "/content/drive/MyDrive/audio_metadata.json"
process_audio_json(json_path, num_entries=25000)


📂 Found 25000 valid audio entries to process.


🎧 Storing audio embeddings: 100%|██████████| 25000/25000 [12:19<00:00, 33.82it/s]

✅ Successfully processed 25000 audio items.





In [None]:
process_texts(text_dataset)


📝 Processing texts in parallel: 100%|██████████| 10000/10000 [04:55<00:00, 33.87it/s]

✅ Processed 10000 text items.





## Database check

In [19]:
print(weaviate_client.collections.list_all())
collection = weaviate_client.collections.get("MultimodalItem")
count_result = collection.aggregate.over_all(total_count=True)
print("🔢 Total items stored:", count_result.total_count)


{'MultimodalItem': _CollectionConfigSimple(name='MultimodalItem', description='Stores text, image, and audio items with embeddings', generative_config=None, properties=[_Property(name='contentId', description=None, data_type=<DataType.TEXT: 'text'>, index_filterable=True, index_range_filters=False, index_searchable=True, nested_properties=None, tokenization=<Tokenization.WORD: 'word'>, vectorizer_config=None, vectorizer='none', vectorizer_configs=None), _Property(name='modality', description=None, data_type=<DataType.TEXT: 'text'>, index_filterable=True, index_range_filters=False, index_searchable=True, nested_properties=None, tokenization=<Tokenization.WORD: 'word'>, vectorizer_config=None, vectorizer='none', vectorizer_configs=None), _Property(name='filePath', description=None, data_type=<DataType.TEXT: 'text'>, index_filterable=True, index_range_filters=False, index_searchable=True, nested_properties=None, tokenization=<Tokenization.WORD: 'word'>, vectorizer_config=None, vectorizer=

## Export data from weaviate cloud into csv

In [None]:
""" Download the json file """

from google.colab import files

collection_name = "MultimodalItem"
multimodal_collection = weaviate_client.collections.get(collection_name)

all_data = []
print(f"Starting to export data from collection '{collection_name}' with vectors...")

for item in multimodal_collection.iterator(include_vector=True):
    data_object = item.properties.copy()

    # ❗️ The vector is a dictionary under the 'vector' key.
    # Since you used a single vectorizer (not named vectors), the key is simply 'default'.
    # We retrieve the list of floats (the vector) and assign it to a new key in your output.
    vector = item.vector.get('default')
    data_object["vector"] = vector
    all_data.append(data_object)

print(f"Retrieved {len(all_data)} objects, including their vectors.")

output_filename = "data.json"
try:
    with open(output_filename, 'w', encoding='utf-8') as f:
        json.dump(all_data, f, ensure_ascii=False, indent=4)

    print(f"✅ Data (properties, and vectors) successfully exported to {output_filename}")
    files.download("/content/data.json")

except Exception as e:
    print(f"Error writing to JSON file: {e}")


Starting to export data from collection 'MultimodalItem' with vectors...
Retrieved 4000 objects, including their vectors.
✅ Data (properties, and vectors) successfully exported to data.json


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [20]:
""" Save the json file into drive """

import json
from google.colab import files

# ✅ Replace this path with wherever you want it saved in Drive
save_path = "/content/drive/MyDrive/data.json"

collection_name = "MultimodalItem"
multimodal_collection = weaviate_client.collections.get(collection_name)

all_data = []
print(f"Starting to export data from collection '{collection_name}' with vectors...")

for item in multimodal_collection.iterator(include_vector=True):
    data_object = item.properties.copy()
    vector = item.vector.get('default')
    data_object["vector"] = vector
    all_data.append(data_object)

print(f"Retrieved {len(all_data)} objects, including their vectors.")

# ✅ Save directly to Drive
try:
    with open(save_path, 'w', encoding='utf-8') as f:
        json.dump(all_data, f, ensure_ascii=False, indent=4)

    print(f"✅ Data successfully exported to Google Drive at: {save_path}")

except Exception as e:
    print(f"❌ Error writing to JSON file: {e}")


Starting to export data from collection 'MultimodalItem' with vectors...
Retrieved 40000 objects, including their vectors.
✅ Data successfully exported to Google Drive at: /content/drive/MyDrive/data.json
