In [1]:
# skin_fusion

import os, json, mimetypes
import cv2, numpy as np
from PIL import Image, ImageOps
from google import genai
from google.genai import types

TARGET_SHORT = 768
MODEL = "gemini-2.0-flash"

PROMPT_TEXT = (
    "전처리된 얼굴 피부 이미지를 분석하여 JSON만 반환하라.\n"
    "평가 항목: acne(여드름), redness(홍조), melasma_darkspots(잡티).\n"
    "각 항목은 다음 스키마로 제공하라:\n"
    "{acne:{score:number,reason:string}, redness:{score:number,reason:string}, "
    "melasma_darkspots:{score:number,reason:string}}\n"
    "score는 0~100 범위의 실수."
)

def _load_exif_bgr(path: str):
    pil = Image.open(path)
    pil = ImageOps.exif_transpose(pil).convert("RGB")
    return cv2.cvtColor(np.array(pil), cv2.COLOR_RGB2BGR)

def _resize_short(bgr, short=TARGET_SHORT):
    h, w = bgr.shape[:2]
    s = min(h, w)
    if s == short: return bgr
    scale = short / float(s)
    new = (int(round(w * scale)), int(round(h * scale)))
    interp = cv2.INTER_AREA if scale < 1 else cv2.INTER_CUBIC
    return cv2.resize(bgr, new, interpolation=interp)

def _wb_grayworld(bgr, strength=0.5):
    x = bgr.astype(np.float32)
    means = x.reshape(-1,3).mean(0) + 1e-6
    g = means.mean()
    gains = np.clip(g/means, 0.8, 1.2)
    gains = (1-strength) + strength*gains
    return np.clip(x*gains, 0, 255).astype(np.uint8)

def _clahe_light(bgr, clip=1.8, tiles=8):
    lab = cv2.cvtColor(bgr, cv2.COLOR_BGR2LAB)
    l,a,b = cv2.split(lab)
    l = cv2.createCLAHE(clipLimit=clip, tileGridSize=(tiles, tiles)).apply(l)
    return cv2.cvtColor(cv2.merge((l,a,b)), cv2.COLOR_LAB2BGR)

def _morph_kernel(bgr, base=768, ksize=5):
    h, w = bgr.shape[:2]
    scale = min(h, w) / base
    k = max(3, int(round(ksize * scale)))
    if k % 2 == 0: k += 1
    return cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (k, k))

def _skin_mask(bgr):
    ycrcb = cv2.cvtColor(bgr, cv2.COLOR_BGR2YCrCb)
    Y, Cr, Cb = cv2.split(ycrcb)
    m1 = (Cr>=135)&(Cr<=180)&(Cb>=85)&(Cb<=135)&(Y>=40)&(Y<=240)

    hsv = cv2.cvtColor(bgr, cv2.COLOR_BGR2HSV)
    H,S,V = cv2.split(hsv)
    m2 = (H<=25)&(S>=30)&(S<=180)&(V>=60)

    m = (m1 & m2).astype(np.uint8) * 255
    k = _morph_kernel(bgr)
    m = cv2.morphologyEx(m, cv2.MORPH_OPEN, k)
    m = cv2.morphologyEx(m, cv2.MORPH_CLOSE, k)

    num, labels, stats, _ = cv2.connectedComponentsWithStats(m)
    if num > 1:
        largest = 1 + np.argmax(stats[1:, cv2.CC_STAT_AREA])
        m = (labels == largest).astype(np.uint8) * 255
    return m

def preprocess_with_mask(bgr, bg_gray=220):
    bgr = _resize_short(bgr)
    bgr = _wb_grayworld(bgr)
    bgr = _clahe_light(bgr)
    mask = _skin_mask(bgr)
    bg = np.full_like(bgr, bg_gray, np.uint8)
    return np.where(mask[...,None]==255, bgr, bg)

def analyze_with_gemini(image_path, api_key):
    client = genai.Client(api_key=api_key)
    mime = mimetypes.guess_type(image_path)[0] or "image/jpeg"
    with open(image_path, "rb") as f:
        data = f.read()
    content = types.Content(
        role="user",
        parts=[types.Part(text=PROMPT_TEXT),
               types.Part(inline_data=types.Blob(mime_type=mime, data=data))]
    )
    resp = client.models.generate_content(
        model=MODEL,
        contents=[content],
        config=types.GenerateContentConfig(
            response_mime_type="application/json",
            temperature=0.2,
            system_instruction="반드시 JSON만 반환"
        )
    )
    txt = (resp.text or "").strip()
    s, e = txt.find("{"), txt.rfind("}")
    return json.loads(txt[s:e+1]) if s != -1 else {}

MAP = {
    "q1":{"없어요":0,"T존 일부(이마 혹은 코)":1,"T존 전체(이마와 코)":2,"얼굴 전체":3},
    "q2":{"전혀 안 보여요":0,"지금은 없지만 가끔 보여요":1,"부분적으로 붉게 보여요":2,"전체적으로 붉게 보여요":3},
    "q3":{"없어요":0,"U존 일부(볼 혹은 턱)":1,"U존 전체(볼과 턱)":2,"얼굴 전체":3},
    "q4":{"전혀 생기지 않아요":0,"표정을 지을 때만 생겨요":1,"표정 짓지 않아도 약간 있어요":2,"표정 짓지 않아도 많이 있어요":3},
    "q5":{"주름이 없어요":0,"잔주름이에요":1,"깊은 주름이에요":2,"잔주름과 깊은 주름 다 있어요":3},
    "q6":{"전혀 생기지 않아요":0,"미소 지을 때만 약간 생겨요":1,"미소 지을 때 진하게 생겨요":2,"미소 짓지 않아도 생겨요":3},
    "q7":{"전혀 안 보여요":0,"거의 안 보여요":1,"약간 눈에 띄어요":2,"곳곳에 많이 보여요":3},
    "q8":{"주름이 없어요":0,"잔주름이에요":1,"깊은 주름이에요":2,"잔주름과 깊은 주름 다 있어요":3},
    "q9":{"외출 전보다 윤기가 없어요":0,"외출 전과 변함이 없어요":1,"약간 번들거리고 윤기가 있어요":2,"많이 번들거리고 기름져요":3},
    "q10":{"전혀 안 보여요":0,"가끔 붉어지면 보여요":1,"특정부위에 눈에 띄어요":2,"곳곳에 많이 보여요":3},
}

def _to_0_3(x):
    v = float(x) if x else 0.0
    return round(max(0,min(100,v))/100*3,2)

def _skin_type(oil, dry):
    if oil>=2 and dry<=1: return "지성"
    if dry>=2 and oil<=1: return "건성"
    if oil>=2 and dry>=2: return "복합성"
    return "중성"

def assess_skin_type(s):
    sc = {k:MAP[k].get(v,0) for k,v in s.items()}
    oil = round(0.6*sc["q1"] + 0.4*sc["q9"],2)
    dry = float(sc["q3"])
    sens = round(0.7*sc["q2"] + 0.3*sc["q10"],2)
    wrinkle = round(0.4*sc["q4"] + 0.6*((sc["q5"]+sc["q8"])/2),2)
    pigment = float(sc["q7"])
    return {
        "skin_type": _skin_type(oil,dry),
        "indices": {"oil":oil,"dry":dry,"sensitivity":sens,"wrinkle":wrinkle,"pigment":pigment}
    }

def assess_with_gemini(survey, gemini):
    base = assess_skin_type(survey["survey"])
    idx = base["indices"]
    fused = idx.copy()

    fused["sensitivity"] = round(0.4*idx["sensitivity"]+0.6*_to_0_3(gemini.get("redness",{}).get("score",0)),2)
    fused["oil"] = round(0.7*idx["oil"]+0.3*_to_0_3(gemini.get("acne",{}).get("score",0)),2)
    fused["pigment"] = round(0.3*idx["pigment"]+0.7*_to_0_3(gemini.get("melasma_darkspots",{}).get("score",0)),2)

    return {
        "skin_type": _skin_type(fused["oil"], fused["dry"]),
        "indices": fused,
        "vision_raw": gemini
    }

def run_fusion_from_request(image_path, survey_dict):
    original = _load_exif_bgr(image_path)
    processed = preprocess_with_mask(original)

    root, ext = os.path.splitext(image_path)
    if not ext: ext=".jpg"
    pre_path = f"{root}_Pre{ext}"
    cv2.imwrite(pre_path, processed)

    gemini = analyze_with_gemini(pre_path, os.getenv("GEMINI_API_KEY"))
    return assess_with_gemini(survey_dict, gemini)


In [2]:
#es_ltr_online

import os, json, math
from typing import Dict, List
from elasticsearch import Elasticsearch
from sentence_transformers import SentenceTransformer
import xgboost as xgb

es = Elasticsearch("http://localhost:9200", request_timeout=20)

model = SentenceTransformer("jhgan/ko-sroberta-multitask")

CATEGORIES = ["cream", "essence", "skintoner"]

POS_ING = {
    "pigment":{"niacinamide","비타민c","arbutin","트라넥삼산","감초","코직"},
    "sensitivity":{"panthenol","판테놀","cica","병풀","알란토인","베타글루칸","알로에","세라마이드"},
    "dry":{"히알루론산","글리세린","스쿠알란","세라마이드","콜레스테롤","요소"},
    "acne":{"살리실산","바하","아젤라익","아연"},
}
NEG_ING = {
    "sensitivity":{"향","향료","퍼퓸","알코올","에탄올","에센셜 오일","티트리 오일"},
    "acne":{"코코넛 오일","아이소프로필 미리스테이트","라놀린"}
}

def _make_pos_neg_vocab(fusion):
    idx = fusion["indices"]
    pos, neg = set(), set()
    if idx["pigment"]>=2: pos |= POS_ING["pigment"]
    if idx["sensitivity"]>=2: pos |= POS_ING["sensitivity"]; neg |= NEG_ING["sensitivity"]
    if idx["dry"]>=2: pos |= POS_ING["dry"]
    if idx["oil"]>=2: pos |= POS_ING["acne"]; neg |= NEG_ING["acne"]
    return pos, neg

def _cosine(a,b): 
    return float((a @ b) / (np.linalg.norm(a)*np.linalg.norm(b) + 1e-6))

def search_candidates(es, fusion, per_cat=20):
    qtext = fusion["skin_type"]
    qvec = model.encode(qtext).tolist()
    out = {}
    for cat in CATEGORIES:
        body = {
            "size":per_cat,
            "query":{
                "bool":{
                    "filter":[{"term":{"category":cat}}],
                    "must":[
                        {
                            "script_score":{
                                "query":{"match_all":{}},
                                "script":{"source":"cosineSimilarity(params.qvec, 'review_vector') + 1.0",
                                          "params":{"qvec":qvec}}
                            }
                        }
                    ]
                }
            },
            "_source":["product_id","productName","brand","salePrice",
                       "ingredients","averageReviewScore","totalReviewCount"]
        }
        hits = es.search(index="cosmetics_demo", body=body)["hits"]["hits"]
        out[cat] = [{**h["_source"], "score_es":h["_score"]} for h in hits]
    return out

def featurize(results, fusion):
    pos, neg = _make_pos_neg_vocab(fusion)
    X, group, info = [], [], []

    for cat in CATEGORIES:
        rows = results.get(cat, [])
        group.append(len(rows))
        for r in rows:
            ings = r.get("ingredients", [])
            pos_hits = sum(1 for x in ings if x in pos)
            neg_hits = sum(1 for x in ings if x in neg)
            avg = r.get("averageReviewScore") or 0.0
            cnt = math.log1p(r.get("totalReviewCount") or 0.0)
            price = math.log1p(r.get("salePrice") or 0.0)
            X.append([pos_hits, neg_hits, pos_hits-neg_hits, avg, cnt, price, r["score_es"]])
            info.append(r)
    return X, group, info

def recommend_for_request(fusion_json: dict, topk=3):
    raw = search_candidates(es, fusion_json, per_cat=20)
    X, groups, info = featurize(raw, fusion_json)

    dtest = xgb.DMatrix(X)
    preds = booster.predict(dtest)

    ranked = []
    for r,p in zip(info, preds):
        r = dict(r)
        r["score_ltr"] = float(p)
        ranked.append(r)

    ranked.sort(key=lambda x: x["score_ltr"], reverse=True)
    return ranked[:topk]

booster = xgb.Booster()
booster.load_model("ltr_booster.json")

In [None]:
#offline_setup

import os, json, hashlib, re
from elasticsearch import Elasticsearch, helpers
from sentence_transformers import SentenceTransformer
import xgboost as xgb

es = Elasticsearch("http://localhost:9200", request_timeout=20)
model = SentenceTransformer("jhgan/ko-sroberta-multitask")

def load_json(path):
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)

def normalize(doc, category):
    name = doc.get("productName","")
    brand = doc.get("mallName","")
    pid = hashlib.md5(f"{name}|{brand}".encode()).hexdigest()

    reviews = doc.get("reviews") or []
    text = " ".join([r.get("reviewContent","") for r in reviews])[:5000]
    vec = model.encode(text).tolist()

    return {
        "product_id":pid,
        "productName":name,
        "brand":brand,
        "category":category,
        "ingredients":doc.get("ingredients",[]),
        "salePrice":doc.get("salePrice"),
        "averageReviewScore":doc.get("averageReviewScore"),
        "totalReviewCount":doc.get("totalReviewCount"),
        "review_vector":vec
    }

def index_all():
    docs=[]
    for fname,cat in [("cream.json","cream"),("essence.json","essence"),("skintoner.json","skintoner")]:
        if os.path.exists(fname):
            for d in load_json(fname):
                docs.append(normalize(d,cat))

    ops = ({"_op_type":"index","_index":"cosmetics_demo","_id":d["product_id"],"_source":d} for d in docs)
    helpers.bulk(es, ops)

def train_ltr():
    pass

if __name__ == "__main__":
    index_all()


In [None]:
import os, json, hashlib, re
from elasticsearch import Elasticsearch, helpers
from sentence_transformers import SentenceTransformer
import xgboost as xgb

es = Elasticsearch("http://localhost:9200", request_timeout=20)
model = SentenceTransformer("jhgan/ko-sroberta-multitask")

def load_json(path):
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)

def normalize(doc, category):
    name = doc.get("productName","")
    brand = doc.get("mallName","")
    pid = hashlib.md5(f"{name}|{brand}".encode()).hexdigest()

    reviews = doc.get("reviews") or []
    text = " ".join([r.get("reviewContent","") for r in reviews])[:5000]
    vec = model.encode(text).tolist()

    return {
        "product_id":pid,
        "productName":name,
        "brand":brand,
        "category":category,
        "ingredients":doc.get("ingredients",[]),
        "salePrice":doc.get("salePrice"),
        "averageReviewScore":doc.get("averageReviewScore"),
        "totalReviewCount":doc.get("totalReviewCount"),
        "review_vector":vec
    }

def index_all():
    docs=[]
    for fname,cat in [("cream.json","cream"),("essence.json","essence"),("skintoner.json","skintoner")]:
        if os.path.exists(fname):
            for d in load_json(fname):
                docs.append(normalize(d,cat))

    ops = ({"_op_type":"index","_index":"cosmetics_demo","_id":d["product_id"],"_source":d} for d in docs)
    helpers.bulk(es, ops)

def train_ltr():
    pass  # (여기엔 X, y, group → XGBoost rank:ndcg 학습 코드가 들어감)

if __name__ == "__main__":
    index_all()
