# 🔍 Advanced Search with Filters

Welcome to the Airweave advanced search tutorial! This notebook demonstrates how to use Airweave's powerful search capabilities with metadata filtering to find exactly what you need across all your connected data sources.

## What You'll Learn

- How to perform basic searches across all your data
- Using filters to narrow results by source, date, and metadata
- Understanding response types (raw vs completion)
- Implementing pagination for large result sets
- Leveraging query expansion for better recall
- Common pitfalls and how to avoid them

## Prerequisites

Before starting, ensure you have:
1. An Airweave API key (get one at [app.airweave.ai](https://app.airweave.ai))
2. At least one collection with connected data sources
3. Python 3.8+ with the required packages installed

## Documentation Links

- [Search Concepts](https://docs.airweave.ai/search/concepts)
- [Using Filters](https://docs.airweave.ai/search/filters)
- [API Reference](https://docs.airweave.ai/api-reference/collections/search-collection-advanced)


## Setup and Configuration

First, let's install the required packages and set up our environment:


In [None]:
# Install required packages (uncomment if needed)
# !pip install airweave-sdk pandas matplotlib seaborn python-dotenv

# Import necessary libraries
import json
from datetime import datetime, timezone, timedelta
from typing import List, Dict

# Data manipulation and visualization
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import display, Markdown

# Airweave imports
from airweave import AirweaveSDK
from airweave.schemas.search import SearchRequest
from qdrant_client.http.models import (
    Filter, 
    FieldCondition, 
    MatchValue, 
    MatchAny,
    DatetimeRange,
    Range
)

# Configure display options
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', 100)
sns.set_theme(style="whitegrid")

print("✅ All imports successful!")


In [None]:
# Configure API credentials
# Option 1: Set directly (not recommended for production)
API_KEY = "your-api-key-here"
COLLECTION_ID = "your-collection-id-here"

# Option 2: Load from environment variables (recommended)
# from dotenv import load_dotenv
# load_dotenv()
# API_KEY = os.getenv("AIRWEAVE_API_KEY")
# COLLECTION_ID = os.getenv("AIRWEAVE_COLLECTION_ID")

# Initialize the SDK
client = AirweaveSDK(api_key=API_KEY)

# Helper function to display results nicely
def display_results(results: List[Dict], title: str = "Search Results"):
    """Display search results in a formatted table."""
    if not results:
        display(Markdown(f"### {title}\n\n*No results found*"))
        return
    
    # Extract key fields for display
    data = []
    for r in results:
        payload = r.get('payload', {})
        data.append({
            'Score': f"{r.get('score', 0):.3f}",
            'Source': payload.get('source_name', 'Unknown'),
            'Title': payload.get('title', payload.get('name', 'Untitled'))[:80] + '...',
            'Created': payload.get('created_at', 'Unknown')[:10] if payload.get('created_at') else 'Unknown',
            'Type': payload.get('entity_type', 'Unknown')
        })
    
    df = pd.DataFrame(data)
    display(Markdown(f"### {title} ({len(results)} results)"))
    display(df)

print("✅ SDK initialized successfully!")


## Part 1: Basic Search

Let's start with the fundamentals - performing a simple search across all your connected data sources.


In [None]:
# Example 1: Simple text search
response = await client.collections.search_collection(
    readable_id=COLLECTION_ID,
    query="customer onboarding process",
    limit=5
)

# Display results
display_results(response.results, "Basic Search Results")

# Show the structure of a single result
if response.results:
    display(Markdown("### Structure of a Search Result"))
    print(json.dumps(response.results[0], indent=2))


### Understanding Response Types

Airweave provides two response types:
- **Raw**: Returns the actual search results as structured data
- **Completion**: Returns an AI-generated summary of the results


In [None]:
# Example 2: Search with AI completion
completion_response = await client.collections.search_collection(
    readable_id=COLLECTION_ID,
    query="What are our current security policies and procedures?",
    response_type="completion",
    limit=10
)

display(Markdown("### AI-Generated Summary"))
display(Markdown(completion_response.completion))

# Also show the number of sources used
display(Markdown(f"\n*Based on {len(completion_response.results)} search results*"))


## Part 2: Filtering Deep Dive

Now let's explore how filters can help you find exactly what you need. Filters allow you to narrow results based on metadata like source, date, priority, and more.


In [None]:
# Example 3: Filter by source (CASE-SENSITIVE!)
display(Markdown("### 🚨 Important: source_name is case-sensitive!"))

# This will work if you have GitHub data
github_request = SearchRequest(
    query="bug fixes and improvements",
    filter=Filter(
        must=[
            FieldCondition(
                key="source_name",
                match=MatchValue(value="GitHub")  # Must match exactly!
            )
        ]
    ),
    limit=5
)

github_response = await client.collections.search_collection_advanced(
    readable_id=COLLECTION_ID,
    search_request=github_request
)

display_results(github_response.results, "GitHub-only Results")

# Common mistake: wrong case
wrong_case_request = SearchRequest(
    query="bug fixes and improvements",
    filter=Filter(
        must=[
            FieldCondition(
                key="source_name",
                match=MatchValue(value="github")  # lowercase won't match "GitHub"!
            )
        ]
    ),
    limit=5
)

wrong_response = await client.collections.search_collection_advanced(
    readable_id=COLLECTION_ID,
    search_request=wrong_case_request
)

display(Markdown("### ❌ Wrong case example (github vs GitHub):"))
display_results(wrong_response.results, "Results with wrong case")


In [None]:
# Solution: Handle case variations with MatchAny
case_insensitive_request = SearchRequest(
    query="bug fixes and improvements",
    filter=Filter(
        must=[
            FieldCondition(
                key="source_name",
                match=MatchAny(any=["GitHub", "github", "GITHUB"])  # Cover common variations
            )
        ]
    ),
    limit=5
)

case_insensitive_response = await client.collections.search_collection_advanced(
    readable_id=COLLECTION_ID,
    search_request=case_insensitive_request
)

display(Markdown("### ✅ Case-insensitive approach using MatchAny:"))
display_results(case_insensitive_response.results, "Case-insensitive Results")


In [None]:
# Example 4: Date range filtering
display(Markdown("### Date Range Filtering"))

# Find items from the last 30 days
thirty_days_ago = datetime.now(timezone.utc) - timedelta(days=30)

recent_items_request = SearchRequest(
    query="updates and changes",
    filter=Filter(
        must=[
            FieldCondition(
                key="created_at",
                range=DatetimeRange(gte=thirty_days_ago)
            )
        ]
    ),
    limit=10
)

recent_response = await client.collections.search_collection_advanced(
    readable_id=COLLECTION_ID,
    search_request=recent_items_request
)

display_results(recent_response.results, "Items from Last 30 Days")

# Visualize the date distribution
if recent_response.results:
    dates = [r['payload'].get('created_at', '')[:10] for r in recent_response.results if r['payload'].get('created_at')]
    if dates:
        df_dates = pd.DataFrame({'date': pd.to_datetime(dates)})
        plt.figure(figsize=(10, 4))
        df_dates['date'].hist(bins=20)
        plt.title('Distribution of Results by Date')
        plt.xlabel('Date')
        plt.ylabel('Count')
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()


In [None]:
# Example 5: Complex multi-condition filtering
display(Markdown("### Complex Multi-Condition Search"))

# Find high-priority items from multiple sources, excluding resolved ones
complex_request = SearchRequest(
    query="critical issues and blockers",
    filter=Filter(
        must=[
            # From specific project management tools
            FieldCondition(
                key="source_name",
                match=MatchAny(any=["Asana", "Jira", "Linear", "GitHub"])
            ),
            # Created in the last 90 days
            FieldCondition(
                key="created_at",
                range=DatetimeRange(
                    gte=datetime.now(timezone.utc) - timedelta(days=90)
                )
            )
        ],
        should=[
            # High priority indicators (at least one must match)
            FieldCondition(
                key="metadata.priority",
                match=MatchAny(any=["high", "critical", "urgent", "P0", "P1"])
            ),
            FieldCondition(
                key="metadata.labels",
                match=MatchAny(any=["blocker", "showstopper", "critical-bug"])
            )
        ],
        must_not=[
            # Exclude completed items
            FieldCondition(
                key="metadata.status",
                match=MatchAny(any=["resolved", "closed", "done", "completed"])
            )
        ]
    ),
    score_threshold=0.6,  # Only show relevant results
    limit=20
)

complex_response = await client.collections.search_collection_advanced(
    readable_id=COLLECTION_ID,
    search_request=complex_request
)

display_results(complex_response.results, "High-Priority Unresolved Items")

# Analyze results by source
if complex_response.results:
    sources = [r['payload'].get('source_name', 'Unknown') for r in complex_response.results]
    source_counts = pd.Series(sources).value_counts()
    
    plt.figure(figsize=(8, 5))
    source_counts.plot(kind='bar')
    plt.title('High-Priority Items by Source')
    plt.xlabel('Source')
    plt.ylabel('Count')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()


## Part 3: Advanced Features

Let's explore pagination, score thresholds, and query expansion strategies.


In [None]:
# Example 6: Pagination
display(Markdown("### Pagination Example"))

# Function to fetch all pages
async def fetch_all_pages(query: str, filter: Filter = None, page_size: int = 10):
    all_results = []
    offset = 0
    page = 1
    
    while True:
        request = SearchRequest(
            query=query,
            filter=filter,
            limit=page_size,
            offset=offset
        )
        
        response = await client.collections.search_collection_advanced(
            readable_id=COLLECTION_ID,
            search_request=request
        )
        
        if not response.results:
            break
            
        all_results.extend(response.results)
        print(f"Page {page}: Retrieved {len(response.results)} results")
        
        # Stop after 3 pages for demo
        if page >= 3:
            break
            
        offset += page_size
        page += 1
    
    return all_results

# Fetch paginated results
all_results = await fetch_all_pages(
    "documentation and guides",
    filter=Filter(
        must=[
            FieldCondition(
                key="source_name",
                match=MatchAny(any=["Confluence", "Notion", "GitHub"])
            )
        ]
    ),
    page_size=5
)

display(Markdown(f"### Total results fetched: {len(all_results)}"))


In [None]:
# Example 7: Score threshold demonstration
display(Markdown("### Score Threshold Impact"))

# Compare different score thresholds
thresholds = [None, 0.5, 0.7, 0.9]
threshold_results = {}

for threshold in thresholds:
    request = SearchRequest(
        query="best practices and guidelines",
        score_threshold=threshold,
        limit=20
    )
    
    response = await client.collections.search_collection_advanced(
        readable_id=COLLECTION_ID,
        search_request=request
    )
    
    threshold_results[threshold or "None"] = {
        'count': len(response.results),
        'scores': [r['score'] for r in response.results] if response.results else []
    }

# Display comparison
display(Markdown("### Results by Score Threshold"))
for threshold, data in threshold_results.items():
    avg_score = sum(data['scores']) / len(data['scores']) if data['scores'] else 0
    print(f"Threshold {threshold}: {data['count']} results, avg score: {avg_score:.3f}")

# Visualize score distributions
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
axes = axes.ravel()

for i, (threshold, data) in enumerate(threshold_results.items()):
    if data['scores']:
        axes[i].hist(data['scores'], bins=20, alpha=0.7)
        axes[i].set_title(f'Score Distribution (threshold={threshold})')
        axes[i].set_xlabel('Score')
        axes[i].set_ylabel('Count')
        axes[i].axvline(x=float(threshold) if threshold != "None" else 0, 
                       color='red', linestyle='--', label='Threshold')
        if threshold != "None":
            axes[i].legend()

plt.tight_layout()
plt.show()


In [None]:
# Example 8: Query expansion comparison
display(Markdown("### Query Expansion Strategies"))

expansion_strategies = ["no_expansion", "auto", "llm"]
expansion_results = {}

query = "authentication methods"

for strategy in expansion_strategies:
    request = SearchRequest(
        query=query,
        expansion_strategy=strategy,
        limit=10
    )
    
    response = await client.collections.search_collection_advanced(
        readable_id=COLLECTION_ID,
        search_request=request
    )
    
    expansion_results[strategy] = {
        'count': len(response.results),
        'sources': list(set([r['payload'].get('source_name', 'Unknown') 
                           for r in response.results])),
        'avg_score': sum(r['score'] for r in response.results) / len(response.results) 
                    if response.results else 0
    }

# Display comparison
display(Markdown("### Expansion Strategy Comparison"))
comparison_df = pd.DataFrame(expansion_results).T
comparison_df['sources'] = comparison_df['sources'].apply(lambda x: ', '.join(x))
display(comparison_df)

# Visualize results count
plt.figure(figsize=(8, 5))
strategies = list(expansion_results.keys())
counts = [expansion_results[s]['count'] for s in strategies]
plt.bar(strategies, counts)
plt.title(f'Results Count by Expansion Strategy\nQuery: "{query}"')
plt.xlabel('Strategy')
plt.ylabel('Number of Results')
plt.show()


## Part 4: Real-World Scenarios

Let's put it all together with practical examples you might use in production.


In [None]:
# Scenario 1: Find all critical support tickets from last week
display(Markdown("### Scenario 1: Critical Support Tickets Dashboard"))

one_week_ago = datetime.now(timezone.utc) - timedelta(days=7)

support_request = SearchRequest(
    query="customer issues problems errors",
    filter=Filter(
        must=[
            FieldCondition(
                key="source_name",
                match=MatchAny(any=["Zendesk", "Intercom", "Freshdesk", "HelpScout"])
            ),
            FieldCondition(
                key="created_at",
                range=DatetimeRange(gte=one_week_ago)
            )
        ],
        should=[
            FieldCondition(
                key="metadata.priority",
                match=MatchAny(any=["urgent", "high", "critical"])
            ),
            FieldCondition(
                key="metadata.tags",
                match=MatchAny(any=["vip", "enterprise", "paid"])
            )
        ]
    ),
    response_type="completion",
    limit=25
)

support_response = await client.collections.search_collection_advanced(
    readable_id=COLLECTION_ID,
    search_request=support_request
)

display(Markdown("### AI Summary of Critical Support Issues"))
display(Markdown(support_response.completion))

# Show top issues
if support_response.results:
    display(Markdown("\n### Top Critical Issues"))
    display_results(support_response.results[:5], "Most Relevant Issues")


In [None]:
# Scenario 2: Search payment issues across Stripe and Square
display(Markdown("### Scenario 2: Payment Processing Issues"))

payment_request = SearchRequest(
    query="payment failed declined error refund chargeback",
    filter=Filter(
        must=[
            FieldCondition(
                key="source_name",
                match=MatchAny(any=["Stripe", "Square", "PayPal", "Braintree"])
            )
        ],
        should=[
            FieldCondition(
                key="metadata.type",
                match=MatchAny(any=["error", "failure", "dispute"])
            ),
            FieldCondition(
                key="metadata.amount",
                range=Range(gte=100)  # Focus on larger transactions
            )
        ]
    ),
    score_threshold=0.65,
    limit=15
)

payment_response = await client.collections.search_collection_advanced(
    readable_id=COLLECTION_ID,
    search_request=payment_request
)

display_results(payment_response.results, "Payment Issues")

# Analyze by payment provider
if payment_response.results:
    providers = [r['payload'].get('source_name', 'Unknown') for r in payment_response.results]
    provider_counts = pd.Series(providers).value_counts()
    
    plt.figure(figsize=(8, 5))
    provider_counts.plot(kind='pie', autopct='%1.1f%%')
    plt.title('Payment Issues by Provider')
    plt.ylabel('')
    plt.show()


In [None]:
# Scenario 3: Get engineering docs updated this month
display(Markdown("### Scenario 3: Recent Engineering Documentation"))

this_month_start = datetime.now(timezone.utc).replace(day=1, hour=0, minute=0, second=0, microsecond=0)

docs_request = SearchRequest(
    query="API documentation guide tutorial implementation",
    filter=Filter(
        must=[
            FieldCondition(
                key="source_name",
                match=MatchAny(any=["Confluence", "Notion", "GitHub"])
            ),
            FieldCondition(
                key="created_at",
                range=DatetimeRange(gte=this_month_start)
            )
        ],
        should=[
            FieldCondition(
                key="metadata.type",
                match=MatchAny(any=["documentation", "guide", "readme", "wiki"])
            ),
            FieldCondition(
                key="metadata.path",
                match=MatchAny(any=["docs/", "documentation/", "wiki/"])
            )
        ]
    ),
    expansion_strategy="llm",  # Better for technical documentation
    limit=20
)

docs_response = await client.collections.search_collection_advanced(
    readable_id=COLLECTION_ID,
    search_request=docs_request
)

display_results(docs_response.results, "Recent Documentation Updates")

# Group by date
if docs_response.results:
    dates = []
    for r in docs_response.results:
        created = r['payload'].get('created_at', '')
        if created:
            dates.append(pd.to_datetime(created[:10]))
    
    if dates:
        date_counts = pd.Series(dates).dt.date.value_counts().sort_index()
        
        plt.figure(figsize=(12, 5))
        date_counts.plot(kind='bar')
        plt.title('Documentation Updates by Date')
        plt.xlabel('Date')
        plt.ylabel('Count')
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()


## Part 5: Performance & Best Practices

Let's explore how to optimize your searches and avoid common pitfalls.


In [None]:
# Performance comparison: filters vs no filters
import time

display(Markdown("### Performance Impact of Filters"))

# Test 1: Broad search without filters
start_time = time.time()
broad_response = await client.collections.search_collection(
    readable_id=COLLECTION_ID,
    query="error bug issue problem",
    limit=50
)
broad_time = time.time() - start_time

# Test 2: Filtered search
start_time = time.time()
filtered_request = SearchRequest(
    query="error bug issue problem",
    filter=Filter(
        must=[
            FieldCondition(
                key="source_name",
                match=MatchValue(value="GitHub")
            ),
            FieldCondition(
                key="created_at",
                range=DatetimeRange(
                    gte=datetime.now(timezone.utc) - timedelta(days=30)
                )
            )
        ]
    ),
    limit=50
)
filtered_response = await client.collections.search_collection_advanced(
    readable_id=COLLECTION_ID,
    search_request=filtered_request
)
filtered_time = time.time() - start_time

# Test 3: With score threshold
start_time = time.time()
threshold_request = SearchRequest(
    query="error bug issue problem",
    score_threshold=0.7,
    limit=50
)
threshold_response = await client.collections.search_collection_advanced(
    readable_id=COLLECTION_ID,
    search_request=threshold_request
)
threshold_time = time.time() - start_time

# Display results
performance_data = {
    'Search Type': ['No Filters', 'With Filters', 'Score Threshold'],
    'Time (seconds)': [broad_time, filtered_time, threshold_time],
    'Results Count': [len(broad_response.results), 
                     len(filtered_response.results), 
                     len(threshold_response.results)]
}

perf_df = pd.DataFrame(performance_data)
display(perf_df)

# Visualize
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

ax1.bar(perf_df['Search Type'], perf_df['Time (seconds)'])
ax1.set_title('Search Time Comparison')
ax1.set_ylabel('Time (seconds)')
ax1.set_xlabel('Search Type')

ax2.bar(perf_df['Search Type'], perf_df['Results Count'])
ax2.set_title('Results Count Comparison')
ax2.set_ylabel('Number of Results')
ax2.set_xlabel('Search Type')

plt.tight_layout()
plt.show()


## Common Mistakes to Avoid

1. **Case Sensitivity**: Always remember that `source_name` is case-sensitive
2. **Date Timezones**: Always use timezone-aware datetime objects
3. **Over-filtering**: Start broad and narrow down gradually
4. **Score Thresholds**: Don't set too high initially - you might miss relevant results
5. **Pagination Limits**: Maximum limit is 100 results per page

## Key Takeaways

- **Filters are powerful**: Combine semantic search with metadata filtering for precise results
- **Case matters**: Use `MatchAny` for case-insensitive matching
- **Start simple**: Begin with basic filters and add complexity as needed
- **Monitor performance**: Filters can significantly improve search speed
- **Use the right response type**: `raw` for data processing, `completion` for summaries

## Next Steps

1. Explore the [API documentation](https://docs.airweave.ai/api-reference) for more details
2. Try these examples with your own data
3. Experiment with different filter combinations
4. Join our [community](https://discord.gg/airweave) for support and best practices

Happy searching! 🔍✨
