## Import Libraries


In [10]:
import os
import json
import re
import warnings
from dotenv import load_dotenv
from haystack import Pipeline, component
from haystack.components.builders import PromptBuilder
from haystack.dataclasses import ChatMessage
from pymongo import MongoClient
from groq import Groq
from typing import List

warnings.filterwarnings("ignore", category=UserWarning)

In [11]:
load_dotenv()

True

## Custom Groq Generator


In [12]:
@component
class GroqGenerator:
    def __init__(self, model: str = "llama-3.3-70b-versatile", api_key: str = None):
        self.client = Groq(api_key=api_key or os.environ.get("GROQ_API_KEY"))
        self.model = model
    
    @component.output_types(replies=List[str])
    def run(self, prompt: str):
        if not prompt:
            raise ValueError("The 'prompt' received by GroqGenerator is empty.")
            
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.3,  # Lower temperature for more consistent JSON
            max_tokens=500
        )
        
        return {
            "replies": [response.choices[0].message.content]
        }

## MongoDB Components


In [13]:
class MongoDBAtlas:
    def __init__(self, mongo_connection_string: str):
        self.client = MongoClient(mongo_connection_string)
        self.db = self.client.smartshopper_store
        self.material_collection = self.db.materials
        self.category_collection = self.db.categories

    def get_materials(self):
        return [doc['name'] for doc in self.material_collection.find()]

    def get_categories(self):
        return [doc['name'] for doc in self.category_collection.find()]

@component
class GetMaterials:
    def __init__(self):
        self.db = MongoDBAtlas(os.environ['MONGO_CONNECTION_STRING'])
    
    @component.output_types(materials=List[str])
    def run(self):
        materials = self.db.get_materials()
        return {"materials": materials}

@component  
class GetCategories:
    def __init__(self):
        self.db = MongoDBAtlas(os.environ['MONGO_CONNECTION_STRING'])
    
    @component.output_types(categories=List[str])
    def run(self):
        categories = self.db.get_categories()
        return {"categories": categories}

## Filter Template


In [14]:
METADATA_FILTER_TEMPLATE = """
You are an expert assistant that helps create metadata filters for product searches. 
Based on the user input, create a JSON filter object that can be used to filter products.

Available materials: {{materials}}
Available categories: {{categories}}

The filter should follow this structure:
```json
{
    "material": ["material1", "material2"],  // Only if user mentions specific materials
    "category": ["category1", "category2"],  // Only if user mentions specific categories  
    "price": {"$gte": min_price, "$lte": max_price}  // Only if user mentions price range
}
```

Rules:
1. Only include filters that are explicitly mentioned or strongly implied in the user input
2. For materials and categories, use exact matches from the available lists
3. For price, extract numerical values and create range filters
4. If no specific filters are mentioned, return an empty JSON object: {}
5. Always return valid JSON wrapped in ```json``` code blocks

User input: {{input}}

Filter:
"""

## Create Metadata Filter Pipeline


In [15]:
class MetaDataFilterPipeline:
    def __init__(self, template):
        self.template = template
        self.pipeline = Pipeline()
        self.pipeline.add_component("materials", GetMaterials())
        self.pipeline.add_component("categories", GetCategories())
        self.pipeline.add_component(
            "prompt_builder",
            PromptBuilder(
                template=self.template,
                required_variables=["input", "materials", "categories"],
            )
        )
        self.pipeline.add_component("generator", GroqGenerator())
        self.pipeline.connect("materials.materials", "prompt_builder.materials")
        self.pipeline.connect("categories.categories", "prompt_builder.categories") 
        self.pipeline.connect("prompt_builder", "generator")

    def run(self, query: str):
        res = self.pipeline.run({
            "prompt_builder": {
                "input": query,
            },
        })
        return res["generator"]["replies"][0]

## Test Setup


In [16]:
print("Setting up MetaData Filter Pipeline...")
filter_pipeline = MetaDataFilterPipeline(METADATA_FILTER_TEMPLATE)


Setting up MetaData Filter Pipeline...


## Get available materials and categories


In [17]:
db = MongoDBAtlas(os.environ['MONGO_CONNECTION_STRING'])
materials = db.get_materials()
categories = db.get_categories()

print(f"Available materials: {materials}")
print(f"Available categories: {categories}")


Available materials: ['Glass', 'Synthetic', 'Leather', 'Cotton', 'Wood']
Available categories: ['Electronics', 'Shoes', 'Furniture', 'Clothing']


## Test Cases


In [18]:
test_queries = [
    # Material-based queries
    "I want cotton shirts",
    "Show me leather products", 
    "Looking for wooden furniture",
    
    # Category-based queries  
    "I need electronics",
    "Show me shoes",
    "Looking for clothing items",
    
    # Price-based queries
    "Products under 5 million rupiah",
    "Items between 1 million and 10 million",
    "Cheap products under 500000",
    
    # Combined filters
    "Cotton clothing under 1 million",
    "Leather shoes between 2 million and 5 million", 
    "Electronics over 10 million rupiah",
    
    # No specific filters
    "Show me good products",
    "What do you recommend?",
    "I'm just browsing"
]

In [19]:
def extract_and_parse_json(response_text):
    try:
        # Look for JSON code block
        json_match = re.search(r'```json\n(.*?)\n```', response_text, re.DOTALL)
        if json_match:
            json_str = json_match.group(1)
            return json.loads(json_str)
        else:
            # Try to find JSON without code blocks
            json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
            if json_match:
                json_str = json_match.group(0)
                return json.loads(json_str)
            else:
                return {}
    except json.JSONDecodeError as e:
        print(f"JSON parsing error: {e}")
        return {}

## Run Tests


In [20]:
print("\n=== METADATA FILTER GENERATION TESTS ===")
for i, query in enumerate(test_queries, 1):
    print(f"\n{i}. Query: '{query}'")
    print("-" * 60)
    
    try:
        # Generate filter
        result = filter_pipeline.run(query)
        print("Raw Response:")
        print(result)
        
        # Extract JSON
        filter_dict = extract_and_parse_json(result)
        print("\nExtracted Filter:")
        print(json.dumps(filter_dict, indent=2))
        
        # Validate filter structure
        valid_keys = {"material", "category", "price"}
        invalid_keys = set(filter_dict.keys()) - valid_keys
        if invalid_keys:
            print(f"⚠️  Warning: Invalid filter keys detected: {invalid_keys}")
        else:
            print("✅ Filter structure is valid")
            
    except Exception as e:
        print(f"❌ Error processing query: {e}")


=== METADATA FILTER GENERATION TESTS ===

1. Query: 'I want cotton shirts'
------------------------------------------------------------
Raw Response:
```json
{
    "material": ["Cotton"],
    "category": ["Clothing"]
}
```

Extracted Filter:
{
  "material": [
    "Cotton"
  ],
  "category": [
    "Clothing"
  ]
}
✅ Filter structure is valid

2. Query: 'Show me leather products'
------------------------------------------------------------
Raw Response:
```json
{
    "material": ["Leather"]
}
```

Extracted Filter:
{
  "material": [
    "Leather"
  ]
}
✅ Filter structure is valid

3. Query: 'Looking for wooden furniture'
------------------------------------------------------------
Raw Response:
```json
{
    "material": ["Wood"],
    "category": ["Furniture"]
}
```

Extracted Filter:
{
  "material": [
    "Wood"
  ],
  "category": [
    "Furniture"
  ]
}
✅ Filter structure is valid

4. Query: 'I need electronics'
------------------------------------------------------------
Raw Response:

## Edge Cases Testing


In [21]:
edge_cases = [
    "",  # Empty query
    "asdfgh",  # Random text
    "100 million dollar products",  # High price
    "plastic wooden cotton items",  # Multiple materials
    "I want electronics and shoes and furniture",  # Multiple categories
]

In [22]:
print("\n=== EDGE CASES TESTING ===")
for i, query in enumerate(edge_cases, 1):
    print(f"\n{i}. Edge Case: '{query}'")
    print("-" * 40)
    
    try:
        result = filter_pipeline.run(query)
        filter_dict = extract_and_parse_json(result)
        print("Extracted Filter:")
        print(json.dumps(filter_dict, indent=2))
    except Exception as e:
        print(f"❌ Error: {e}")


=== EDGE CASES TESTING ===

1. Edge Case: ''
----------------------------------------
Extracted Filter:
{}

2. Edge Case: 'asdfgh'
----------------------------------------
Extracted Filter:
{}

3. Edge Case: '100 million dollar products'
----------------------------------------
Extracted Filter:
{
  "price": {
    "$gte": 100000000
  }
}

4. Edge Case: 'plastic wooden cotton items'
----------------------------------------
Extracted Filter:
{
  "material": [
    "Wood",
    "Cotton"
  ]
}

5. Edge Case: 'I want electronics and shoes and furniture'
----------------------------------------
Extracted Filter:
{
  "category": [
    "Electronics",
    "Shoes",
    "Furniture"
  ]
}


In [23]:
def validate_filter(filter_dict, materials, categories):
    """Validate if the generated filter is correct"""
    issues = []
    
    # Check materials
    if "material" in filter_dict:
        if not isinstance(filter_dict["material"], list):
            issues.append("Material should be a list")
        else:
            for mat in filter_dict["material"]:
                if mat not in materials:
                    issues.append(f"Unknown material: {mat}")
    
    # Check categories
    if "category" in filter_dict:
        if not isinstance(filter_dict["category"], list):
            issues.append("Category should be a list")
        else:
            for cat in filter_dict["category"]:
                if cat not in categories:
                    issues.append(f"Unknown category: {cat}")
    
    # Check price
    if "price" in filter_dict:
        if not isinstance(filter_dict["price"], dict):
            issues.append("Price should be a dictionary")
        else:
            price_filter = filter_dict["price"]
            valid_price_ops = {"$gte", "$lte", "$gt", "$lt", "$eq"}
            for op in price_filter.keys():
                if op not in valid_price_ops:
                    issues.append(f"Invalid price operator: {op}")
    
    return issues

In [24]:
validation_tests = [
    {"material": ["Cotton"], "price": {"$lte": 1000000}},
    {"category": ["Electronics"], "price": {"$gte": 10000000}},
    {"material": ["Plastic"], "category": ["InvalidCategory"]},  # Should have issues
    {}
]

In [25]:
print("\n=== VALIDATION TESTS ===")
for i, test_filter in enumerate(validation_tests, 1):
    print(f"\n{i}. Testing filter: {test_filter}")
    issues = validate_filter(test_filter, materials, categories)
    if issues:
        print(f"❌ Issues found: {issues}")
    else:
        print("✅ Filter is valid")

print("\nFilter generation testing completed!")


=== VALIDATION TESTS ===

1. Testing filter: {'material': ['Cotton'], 'price': {'$lte': 1000000}}
✅ Filter is valid

2. Testing filter: {'category': ['Electronics'], 'price': {'$gte': 10000000}}
✅ Filter is valid

3. Testing filter: {'material': ['Plastic'], 'category': ['InvalidCategory']}
❌ Issues found: ['Unknown material: Plastic', 'Unknown category: InvalidCategory']

4. Testing filter: {}
✅ Filter is valid

Filter generation testing completed!
