## Import libraries

In [1]:
import os
import json

from elasticsearch import AsyncElasticsearch

## Indexing data

**Initialize client**

In [2]:
client = AsyncElasticsearch(
    hosts=os.getenv("ELASTICSEARCH_HOST"),
    api_key=os.getenv("ELASTICSEARCH_API_KEY")
)

**Load data**

In [3]:
with open("../data/mm_image.json", "r", encoding="utf-8") as f:
    data = json.load(f)
    
# transformed_data = []
# for item in data:
#     for product in item["products"]:
#         transformed_data.append(
#             {
#                 **product,
#                 "category": item["categories"]
#             }
#         )

In [4]:
import json
import re
from typing import Dict, Any, List, Union

def extract_combo_info(text: str) -> Dict[str, Any]:
    """
    Extract combo information from text string.
    Returns dict with combo details and calculated discount.
    """
    if not isinstance(text, str):
        return {"is_combo": False, "combo_text": None, "combo_price": None, "discount_percent": 0}
    
    # Pattern to match combo pricing like "Combo 2 giá 435000D" or "Combo 3 giá 500000D"
    combo_pattern = r'combo\s+(\d+)\s+giá\s+([\d,]+)d?'
    match = re.search(combo_pattern, text.lower())
    
    if match:
        combo_quantity = int(match.group(1))
        combo_price = int(match.group(2).replace(',', ''))
        
        return {
            "is_combo": True,
            "combo_text": text,
            "combo_quantity": combo_quantity,
            "combo_price": combo_price,
            "discount_percent": 0  # Will be calculated based on original price
        }
    
    # Try to extract percentage discount like "20%" or "15% off"
    percent_pattern = r'(\d+(?:\.\d+)?)%'
    percent_match = re.search(percent_pattern, text)
    
    if percent_match:
        discount_percent = float(percent_match.group(1))
        return {
            "is_combo": False,
            "combo_text": None,
            "combo_price": None,
            "discount_percent": discount_percent
        }
    
    return {"is_combo": False, "combo_text": None, "combo_price": None, "discount_percent": 0}

def calculate_discount_percent(original_price: float, final_price: float) -> float:
    """Calculate discount percentage from original and final prices."""
    if original_price <= 0:
        return 0
    return round(((original_price - final_price) / original_price) * 100, 2)

def fix_product_data(product: Dict[str, Any]) -> Dict[str, Any]:
    """
    Fix product data by properly handling discount and combo information.
    """
    fixed_product = product.copy()
    
    # Extract combo/discount info from the discount field
    discount_field = product.get("discount", "")
    combo_info = extract_combo_info(discount_field)
    
    original_price = product.get("original_price", 0)
    
    if combo_info["is_combo"]:
        # Handle combo products
        combo_quantity = combo_info["combo_quantity"]
        combo_price = combo_info["combo_price"]
        
        # Calculate per-unit price in combo
        per_unit_combo_price = combo_price / combo_quantity if combo_quantity > 0 else combo_price
        
        # Calculate discount percentage
        if original_price > 0:
            discount_percent = calculate_discount_percent(original_price, per_unit_combo_price)
        else:
            discount_percent = 0
        
        # Update fields
        fixed_product["discount"] = discount_percent
        fixed_product["final_price"] = per_unit_combo_price
        fixed_product["product_combo"] = combo_info["combo_text"]
        
        print(f"Fixed combo product: {combo_quantity} items for {combo_price}D")
        print(f"Per unit: {original_price}D -> {per_unit_combo_price}D ({discount_percent}% off)")
        
    else:
        # Handle regular discount
        discount_percent = combo_info["discount_percent"]
        
        if discount_percent > 0:
            # Calculate final price from discount percentage
            final_price = original_price * (1 - discount_percent / 100)
            fixed_product["discount"] = discount_percent
            fixed_product["final_price"] = round(final_price, 2)
            fixed_product["product_combo"] = None
            
            print(f"Fixed discount product: {original_price}D -> {final_price}D ({discount_percent}% off)")
        else:
            # No valid discount found, check if final_price suggests a discount
            current_final_price = product.get("final_price", original_price)
            
            if current_final_price != original_price and original_price > 0:
                # Calculate discount from existing prices
                calculated_discount = calculate_discount_percent(original_price, current_final_price)
                fixed_product["discount"] = calculated_discount
                fixed_product["product_combo"] = None
                
                print(f"Calculated discount: {original_price}D -> {current_final_price}D ({calculated_discount}% off)")
            else:
                # No discount
                fixed_product["discount"] = 0
                fixed_product["final_price"] = original_price
                fixed_product["product_combo"] = None
                
                print("No discount applied")
    
    return fixed_product

def validate_fixed_data(original: Dict[str, Any], fixed: Dict[str, Any]) -> bool:
    """
    Validate that the fixed data makes sense.
    """
    issues = []
    
    # Check discount field type
    if not isinstance(fixed["discount"], (int, float)):
        issues.append(f"Discount should be numeric, got: {type(fixed['discount'])}")
    
    # Check final price is reasonable
    original_price = fixed.get("original_price", 0)
    final_price = fixed.get("final_price", 0)
    
    if final_price > original_price:
        issues.append(f"Final price ({final_price}) is higher than original price ({original_price})")
    
    if final_price <= 0:
        issues.append(f"Final price should be positive, got: {final_price}")
    
    # Check discount percentage is reasonable
    discount = fixed.get("discount", 0)
    if discount < 0 or discount > 100:
        issues.append(f"Discount percentage should be 0-100, got: {discount}")
    
    if issues:
        print("Validation issues:")
        for issue in issues:
            print(f"  - {issue}")
        return False
    
    print("Validation passed!")
    return True

def process_products(products: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Process a list of products and fix their data.
    """
    fixed_products = []
    
    for i, product in enumerate(products):
        print(f"\n--- Processing Product {i+1} ---")
        print(f"Name: {product.get('product_name', 'N/A')}")
        print(f"Original discount field: {product.get('discount', 'N/A')}")
        
        try:
            fixed_product = fix_product_data(product)
            fixed_product["related_products"] = None
            if validate_fixed_data(product, fixed_product):
                fixed_products.append(fixed_product)
                print("✅ Product fixed successfully")
            else:
                print("❌ Product validation failed")
                fixed_products.append(product)  # Keep original if fix failed
                
        except Exception as e:
            print(f"❌ Error processing product: {e}")
            fixed_products.append(product)  # Keep original if error occurred
    
    return fixed_products
    

In [5]:
processed_data = process_products(data)


--- Processing Product 1 ---
Name: Khổ qua dồn thịt
Original discount field: None
No discount applied
Validation passed!
✅ Product fixed successfully

--- Processing Product 2 ---
Name: Khoai mỡ thịt xay, 650-850g
Original discount field: None
No discount applied
Validation passed!
✅ Product fixed successfully

--- Processing Product 3 ---
Name: Đùi gà tươi tháo khớp khay CP
Original discount field: None
No discount applied
Validation passed!
✅ Product fixed successfully

--- Processing Product 4 ---
Name: Đùi tỏi gà công nghiệp CP VN, 1kg
Original discount field: None
No discount applied
Validation passed!
✅ Product fixed successfully

--- Processing Product 5 ---
Name: Đuôi heo đông lạnh nhập khẩu
Original discount field: None
No discount applied
Validation passed!
✅ Product fixed successfully

--- Processing Product 6 ---
Name: Đùi tỏi gà MM, 1kg
Original discount field: 9
Calculated discount: 93900D -> 85000D (9.48% off)
Validation passed!
✅ Product fixed successfully

--- Process

In [6]:
import asyncio
from asyncio import Semaphore

async def delete_index(client, index_name):
    await client.indices.delete(index=index_name, ignore=[400, 404])

async def index_products(client, data):
    await delete_index(client, "product-data-v2")
    semaphore = Semaphore(32)
    tasks = [
        index_with_semaphore(client, product, semaphore)
        for product in data
    ]
    await asyncio.gather(*tasks)

async def index_with_semaphore(client, product, semaphore):
    async with semaphore:
        await client.index(
            index="product-data-v2",
            id=product["product_id"],
            document=product
        )

await index_products(client, processed_data)

  await client.indices.delete(index=index_name, ignore=[400, 404])
