In [1]:
# Ran on Standard_D4ds_v5 colocated with the bucket

In [2]:
import time
start_time = time.time()

In [3]:
from azure.storage.blob import BlobServiceClient
import os

def download_azure_files(account_name, account_key, container_name, prefix):
    """
    Download files from Azure Blob Storage that match a specific prefix pattern.
    
    Args:
        account_name (str): Azure Storage account name
        account_key (str): Azure Storage account key
        container_name (str): Container name
        prefix (str): Prefix pattern to match files
    """
    # Create the connection string
    connect_str = f"DefaultEndpointsProtocol=https;AccountName={account_name};AccountKey={account_key};EndpointSuffix=core.windows.net"
    
    # Create the BlobServiceClient
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    
    # Get the container client
    container_client = blob_service_client.get_container_client(container_name)
    
    # List all blobs in the container that match the prefix
    blobs = container_client.list_blobs(name_starts_with=prefix)
    
    # Create local directory if it doesn't exist
    os.makedirs('downloaded_files', exist_ok=True)
    
    # Download each matching blob
    for blob in blobs:
        print(f"Downloading: {blob.name}")
        
        # Get blob client
        blob_client = container_client.get_blob_client(blob.name)
        
        # Create the local file path
        local_file_name = os.path.join('downloaded_files', os.path.basename(blob.name))
        
        # Download the blob
        with open(local_file_name, "wb") as file:
            data = blob_client.download_blob()
            file.write(data.readall())
            
        print(f"Successfully downloaded to: {local_file_name}")

In [4]:
%%time
# Environment variables for Azure Storage
account_name = AZURE_STORAGE_ACCOUNT_NAME
account_key = AZURE_STORAGE_ACCOUNT_KEY
container_name = AZURE_STORAGE_CONTAINER_NAME

# Prefix pattern to match
prefix = "dagster/speculatives_substantiation/"

# Download the files
download_azure_files(account_name, account_key, container_name, prefix)

Downloading: dagster/speculatives_substantiation/cm0i27jdj0000aqpa73ghpcxf.snappy
Successfully downloaded to: downloaded_files/cm0i27jdj0000aqpa73ghpcxf.snappy
CPU times: user 323 ms, sys: 263 ms, total: 585 ms
Wall time: 2.19 s


In [5]:
%%time


import polars as pl
import numpy as np
import faiss
import random

# Cell 1: Load Data
# Load the Parquet file
file_path = "downloaded_files/cm0i27jdj0000aqpa73ghpcxf.snappy"  # Update with your file path
df = pl.read_parquet(file_path)
print(f"Loaded dataframe with shape: {df.shape}")

Loaded dataframe with shape: (5727, 21)
CPU times: user 689 ms, sys: 71.1 ms, total: 760 ms
Wall time: 747 ms


In [6]:
%%time
# Convert embeddings to numpy array
embeddings = np.stack(df['embedding'].to_numpy()).astype(np.float32)
print(f"Embedding shape: {embeddings.shape}")

dimension = embeddings.shape[1]

# Initialize FAISS index with Inner Product (for cosine similarity with normalized vectors)
index = faiss.IndexFlatIP(dimension)

# Add normalized vectors to the index
index.add(embeddings)
print(f"Built FAISS index with {index.ntotal} vectors")

Embedding shape: (5727, 4096)
Built FAISS index with 5727 vectors
CPU times: user 58 ms, sys: 66.7 ms, total: 125 ms
Wall time: 124 ms


In [10]:
%%time
# Select and normalize random query vector
random_idx = random.randint(0, len(embeddings) - 1)
query_vector = embeddings[random_idx].reshape(1, -1).copy()  # Make a copy to avoid modifying original
faiss.normalize_L2(query_vector)
print(f"Using random vector at index {random_idx} as query")

# Perform search
k = 30  # Number of nearest neighbors
similarities, indices = index.search(query_vector, k)

Using random vector at index 3700 as query
CPU times: user 7.94 ms, sys: 748 µs, total: 8.68 ms
Wall time: 7.32 ms


In [8]:
end_time = time.time()
runtime = end_time - start_time
print(f"Total runtime: {runtime:.2f} seconds")

Total runtime: 3.29 seconds
