In [31]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import pandas as pd
import json
import os
from datetime import datetime

# Connect to Elasticsearch
es = Elasticsearch(
    "https://localhost:9200",
    basic_auth=("elastic", "T4spN*t4Vldl-hUb1HE-"),
    verify_certs=False,
    ssl_show_warn=False,
    request_timeout=30
)

index_name = "books"
log_index_name = "search_logs"
json_file_path = "books.json"
csv_file_path = "synonyms.csv"  # Make sure this file exists

# Function to load JSON data
def load_books_from_json(json_file_path):
    if not os.path.exists(json_file_path):
        raise FileNotFoundError(f"JSON file not found: {json_file_path}")
    
    with open(json_file_path, "r", encoding="utf-8") as file:
        data = json.load(file)
    
    if not isinstance(data, list):
        raise ValueError("JSON file must contain a list of book objects.")
    
    return data

# Function to load synonyms from CSV
def load_synonyms_from_csv(csv_file_path):
    if not os.path.exists(csv_file_path):
        raise FileNotFoundError(f"CSV file not found: {csv_file_path}")

    df = pd.read_csv(csv_file_path)
    df.columns = df.columns.str.lower()  # Normalize column names

    if "lemma" not in df.columns or "synonyms" not in df.columns:
        raise ValueError("CSV file must have 'lemma' and 'synonyms' columns.")

    return [f"{row['lemma']} => {row['synonyms']}" for _, row in df.iterrows()]

# Function to update synonym filter in Elasticsearch
def update_synonym_filter(csv_file_path):
    synonyms = load_synonyms_from_csv(csv_file_path)

    if es.indices.exists(index=index_name):
        es.indices.close(index=index_name)

        es.indices.put_settings(
            index=index_name,
            body={
                "settings": {
                    "analysis": {
                        "filter": {
                            "synonym_filter": {
                                "type": "synonym",
                                "synonyms": synonyms
                            }
                        }
                    }
                }
            }
        )
        es.indices.open(index=index_name)
        print("Synonym filter updated successfully.")
    else:
        print(f"Index {index_name} does not exist.")

# Ensure the books index exists
if es.indices.exists(index=index_name):
    es.indices.delete(index=index_name)
    print(f"Deleted existing index: {index_name}")

# Create the books index
es.indices.create(index=index_name, body={
    "mappings": {
        "properties": {
            "Title": {"type": "text"},
            "Author": {"type": "text"},
            "Description": {"type": "text"},
            "Language": {"type": "keyword"},
            "Category": {"type": "text"},
            "Link": {"type": "keyword"},
        }
    },
    "settings": {
        "analysis": {
            "filter": {
                "synonym_filter": {
                    "type": "synonym",
                    "synonyms": []
                }
            },
            "analyzer": {
                "synonym_analyzer": {
                    "tokenizer": "standard",
                    "filter": ["lowercase", "synonym_filter"]
                }
            }
        }
    }
})
print(f"Created index: {index_name}")

# Ensure the search_logs index exists
if es.indices.exists(index=log_index_name):
    es.indices.delete(index=log_index_name)
    print(f"Deleted existing index: {log_index_name}")

if not es.indices.exists(index=log_index_name):
    es.indices.create(index=log_index_name, body={
        "mappings": {
            "properties": {
                "query": {"type": "keyword"},
                "timestamp": {"type": "date"}
            }
        }
    })
    print(f"Created index: {log_index_name}")

# Function to Log Search Queries
def log_search(query):
    doc = {
        "query": query,
        "timestamp": datetime.utcnow().isoformat()
    }
    es.index(index=log_index_name, document=doc)

# Load book data from JSON file
books_data = load_books_from_json(json_file_path)

# Prepare bulk data for indexing
bulk_data = [
    {
        "_index": index_name,
        "_source": book
    }
    for book in books_data
]

# Load and update synonyms
update_synonym_filter(csv_file_path)

# Perform bulk indexing and handle errors
success, failed = bulk(es, bulk_data, raise_on_error=False)
print(f"Successfully indexed {success} documents, {len(failed)} failed.")

print("Setup completed successfully!")

Deleted existing index: books
Created index: books
Deleted existing index: search_logs
Created index: search_logs
Synonym filter updated successfully.
Successfully indexed 759 documents, 0 failed.
Setup completed successfully!


In [34]:
#Useful
def search_books_by_language(criteria):
    query = {
        "query": {
            "multi_match": {
                "query": criteria,
                "fields": ["Language"]
            }
        }
    }
    response = es.search(index=index_name, body=query)
    return [
        {
            "Title": hit["_source"].get("Title"),
            "Author": hit["_source"].get("Author"),
            "Description": hit["_source"].get("Description"),
            "Language": hit["_source"].get("Language"),
            "Category": hit["_source"].get("Category"),
            "Book": hit["_source"].get("Book")
        }
        for hit in response["hits"]["hits"]
    ]
            
# Query Functions
def search_books(criteria, fields=None):
    log_search(criteria)  # Log the search query
    if fields is None:
        fields = ["Title", "Author", "Description", "Category"]
    query = {
        "query": {
            "multi_match": {
                "query": criteria,
                "fields": fields
            }
        }
    }
    # Language and Category are not added. Currently
    response = es.search(index=index_name, body=query)
    return [
        {
            "Title": hit["_source"].get("Title"),
            "Author": hit["_source"].get("Author"),
            "Description": hit["_source"].get("Description"),
            "Book": hit["_source"].get("Book")
        }
        for hit in response["hits"]["hits"]
    ]

def fuzzy_search(term, language="standard"):
    log_search(term)  # Log the search query
    query = {
        "query": {
            "multi_match": {
                "query": term,
                "fields": ["Title", "Author", "Description", "Book"],
                "fuzziness": "AUTO",
                "analyzer": language
            }
        }
    }
    response = es.search(index=index_name, body=query)
    return [
        {
            "Title": hit["_source"].get("Title"),
            "Author": hit["_source"].get("Author"),
            "Description": hit["_source"].get("Description"),
            "Book": hit["_source"].get("Book")
        }
        for hit in response["hits"]["hits"]
    ]
    

def synonym_search(term):
    log_search(term)  # Log the search query
    query = {
        "query": {
            "multi_match": {
                "query": term,
                "fields": ["Title", "Author", "Description", "Book"],
                "analyzer": "synonym_analyzer"
            }
        }
    }
    response = es.search(index=index_name, body=query)
    return [
        {
            "Title": hit["_source"].get("Title"),
            "Author": hit["_source"].get("Author"),
            "Description": hit["_source"].get("Description"),
            "Book": hit["_source"].get("Book")
        }
        for hit in response["hits"]["hits"]
    ]

def natural_language_search(query_text):
    log_search(query_text)  # Log the search query
    query = {
        "query": {
            "multi_match": {
                "query": query_text,
                "fields": ["Title", "Author", "Description", "Book"],
                "type": "best_fields",
                "operator": "and"
            }
        }
    }
    response = es.search(index=index_name, body=query)
    
    # Fallback if no results are returned
    if not response["hits"]["hits"]:
        query["query"]["multi_match"]["operator"] = "or"
        response = es.search(index=index_name, body=query)
    return [
        {
            "Title": hit["_source"].get("Title"),
            "Author": hit["_source"].get("Author"),
            "Description": hit["_source"].get("Description"),
            "Book": hit["_source"].get("Book")
        }
        for hit in response["hits"]["hits"]
    ]
    
def user_recommendations(user_history):
    query = {
        "query": {
            "more_like_this": {
                "fields": ["Title", "Author", "Description", "Book"],
                "like": user_history,
                "min_term_freq": 1,
                "max_query_terms": 12
            }
        }
    }
    response = es.search(index=index_name, body=query)
    return [{
        "Title": hit["_source"].get("Title"),
        "Author": hit["_source"].get("Author"),
        "Description": hit["_source"].get("Description"),
        "Book": hit["_source"].get("Book")
    } for hit in response["hits"]["hits"]]

def popular_searches(hours_interval=None):
    # Define the time range filter if hours_interval is provided
    time_filter = {}
    if hours_interval is not None:
        start_time = (datetime.utcnow() - timedelta(hours=hours_interval)).isoformat()
        time_filter = {"range": {"timestamp": {"gte": start_time}}}
    else:
        time_filter = {"match_all": {}}

    # Build the query with optional time filter
    query = {
        "query": time_filter,
        "aggs": {
            "popular_terms": {
                "terms": {
                    "field": "query",  # Use the `keyword` field
                    "size": 10  # Number of terms to return
                }
            }
        }
    }
    #response = es.search(index=log_index_name, body={"query": {"match_all": {}}})
    #print("Search Logs Data:", response)
    
    response = es.search(index=log_index_name, body=query)
    #print(response)

    # Check if aggregation returned results
    if "aggregations" in response and "popular_terms" in response["aggregations"]:
        buckets = response["aggregations"]["popular_terms"]["buckets"]
        if buckets:
            return [bucket["key"] for bucket in buckets]

    # Fallback to fetching raw queries if aggregation is empty
    match_all_response = es.search(
        index=log_index_name,
        body={"query": {"match_all": {}}},
        size=10  # Limit the number of raw queries fetched
    )
    return [log["_source"]["query"] for log in match_all_response["hits"]["hits"]]

def advanced_search(query, operator="AND"):
    log_search(query)  # Log the search query

    terms = [term.strip() for term in query.split(f" {operator} ")]

    if operator == "AND":
        bool_query = {
            "must": [
                {
                    "multi_match": {
                        "query": term,
                        "fields": ["Title", "Author", "Description", "Book"]
                    }
                } for term in terms
            ]
        }
    elif operator == "OR":
        bool_query = {
            "should": [
                {
                    "multi_match": {
                        "query": term,
                        "fields": ["Title", "Author", "Description", "Book"]
                    }
                } for term in terms
            ],
            "minimum_should_match": 1  # At least one should match
        }
    else:
        raise ValueError(f"Unsupported operator: {operator}")

    query_body = {
        "query": {
            "bool": bool_query
        }
    }

    response = es.search(index=index_name, body=query_body)

    return [
        {
            "Title": hit["_source"].get("Title"),
            "Author": hit["_source"].get("Author"),
            "Description": hit["_source"].get("Description"),
            "Book": hit["_source"].get("Book")
        }
        for hit in response["hits"]["hits"]
    ]


In [35]:
analyze_query = {
    "analyzer": "synonym_analyzer",
    "text": "Book"
}
response = es.indices.analyze(index=index_name, body=analyze_query)
print("Analyze Response:", response)

search_lanugage_based = search_books_by_language("English")
print("Search Results for English Language:", search_lanugage_based)
print("\n")

search_result = search_books("Public School Education")
print("Search Results:", search_result)
print("\n")

field_specific_result = search_books("Michael", fields=["Author"])
print("Field Specific Search Results:", field_specific_result)
print("\n")

fuzzy_result = fuzzy_search("Publc Skool", "english")
print("Fuzzy Search Results:", fuzzy_result)
print("\n")

fuzzy_result_BG = fuzzy_search("образувание", "bulgarian")
print("Fuzzy search Results for BG:", fuzzy_result_BG)
print("\n")

synonym_result = synonym_search("debater")
print("Synonym Search Results:", synonym_result)
print("\n")

nl_search_result = natural_language_search("book on education")
print("Natural Language Search Results:", nl_search_result)
print("\n")

recommendation_result = user_recommendations("Education")
print("User Recommendations:", recommendation_result)
print("\n")

popular_search_result = popular_searches(hours_interval=1)
print("Popular Searches:", popular_search_result)
print("\n")

advanced_search_result = advanced_search("educational OR practices", "OR")
print("Advanced Search Results:", advanced_search_result)
print("\n")


Analyze Response: {'tokens': [{'token': 'volume', 'start_offset': 0, 'end_offset': 4, 'type': 'SYNONYM', 'position': 0}, {'token': 'reserve', 'start_offset': 0, 'end_offset': 4, 'type': 'SYNONYM', 'position': 0}, {'token': 'record', 'start_offset': 0, 'end_offset': 4, 'type': 'SYNONYM', 'position': 1}, {'token': 'hold', 'start_offset': 0, 'end_offset': 4, 'type': 'SYNONYM', 'position': 1}, {'token': 'record', 'start_offset': 0, 'end_offset': 4, 'type': 'SYNONYM', 'position': 2}, {'token': 'book', 'start_offset': 0, 'end_offset': 4, 'type': 'SYNONYM', 'position': 3}, {'token': 'script', 'start_offset': 0, 'end_offset': 4, 'type': 'SYNONYM', 'position': 4}, {'token': 'playscript', 'start_offset': 0, 'end_offset': 4, 'type': 'SYNONYM', 'position': 5}, {'token': 'ledger', 'start_offset': 0, 'end_offset': 4, 'type': 'SYNONYM', 'position': 6}, {'token': 'leger', 'start_offset': 0, 'end_offset': 4, 'type': 'SYNONYM', 'position': 7}, {'token': 'account', 'start_offset': 0, 'end_offset': 4, 'ty

  "timestamp": datetime.utcnow().isoformat()
  start_time = (datetime.utcnow() - timedelta(hours=hours_interval)).isoformat()
