In [6]:
#test imagebind functioality
from imagebind import data
import torch
from imagebind.models import imagebind_model
from imagebind.models.imagebind_model import ModalityType

text_list=["A dog.", "A car", "A bird"]
image_paths=["/ImageBind/.assets/dog_image.jpg", "/ImageBind/.assets/car_image.jpg", "/ImageBind/.assets/bird_image.jpg"]
audio_paths=["/ImageBind/.assets/dog_audio.wav", "/ImageBind/.assets/car_audio.wav", "/ImageBind/.assets/bird_audio.wav"]

device = "cuda:0" if torch.cuda.is_available() else "cpu"

# Instantiate model
model = imagebind_model.imagebind_huge(pretrained=True)
model.eval()
model.to(device)

# Load data
inputs = {
    ModalityType.TEXT: data.load_and_transform_text(text_list, device),
    ModalityType.VISION: data.load_and_transform_vision_data(image_paths, device),
    ModalityType.AUDIO: data.load_and_transform_audio_data(audio_paths, device),
}

with torch.no_grad():
    embeddings = model(inputs)

print(
    "Vision x Text: ",
    torch.softmax(embeddings[ModalityType.VISION] @ embeddings[ModalityType.TEXT].T, dim=-1),
)
print(
    "Audio x Text: ",
    torch.softmax(embeddings[ModalityType.AUDIO] @ embeddings[ModalityType.TEXT].T, dim=-1),
)
print(
    "Vision x Audio: ",
    torch.softmax(embeddings[ModalityType.VISION] @ embeddings[ModalityType.AUDIO].T, dim=-1),
)


RuntimeError: Numpy is not available

In [4]:
pip install "numpy>2"


[0mNote: you may need to restart the kernel to use updated packages.


In [1]:
ls

2                                           [0m[01;36mnvidia-examples[0m@
NVIDIA_Deep_Learning_Container_License.pdf  [01;34mproj[0m/
README.md                                   [01;34msolanity[0m/
[01;34mchroma_db[0m/                                  [01;34msolanity-with-suffix[0m/
[01;34mdocker-examples[0m/


In [1]:
import requests
import json
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from langchain_community.document_loaders import WebBaseLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
import hashlib
import re

def is_valid_content(content):
    """
    Check if content is valid for indexing.
    Returns True if content is meaningful, False otherwise.
    """
    if not content:
        return False
    
    # Remove whitespace and check if empty
    stripped = content.strip()
    if not stripped:
        return False
    
    # Check if content is too short (likely not meaningful)
    if len(stripped) < 50:
        return False
    
    # Check if content is mostly whitespace or special characters
    text_ratio = len(re.findall(r'[a-zA-Z0-9]', stripped)) / len(stripped)
    if text_ratio < 0.3:  # Less than 30% actual text
        return False
    
    # Check for common meaningless patterns
    meaningless_patterns = [
        r'^\s*$',  # Only whitespace
        r'^[^\w]*$',  # Only special characters
        r'^(Loading|Error|404|Not Found|Access Denied)',  # Error pages
        r'^\s*(javascript|css|html)\s*$',  # Just tech terms
    ]
    
    for pattern in meaningless_patterns:
        if re.match(pattern, stripped, re.IGNORECASE):
            return False
    
    return True

def check_webpage(url):
    """
    Checks if the given URL points to a webpage (HTML content).
    Returns True if the content appears to be a webpage, False otherwise.
    """
    try:
        resp = requests.get(url, timeout=5)
        text = resp.text
        if "<!doctype html" in text.lower() or "<html" in text.lower():
            return True
        if any(tag in text.lower() for tag in ["<head", "<body", "<title", "<meta"]):
            return True
        return False
    except Exception:
        return False

def get_arweave_webpage_manifests(max_workers=16, batch_size=32):
    """
    Generator function that yields Arweave manifest transaction IDs
    that actually point to webpages (HTML content).
    Yields: tuple of (url, content_type)
    """
    query = """
    query GetManifestTxs($cursor: String) {
      transactions(
        after: $cursor
        first: 100
        tags: [
          { name: "Content-Type", values: ["application/x.arweave-manifest+json"] }
        ]
      ) {
        pageInfo {
          hasNextPage
        }
        edges {
          cursor
          node {
            id
            tags {
              name
              value
            }
          }
        }
      }
    }
    """

    url = "https://arweave.net/graphql"
    headers = { "Content-Type": "application/json" }

    cursor = None
    count = 0

    while True:
        payload = {
            "query": query,
            "variables": { "cursor": cursor } if cursor else {}
        }

        try:
            response = requests.post(url, json=payload, headers=headers, timeout=10)
            response.raise_for_status()
            result = response.json()
        except Exception as e:
            print(f"Error fetching or decoding response: {e}")
            break

        try:
            edges = result["data"]["transactions"]["edges"]
            page_info = result["data"]["transactions"]["pageInfo"]
        except (KeyError, TypeError):
            print("Malformed response or missing data in GraphQL result.")
            break

        if not edges:
            break

        txs = []
        for edge in edges:
            tx = edge["node"]
            txid = tx["id"]
            content_type = None
            for tag in tx.get("tags", []):
                if tag.get("name") == "Content-Type":
                    content_type = tag.get("value")
                    break
            if content_type:
                txs.append((txid, content_type))

        for i in range(0, len(txs), batch_size):
            batch = txs[i:i+batch_size]
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                future_to_tx = {
                    executor.submit(check_webpage, f"http://arweave.net/{txid}"): (txid, content_type)
                    for txid, content_type in batch
                }
                for future in as_completed(future_to_tx):
                    txid, content_type = future_to_tx[future]
                    try:
                        is_webpage = future.result()
                    except Exception:
                        is_webpage = False
                    if is_webpage:
                        yield f"http://arweave.net/{txid}", content_type

        count += len(edges)
        print(f"✅ {count} manifest txs indexed...")

        if not page_info.get("hasNextPage"):
            break

        cursor = edges[-1].get("cursor")
        time.sleep(0.1)

def load_webpage_content(url):
    """
    Load webpage content using LangChain WebBaseLoader.
    Returns Document object with content and metadata.
    """
    try:
        loader = WebBaseLoader(url)
        docs = loader.load()
        if docs:
            return docs[0]
        return None
    except Exception as e:
        print(f"Error loading {url}: {e}")
        return None

def create_chunks(doc, chunk_size=1000, chunk_overlap=200):
    """
    Split document into chunks for embedding.
    Returns list of Document chunks with metadata.
    """
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
    )
    
    chunks = text_splitter.split_documents([doc])
    
    # Get title for chunk descriptions
    title = doc.metadata.get("title", "Untitled")
    
    # Add chunk-specific metadata and descriptions
    for i, chunk in enumerate(chunks):
        # Create individual chunk description
        chunk_desc = create_chunk_description(
            chunk.page_content, 
            i, 
            len(chunks), 
            title
        )
        
        chunk.metadata.update({
            "chunk_id": i,
            "total_chunks": len(chunks),
            "chunk_size": len(chunk.page_content),
            "content_hash": hashlib.md5(chunk.page_content.encode()).hexdigest(),
            "chunk_description": chunk_desc,
            "chunk_title": f"{title} - Part {i + 1}/{len(chunks)}" if len(chunks) > 1 else title
        })
    
    return chunks

def extract_metadata(doc):
    """
    Extract rich metadata from document.
    Returns enhanced metadata dictionary.
    """
    metadata = doc.metadata.copy()
    
    # Extract title and description
    title = metadata.get("title", "Untitled")
    description = metadata.get("description", "")
    
    # Create a short description from content
    content = doc.page_content
    if len(content) > 200:
        short_desc = content[:200] + "..."
    else:
        short_desc = content
    
    # Enhanced metadata
    enhanced_metadata = {
        "title": title,
        "description": description,
        "short_description": short_desc,
        "url": metadata.get("source", ""),
        "content_length": len(content),
        "language": "en",  # Could be detected later
        "content_type": "webpage",
        "source": "arweave",
        "extracted_at": time.time()
    }
    
    return enhanced_metadata

def create_chunk_description(chunk_content, chunk_id, total_chunks, title):
    """
    Create a meaningful short description for each chunk.
    """
    # Clean the chunk content
    cleaned_content = chunk_content.strip()
    
    # Create a short description from the chunk content
    if len(cleaned_content) > 150:
        chunk_desc = cleaned_content[:150] + "..."
    else:
        chunk_desc = cleaned_content
    
    # Add chunk context to the description
    if total_chunks > 1:
        chunk_desc = f"[Part {chunk_id + 1}/{total_chunks}] {chunk_desc}"
    
    # Add title context if available
    if title and title != "Untitled":
        chunk_desc = f"{title}: {chunk_desc}"
    
    return chunk_desc

def optimized_arweave_indexer(max_pages=None, chunk_size=1000, chunk_overlap=200, max_workers=16, batch_size=32):
    """
    Super optimized generator function that does everything in one pipeline:
    - Discovers Arweave webpages
    - Validates content quality
    - Extracts and chunks content
    - Yields ready-to-embed documents
    
    Args:
        max_pages: Maximum number of pages to process
        chunk_size: Size of each chunk
        chunk_overlap: Overlap between chunks
        max_workers: Number of parallel workers for webpage checking
        batch_size: Batch size for parallel processing
    
    Yields:
        Document objects ready for embedding with rich metadata
    """
    page_count = 0
    chunk_count = 0
    skipped_count = 0
    
    print("🚀 Starting optimized Arweave indexing pipeline...")
    
    for url, content_type in get_arweave_webpage_manifests(max_workers, batch_size):
        if max_pages and page_count >= max_pages:
            break
            
        print(f"📄 Processing webpage {page_count + 1}: {url}")
        
        # Load webpage content
        doc = load_webpage_content(url)
        if not doc:
            skipped_count += 1
            print(f"⚠️  Skipped {url} - failed to load")
            continue
        
        # Validate content quality
        if not is_valid_content(doc.page_content):
            skipped_count += 1
            print(f"⚠️  Skipped {url} - invalid content")
            continue
        
        # Extract rich metadata
        enhanced_metadata = extract_metadata(doc)
        doc.metadata.update(enhanced_metadata)
        
        # Create chunks
        chunks = create_chunks(doc, chunk_size, chunk_overlap)
        
        # Filter out invalid chunks and yield valid ones
        valid_chunks = 0
        for chunk in chunks:
            if is_valid_content(chunk.page_content):
                # Update with enhanced metadata but preserve chunk-specific descriptions
                chunk.metadata.update(enhanced_metadata)
                # Keep the chunk-specific description and title
                chunk.metadata["short_description"] = chunk.metadata.get("chunk_description", chunk.metadata.get("short_description", ""))
                chunk.metadata["title"] = chunk.metadata.get("chunk_title", chunk.metadata.get("title", "Untitled"))
                
                chunk_count += 1
                valid_chunks += 1
                yield chunk
        
        page_count += 1
        print(f"✅ Processed {page_count} webpages, created {valid_chunks} valid chunks (total chunks: {chunk_count}, skipped: {skipped_count})")
    
    print(f"🎉 Indexing complete! Processed {page_count} webpages, created {chunk_count} chunks, skipped {skipped_count} items")

# Usage example
if __name__ == "__main__":
    print("🔍 Starting optimized Arweave webpage indexing...")
    
    # Process first 10 webpages as example
    chunks = []
    for chunk in optimized_arweave_indexer(max_pages=10):
        chunks.append(chunk)
        print(f"📝 Chunk {len(chunks)}: {chunk.metadata['title'][:50]}...")
    
    print(f"🎉 Indexed {len(chunks)} valid chunks from Arweave webpages!")
    
    # Example: Show first chunk details
    if chunks:
        first_chunk = chunks[0]
        print(f"\n📋 Sample chunk:")
        print(f"Title: {first_chunk.metadata['title']}")
        print(f"URL: {first_chunk.metadata['url']}")
        print(f"Content length: {len(first_chunk.page_content)} chars")
        print(f"Content preview: {first_chunk.page_content[:200]}...") 

USER_AGENT environment variable not set, consider setting it to identify your requests.


🔍 Starting optimized Arweave webpage indexing...
🚀 Starting optimized Arweave indexing pipeline...
📄 Processing webpage 1: http://arweave.net/3sAdgGV5efxdqxQOTtH_6hHtRmmY1iyywUb3aB0EHx8
📝 Chunk 1: Permaweb LLM Fuel...
✅ Processed 1 webpages, created 1 valid chunks (total chunks: 1, skipped: 0)
📄 Processing webpage 2: http://arweave.net/rRQZ8p4V83BweFtOMfXLJ6xMfc_81-AyKhqkbZ-cmu0
⚠️  Skipped http://arweave.net/rRQZ8p4V83BweFtOMfXLJ6xMfc_81-AyKhqkbZ-cmu0 - invalid content
📄 Processing webpage 2: http://arweave.net/JMEUEcnwNQCpt9Cd5z9Oy3xMQ9cst_hXkT-4-4YvQM0
⚠️  Skipped http://arweave.net/JMEUEcnwNQCpt9Cd5z9Oy3xMQ9cst_hXkT-4-4YvQM0 - invalid content
📄 Processing webpage 2: http://arweave.net/zFGZ14V1JYPmlAyn2p054tm2F8iWYIBqkmrecqmS4Xw
⚠️  Skipped http://arweave.net/zFGZ14V1JYPmlAyn2p054tm2F8iWYIBqkmrecqmS4Xw - invalid content
📄 Processing webpage 2: http://arweave.net/_rDg7_NmAmF4j-mvLap81RBLVCBAFLBGLh_o8yN8cFA
📝 Chunk 2: Permaweb Memes...
✅ Processed 2 webpages, created 1 valid chunks (t

In [2]:
chunk

Document(metadata={'source': 'arweave', 'title': 'pop upcOS hasta que terminemos', 'language': 'en', 'description': '', 'short_description': 'pop upcOS hasta que terminemos: pop upcOS hasta que terminemosYou need to enable JavaScript to run this app.', 'url': 'http://arweave.net/ZSalRc9ln_4tXlFW9ilbUU1RU3OOt32fdLDzHz3fe1Y', 'content_length': 76, 'content_type': 'webpage', 'extracted_at': 1754148966.0766518, 'chunk_id': 0, 'total_chunks': 1, 'chunk_size': 76, 'content_hash': '0d6297e570c2bd8703147112f9b28b25', 'chunk_description': 'pop upcOS hasta que terminemos: pop upcOS hasta que terminemosYou need to enable JavaScript to run this app.', 'chunk_title': 'pop upcOS hasta que terminemos'}, page_content='pop upcOS hasta que terminemosYou need to enable JavaScript to run this app.')