In [4]:
#pip install dotenv

In [6]:
# Elasticsearch SKU Lookup
# Load dependencies and environment variables

import os
import json
from dotenv import load_dotenv
from elasticsearch import Elasticsearch

load_dotenv()

# Connect to Elasticsearch
es = Elasticsearch(
    hosts=[f"{os.getenv('ELASTICSEARCH_HOST')}:{os.getenv('ELASTICSEARCH_PORT')}"],
    basic_auth=(os.getenv('ELASTICSEARCH_USERNAME'), os.getenv('ELASTICSEARCH_PASSWORD'))
)

# Test connection
print(es.info())

{'name': 'instance-0000000053', 'cluster_name': '0b631774f12b4ae3ba94fe3210d761fd', 'cluster_uuid': 'BMdUG30kTUONYE77pJvW8w', 'version': {'number': '8.17.3', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': 'a091390de485bd4b127884f7e565c0cad59b10d2', 'build_date': '2025-02-28T10:07:26.089129809Z', 'build_snapshot': False, 'lucene_version': '9.12.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'}


In [7]:
# Elasticsearch SKU Lookup
# Load dependencies and environment variables

import os
import json
from dotenv import load_dotenv
from elasticsearch import Elasticsearch

load_dotenv()

# Connect to Elasticsearch
es = Elasticsearch(
    hosts=[f"{os.getenv('ELASTICSEARCH_HOST')}:{os.getenv('ELASTICSEARCH_PORT')}"],
    basic_auth=(os.getenv('ELASTICSEARCH_USERNAME'), os.getenv('ELASTICSEARCH_PASSWORD'))
)

# Test connection
print(es.info())

{'name': 'instance-0000000053', 'cluster_name': '0b631774f12b4ae3ba94fe3210d761fd', 'cluster_uuid': 'BMdUG30kTUONYE77pJvW8w', 'version': {'number': '8.17.3', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': 'a091390de485bd4b127884f7e565c0cad59b10d2', 'build_date': '2025-02-28T10:07:26.089129809Z', 'build_snapshot': False, 'lucene_version': '9.12.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'}


In [8]:
def get_sku(sku_id: str, index: str = "skus") -> dict:
    """
    Get full SKU information by SKU ID.
    
    Args:
        sku_id: The SKU identifier (e.g., "1688_740898787_9897")
        index: The Elasticsearch index pattern to search (default: skus)
    
    Returns:
        dict: Full SKU document as JSON with _index info
    """
    # Search query across indices
    try:
        result = es.search(
            index=index,
            query={
                "bool": {
                    "should": [
                        {"term": {"sku": sku_id}},
                        {"term": {"sk": sku_id}},
                        {"term": {"sku_id": sku_id}},
                        {"match_phrase": {"sk": sku_id}}
                    ],
                    "minimum_should_match": 1
                }
            },
            size=1
        )
        if result['hits']['hits']:
            hit = result['hits']['hits'][0]
            data = hit['_source']
            data['_found_in_index'] = hit['_index']
            data['_doc_id'] = hit['_id']
            return data
    except Exception as e:
        return {"error": str(e)}
    
    return {"error": f"SKU {sku_id} not found in index pattern {index}"}

In [14]:
# Example usage
sku_data = get_sku("1688_760286087_4507")
print(json.dumps(sku_data, indent=2, ensure_ascii=False))

{
  "error": "SKU 1688_760286087_4507 not found in index pattern skus"
}


In [None]:
import numpy as np
import pandas as pd
from elasticsearch.helpers import bulk

def calculate_trending_score(views: float, max_score: float = 100, factor: float = 25) -> float:
    """
    Calculate trending score using logarithmic decay.
    Low views = high score, high views = low score.
    
    Formula: score = max_score - log10(views + 1) * factor
    
    Args:
        views: Number of views
        max_score: Maximum possible score (default: 100)
        factor: Decay factor - higher = faster decay (default: 25)
    
    Returns:
        float: Score between 0 and max_score
    
    Example scores with default settings:
        0 views    → 100
        10 views   → 74
        100 views  → 50
        1000 views → 25
        10000 views → 0
    """
    score = max_score - (np.log10(views + 1) * factor)
    return max(0, min(max_score, score))

In [None]:
def rerank_from_csv(
    csv_path: str,
    sku_column: str = "Sku",
    views_column: str = "Item Viewed",
    index: str = "skus_product_pool_v3",
    max_score: float = 100,
    factor: float = 25,
    dry_run: bool = True
) -> dict:
    """
    Rerank products based on views from a CSV file.
    Promotes low-view products (high score) and demotes high-view products (low score).
    
    Args:
        csv_path: Path to CSV file with SKU and views data
        sku_column: Column name for SKU (default: "Sku")
        views_column: Column name for views (default: "Item Viewed")
        index: Elasticsearch index to update (default: "skus_product_pool_v3")
        max_score: Maximum trending score (default: 100)
        factor: Decay factor for logarithmic formula (default: 25)
        dry_run: If True, only preview changes without updating ES (default: True)
    
    Returns:
        dict: Summary of the reranking operation
    """
    # Read CSV
    df = pd.read_csv(csv_path, low_memory=False)
    
    # Clean views column (remove commas if present)
    df['_views'] = df[views_column].astype(str).str.replace(',', '').astype(float)
    
    # Calculate new scores
    df['_new_score'] = df['_views'].apply(lambda v: calculate_trending_score(v, max_score, factor))
    
    print(f"=== Reranking Summary ===")
    print(f"CSV: {csv_path}")
    print(f"Total products: {len(df):,}")
    print(f"Index: {index}")
    print(f"Formula: score = {max_score} - log10(views + 1) * {factor}")
    
    print(f"\n=== Score Distribution ===")
    print(f"Min score: {df['_new_score'].min():.2f}")
    print(f"Max score: {df['_new_score'].max():.2f}")
    print(f"Mean score: {df['_new_score'].mean():.2f}")
    
    print(f"\n=== Preview ===")
    preview = df[[sku_column, views_column, '_new_score']].copy()
    preview.columns = ['SKU', 'Views', 'New Score']
    print("\nLowest views (promoted):")
    print(preview.nsmallest(5, 'Views').to_string(index=False))
    print("\nHighest views (demoted):")
    print(preview.nlargest(5, 'Views').to_string(index=False))
    
    if dry_run:
        print(f"\n⚠️  DRY RUN - No changes made. Set dry_run=False to apply.")
        return {"status": "dry_run", "total": len(df)}
    
    # Bulk update Elasticsearch
    print(f"\n=== Updating Elasticsearch ===")
    
    def generate_actions():
        for i, row in df.iterrows():
            yield {
                "_op_type": "update",
                "_index": index,
                "_id": row[sku_column],
                "doc": {"trending_score": row['_new_score']}
            }
            if (i + 1) % 5000 == 0:
                print(f"Prepared {i + 1:,}/{len(df):,}...")
    
    success, failed = bulk(es, generate_actions(), chunk_size=500, raise_on_error=False)
    
    print(f"\n=== Complete ===")
    print(f"✓ Updated: {success:,}")
    print(f"✗ Failed: {len(failed) if isinstance(failed, list) else failed}")
    
    return {
        "status": "completed",
        "total": len(df),
        "success": success,
        "failed": len(failed) if isinstance(failed, list) else failed
    }

In [None]:
# Example: Rerank products from CSV
# First do a dry run to preview changes
rerank_from_csv(
    csv_path="product_metrics__filtered_by_pushed_status_is_completed_and_app_status_is_live__sorted_by_item_viewed_descending_2025-12-31T08_48_43.420166862Z.csv",
    dry_run=True  # Set to False to apply changes
)