In [95]:
from elasticsearch import Elasticsearch

# Create Elasticsearch client
es = Elasticsearch(
    "http://localhost:9200",  # Changed from https to http
    basic_auth=("elastic", "pass"),  # Use your actual password
)
# Update disk watermark thresholds
es.cluster.put_settings(
    body={
        "persistent": {
            "cluster.routing.allocation.disk.watermark.low": "99%",
            "cluster.routing.allocation.disk.watermark.high": "99%",
            "cluster.routing.allocation.disk.watermark.flood_stage": "99%",
        }
    }
)
# Test connection
try:
    if es.ping():
        print("Successfully connected to Elasticsearch")
        print(es.info())
    else:
        print("Could not connect to Elasticsearch")
except Exception as e:
    print(f"Connection failed: {e}")

Successfully connected to Elasticsearch
{'name': '66123632dc80', 'cluster_name': 'docker-cluster', 'cluster_uuid': 'tFhb53A1Rw2c0dBvX5pkpA', 'version': {'number': '8.17.0', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': '2b6a7fed44faa321997703718f07ee0420804b41', 'build_date': '2024-12-11T12:08:05.663969764Z', 'build_snapshot': False, 'lucene_version': '9.12.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'}


In [96]:
# Elasticsearch mappings for the models

USER_MAPPING = {
    "mappings": {
        "properties": {
            "email": {"type": "keyword"},
            "name": {"type": "text"},
            "password": {"type": "keyword"},
            "embedding": {
                "type": "dense_vector",
                "dims": 768,  # Adjust dimension based on your embedding size
            },
        }
    }
}

RECIPE_MAPPING = {
    "mappings": {
        "properties": {
            "id": {"type": "integer"},
            "title": {"type": "text"},
            "ingredients": {"type": "text", "fields": {"keyword": {"type": "keyword"}}},
            "instructions": {"type": "text"},
            "prep_time": {"type": "integer"},
            "cook_time": {"type": "integer"},
            "cuisine": {"type": "keyword"},
            "course": {"type": "keyword"},
            "diet": {"type": "keyword"},
            "image": {"type": "keyword", "index": False},
            "url": {"type": "keyword", "index": False},
            "embedding": {
                "type": "dense_vector",
                "dims": 768,  # Adjust dimension based on your embedding size
            },
        }
    }
}

FEEDBACK_MAPPING = {
    "mappings": {
        "properties": {
            "email": {"type": "keyword"},
            "input_description": {"type": "text"},
            "input_image": {"type": "text", "index": False},
            "recipe_id": {"type": "integer"},
            "rating": {"type": "integer"},
            "comment": {"type": "text"},
            "created_at": {"type": "date"},  # Added creation date
        }
    }
}

USER_REVIEW_MAPPING = {
    "mappings": {
        "properties": {
            "email": {"type": "keyword"},
            "reviews": {
                "type": "nested",
                "properties": {
                    "content": {
                        "type": "text"
                    },  # Changed from "text" to match the model
                    "created_at": {"type": "date"},  # Added creation date
                },
            },
        }
    }
}

RECIPE_ADD_MAPPING = {
    "mappings": {
        "properties": {
            "id": {"type": "integer"},
            "title": {"type": "text"},
            "ingredients": {"type": "text", "fields": {"keyword": {"type": "keyword"}}},
            "instructions": {"type": "text"},
            "prep_time": {"type": "integer"},
            "cook_time": {"type": "integer"},
            "cuisine": {"type": "keyword"},
            "course": {"type": "keyword"},
            "diet": {"type": "keyword"},
            "image": {"type": "keyword", "index": False},
            "url": {"type": "keyword", "index": False},
            "embedding": {
                "type": "dense_vector",
                "dims": 768,  # Adjust dimension based on your embedding size
            },
            "accepted": {"type": "boolean"},
        }
    }
}

In [99]:
# ... existing elasticsearch import and client setup ...
def create_indices(es_client):
    """Create all required indices if they don't exist"""
    for index_name, mapping in MAPPINGS.items():
        try:
            if not es_client.indices.exists(index=index_name):
                print(f"Creating index '{index_name}'...")
                es_client.indices.create(index=index_name, body=mapping)
                print(f"Successfully created index '{index_name}'")
            else:
                print(f"Index '{index_name}' already exists")
        except Exception as e:
            print(f"Error creating index '{index_name}': {e}")


# Define the mapping dictionary
MAPPINGS = {
    "users": USER_MAPPING,
    "recipes": RECIPE_MAPPING,
    "feedback": FEEDBACK_MAPPING,
    "user_reviews": USER_REVIEW_MAPPING,
    "recipe_additions": RECIPE_ADD_MAPPING,
}

# Create all indices
create_indices(es)

Creating index 'users'...
Successfully created index 'users'
Creating index 'recipes'...
Successfully created index 'recipes'
Creating index 'feedback'...
Successfully created index 'feedback'
Creating index 'user_reviews'...
Successfully created index 'user_reviews'
Creating index 'recipe_additions'...
Successfully created index 'recipe_additions'


In [100]:
def delete_indices(es_client):
    """Delete all indices defined in MAPPINGS"""
    for index_name in MAPPINGS.keys():
        try:
            if es_client.indices.exists(index=index_name):
                print(f"Deleting index '{index_name}'...")
                es_client.indices.delete(index=index_name)
                print(f"Successfully deleted index '{index_name}'")
            else:
                print(f"Index '{index_name}' does not exist")
        except Exception as e:
            print(f"Error deleting index '{index_name}': {e}")


# Delete all indices
# delete_indices(es)

In [101]:
def check_index_stats(es_client, index_name="recipes"):
    """
    Check if an index exists and get its document count

    Args:
        es_client: AsyncElasticsearch client
        index_name: Name of the index to check

    Returns:
        bool: True if index exists and has documents, False otherwise
    """
    try:
        # Check if index exists
        if not es_client.indices.exists(index=index_name):
            print(f"Index '{index_name}' does not exist!")
            return False

        # Get document count
        stats = es_client.count(index=index_name)
        doc_count = stats["count"]

        print(f"Index '{index_name}' contains {doc_count} documents")
        return doc_count > 0

    except Exception as e:
        print(f"Error checking index: {e}")
        return False


# Usage example:
has_documents = check_index_stats(es)
if not has_documents:
    print("Index is empty! You may need to index some documents first.")

Index 'recipes' contains 0 documents
Index is empty! You may need to index some documents first.


## read data and index it to elastic


In [102]:
import pandas as pd

df = pd.read_csv("./data.csv")
df.head()

Unnamed: 0,id,title,ingredients,instructions,prep_time,cook_time,cuisine,course,diet,image,url,embedding
0,4529,lavand-e-murgh recipe - afghani chicken in yog...,['fresh pomegranate fruit kernels few garnish'...,"['to begin making the lavand-e-murgh recipe, w...",15,25,Afghan,Dinner,High Protein Non Vegetarian,,https://www.archanaskitchen.com/lavand-e-murgh...,"[[-0.0026710997335612774, 0.003612738568335771..."
1,4640,afghani dhoog recipe - cucumber mint buttermil...,"['cumin powder jeera', 'curd dahi yogurt', 'sa...",['to begin making the afghani dhoog recipe - c...,10,0,Afghan,Snack,Vegetarian,,http://www.archanaskitchen.com/doogh-afghani-y...,"[[-0.014779524877667427, -0.008534302935004234..."
2,5978,malida recipe (healthy whole wheat afghan sweet),"['cardamom powder elaichi', 'dates pitted fine...","['to begin making the malida recipe, tear the ...",20,20,Afghan,Snack,Vegetarian,,https://www.archanaskitchen.com/malida-recipe-...,"[[-0.01772255077958107, -0.019701037555933, -0..."
3,7092,moroccan spiced millet and lentil salad recipe,"['tomato chopped', 'extra virgin olive oil', '...",['to begin making the moroccan spiced millet a...,10,20,African,Dinner,Vegetarian,,https://www.archanaskitchen.com/moroccan-spice...,"[[-0.06342744827270508, -0.01326711568981409, ..."
4,6684,chickpea & date tagine recipe,"['onion', 'cumin powder jeera', 'extra virgin ...",['to begin making the chickpea & date tagine r...,15,60,African,Dinner,High Protein Vegetarian,,https://www.archanaskitchen.com/chickpea-date-...,"[[-0.03216571733355522, 0.029672250151634216, ..."


In [103]:
# Add import at the top
import ast

# Convert string representation to actual array when reading the embedding
df["embedding"] = df["embedding"].apply(ast.literal_eval)

# Now when you check the embedding, it will be an actual array
print(type(df["embedding"].loc[0]))  # Should print: <class 'list'>

<class 'list'>


index one document through recipe model


In [105]:
from elasticsearch import Elasticsearch
from typing import Dict, Any
import sys
from models import Recipe

# ... existing code ...


def index_recipe_to_elastic(
    recipe: Recipe, es_client: Elasticsearch, index_name: str = "recipes"
) -> None:
    """
    Index a recipe to Elasticsearch

    Args:
        recipe: Recipe model instance
        es_client: Elasticsearch client instance
        index_name: Name of the Elasticsearch index (default: "recipes")
    """
    doc = {
        "id": recipe.id,
        "title": recipe.title,
        "ingredients": recipe.ingredients,
        "instructions": recipe.instructions,
        "prep_time": recipe.prep_time,
        "cook_time": recipe.cook_time,
        "cuisine": recipe.cuisine,
        "course": recipe.course,
        "diet": recipe.diet,
        "image": str(recipe.image) if recipe.image else None,
        "url": str(recipe.url) if recipe.url else None,
        "embedding": recipe.embedding,
    }

    es_client.index(index=index_name, id=str(recipe.id), document=doc)
    print(f"Indexed recipe {recipe.id} to Elasticsearch")

bulk index df recipes


In [106]:
# Add necessary imports
from models import Recipe, RecipeAdd, User
from ast import literal_eval
import numpy as np
from elasticsearch.helpers import bulk

In [107]:
# Add necessary imports
from models import Recipe
from ast import literal_eval
import numpy as np


def row_to_recipe(row):
    """Convert a DataFrame row to a Recipe object"""
    # Convert embedding to a flat list of floats
    embedding = np.array(row.embedding).flatten().tolist()
    try:
        return Recipe(
            id=row.id,
            title=row.title,
            ingredients=(
                literal_eval(row.ingredients)
                if isinstance(row.ingredients, str)
                else row.ingredients
            ),
            instructions=(
                literal_eval(row.instructions)
                if isinstance(row.instructions, str)
                else row.instructions
            ),
            prep_time=row.prep_time,
            cook_time=row.cook_time,
            cuisine=row.cuisine,
            course=row.course,
            diet=row.diet,
            image=row.image if pd.notna(row.image) else None,
            url=row.url if pd.notna(row.url) else None,
            embedding=embedding,  # Now it's a flat list of floats
        )
    except Exception as e:
        return None

In [108]:
def bulk_index_recipe_batch(df_batch, es_client, index_name="recipes"):
    """
    Convert a batch of DataFrame rows to Recipe objects and bulk index them

    Args:
        df_batch: Pandas DataFrame batch containing recipes
        es_client: Elasticsearch client instance
        index_name: Name of the Elasticsearch index
    """
    # Convert rows to Recipe objects and filter out None values
    recipes = [
        r
        for r in (row_to_recipe(row) for _, row in df_batch.iterrows())
        if r is not None
    ]

    if not recipes:
        print("No valid recipes in this batch")
        return
    # Prepare bulk indexing actions
    actions = []
    for recipe in recipes:
        # Build document with required fields
        doc = {
            "id": recipe.id,
            "title": recipe.title,
            "ingredients": recipe.ingredients,
            "instructions": recipe.instructions,
            "prep_time": recipe.prep_time,
            "cook_time": recipe.cook_time,
            "cuisine": recipe.cuisine,
            "course": recipe.course,
            "diet": recipe.diet,
        }

        # Only add optional fields if they're not None
        if recipe.image is not None:
            doc["image"] = recipe.image
        if recipe.url is not None:
            doc["url"] = recipe.url
        if recipe.embedding is not None:
            doc["embedding"] = recipe.embedding

        actions.append({"_index": index_name, "_id": str(recipe.id), "_source": doc})

    # Perform bulk indexing
    try:
        success, failed = bulk(es_client, actions, chunk_size=500, request_timeout=30)
        print(f"Successfully indexed {success} documents")
        if failed:
            print(f"Failed to index {len(failed)} documents")
    except Exception as e:
        print(f"Error during bulk indexing: {e}")


# Usage example - process in batches of 1000
batch_size = 2500
for start_idx in range(0, len(df), batch_size):
    batch = df.iloc[start_idx : start_idx + batch_size]
    bulk_index_recipe_batch(batch, es)

  success, failed = bulk(es_client, actions, chunk_size=500, request_timeout=30)


Successfully indexed 2500 documents
Successfully indexed 2500 documents
Successfully indexed 44 documents


In [109]:
def get_random_recipe(
    es_client: Elasticsearch, index_name: str = "recipes"
) -> tuple[Recipe, pd.DataFrame]:
    """
    Retrieve a random recipe from Elasticsearch, convert it to a Recipe model and DataFrame

    Args:
        es_client: Elasticsearch client instance
        index_name: Name of the Elasticsearch index

    Returns:
        tuple: (Recipe model instance, pandas DataFrame)
    """
    # Random query
    random_query = {
        "query": {"function_score": {"query": {"match_all": {}}, "random_score": {}}},
        "size": 1,
    }

    try:
        # Execute search
        result = es_client.search(index=index_name, body=random_query)

        if not result["hits"]["hits"]:
            raise ValueError("No documents found in the index")

        # Convert to Recipe model
        hit = result["hits"]["hits"][0]["_source"]
        recipe = Recipe(
            id=hit["id"],
            title=hit["title"],
            ingredients=hit["ingredients"],
            instructions=hit["instructions"],
            prep_time=hit["prep_time"],
            cook_time=hit["cook_time"],
            cuisine=hit["cuisine"],
            course=hit["course"],
            diet=hit["diet"],
            image=hit.get("image"),
            url=hit.get("url"),
            embedding=hit.get("embedding"),
        )

        # Convert to DataFrame
        recipe_df = pd.DataFrame(
            [
                {
                    "id": recipe.id,
                    "title": recipe.title,
                    "ingredients": recipe.ingredients,
                    "instructions": recipe.instructions,
                    "prep_time": recipe.prep_time,
                    "cook_time": recipe.cook_time,
                    "cuisine": recipe.cuisine,
                    "course": recipe.course,
                    "diet": recipe.diet,
                    "embedding": recipe.embedding,
                    "url": recipe.url,
                }
            ]
        )

        return recipe, recipe_df

    except Exception as e:
        print(f"Error retrieving random recipe: {e}")
        return None, None


# Usage example:
recipe, df = get_random_recipe(es)
if recipe:
    print("Recipe Model:")
    display(recipe)
    print("\nDataFrame:")
    display(df)

Recipe Model:


Recipe(id=2203, title='pepper murukku recipe - south indian style chakli', ingredients=['pinch asafoetida hing', 'urad dal flour husk black gram flour', 'rice flour', 'whole black peppercorns freshly powdered', 'sunflower oil frying', 'salt taste', 'sunflower oil from the oil used frying the murukku', 'sesame seeds til seeds', 'butter unsalted'], instructions=['to begin making the pepper murukku recipe, in a medium size bowl, add the sieved rice and the urad dal flour.add salt, asafoetida, sesame seeds, pepper powder and mix all the dry ingredients.now add butter and rub it into the flour get\xa0 sand like texture.\xa0 add hot oil and add water little by little to form a smooth, crack free dough.cover the bowl with a damp cloth and allow the dough to rest for 30 minutes.once the dough is rested, grease the murukku press with oil and fill it with the prepared dough.heat oil in a wok/kadhai\xa0on medium flame and when the oil is heated up, press the murukku press over the oil in circular


DataFrame:


Unnamed: 0,id,title,ingredients,instructions,prep_time,cook_time,cuisine,course,diet,embedding,url
0,2203,pepper murukku recipe - south indian style chakli,"[pinch asafoetida hing, urad dal flour husk bl...","[to begin making the pepper murukku recipe, in...",40,20,South Indian Recipes,Snack,Vegetarian,"[0.012523884885013103, 0.0002202177856815979, ...",https://www.archanaskitchen.com/pepper-murukku...


In [110]:
def index_pending_recipe(
    recipe: Recipe, es_client: Elasticsearch, index_name: str = "recipe_additions"
) -> None:
    """
    Convert Recipe to RecipeAdd and index it to Elasticsearch with accepted=False

    Args:
        recipe: Recipe model instance
        es_client: Elasticsearch client instance
        index_name: Name of the Elasticsearch index for pending recipes
    """
    # Convert to pending RecipeAdd
    recipe_dict = recipe.model_dump()
    recipe_dict["accepted"] = False

    # Create new RecipeAdd instance
    pending_recipe = RecipeAdd(**recipe_dict)
    # Prepare document
    doc = pending_recipe.model_dump()

    try:
        # Index the document
        es_client.index(index=index_name, id=str(recipe.id), document=doc)
        print(f"Successfully indexed pending recipe {recipe.id} to {index_name}")
    except Exception as e:
        print(f"Error indexing pending recipe: {e}")


# Example usage:
# First, create a sample recipe
sample_recipe = Recipe(
    id=2001,
    title="Homemade Pizza",
    ingredients=[
        "2 cups all-purpose flour",
        "1 cup warm water",
        "2 tbsp olive oil",
        "1 tsp yeast",
        "1 tsp salt",
        "Pizza toppings of choice",
    ],
    instructions=[
        "Mix flour, water, oil, yeast, and salt",
        "Knead dough for 10 minutes",
        "Let rise for 1 hour",
        "Roll out and add toppings",
        "Bake at 450°F for 15 minutes",
    ],
    prep_time=70,
    cook_time=15,
    cuisine="Italian",
    course="Main Dish",
    diet="Vegetarian",
    image="https://example.com/pizza.jpg",
    url="https://example.com/homemade-pizza",
    embedding=[0.1] * 768,  # Dummy embedding
)

# Index the sample recipe
index_pending_recipe(sample_recipe, es)

# Verify it was indexed
result = es.get(index="recipe_additions", id=str(sample_recipe.id))
display(pd.DataFrame([result["_source"]]))

Successfully indexed pending recipe 2001 to recipe_additions


Unnamed: 0,id,title,ingredients,instructions,prep_time,cook_time,cuisine,course,diet,image,url,embedding,accepted
0,2001,Homemade Pizza,"[2 cups all-purpose flour, 1 cup warm water, 2...","[Mix flour, water, oil, yeast, and salt, Knead...",70,15,Italian,Main Dish,Vegetarian,https://example.com/pizza.jpg,https://example.com/homemade-pizza,"[0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, ...",False


In [55]:
# Define the query
query = {
    "size": 0,
    "aggs": {"unique_categories": {"terms": {"field": "cuisine"}}},
}

# Execute the search
response = es.search(index="recipes", body=query)

# Print the aggregation results
print("Aggregation Results:")
print(response["aggregations"]["unique_categories"]["buckets"])

Aggregation Results:
[{'key': 'Indian', 'doc_count': 942}, {'key': 'Continental', 'doc_count': 805}, {'key': 'North Indian Recipes', 'doc_count': 571}, {'key': 'South Indian Recipes', 'doc_count': 414}, {'key': 'Italian Recipes', 'doc_count': 209}, {'key': 'Bengali Recipes', 'doc_count': 127}, {'key': 'Kerala Recipes', 'doc_count': 115}, {'key': 'Maharashtrian Recipes', 'doc_count': 108}, {'key': 'Fusion', 'doc_count': 106}, {'key': 'Karnataka', 'doc_count': 100}]


In [76]:
def initialize_globals():
    """Initialize global variables used across the application"""
    global df, distinct_ingredients, cuisines, courses, diets

    try:
        # Simple aggregation query for all fields
        query = {
            "size": 0,
            "aggs": {
                "unique_cuisines": {"terms": {"field": "cuisine", "size": 10000}},
                "unique_courses": {"terms": {"field": "course", "size": 10000}},
                "unique_diets": {"terms": {"field": "diet", "size": 10000}},
                "unique_ingredients": {
                    "terms": {"field": "ingredients.keyword", "size": 10000}
                },
            },
        }

        # Execute the search
        response = es.search(index="recipes", body=query)

        # Extract values from buckets
        cuisines = sorted(
            [
                bucket["key"]
                for bucket in response["aggregations"]["unique_cuisines"]["buckets"]
            ]
        )
        courses = sorted(
            [
                bucket["key"]
                for bucket in response["aggregations"]["unique_courses"]["buckets"]
            ]
        )
        diets = sorted(
            [
                bucket["key"]
                for bucket in response["aggregations"]["unique_diets"]["buckets"]
            ]
        )
        distinct_ingredients = sorted(
            [
                bucket["key"]
                for bucket in response["aggregations"]["unique_ingredients"]["buckets"]
            ]
        )

        print(
            f"Found {len(cuisines)} cuisines, {len(courses)} courses, {len(diets)} diets, "
            f"and {len(distinct_ingredients)} ingredients"
        )

        return distinct_ingredients, cuisines, courses, diets

    except Exception as e:
        print(f"Error initializing globals from Elasticsearch: {e}")
        return [], [], [], []

In [77]:
distinct_ingredients, cuisines, courses, diets = initialize_globals()
print("g")

Found 82 cuisines, 20 courses, 10 diets, and 10000 ingredients
g


user signup and user loging


In [92]:
def index_user(user: User, es_client: Elasticsearch, index_name: str = "users") -> bool:
    """
    Index a User model instance into Elasticsearch if it doesn't already exist

    Args:
        user: User model instance
        es_client: Elasticsearch client instance
        index_name: Name of the Elasticsearch index for users (default: "users")

    Returns:
        bool: True if user was indexed successfully, False if user already exists or error occurs
    """
    try:
        # Check if user already exists
        if es_client.exists(index=index_name, id=user.email):
            print(f"User {user.email} already exists in {index_name}")
            return False

        # Convert User model to dictionary
        doc = user.model_dump()

        # Use email as document ID since it's unique
        es_client.index(index=index_name, id=user.email, document=doc)
        print(f"Successfully indexed user {user.email} to {index_name}")
        return True

    except Exception as e:
        print(f"Error indexing user: {e}")
        return False


# Example usage:
sample_user = User(
    email="test@exgample.com",
    name="Test User",
    password="hashed_password",  # In practice, this should be properly hashed
)

# Index the sample user
index_user(sample_user, es)

# Verify it was indexed (optional)
result = es.get(index="users", id=sample_user.email)
display(pd.DataFrame([result["_source"]]))

User test@exgample.com already exists in users


Unnamed: 0,email,name,password,embedding
0,test@exgample.com,Test User,hashed_password,


In [93]:
def login_user(
    email: str, password: str, es_client: Elasticsearch, index_name: str = "users"
) -> bool:
    """
    Verify user credentials against Elasticsearch

    Args:
        email: User's email
        password: User's password (should be hashed in production)
        es_client: Elasticsearch client instance
        index_name: Name of the Elasticsearch index for users (default: "users")

    Returns:
        bool: True if credentials are valid, False otherwise
    """
    try:
        # Check if user exists and get their data
        if not es_client.exists(index=index_name, id=email):
            print("User not found")
            return False

        # Get user data
        user_data = es_client.get(index=index_name, id=email)["_source"]

        # Check if password matches
        # NOTE: In production, you should use proper password hashing and verification
        if user_data["password"] == password:
            print("Login successful")
            return True
        else:
            print("Invalid password")
            return False

    except Exception as e:
        print(f"Error during login: {e}")
        return False


# Example usage:
success = login_user("test@example.com", "hashed_password", es)
print(f"Login successful: {success}")

Login successful
Login successful: True


In [94]:
# Test with correct credentials
success = login_user("test@example.com", "hashed_password", es)
print(f"Should succeed: {success}")

# Test with wrong password
success = login_user("test@example.com", "wrong_password", es)
print(f"Should fail: {success}")

# Test with non-existent user
success = login_user("nonexistent@example.com", "any_password", es)
print(f"Should fail: {success}")

Login successful
Should succeed: True
Invalid password
Should fail: False
User not found
Should fail: False
