In [None]:
import os
from dotenv import load_dotenv

dotenv_path = os.path.join(os.getcwd(), "..", "..", ".env")
load_dotenv(dotenv_path)

print("DB_HOST =", os.getenv("DB_HOST"))


Starter

In [None]:
import random
import copy
import os
from dotenv import load_dotenv
import mysql.connector

dotenv_path = os.path.join(os.getcwd(), "..", "..", ".env")
load_dotenv(dotenv_path)

# =========================================
# 0. เชื่อมต่อฐานข้อมูล
# =========================================
def connect_db():
    conn = mysql.connector.connect(
        host=os.getenv("DB_HOST"),
        port=int(os.getenv("DB_PRIMARY_PORT")),
        user=os.getenv("DB_USER"),
        password=os.getenv("DB_PASSWORD"),
        database=os.getenv("DB_NAME")
    )
    return conn

# =========================================
# 1. Parameter Setting
# =========================================
POPULATION_SIZE = 50
MAX_GENERATIONS = 100
CROSSOVER_RATE = 0.8
MUTATION_RATE = 0.05
ELITISM = True

# Budget ตาม Phase
EARLY_BUDGET = 2700
MID_BUDGET = 7500
LATE_BUDGET = 14000

CHROMOSOME_LENGTH = 6  # 6 ชิ้น

# =========================================
# 2. Load ข้อมูลพื้นฐานจาก DB
# =========================================

def load_item_data():
    """
    ดึงข้อมูลไอเทมทั้งหมดจากตาราง items
    ควรมีคอลัมน์ ItemID, ItemType, Cost, PhysicalAttack, MagicPower, Defense, HP, CDR, ฯลฯ
    (แล้วแต่โครงสร้างจริงของคุณ)
    """
    conn = connect_db()
    cursor = conn.cursor(dictionary=True)

    query = "SELECT * FROM items"
    cursor.execute(query)
    items = cursor.fetchall()

    cursor.close()
    conn.close()

    # แปลงเป็น dict โดยมี key = ItemID
    item_dict = {}
    for it in items:
        item_id = it["ItemID"]
        item_dict[item_id] = it  # เก็บทั้ง row ไว้เลย
    return item_dict

def load_hero_data(hero_id):
    """
    ดึงข้อมูลฮีโร่ (เช่น Class, Lane ได้จากตาราง heroes)
    ดึงข้อมูลสเตตัสพื้นฐาน (herostats) ถ้าจำเป็น
    หรือต้องการข้อมูลเฉพาะ hero_id
    """
    conn = connect_db()
    cursor = conn.cursor(dictionary=True)

    # ตัวอย่าง: ดึงข้อมูลจาก heroes
    query_hero = "SELECT * FROM heroes WHERE HeroID = %s"
    cursor.execute(query_hero, (hero_id,))
    hero_info = cursor.fetchone()

    cursor.close()
    conn.close()

    return hero_info  # เช่น {HeroID:'H001', Hero_Name:'Airi', First_Class:'Assassin', ...}

def load_hero_skills(hero_id):
    """
    ดึงข้อมูล heroskills เพื่อดู RecommendItemType
    เช่น SkillID, HeroID, SkillName, RecommendItemType เป็นต้น
    """
    conn = connect_db()
    cursor = conn.cursor(dictionary=True)

    query_skill = "SELECT * FROM heroskills WHERE HeroID = %s"
    cursor.execute(query_skill, (hero_id,))
    skills = cursor.fetchall()

    cursor.close()
    conn.close()

    return skills  # list ของ dict

# =========================================
# 3. Representation / Population Initialization
# =========================================

def create_random_chromosome(item_pool, force_items, ban_items):
    """
    สร้างโครโมโซมแบบสุ่ม (6 ไอเทม) โดย
    1) ใส่ force_items ก่อน
    2) หลีกเลี่ยง ban_items
    3) ไม่ซ้ำกัน (ถ้า RoV ไม่อนุญาตออกไอเทมซ้ำ)
    """
    chromosome = []

    # 1) ใส่ force items
    forced_list = list(force_items)
    for fi in forced_list:
        if fi not in chromosome and fi not in ban_items:
            chromosome.append(fi)

    # 2) สุ่มไอเทมที่เหลือจนกว่าจะครบ 6 ชิ้น
    while len(chromosome) < CHROMOSOME_LENGTH:
        candidate = random.choice(item_pool)
        if (candidate not in ban_items) and (candidate not in chromosome):
            chromosome.append(candidate)

    # กรณีสุ่มได้ซ้ำจนคับไม่ถึง 6 ชิ้น อาจต้องวนซ้ำเพิ่ม
    if len(chromosome) < CHROMOSOME_LENGTH:
        # ลองวนอีกสักรอบ
        for _ in range(1000):  # ป้องกัน infinite loop
            if len(chromosome) >= CHROMOSOME_LENGTH:
                break
            candidate = random.choice(item_pool)
            if (candidate not in ban_items) and (candidate not in chromosome):
                chromosome.append(candidate)

    # ถ้ายังไม่ได้ 6 ชิ้นจริง ๆ (กรณี ban เยอะมาก) อาจปล่อยผ่าน
    return chromosome[:CHROMOSOME_LENGTH]

def initialize_population(size, item_pool, force_items, ban_items):
    population = []
    for _ in range(size):
        chromo = create_random_chromosome(item_pool, force_items, ban_items)
        population.append(chromo)
    return population

# =========================================
# 4. Fitness Function (หลัก) 
# =========================================

def calculate_fitness(chromosome, hero_id, lane, item_data, hero_info, hero_skills):
    """
    ประเมินคะแนน Fitness ของชุดไอเทม 6 ชิ้น
    1) รวมสเตตัสไอเทม -> ถ่วงน้ำหนักตาม Class
    2) ตรวจสอบ Lane (เช่น Jungle -> ต้องมีไอเทมป่า, Support -> ต้องมีไอเทมซัพพอร์ต ฯลฯ)
    3) RecommendedItemType จากสกิล -> ถ้ามีไอเทมตรงประเภท -> +คะแนน
    4) Early/Mid/Late Budget
    5) ฯลฯ
    """

    # ----------------------------------
    # 4.1 ข้อมูลฮีโร่ เช่น Class
    # ----------------------------------
    # สมมติ heroes มี First_Class, Second_Class
    # เลือกใช้ First_Class เป็นหลัก (หรือตรวจสอบว่าเป็นสายไหน)
    hero_class = hero_info["First_Class"]  # ex: 'Assassin'
    # ตัวอย่างการกำหนดน้ำหนัก
    # คุณควรปรับค่าจริงตามงานวิจัย/การออกแบบ
    weight_table = {
        "Fighter":  {"PATK":0.3,"AP":0.0,"DEF":0.2,"HP":0.2,"CDR":0.2,"CRIT":0.1,"MS":0.0},
        "Tank":     {"PATK":0.1,"AP":0.0,"DEF":0.4,"HP":0.4,"CDR":0.0,"CRIT":0.0,"MS":0.1},
        "Assassin": {"PATK":0.4,"AP":0.1,"DEF":0.1,"HP":0.1,"CDR":0.3,"CRIT":0.1,"MS":0.0},
        "Mage":     {"PATK":0.0,"AP":0.5,"DEF":0.1,"HP":0.1,"CDR":0.2,"CRIT":0.0,"MS":0.1},
        "Carry":    {"PATK":0.4,"AP":0.0,"DEF":0.1,"HP":0.1,"CDR":0.1,"CRIT":0.3,"MS":0.0},
        "Support":  {"PATK":0.1,"AP":0.3,"DEF":0.2,"HP":0.3,"CDR":0.3,"CRIT":0.0,"MS":0.1}
    }
    if hero_class not in weight_table:
        hero_class = "Fighter"  # fallback

    # ----------------------------------
    # 4.2 รวมค่าสเตตัสจากไอเทม 
    # (PATK, AP, DEF, HP, CDR, CRIT, MS etc.)
    # ----------------------------------
    total_patk = 0
    total_ap = 0
    total_def = 0
    total_hp = 0
    total_cdr = 0
    total_crit = 0
    total_ms = 0
    total_cost = 0

    for item_id in chromosome:
        if item_id not in item_data:
            continue  # กรณีข้อมูลไม่ครบ
        it = item_data[item_id]
        # ปรับชื่อคอลัมน์ตามฐานข้อมูลจริง
        total_cost += it.get("Cost", 0)
        total_patk += it.get("PhysicalAttack", 0)
        total_ap   += it.get("MagicPower", 0)
        total_def  += it.get("Defense", 0)
        total_hp   += it.get("HP", 0)
        total_cdr  += it.get("CDR", 0)
        total_crit += it.get("CritChance", 0)
        total_ms   += it.get("MoveSpeed", 0)

    # ----------------------------------
    # 4.3 การถ่วงน้ำหนักด้วย Hero Class
    # ----------------------------------
    # (อาจ normalize ก่อน หรือคูณตรง ๆ เลยก็ได้)
    wtable = weight_table[hero_class]
    score_stats = (
        wtable["PATK"] * total_patk +
        wtable["AP"]   * total_ap +
        wtable["DEF"]  * total_def +
        wtable["HP"]   * total_hp +
        wtable["CDR"]  * total_cdr +
        wtable["CRIT"] * total_crit +
        wtable["MS"]   * total_ms
    )

    # ----------------------------------
    # 4.4 ตรวจสอบ Lane
    # ----------------------------------
    # สมมติ:
    # - Jungle ต้องมีไอเทมที่ ItemType = 'Jungle' อย่างน้อย 1 ชิ้น
    # - Support ต้องมี ItemType = 'Support' อย่างน้อย 1 ชิ้น
    # - อื่น ๆ คุณอาจปรับเงื่อนไขได้
    lane_score = 0
    if lane == "Jungle":
        has_jungle_item = False
        for item_id in chromosome:
            if item_id in item_data:
                it_type = item_data[item_id].get("ItemType","")
                if it_type == "Jungle":
                    has_jungle_item = True
                    break
        if has_jungle_item:
            lane_score += 10
        else:
            lane_score -= 20  # ลงโทษหนักถ้าไม่มี
    elif lane == "Support":
        has_support_item = False
        for item_id in chromosome:
            if item_id in item_data:
                it_type = item_data[item_id].get("ItemType","")
                if it_type == "Support":
                    has_support_item = True
                    break
        if has_support_item:
            lane_score += 10
        else:
            lane_score -= 20

    # สำหรับ Off Lane / Mid / Dragon Lane คุณอาจกำหนดเงื่อนไขเสริมเอง

    # ----------------------------------
    # 4.5 RecommendedItemType จากสกิล
    # ----------------------------------
    skill_score = 0
    # ตัวอย่าง: ถ้าสกิลมี RecommendItemType = "MagicPen" แล้วใน chromosome มีไอเทม ItemType = "MagicPen" -> +คะแนน
    for sk in hero_skills:
        rec_type = sk.get("RecommendItemType", "")
        if not rec_type:
            continue
        # ตรวจสอบในโครโมโซมว่ามีไอเทมใด ItemType == rec_type หรือไม่
        for item_id in chromosome:
            if item_id in item_data:
                it_type = item_data[item_id].get("ItemType","")
                # เช็คแบบง่าย ๆ: ถ้า rec_type ตรงกับ it_type
                if it_type == rec_type:
                    skill_score += 5  # +5 ต่อ 1 ไอเทมที่แมตช์
                    # ไม่ต้อง break; ถ้าชุดนึงมีหลายชิ้นก็ + หลายรอบ

    # ----------------------------------
    # 4.6 คิดงบในช่วง Early/Mid/Late
    # ----------------------------------
    # แนวทาง: 
    #   - Early: พิจารณา 2 ชิ้นแรก => cost_early <= 2700
    #   - Mid:   4 ชิ้นแรก => cost_mid <= 7500
    #   - Late:  6 ชิ้น => cost_late <= 14000
    # แล้วเอาไปคิดคะแนนรวมกัน
    # (ตัวอย่างนี้จะคำนวณ Fitness แบบ multi-phase)
    cost_early = 0
    cost_mid   = 0
    cost_late  = total_cost

    # คำนวณ 2 ชิ้นแรก
    for i, item_id in enumerate(chromosome):
        if i < 2:  # early
            cost_early += item_data[item_id].get("Cost",0)
        if i < 4:  # mid
            cost_mid += item_data[item_id].get("Cost",0)

    # สมมติเราจัดคะแนนตาม Phase
    def phase_score(cost_val, budget):
        if cost_val <= budget:
            return 1.0  # ผ่าน
        else:
            # ถ้าเกิน อาจคิดเป็นสัดส่วน
            over = cost_val - budget
            return max(0, 1 - (over / budget))  # ถ้ายิ่งเกินเยอะ ยิ่งได้ 0

    early_factor = phase_score(cost_early, EARLY_BUDGET)
    mid_factor   = phase_score(cost_mid, MID_BUDGET)
    late_factor  = phase_score(cost_late, LATE_BUDGET)

    # ถ่วงน้ำหนัก
    w_e, w_m, w_l = 0.2, 0.3, 0.5  # ตัวอย่าง
    budget_score = (w_e*early_factor + w_m*mid_factor + w_l*late_factor) * 20
    # เอา *20 เป็น scaling factor เพื่อให้ได้ตัวเลขคะแนนรวม

    # ----------------------------------
    # 4.7 รวมคะแนนสุดท้าย
    # ----------------------------------
    fitness = 0.0
    fitness += score_stats   # คะแนนสเตตัส
    fitness += lane_score    # คะแนนเงื่อนไขเลน
    fitness += skill_score   # คะแนนจาก RecommendItemType
    fitness += budget_score  # คะแนนจากการไม่เกินงบ

    return fitness

# =========================================
# 5. Evaluate Population
# =========================================
def evaluate_population(population, hero_id, lane, item_data):
    """
    ประเมิน Fitness ให้โครโมโซมทุกตัว
    โดยจะโหลดข้อมูล hero, skills มาใช้
    (ป้องกันไม่ให้โหลดซ้ำ ๆ บ่อย ก็ได้)
    """
    hero_info = load_hero_data(hero_id)
    hero_skills = load_hero_skills(hero_id)

    fitness_values = []
    for chromo in population:
        fit = calculate_fitness(
            chromosome=chromo,
            hero_id=hero_id,
            lane=lane,
            item_data=item_data,
            hero_info=hero_info,
            hero_skills=hero_skills
        )
        fitness_values.append(fit)
    return fitness_values

# =========================================
# 6. Selection
# =========================================
def selection(population, fitness_values):
    """
    Tournament Selection (size=2)
    """
    new_population = []
    pop_size = len(population)
    for _ in range(pop_size):
        i1, i2 = random.sample(range(pop_size), 2)
        if fitness_values[i1] > fitness_values[i2]:
            winner = copy.deepcopy(population[i1])
        else:
            winner = copy.deepcopy(population[i2])
        new_population.append(winner)
    return new_population

# =========================================
# 7. Crossover
# =========================================
def crossover(parent1, parent2):
    """ One-point crossover """
    if random.random() < CROSSOVER_RATE:
        point = random.randint(1, CHROMOSOME_LENGTH-1)
        child1 = parent1[:point] + parent2[point:]
        child2 = parent2[:point] + parent1[point:]
        # ต้องตรวจสอบการซ้ำหรือ Ban ไหม
        # ตัวอย่างนี้ปล่อยให้ mutation แก้ไขต่อ
        return child1, child2
    else:
        return copy.deepcopy(parent1), copy.deepcopy(parent2)

# =========================================
# 8. Mutation
# =========================================
def mutation(chromosome, item_pool, ban_items):
    for i in range(len(chromosome)):
        if random.random() < MUTATION_RATE:
            candidate = random.choice(item_pool)
            # ตรวจสอบซ้ำ / ban
            if (candidate not in ban_items) and (candidate not in chromosome):
                chromosome[i] = candidate
    return chromosome

# =========================================
# 9. Replacement
# =========================================
def replacement(old_population, old_fitness, new_population, new_fitness):
    if not ELITISM:
        return new_population, new_fitness
    else:
        # หา best ตัวเก่า
        best_index = max(range(len(old_population)), key=lambda i: old_fitness[i])
        best_chromo = old_population[best_index]
        best_fit = old_fitness[best_index]

        # หา worst ตัวใน new_population
        worst_index = min(range(len(new_population)), key=lambda i: new_fitness[i])

        # แทนที่
        new_population[worst_index] = copy.deepcopy(best_chromo)
        new_fitness[worst_index] = best_fit
        return new_population, new_fitness

# =========================================
# 10. Main GA Loop
# =========================================
def run_genetic_algorithm(hero_id, lane, force_items, ban_items):
    """
    hero_id: ฮีโร่ที่เลือก
    lane: เลนของฮีโร่ เช่น "Jungle", "Support", ...
    force_items: set() หรือ list ของไอเทมบังคับ
    ban_items: set() หรือ list ของไอเทมที่แบน
    """

    # โหลดข้อมูลไอเทมทั้งหมด
    item_data = load_item_data()
    # สร้าง item_pool = รายการไอเทมทั้งหมดที่ไม่ถูก ban ล่วงหน้า
    # (ยกเว้น force_items ยังใช้ได้อยู่)
    item_pool = [it_id for it_id in item_data.keys() if it_id not in ban_items]

    # 1) Init Population
    population = initialize_population(POPULATION_SIZE, item_pool, force_items, ban_items)
    # 2) Evaluate
    fitness_values = evaluate_population(population, hero_id, lane, item_data)

    best_solution = None
    best_fitness = float("-inf")

    generation = 0
    while generation < MAX_GENERATIONS:
        # 3) Selection
        mating_pool = selection(population, fitness_values)
        # 4) Crossover
        new_population = []
        for i in range(0, len(mating_pool), 2):
            parent1 = mating_pool[i]
            if i+1 < len(mating_pool):
                parent2 = mating_pool[i+1]
            else:
                parent2 = mating_pool[0]  # ถ้าจำนวนเป็นคี่
            child1, child2 = crossover(parent1, parent2)
            new_population.append(child1)
            new_population.append(child2)

        # 5) Mutation
        for i in range(len(new_population)):
            new_population[i] = mutation(new_population[i], item_pool, ban_items)

        # Evaluate new population
        new_fitness = evaluate_population(new_population, hero_id, lane, item_data)

        # 6) Replacement
        population, fitness_values = replacement(population, fitness_values,
                                                 new_population, new_fitness)

        # เช็ค best
        curr_best_index = max(range(len(population)), key=lambda i: fitness_values[i])
        curr_best_fit = fitness_values[curr_best_index]
        if curr_best_fit > best_fitness:
            best_fitness = curr_best_fit
            best_solution = copy.deepcopy(population[curr_best_index])

        generation += 1

    return best_solution, best_fitness


# =========================================
# ตัวอย่างการทดสอบเรียกใช้งาน (main)
# =========================================
if __name__ == "__main__":
    # ตัวอย่างสมมติผู้ใช้เลือกฮีโร่ = "H001" Lane = "Jungle"
    hero_id_example = "H001"
    lane_example = "Jungle"

    # Force Item / Ban Item (สมมติ)
    force_list = {"I010", "I022"}  # บังคับ 2 ชิ้น
    ban_list   = {"I999"}         # แบน 1 ชิ้น

    best_build, best_fit = run_genetic_algorithm(
        hero_id=hero_id_example,
        lane=lane_example,
        force_items=force_list,
        ban_items=ban_list
    )

    print("Best Build:", best_build)
    print("Best Fitness:", best_fit)


0.0.1

In [None]:
import random
import copy
import os
from dotenv import load_dotenv
import mysql.connector
import sys

dotenv_path = os.path.join(os.getcwd(), "..", "..", ".env")
load_dotenv(dotenv_path)

# =========================================
# 0. เชื่อมต่อฐานข้อมูล (ใช้ try-except)
# =========================================
def connect_db():
    try:
        conn = mysql.connector.connect(
            host=os.getenv("DB_HOST"),
            port=int(os.getenv("DB_PRIMARY_PORT")),
            user=os.getenv("DB_USER"),
            password=os.getenv("DB_PASSWORD"),
            database=os.getenv("DB_NAME")
        )
        return conn
    except mysql.connector.Error as e:
        print(f"Database connection failed: {e}")
        sys.exit(1)

# =========================================
# 1. Parameter Setting
# =========================================
POPULATION_SIZE = 100000
MAX_GENERATIONS = 500
CROSSOVER_RATE = 0.8
BASE_MUTATION_RATE = 0.05   # พื้นฐาน (Adaptive Mutation Rate)
ELITISM = True

# สำหรับ Adaptive Mutation Rate
no_improve_count = 0
BEST_FITNESS_HISTORY = []

# Budget ตาม Phase
EARLY_BUDGET = 2700
MID_BUDGET   = 7500
LATE_BUDGET  = 14000

CHROMOSOME_LENGTH = 6  # 6 ชิ้น

# ค่าสูงสุดสำหรับ Normalize (StatCap)
MAX_PATK = 20000
MAX_AP   = 400
MAX_DEF  = 20000
MAX_HP   = 20000
MAX_CDR  = 0.40
MAX_CRIT = 1.0
MAX_MS   = 120

# =========================================
# 2. Load ข้อมูลพื้นฐานจาก DB
# =========================================
def load_item_data():
    conn = connect_db()
    cursor = conn.cursor(dictionary=True)
    cursor.execute("SELECT * FROM items")
    items = cursor.fetchall()
    cursor.close()
    conn.close()
    item_dict = {it["ItemID"]: it for it in items}
    return item_dict

def load_hero_data(hero_id):
    conn = connect_db()
    cursor = conn.cursor(dictionary=True)
    query_hero = "SELECT * FROM heroes WHERE HeroID = %s"
    cursor.execute(query_hero, (hero_id,))
    hero_info = cursor.fetchone()
    cursor.close()
    conn.close()
    return hero_info

def load_hero_skills(hero_id):
    conn = connect_db()
    cursor = conn.cursor(dictionary=True)
    query_skill = "SELECT * FROM heroskills WHERE HeroID = %s"
    cursor.execute(query_skill, (hero_id,))
    skills = cursor.fetchall()
    cursor.close()
    conn.close()
    return skills

def load_hero_stats(hero_id, level=15):
    conn = connect_db()
    cursor = conn.cursor(dictionary=True)
    query = "SELECT * FROM herostats WHERE HeroID = %s AND Level = %s"
    cursor.execute(query, (hero_id, level))
    stats = cursor.fetchone()
    cursor.close()
    conn.close()
    return stats

def load_item_composition():
    conn = connect_db()
    cursor = conn.cursor(dictionary=True)
    query = "SELECT * FROM itemcomposition"
    cursor.execute(query)
    comps = cursor.fetchall()
    cursor.close()
    conn.close()
    comp_dict = {}
    for comp in comps:
        base = comp["BaseItemID"]
        composite = comp["Composite_ItemID"]
        if base in comp_dict:
            comp_dict[base].append(composite)
        else:
            comp_dict[base] = [composite]
    return comp_dict

# =========================================
# 3. Repair Operator (สำหรับ Support/Jungle)
# =========================================
def repair_chromosome(chromosome, item_pool, ban_items, force_items, lane, item_data):
    # 1) ลบ Ban Item
    new_chromo = [item for item in chromosome if item not in ban_items]
    # 2) ใส่ Force Item ถ้าขาด
    for fi in force_items:
        if fi not in new_chromo and fi in item_pool:
            if len(new_chromo) < CHROMOSOME_LENGTH:
                new_chromo.append(fi)
            else:
                replace_index = random.randint(0, CHROMOSOME_LENGTH - 1)
                new_chromo[replace_index] = fi
    # 3) ลบไอเทมซ้ำ
    unique_items = []
    for it in new_chromo:
        if it not in unique_items:
            unique_items.append(it)
    new_chromo = unique_items

    # 4) บังคับเงื่อนไข Support หรือ Jungle ให้มีแค่ 1 ชิ้น
    if lane in ["Support", "Jungle"]:
        count = 0
        filtered = []
        for it in new_chromo:
            if it in item_data and item_data[it].get("ItemType", "") == lane:
                if count < 1:
                    filtered.append(it)
                    count += 1
            else:
                filtered.append(it)
        new_chromo = filtered

    # 5) เติมให้ครบ 6 ชิ้น
    while len(new_chromo) < CHROMOSOME_LENGTH:
        candidate = random.choice(item_pool)
        # สำหรับเลน Support/Jungle ไม่ให้เพิ่มอีกถ้ามีแล้ว
        if lane in ["Support", "Jungle"]:
            if candidate in item_data and item_data[candidate].get("ItemType", "") == lane:
                if any(item_data[it].get("ItemType", "") == lane for it in new_chromo):
                    continue
        if candidate not in ban_items and candidate not in new_chromo:
            new_chromo.append(candidate)
    return new_chromo[:CHROMOSOME_LENGTH]

# =========================================
# 4. Population Initialization
# =========================================
def create_random_chromosome(item_pool, force_items, ban_items):
    chromosome = []
    # ใส่ Force Items ก่อน
    for fi in force_items:
        if fi not in ban_items and fi in item_pool and fi not in chromosome:
            chromosome.append(fi)
    while len(chromosome) < CHROMOSOME_LENGTH:
        candidate = random.choice(item_pool)
        if candidate not in ban_items and candidate not in chromosome:
            chromosome.append(candidate)
    return chromosome

def initialize_population(size, item_pool, force_items, ban_items, lane, item_data):
    population = []
    for _ in range(size):
        chromo = create_random_chromosome(item_pool, force_items, ban_items)
        chromo = repair_chromosome(chromo, item_pool, ban_items, force_items, lane, item_data)
        population.append(chromo)
    return population

# =========================================
# 5. Fitness Function Components
# =========================================
def score_stats(chromosome, item_data, hero_class):
    total_patk = total_ap = total_def = total_hp = total_cdr = total_crit = total_ms = 0
    for item_id in chromosome:
        if item_id not in item_data:
            continue
        it = item_data[item_id]
        total_patk += it.get("PhysicalAttack", 0)
        total_ap   += it.get("MagicPower", 0)
        total_def  += it.get("Defense", 0)
        total_hp   += it.get("HP", 0)
        total_cdr  += it.get("CDR", 0)
        total_crit += it.get("CritChance", 0)
        total_ms   += it.get("MoveSpeed", 0)
    patk_norm = min(total_patk / MAX_PATK, 1.0)
    ap_norm   = min(total_ap   / MAX_AP, 1.0)
    def_norm  = min(total_def  / MAX_DEF, 1.0)
    hp_norm   = min(total_hp   / MAX_HP, 1.0)
    cdr_norm  = min(total_cdr  / MAX_CDR, 1.0)
    crit_norm = min(total_crit / MAX_CRIT, 1.0)
    ms_norm   = min(total_ms   / MAX_MS, 1.0)

    weight_table = {
        "Fighter":  {"PATK": 0.3, "AP": 0.0, "DEF": 0.2, "HP": 0.2, "CDR": 0.2, "CRIT": 0.1, "MS": 0.0},
        "Tank":     {"PATK": 0.1, "AP": 0.0, "DEF": 0.4, "HP": 0.4, "CDR": 0.0, "CRIT": 0.0, "MS": 0.1},
        "Assassin": {"PATK": 0.4, "AP": 0.1, "DEF": 0.1, "HP": 0.1, "CDR": 0.3, "CRIT": 0.1, "MS": 0.0},
        "Mage":     {"PATK": 0.0, "AP": 0.5, "DEF": 0.1, "HP": 0.1, "CDR": 0.2, "CRIT": 0.0, "MS": 0.1},
        "Carry":    {"PATK": 0.4, "AP": 0.0, "DEF": 0.1, "HP": 0.1, "CDR": 0.1, "CRIT": 0.3, "MS": 0.0},
        "Support":  {"PATK": 0.1, "AP": 0.3, "DEF": 0.2, "HP": 0.3, "CDR": 0.3, "CRIT": 0.0, "MS": 0.1}
    }
    if hero_class not in weight_table:
        hero_class = "Fighter"
    wtable = weight_table[hero_class]
    return (wtable["PATK"] * patk_norm +
            wtable["AP"]   * ap_norm +
            wtable["DEF"]  * def_norm +
            wtable["HP"]   * hp_norm +
            wtable["CDR"]  * cdr_norm +
            wtable["CRIT"] * crit_norm +
            wtable["MS"]   * ms_norm)

def score_lane(chromosome, item_data, lane):
    score = 0
    if lane == "Jungle":
        has_jungle = any(item_data[it].get("ItemType", "") == "Jungle" for it in chromosome if it in item_data)
        score = 10 if has_jungle else -20
    elif lane == "Support":
        has_support = any(item_data[it].get("ItemType", "") == "Support" for it in chromosome if it in item_data)
        score = 10 if has_support else -20
    return score

def score_skill(chromosome, item_data, hero_skills):
    score = 0
    for sk in hero_skills:
        rec_type = sk.get("RecommendItemType", "")
        if rec_type:
            for it in chromosome:
                if it in item_data and item_data[it].get("ItemType", "") == rec_type:
                    score += 5
    return score

def phase_score_exp(cost_val, budget):
    if cost_val <= budget:
        return 1.0
    else:
        over = (cost_val - budget) / budget
        return max(0, 1 - (over**2))

def score_budget(chromosome, item_data):
    cost_early = sum(item_data[it].get("Cost", 0) for i, it in enumerate(chromosome) if i < 2 and it in item_data)
    cost_mid   = sum(item_data[it].get("Cost", 0) for i, it in enumerate(chromosome) if i < 4 and it in item_data)
    cost_late  = sum(item_data[it].get("Cost", 0) for it in chromosome if it in item_data)

    early_factor = phase_score_exp(cost_early, EARLY_BUDGET)
    mid_factor   = phase_score_exp(cost_mid, MID_BUDGET)
    late_factor  = phase_score_exp(cost_late, LATE_BUDGET)

    w_e, w_m, w_l = 0.2, 0.3, 0.5
    return (w_e * early_factor + w_m * mid_factor + w_l * late_factor) * 20

def score_improvement(chromosome, item_data, hero_stats):
    bonus_patk = sum(item_data[it].get("PhysicalAttack", 0) for it in chromosome if it in item_data)
    bonus_ap   = sum(item_data[it].get("MagicPower", 0) for it in chromosome if it in item_data)
    bonus_def  = sum(item_data[it].get("Defense", 0) for it in chromosome if it in item_data)
    bonus_hp   = sum(item_data[it].get("HP", 0) for it in chromosome if it in item_data)
    bonus_cdr  = sum(item_data[it].get("CDR", 0) for it in chromosome if it in item_data)
    bonus_crit = sum(item_data[it].get("CritChance", 0) for it in chromosome if it in item_data)
    bonus_ms   = sum(item_data[it].get("MoveSpeed", 0) for it in chromosome if it in item_data)

    ratios = []
    for stat, bonus in [("PhysicalAttack", bonus_patk), ("MagicPower", bonus_ap),
                         ("Defense", bonus_def), ("HP", bonus_hp),
                         ("CDR", bonus_cdr), ("CritChance", bonus_crit),
                         ("MoveSpeed", bonus_ms)]:
        base = hero_stats.get(stat, 1)
        base = max(base, 0.01)  # ป้องกันหารด้วยศูนย์
        ratios.append(bonus / base)
    avg_improvement = sum(ratios) / len(ratios)
    return max(0, avg_improvement - 1) * 20

def score_composition(chromosome, comp_data):
    # สร้างเซต Composite Items จาก comp_data
    comp_items = set()
    for composites in comp_data.values():
        comp_items.update(composites)
    return sum(5 for it in chromosome if it in comp_items)

def penalty(chromosome, force_items, ban_items):
    # ถ้ามี Ban Item → ให้ Fitness = 0
    for it in chromosome:
        if it in ban_items:
            return float("inf")
    missing = 0
    for fi in force_items:
        if fi not in chromosome:
            missing += 1
    return missing * 0.3  # หัก 30% ต่อ Force Item ที่ขาด

def calculate_fitness(chromosome, hero_id, lane, item_data, hero_info, hero_skills, hero_stats, comp_data, force_items, ban_items):
    stat_score = score_stats(chromosome, item_data, hero_info.get("First_Class", "Fighter"))
    lane_score = score_lane(chromosome, item_data, lane)
    skill_score = score_skill(chromosome, item_data, hero_skills)
    budget_score = score_budget(chromosome, item_data)
    improvement_score = score_improvement(chromosome, item_data, hero_stats)
    composition_score = score_composition(chromosome, comp_data)
    base_fitness = stat_score + lane_score + skill_score + budget_score + improvement_score + composition_score

    pen = penalty(chromosome, force_items, ban_items)
    if pen == float("inf"):
        return 0
    return base_fitness * (1 - pen)

# =========================================
# 6. Evaluate Population
# =========================================
def evaluate_population(population, hero_id, lane, item_data, comp_data, force_items, ban_items):
    hero_info = load_hero_data(hero_id)
    hero_skills = load_hero_skills(hero_id)
    hero_stats = load_hero_stats(hero_id, level=15)
    fitness_values = []
    for chromo in population:
        fit = calculate_fitness(chromo, hero_id, lane, item_data, hero_info, hero_skills, hero_stats, comp_data, force_items, ban_items)
        fitness_values.append(fit)
    return fitness_values

# =========================================
# 7. Selection (Tournament)
# =========================================
def selection(population, fitness_values):
    new_population = []
    pop_size = len(population)
    for _ in range(pop_size):
        i1, i2 = random.sample(range(pop_size), 2)
        winner = copy.deepcopy(population[i1]) if fitness_values[i1] > fitness_values[i2] else copy.deepcopy(population[i2])
        new_population.append(winner)
    return new_population

# =========================================
# 8. Crossover (One-Point)
# =========================================
def crossover(parent1, parent2):
    if random.random() < CROSSOVER_RATE:
        point = random.randint(1, CHROMOSOME_LENGTH - 1)
        child1 = parent1[:point] + parent2[point:]
        child2 = parent2[:point] + parent1[point:]
        return child1, child2
    else:
        return copy.deepcopy(parent1), copy.deepcopy(parent2)

# =========================================
# 9. Mutation
# =========================================
def mutation(chromosome, item_pool, ban_items, mutation_rate):
    for i in range(len(chromosome)):
        if random.random() < mutation_rate:
            candidate = random.choice(item_pool)
            if candidate not in ban_items and candidate not in chromosome:
                chromosome[i] = candidate
    return chromosome

# =========================================
# 10. Replacement (Elitism)
# =========================================
def replacement(old_population, old_fitness, new_population, new_fitness):
    if not ELITISM:
        return new_population, new_fitness
    else:
        best_index = max(range(len(old_population)), key=lambda i: old_fitness[i])
        best_chromo = copy.deepcopy(old_population[best_index])
        best_fit = old_fitness[best_index]
        worst_index = min(range(len(new_population)), key=lambda i: new_fitness[i])
        new_population[worst_index] = best_chromo
        new_fitness[worst_index] = best_fit
        return new_population, new_fitness

# =========================================
# Adaptive Mutation Rate (Optional)
# =========================================
def get_adaptive_mutation_rate(generation, best_fitness, mutation_rate, no_improve_count):
    global BEST_FITNESS_HISTORY
    if not BEST_FITNESS_HISTORY:
        BEST_FITNESS_HISTORY.append(best_fitness)
        return mutation_rate, 0
    last_best = BEST_FITNESS_HISTORY[-1]
    if best_fitness > last_best:
        new_mr = BASE_MUTATION_RATE
        no_improve_count = 0
    else:
        no_improve_count += 1
        if no_improve_count >= 5:
            new_mr = min(0.2, mutation_rate * 1.5)
        else:
            new_mr = mutation_rate
    BEST_FITNESS_HISTORY.append(best_fitness)
    return new_mr, no_improve_count

# =========================================
# Main GA Loop
# =========================================
def run_genetic_algorithm(hero_id, lane, force_items, ban_items):
    global no_improve_count, BEST_FITNESS_HISTORY
    no_improve_count = 0
    BEST_FITNESS_HISTORY = []

    item_data = load_item_data()
    comp_data = load_item_composition()
    item_pool = [it_id for it_id in item_data.keys() if it_id not in ban_items]

    population = initialize_population(POPULATION_SIZE, item_pool, force_items, ban_items, lane, item_data)
    fitness_values = evaluate_population(population, hero_id, lane, item_data, comp_data, force_items, ban_items)

    best_solution = None
    best_fitness = float("-inf")
    current_mutation_rate = BASE_MUTATION_RATE
    generation = 0

    while generation < MAX_GENERATIONS:
        mating_pool = selection(population, fitness_values)
        new_population = []
        for i in range(0, len(mating_pool), 2):
            parent1 = mating_pool[i]
            parent2 = mating_pool[i+1] if (i+1 < len(mating_pool)) else mating_pool[0]
            child1, child2 = crossover(parent1, parent2)
            new_population.extend([child1, child2])
        for i in range(len(new_population)):
            new_population[i] = mutation(new_population[i], item_pool, ban_items, current_mutation_rate)
            new_population[i] = repair_chromosome(new_population[i], item_pool, ban_items, force_items, lane, item_data)
        new_fitness = evaluate_population(new_population, hero_id, lane, item_data, comp_data, force_items, ban_items)
        population, fitness_values = replacement(population, fitness_values, new_population, new_fitness)

        curr_best_index = max(range(len(population)), key=lambda i: fitness_values[i])
        curr_best_fit = fitness_values[curr_best_index]
        if curr_best_fit > best_fitness:
            best_fitness = curr_best_fit
            best_solution = copy.deepcopy(population[curr_best_index])

        current_mutation_rate, no_improve_count = get_adaptive_mutation_rate(
            generation, best_fitness, current_mutation_rate, no_improve_count
        )
        # Early stopping condition (optional)
        if no_improve_count >= 10:
            print(f"Early stopping at generation {generation}")
            break

        generation += 1

    return best_solution, best_fitness

# =========================================
# Example Main Driver
# =========================================
if __name__ == "__main__":
    hero_id_example = "H001"       # ตัวอย่างฮีโร่
    lane_example = "Jungle"        # เลือกเลน Jungle
    force_list = {}  # ไอเทมบังคับ
    ban_list = {}            # ไอเทมต้องห้าม

    best_build, best_fit = run_genetic_algorithm(
        hero_id=hero_id_example,
        lane=lane_example,
        force_items=force_list,
        ban_items=ban_list
    )

    print("Best Build:", best_build)
    print("Best Fitness:", best_fit)


0.0.2

In [None]:
import random
import copy
import os
from dotenv import load_dotenv
import mysql.connector
import sys

dotenv_path = os.path.join(os.getcwd(), "..", "..", ".env")
load_dotenv(dotenv_path)

# =========================================
# 0. เชื่อมต่อฐานข้อมูล (ใช้ try-except)
# =========================================
def connect_db():
    try:
        conn = mysql.connector.connect(
            host=os.getenv("DB_HOST"),
            port=int(os.getenv("DB_PRIMARY_PORT")),
            user=os.getenv("DB_USER"),
            password=os.getenv("DB_PASSWORD"),
            database=os.getenv("DB_NAME")
        )
        return conn
    except mysql.connector.Error as e:
        print(f"Database connection failed: {e}")
        exit(1)

# =========================================
# 1. ฟังก์ชันดึงค่าสูงสุดจากฐานข้อมูล (StatCap)
# =========================================
def load_stat_caps(buffer_pct=0.2):
    """ดึงค่าสูงสุดของแต่ละสถิติจากตาราง items พร้อม buffer"""
    conn = connect_db()
    cursor = conn.cursor(dictionary=True)
    
    # ดึงค่าทั้งหมดในคำสั่ง SQL เดียว
    query = """
        SELECT 
            MAX(PhysicalAttack) * (1 + %s) AS max_patk,
            MAX(MagicPower) * (1 + %s) AS max_ap,
            MAX(Defense) * (1 + %s) AS max_def,
            MAX(HP) * (1 + %s) AS max_hp,
            MAX(MoveSpeed) * (1 + %s) AS max_ms
        FROM items
    """
    buffer_values = [buffer_pct]*4 + [0.1]  # Buffer 20% สำหรับทุกสถิติ ยกเว้น MoveSpeed 10%
    
    cursor.execute(query, buffer_values)
    result = cursor.fetchone()
    
    stat_caps = {
        "MAX_PATK": result["max_patk"] or 1,  # ค่าเริ่มต้นถ้าไม่มีข้อมูล
        "MAX_AP": result["max_ap"] or 1,
        "MAX_DEF": result["max_def"] or 1,
        "MAX_HP": result["max_hp"] or 1,
        "MAX_MS": result["max_ms"] or 1,
        "MAX_CDR": 0.40,  # ค่าตายังตามเกม
        "MAX_CRIT": 1.0    # ค่าตายังตามเกม
    }
    
    cursor.close()
    conn.close()
    return stat_caps

# =========================================
# 2. คำนวณการ Normalize (score_stats)
# =========================================
def score_stats(chromosome, item_data, hero_class, stat_caps):
    total_stats = {
        "patk": 0, "ap": 0, "def": 0, 
        "hp": 0, "cdr": 0, "crit": 0, "ms": 0
    }

    # คำนวณผลรวมของจากไอเทม
    for item_id in chromosome:
        if item_id not in item_data:
            continue
        it = item_data[item_id]
        total_stats["patk"] += it.get("PhysicalAttack", 0)
        total_stats["ap"]   += it.get("MagicPower", 0)
        total_stats["def"]  += it.get("Defense", 0)
        total_stats["hp"]   += it.get("HP", 0)
        total_stats["cdr"]  += it.get("CDR", 0)
        total_stats["crit"] += it.get("CritChance", 0)
        total_stats["ms"]   += it.get("MoveSpeed", 0)

    # การ Normalization โดยใช้ค่า MAX ที่มาจากฐานข้อมูล
    patk_norm = min(total_stats["patk"] / stat_caps["MAX_PATK"], 1.0)
    ap_norm   = min(total_stats["ap"]   / stat_caps["MAX_AP"],   1.0)
    def_norm  = min(total_stats["def"]  / stat_caps["MAX_DEF"],  1.0)
    hp_norm   = min(total_stats["hp"]   / stat_caps["MAX_HP"],   1.0)
    cdr_norm  = min(total_stats["cdr"]  / stat_caps["MAX_CDR"],  1.0)
    crit_norm = min(total_stats["crit"] / stat_caps["MAX_CRIT"], 1.0)
    ms_norm   = min(total_stats["ms"]   / stat_caps["MAX_MS"],   1.0)

    # ใช้ Weight Table ตาม Class ฮีโร่
    weight_table = {
        "Fighter":  {"patk":0.3, "ap":0.0, "def":0.2, "hp":0.2, "cdr":0.2, "crit":0.1, "ms":0.0},
        "Tank":     {"patk":0.1, "ap":0.0, "def":0.4, "hp":0.4, "cdr":0.0, "crit":0.0, "ms":0.1},
        "Assassin": {"patk":0.4, "ap":0.1, "def":0.1, "hp":0.1, "cdr":0.3, "crit":0.1, "ms":0.0},
        "Mage":     {"patk":0.0, "ap":0.5, "def":0.1, "hp":0.1, "cdr":0.2, "crit":0.0, "ms":0.1},
        "Carry":    {"patk":0.4, "ap":0.0, "def":0.1, "hp":0.1, "cdr":0.1, "crit":0.3, "ms":0.0},
        "Support":  {"patk":0.1, "ap":0.3, "def":0.2, "hp":0.3, "cdr":0.3, "crit":0.0, "ms":0.1}
    }

    wtable = weight_table.get(hero_class, weight_table["Fighter"])  # Default เป็น Fighter ถ้า Class ไม่

    return (
        wtable["patk"] * patk_norm +
        wtable["ap"] * ap_norm +
        wtable["def"] * def_norm +
        wtable["hp"] * hp_norm +
        wtable["cdr"] * cdr_norm +
        wtable["crit"] * crit_norm +
        wtable["ms"] * ms_norm
    )

# =========================================
# 3. เริ่มต้น GA (การคำนวณ Fitness และการ Population)
# =========================================
def run_genetic_algorithm(hero_id, lane, force_items, ban_items, item_data, hero_class):
    # โหลดค่า MAX จากฐานข้อมูล
    stat_caps = load_stat_caps()

    population_size = 100
    max_generations = 50
    crossover_rate = 0.8
    mutation_rate = 0.05

    # สร้างประชากรเริ่มต้น
    population = initialize_population(population_size, item_data, force_items, ban_items)

    for generation in range(max_generations):
        # คำนวณ Fitness ของ Chromosome
        fitness = []
        for chromo in population:
            score = calculate_fitness(chromo, item_data, stat_caps, hero_class, lane, force_items, ban_items)
            fitness.append(score)

        # เลือกพ่อแม่ด้วย Tournament Selection
        parents = tournament_selection(population, fitness, tournament_size=3)

        # ทำ Crossover
        offspring = []
        while len(offspring) < population_size:
            p1, p2 = random.sample(parents, 2)
            child1, child2 = one_point_crossover(p1, p2, crossover_rate)
            offspring.extend([child1, child2])

        # ทำ Mutation
        for i in range(len(offspring)):
            offspring[i] = mutate(offspring[i], item_data, mutation_rate, ban_items, force_items, lane)

        # แทนประชากรด้วย Elitism
        population = elitism(population, offspring, fitness, offspring_fitness, elite_size=2)

    # เลือก Best Build จากประชากรท้าย
    best_index = fitness.index(max(fitness))
    return population[best_index], fitness[best_index]

# =========================================
# 4. การเลือกพ่อแม่แบบ Tournament
# =========================================
def tournament_selection(population, fitness, tournament_size=3):
    selected = []
    for _ in range(len(population)):
        candidates = random.sample(list(zip(population, fitness)), tournament_size)
        winner = max(candidates, key=lambda x: x[1])
        selected.append(winner[0])
    return selected

# =========================================
# 5. Crossover แบบ One-Point
# =========================================
def one_point_crossover(p1, p2, crossover_rate):
    if random.random() < crossover_rate:
        point = random.randint(1, 5)
        child1 = p1[:point] + p2[point:]
        child2 = p2[:point] + p1[point:]
    else:
        child1, child2 = p1.copy(), p2.copy()
    return child1, child2

# =========================================
# 6. Mutation และ Repair Chromosome
# =========================================
def mutate(chromosome, item_data, mutation_rate, ban_items, force_items, lane):
    if not chromosome:
        return []
    new_chromo = chromosome.copy()
    valid_items = [item for item in item_data if item not in ban_items and item not in new_chromo]
    for i in range(len(new_chromo)):
        if random.random() < mutation_rate and valid_items:
            # Don't mutate force items
            if new_chromo[i] not in force_items:
                new_chromo[i] = random.choice(valid_items)
                valid_items.remove(new_chromo[i])
    return repair_chromosome(new_chromo, item_data, lane, force_items, ban_items)

def repair_chromosome(chromosome, item_data, lane, force_items, ban_items):
    # 1. ลบไอเทม Ban
    chromosome = [item for item in chromosome if item not in ban_items]
    
    # 2. ใส่ Force Items ขาด
    for fi in force_items:
        if fi not in chromosome:
            replace_idx = random.randint(0, len(chromosome)-1)
            chromosome[replace_idx] = fi
    
    # 3. ตรวจสอบเงื้อนไขเลน Support/Jungle (ได้แค่ 1 ตัว)
    if lane == "Support":
        support_items = [item for item in chromosome if item_data[item]['ItemType'] == 'Support']
        if len(support_items) > 1:
            keep = random.choice(support_items)
            chromosome = [item for item in chromosome if item == keep or item_data[item]['ItemType'] != 'Support']
    elif lane == "Jungle":
        jungle_items = [item for item in chromosome if item_data[item]['ItemType'] == 'Jungle']
        if len(jungle_items) > 1:
            keep = random.choice(jungle_items)
            chromosome = [item for item in chromosome if item == keep or item_data[item]['ItemType'] != 'Jungle']
    
    # 4. ขนาดให้ครบ 6 ตัว
    while len(chromosome) < 6:
        candidate = random.choice(list(item_data.keys()))
        if candidate not in chromosome and candidate not in ban_items:
            chromosome.append(candidate)
    return chromosome[:6]  # คืนส่วน

# =========================================
# 7. เชื่อมต่อกับฐานข้อมูล
# =========================================
def load_item_data():
    conn = connect_db()
    cursor = conn.cursor(dictionary=True)
    cursor.execute("SELECT * FROM items")
    items = {row['ItemID']: row for row in cursor.fetchall()}
    cursor.close()
    conn.close()
    return items

# =========================================
# Main Function
# =========================================
if __name__ == "__main__":
    hero_id_example = "H001"  # อย่างฮีโร่
    lane_example = "Jungle"  # อย่างเลน
    force_list = []  # รายการไอเทม
    ban_list = []  # รายการไอเทมต้องห้าม
    hero_class = "Fighter"  # อย่างคลาสฮีโร่

    # เรียกใช้ GA
    best_build, best_fitness = run_genetic_algorithm(
        hero_id=hero_id_example,
        lane=lane_example,
        force_items=force_list,
        ban_items=ban_list,
        item_data=load_item_data(),
        hero_class=hero_class
    )

    print("Best Build:", best_build)
    print("Best Fitness:", best_fitness)


0.0.3

In [None]:
import random
import copy
import os
from dotenv import load_dotenv
import mysql.connector
import sys

dotenv_path = os.path.join(os.getcwd(), "..", "..", ".env")
load_dotenv(dotenv_path)

# =========================================
# 0. เชื่อมต่อฐานข้อมูล
# =========================================
def connect_db():
    try:
        conn = mysql.connector.connect(
            host=os.getenv("DB_HOST"),
            port=int(os.getenv("DB_PRIMARY_PORT")),
            user=os.getenv("DB_USER"),
            password=os.getenv("DB_PASSWORD"),
            database=os.getenv("DB_NAME")
        )
        return conn
    except mysql.connector.Error as e:
        print(f"Database connection failed: {e}")
        exit(1)

# =========================================
# 1. ข้อมูลไอเทมจากฐานข้อมูล
# =========================================
def load_item_data(conn):
    cursor = conn.cursor(dictionary=True)
    cursor.execute("SELECT * FROM items")
    items = {row['ItemID']: row for row in cursor.fetchall()}
    cursor.close()
    return items

# =========================================
# 2. ค่าของ (Stat Caps)
# =========================================
def load_stat_caps(conn, buffer_pct=0.2):
    cursor = conn.cursor(dictionary=True)
    query = """
        SELECT 
            MAX(Phys_ATK) * (1 + %s) AS max_patk,
            MAX(Magic_Power) * (1 + %s) AS max_ap,
            MAX(Phys_Defense) * (1 + %s) AS max_def,
            MAX(HP) * (1 + %s) AS max_hp,
            MAX(Movement_Speed) * (1 + %s) AS max_ms
        FROM items
    """
    buffer_values = [buffer_pct, buffer_pct, buffer_pct, buffer_pct, 0.1]
    cursor.execute(query, buffer_values)
    result = cursor.fetchone()
    
    # Convert Decimal values to float
    stat_caps = {
        "MAX_PATK": float(result["max_patk"] or 1),
        "MAX_AP": float(result["max_ap"] or 1),
        "MAX_DEF": float(result["max_def"] or 1),
        "MAX_HP": float(result["max_hp"] or 1),
        "MAX_MS": float(result["max_ms"] or 1),
        "MAX_CDR": 0.40,
        "MAX_CRIT": 1.0
    }
    cursor.close()
    return stat_caps

# =========================================
# 3. คำนวณคะแนน Normalize
# =========================================
def score_stats(chromosome, item_data, hero_class, stat_caps):
    total_stats = {
        "Phys_ATK": 0, "Magic_Power": 0, "Phys_Defense": 0, 
        "HP": 0, "Cooldown_Reduction": 0, "Critical_Rate": 0, "Movement_Speed": 0
    }

    for item_id in chromosome:
        if item_id not in item_data:
            continue
        it = item_data[item_id]
        total_stats["Phys_ATK"] += it.get("Phys_ATK", 0)
        total_stats["Magic_Power"] += it.get("Magic_Power", 0)
        total_stats["Phys_Defense"] += it.get("Phys_Defense", 0)
        total_stats["HP"] += it.get("HP", 0)
        total_stats["Cooldown_Reduction"] += it.get("Cooldown_Reduction", 0)
        total_stats["Critical_Rate"] += it.get("Critical_Rate", 0)
        total_stats["Movement_Speed"] += it.get("Movement_Speed", 0)

    # Normalization
    patk_norm = min(total_stats["Phys_ATK"] / stat_caps["MAX_PATK"], 1.0)
    ap_norm = min(total_stats["Magic_Power"] / stat_caps["MAX_AP"], 1.0)
    def_norm = min(total_stats["Phys_Defense"] / stat_caps["MAX_DEF"], 1.0)
    hp_norm = min(total_stats["HP"] / stat_caps["MAX_HP"], 1.0)
    cdr_norm = min(total_stats["Cooldown_Reduction"] / stat_caps["MAX_CDR"], 1.0)
    crit_norm = min(total_stats["Critical_Rate"] / stat_caps["MAX_CRIT"], 1.0)
    ms_norm = min(total_stats["Movement_Speed"] / stat_caps["MAX_MS"], 1.0)

    # Weight Table ตามคลาสฮีโร่
    weight_table = {
        "Fighter": {"Phys_ATK":0.3, "Magic_Power":0.0, "Phys_Defense":0.2, "HP":0.2, "Cooldown_Reduction":0.2, "Critical_Rate":0.1, "Movement_Speed":0.0},
        "Tank": {"Phys_ATK":0.1, "Magic_Power":0.0, "Phys_Defense":0.4, "HP":0.4, "Cooldown_Reduction":0.0, "Critical_Rate":0.0, "Movement_Speed":0.1},
        "Assassin": {"Phys_ATK":0.4, "Magic_Power":0.1, "Phys_Defense":0.1, "HP":0.1, "Cooldown_Reduction":0.3, "Critical_Rate":0.1, "Movement_Speed":0.0},
        "Mage": {"Phys_ATK":0.0, "Magic_Power":0.5, "Phys_Defense":0.1, "HP":0.1, "Cooldown_Reduction":0.2, "Critical_Rate":0.0, "Movement_Speed":0.1},
        "Carry": {"Phys_ATK":0.4, "Magic_Power":0.0, "Phys_Defense":0.1, "HP":0.1, "Cooldown_Reduction":0.1, "Critical_Rate":0.3, "Movement_Speed":0.0},
        "Support": {"Phys_ATK":0.1, "Magic_Power":0.3, "Phys_Defense":0.2, "HP":0.3, "Cooldown_Reduction":0.3, "Critical_Rate":0.0, "Movement_Speed":0.1}
    }

    wtable = weight_table.get(hero_class, weight_table["Fighter"])
    return (
        wtable["Phys_ATK"] * patk_norm +
        wtable["Magic_Power"] * ap_norm +
        wtable["Phys_Defense"] * def_norm +
        wtable["HP"] * hp_norm +
        wtable["Cooldown_Reduction"] * cdr_norm +
        wtable["Critical_Rate"] * crit_norm +
        wtable["Movement_Speed"] * ms_norm
    )

# =========================================
# 4. คำนวณ Fitness
# =========================================
def calculate_fitness(chromosome, item_data, stat_caps, hero_class):
    return score_stats(chromosome, item_data, hero_class, stat_caps)

# =========================================
# 5. เ่อมต้นประชากร
# =========================================
def initialize_population(size, item_data, force_items, ban_items, lane):
    population = []
    for _ in range(size):
        chromo = create_random_chromosome(item_data, force_items, ban_items, lane)
        population.append(chromo)
    return population

def create_random_chromosome(item_data, force_items, ban_items, lane):
    chromosome = force_items.copy()
    while len(chromosome) < 6:
        candidate = random.choice(list(item_data.keys()))
        if candidate not in ban_items and candidate not in chromosome:
            chromosome.append(candidate)
    return repair_chromosome(chromosome, item_data, lane, force_items, ban_items)

# =========================================
# 6. Tournament Selection
# =========================================
def tournament_selection(population, fitness, tournament_size=3):
    selected = []
    for _ in range(len(population)):
        candidates = random.sample(list(zip(population, fitness)), tournament_size)
        winner = max(candidates, key=lambda x: x[1])
        selected.append(winner[0])
    return selected

# =========================================
# 7. One-Point Crossover
# =========================================
def one_point_crossover(p1, p2, crossover_rate):
    if random.random() < crossover_rate:
        point = random.randint(1, 5)
        child1 = p1[:point] + p2[point:]
        child2 = p2[:point] + p1[point:]
    else:
        child1, child2 = p1.copy(), p2.copy()
    return child1, child2

# =========================================
# 8. Mutation และ Repair
# =========================================
def mutate(chromosome, item_data, mutation_rate, ban_items, force_items, lane):
    if not chromosome:
        return []
    new_chromo = chromosome.copy()
    valid_items = [item for item in item_data if item not in ban_items and item not in new_chromo]
    for i in range(len(new_chromo)):
        if random.random() < mutation_rate and valid_items:
            # Don't mutate force items
            if new_chromo[i] not in force_items:
                new_chromo[i] = random.choice(valid_items)
                valid_items.remove(new_chromo[i])
    return repair_chromosome(new_chromo, item_data, lane, force_items, ban_items)

def repair_chromosome(chromosome, item_data, lane, force_items, ban_items):
    chromosome = [item for item in chromosome if item not in ban_items]
    for fi in force_items:
        if fi not in chromosome:
            chromosome.append(fi)
    
    if lane == "Support":
        support_items = [item for item in chromosome if item_data[item]['Class'] == 'Support']
        if len(support_items) > 1:
            keep = random.choice(support_items)
            chromosome = [item for item in chromosome if item == keep or item_data[item]['Class'] != 'Support']
    elif lane == "Jungle":
        jungle_items = [item for item in chromosome if item_data[item]['Class'] == 'Jungle']
        if len(jungle_items) > 1:
            keep = random.choice(jungle_items)
            chromosome = [item for item in chromosome if item == keep or item_data[item]['Class'] != 'Jungle']
    
    while len(chromosome) < 6:
        candidate = random.choice(list(item_data.keys()))
        if candidate not in ban_items and candidate not in chromosome:
            chromosome.append(candidate)
    return chromosome[:6]

# =========================================
# 9. Elitism
# =========================================
def elitism(population, offspring, fitness, offspring_fitness, elite_size):
    combined = list(zip(population, fitness)) + list(zip(offspring, offspring_fitness))
    combined.sort(key=lambda x: x[1], reverse=True)
    new_population = [x[0] for x in combined[:len(population)]]
    return new_population

# =========================================
# 10. Genetic Algorithm
# =========================================
def run_genetic_algorithm(hero_id, lane, force_items, ban_items, item_data, hero_class, conn):
    stat_caps = load_stat_caps(conn)
    population_size = 1000
    max_generations = 50
    crossover_rate = 0.8
    mutation_rate = 0.05

    population = initialize_population(population_size, item_data, force_items, ban_items, lane)

    for generation in range(max_generations):
        fitness = [calculate_fitness(chromo, item_data, stat_caps, hero_class) for chromo in population]
        parents = tournament_selection(population, fitness, tournament_size=3)
        offspring = []
        while len(offspring) < population_size:
            p1, p2 = random.sample(parents, 2)
            child1, child2 = one_point_crossover(p1, p2, crossover_rate)
            offspring.extend([child1, child2])
        for i in range(len(offspring)):
            offspring[i] = mutate(offspring[i], item_data, mutation_rate, ban_items, force_items, lane)
        offspring_fitness = [calculate_fitness(chromo, item_data, stat_caps, hero_class) for chromo in offspring]
        population = elitism(population, offspring, fitness, offspring_fitness, elite_size=2)

    best_index = fitness.index(max(fitness))
    return population[best_index], fitness[best_index]

# =========================================
# Main Function
# =========================================
if __name__ == "__main__":
    conn = connect_db()
    item_data = load_item_data(conn)
    hero_id_example = "H001"
    lane_example = "Jungle"
    force_list = []
    ban_list = []
    hero_class = "Fighter"

    best_build, best_fitness = run_genetic_algorithm(
        hero_id=hero_id_example,
        lane=lane_example,
        force_items=force_list,
        ban_items=ban_list,
        item_data=item_data,
        hero_class=hero_class,
        conn=conn
    )

    print("Best Build:", best_build)
    print("Best Fitness:", best_fitness)
    conn.close()


0.0.4

In [None]:
import random
import copy
import os
from dotenv import load_dotenv
import mysql.connector
import sys

dotenv_path = os.path.join(os.getcwd(), "..", "..", ".env")
load_dotenv(dotenv_path)

# =========================================
# 0. เชื่อมต่อฐานข้อมูล
# =========================================
def connect_db():
    try:
        conn = mysql.connector.connect(
            host=os.getenv("DB_HOST"),
            port=int(os.getenv("DB_PRIMARY_PORT")),
            user=os.getenv("DB_USER"),
            password=os.getenv("DB_PASSWORD"),
            database=os.getenv("DB_NAME")
        )
        return conn
    except mysql.connector.Error as e:
        print(f"Database connection failed: {e}")
        exit(1)

# =========================================
# 1. ข้อมูลไอเทมจากฐานข้อมูล
# =========================================
def load_item_data(conn):
    cursor = conn.cursor(dictionary=True)
    cursor.execute("SELECT * FROM items")
    items = {row['ItemID']: row for row in cursor.fetchall()}
    cursor.close()
    return items

# =========================================
# 2. ค่าของ (Stat Caps)
# =========================================
def load_stat_caps(conn, buffer_pct=0.2):
    cursor = conn.cursor(dictionary=True)
    query = """
        SELECT 
            MAX(Phys_ATK) * (1 + %s) AS max_patk,
            MAX(Magic_Power) * (1 + %s) AS max_ap,
            MAX(Phys_Defense) * (1 + %s) AS max_def,
            MAX(HP) * (1 + %s) AS max_hp,
            MAX(Movement_Speed) * (1 + %s) AS max_ms
        FROM items
    """
    buffer_values = [buffer_pct, buffer_pct, buffer_pct, buffer_pct, 0.1]
    cursor.execute(query, buffer_values)
    result = cursor.fetchone()
    
    # Convert Decimal values to float
    stat_caps = {
        "MAX_PATK": float(result["max_patk"] or 1),
        "MAX_AP": float(result["max_ap"] or 1),
        "MAX_DEF": float(result["max_def"] or 1),
        "MAX_HP": float(result["max_hp"] or 1),
        "MAX_MS": float(result["max_ms"] or 1),
        "MAX_CDR": 0.40,
        "MAX_CRIT": 1.0
    }
    cursor.close()
    return stat_caps

# =========================================
# 3. คำนวณคะแนน Normalize
# =========================================
def score_stats(chromosome, item_data, hero_class, stat_caps, level, budget):
    total_stats = {
        "Phys_ATK": 0, "Magic_Power": 0, "Phys_Defense": 0, 
        "HP": 0, "Cooldown_Reduction": 0, "Critical_Rate": 0, "Movement_Speed": 0
    }

    for item_id in chromosome:
        if item_id not in item_data:
            continue
        it = item_data[item_id]
        total_stats["Phys_ATK"] += it.get("Phys_ATK", 0)
        total_stats["Magic_Power"] += it.get("Magic_Power", 0)
        total_stats["Phys_Defense"] += it.get("Phys_Defense", 0)
        total_stats["HP"] += it.get("HP", 0)
        total_stats["Cooldown_Reduction"] += it.get("Cooldown_Reduction", 0)
        total_stats["Critical_Rate"] += it.get("Critical_Rate", 0)
        total_stats["Movement_Speed"] += it.get("Movement_Speed", 0)

    # Normalization
    patk_norm = min(total_stats["Phys_ATK"] / stat_caps["MAX_PATK"], 1.0)
    ap_norm = min(total_stats["Magic_Power"] / stat_caps["MAX_AP"], 1.0)
    def_norm = min(total_stats["Phys_Defense"] / stat_caps["MAX_DEF"], 1.0)
    hp_norm = min(total_stats["HP"] / stat_caps["MAX_HP"], 1.0)
    cdr_norm = min(total_stats["Cooldown_Reduction"] / stat_caps["MAX_CDR"], 1.0)
    crit_norm = min(total_stats["Critical_Rate"] / stat_caps["MAX_CRIT"], 1.0)
    ms_norm = min(total_stats["Movement_Speed"] / stat_caps["MAX_MS"], 1.0)

    # Weight Table ตามคลาสฮีโร่
    weight_table = {
        "Fighter": {"Phys_ATK":0.3, "Magic_Power":0.0, "Phys_Defense":0.2, "HP":0.2, "Cooldown_Reduction":0.2, "Critical_Rate":0.1, "Movement_Speed":0.0},
        "Tank": {"Phys_ATK":0.1, "Magic_Power":0.0, "Phys_Defense":0.4, "HP":0.4, "Cooldown_Reduction":0.0, "Critical_Rate":0.0, "Movement_Speed":0.1},
        "Assassin": {"Phys_ATK":0.4, "Magic_Power":0.1, "Phys_Defense":0.1, "HP":0.1, "Cooldown_Reduction":0.3, "Critical_Rate":0.1, "Movement_Speed":0.0},
        "Mage": {"Phys_ATK":0.0, "Magic_Power":0.5, "Phys_Defense":0.1, "HP":0.1, "Cooldown_Reduction":0.2, "Critical_Rate":0.0, "Movement_Speed":0.1},
        "Carry": {"Phys_ATK":0.4, "Magic_Power":0.0, "Phys_Defense":0.1, "HP":0.1, "Cooldown_Reduction":0.1, "Critical_Rate":0.3, "Movement_Speed":0.0},
        "Support": {"Phys_ATK":0.1, "Magic_Power":0.3, "Phys_Defense":0.2, "HP":0.3, "Cooldown_Reduction":0.3, "Critical_Rate":0.0, "Movement_Speed":0.1}
    }

    wtable = weight_table.get(hero_class, weight_table["Fighter"])
    fitness = (
        wtable["Phys_ATK"] * patk_norm +
        wtable["Magic_Power"] * ap_norm +
        wtable["Phys_Defense"] * def_norm +
        wtable["HP"] * hp_norm +
        wtable["Cooldown_Reduction"] * cdr_norm +
        wtable["Critical_Rate"] * crit_norm +
        wtable["Movement_Speed"] * ms_norm
    )
    
    # ประมาณงบประมาณในแต่ละ Phase
    budget_score = min(budget / 15000, 1.0)  # เ่อมคะแนนตามงบประมาณ
    fitness *= budget_score

    # ใช้ Level เ่อมคะแนน
    level_score = min(level / 15, 1.0)  # ใช้ Level น้อยกว่า 15
    fitness *= level_score
    
    return fitness

# =========================================
# 4. คำนวณ Fitness
# =========================================
def calculate_fitness(chromosome, item_data, stat_caps, hero_class, level, budget):
    return score_stats(chromosome, item_data, hero_class, stat_caps, level, budget)

# =========================================
# 5. เ่อมต้นประชากร
# =========================================
def initialize_population(size, item_data, force_items, ban_items, lane):
    population = []
    for _ in range(size):
        chromo = create_random_chromosome(item_data, force_items, ban_items)
        population.append(chromo)
    return population

def create_random_chromosome(item_data, force_items, ban_items):
    chromosome = force_items.copy()
    while len(chromosome) < 6:
        candidate = random.choice(list(item_data.keys()))
        if candidate not in ban_items and candidate not in chromosome:
            chromosome.append(candidate)
    return chromosome

# =========================================
# 6. Tournament Selection
# =========================================
def tournament_selection(population, fitness, tournament_size=3):
    selected = []
    for _ in range(len(population)):
        candidates = random.sample(list(zip(population, fitness)), tournament_size)
        winner = max(candidates, key=lambda x: x[1])
        selected.append(winner[0])
    return selected

# =========================================
# 7. One-Point Crossover
# =========================================
def one_point_crossover(p1, p2, crossover_rate):
    if random.random() < crossover_rate:
        point = random.randint(1, 5)
        child1 = p1[:point] + p2[point:]
        child2 = p2[:point] + p1[point:]
    else:
        child1, child2 = p1.copy(), p2.copy()
    return child1, child2

# =========================================
# 8. Mutation
# =========================================
def mutate(chromosome, item_data, mutation_rate, ban_items, force_items, lane):
    if not chromosome:
        return []
    new_chromo = chromosome.copy()
    valid_items = [item for item in item_data if item not in ban_items and item not in new_chromo]
    for i in range(len(new_chromo)):
        if random.random() < mutation_rate and valid_items:
            # Don't mutate force items
            if new_chromo[i] not in force_items:
                new_chromo[i] = random.choice(valid_items)
                valid_items.remove(new_chromo[i])
    return new_chromo

# =========================================
# 9. Elitism
# =========================================
def elitism(population, offspring, fitness, offspring_fitness, elite_size):
    combined = list(zip(population, fitness)) + list(zip(offspring, offspring_fitness))
    combined.sort(key=lambda x: x[1], reverse=True)
    new_population = [x[0] for x in combined[:len(population)]]
    return new_population

# =========================================
# 10. Genetic Algorithm
# =========================================
def run_genetic_algorithm(hero_id, lane, force_items, ban_items, item_data, hero_class, conn):
    stat_caps = load_stat_caps(conn)
    population_size = 1000
    max_generations = 50
    crossover_rate = 0.8
    mutation_rate = 0.05

    population = initialize_population(population_size, item_data, force_items, ban_items, lane)

    # คำนวณผลลัพธ์ 3 Phase
    phases = {
        "Early Game": {"level": 3, "budget": 2700},
        "Mid Game": {"level": 9, "budget": 7500},
        "Late Game": {"level": 15, "budget": 14000},
    }

    phase_results = {}

    for phase_name, phase_data in phases.items():
        print(f"Calculating for {phase_name}...")
        fitness = [calculate_fitness(chromo, item_data, stat_caps, hero_class, phase_data["level"], phase_data["budget"]) for chromo in population]
        parents = tournament_selection(population, fitness, tournament_size=3)
        offspring = []
        while len(offspring) < population_size:
            p1, p2 = random.sample(parents, 2)
            child1, child2 = one_point_crossover(p1, p2, crossover_rate)
            offspring.extend([child1, child2])
        for i in range(len(offspring)):
            offspring[i] = mutate(offspring[i], item_data, mutation_rate, ban_items, force_items, lane)
        offspring_fitness = [calculate_fitness(chromo, item_data, stat_caps, hero_class, phase_data["level"], phase_data["budget"]) for chromo in offspring]
        population = elitism(population, offspring, fitness, offspring_fitness, elite_size=2)

        best_index = fitness.index(max(fitness))
        best_build = population[best_index]
        best_fitness = fitness[best_index]
        
        phase_results[phase_name] = {"Best Build": best_build, "Best Fitness": best_fitness}

    return phase_results

# =========================================
# Main Function
# =========================================
if __name__ == "__main__":
    conn = connect_db()
    item_data = load_item_data(conn)
    hero_id_example = "H001"
    lane_example = "Jungle"
    force_list = []
    ban_list = []
    hero_class = "Fighter"

    phase_results = run_genetic_algorithm(
        hero_id=hero_id_example,
        lane=lane_example,
        force_items=force_list,
        ban_items=ban_list,
        item_data=item_data,
        hero_class=hero_class,
        conn=conn
    )

    for phase, result in phase_results.items():
        print(f"Results for {phase}:")
        print(f"Best Build: {result['Best Build']}")
        print(f"Best Fitness: {result['Best Fitness']}")
        
    conn.close()

0.0.5

In [None]:
import mysql.connector
import random
import copy
import os
from dotenv import load_dotenv

dotenv_path = os.path.join(os.getcwd(), "..", "..", ".env")
load_dotenv(dotenv_path)

# GA Parameters
POPULATION_SIZE = 100
MAX_GENERATIONS = 50
CROSSOVER_RATE = 0.8
BASE_MUTATION_RATE = 0.05
EARLY_BUDGET = 2700
MID_BUDGET = 7500
LATE_BUDGET = 14000
PHASE_WEIGHTS = {"Early": 0.2, "Mid": 0.3, "Late": 0.5}

# การเชื่อมต่อฐานข้อมูล
def connect_db():
    try:
        conn = mysql.connector.connect(
            host=os.getenv("DB_HOST"),
            port=int(os.getenv("DB_PRIMARY_PORT")),
            user=os.getenv("DB_USER"),
            password=os.getenv("DB_PASSWORD"),
            database=os.getenv("DB_NAME")
        )
        return conn
    except mysql.connector.Error as e:
        print(f"การเชื่อมต่อฐานข้อมูลล้มเหลว: {e}")
        exit(1)

# โหลดข้อมูลไอเทมจากฐานข้อมูล
def load_item_data(conn):
    cursor = conn.cursor(dictionary=True)
    cursor.execute("SELECT * FROM items")
    items = {row['ItemID']: row for row in cursor.fetchall()}
    cursor.close()
    return items

# โหลดข้อมูลของฮีโร่จากฐานข้อมูล
def load_hero_stats(conn, hero_id, level):
    cursor = conn.cursor(dictionary=True)
    cursor.execute("SELECT * FROM herostats WHERE HeroID = %s AND Level = %s", (hero_id, level))
    stats = cursor.fetchone()
    cursor.close()
    return stats or {}

# โหลด Stat Caps
def load_stat_caps(conn, buffer_pct=0.2):
    cursor = conn.cursor(dictionary=True)
    query = """
        SELECT 
            MAX(Phys_ATK) * (1 + %s) AS max_patk,
            MAX(Magic_Power) * (1 + %s) AS max_ap,
            MAX(Phys_Defense) * (1 + %s) AS max_def,
            MAX(HP) * (1 + %s) AS max_hp,
            MAX(Movement_Speed) * (1 + %s) AS max_ms
        FROM items
    """
    buffer_values = [buffer_pct, buffer_pct, buffer_pct, buffer_pct, 0.1]
    cursor.execute(query, buffer_values)
    result = cursor.fetchone()
    stat_caps = {
        "MAX_PATK": float(result["max_patk"] or 1),
        "MAX_AP": float(result["max_ap"] or 1),
        "MAX_DEF": float(result["max_def"] or 1),
        "MAX_HP": float(result["max_hp"] or 1),
        "MAX_MS": float(result["max_ms"] or 1),
        "MAX_CDR": 0.40,
        "MAX_CRIT": 1.0
    }
    cursor.close()
    return stat_caps

# คำนวณ Fitness ตามเฟส
def calculate_fitness(chromosome, item_data, stat_caps, hero_id, phase, conn):
    level = get_phase_level(phase)
    budget = get_phase_budget(phase)
    hero_base_stats = load_hero_stats(conn, hero_id, level)
    stats_score = score_stats(chromosome, item_data, hero_base_stats, stat_caps, phase, hero_id, conn)
    budget_score = score_budget(chromosome, item_data, budget)
    return (stats_score + budget_score) * PHASE_WEIGHTS[phase]

def get_phase_level(phase):
    return {"Early": 3, "Mid": 9, "Late": 15}.get(phase, 15)

def get_phase_budget(phase):
    return {"Early": EARLY_BUDGET, "Mid": MID_BUDGET, "Late": LATE_BUDGET}.get(phase, LATE_BUDGET)

# คำนวณคะแนน
def score_stats(chromosome, item_data, hero_base_stats, stat_caps, phase, hero_id, conn):
    total_stats = {
        "patk": hero_base_stats.get("Phys_ATK", 0),
        "ap": hero_base_stats.get("Magic_Power", 0),
        "def": hero_base_stats.get("Phys_Defense", 0),
        "hp": hero_base_stats.get("HP", 0),
        "cdr": 0,
        "crit": 0,
        "ms": hero_base_stats.get("Movement_Speed", 0)
    }
    for item_id in chromosome:
        it = item_data.get(item_id, {})
        total_stats["patk"] += it.get("Phys_ATK", 0)
        total_stats["ap"] += it.get("Magic_Power", 0)
        total_stats["def"] += it.get("Phys_Defense", 0)
        total_stats["hp"] += it.get("HP", 0)
        total_stats["cdr"] += it.get("Cooldown_Reduction", 0)
        total_stats["crit"] += it.get("Critical_Rate", 0)
        total_stats["ms"] += it.get("Movement_Speed", 0)

    patk_norm = min(total_stats["patk"] / stat_caps["MAX_PATK"], 1.0)
    ap_norm = min(total_stats["ap"] / stat_caps["MAX_AP"], 1.0)
    def_norm = min(total_stats["def"] / stat_caps["MAX_DEF"], 1.0)
    hp_norm = min(total_stats["hp"] / stat_caps["MAX_HP"], 1.0)
    cdr_norm = min(total_stats["cdr"] / stat_caps["MAX_CDR"], 1.0)
    crit_norm = min(total_stats["crit"] / stat_caps["MAX_CRIT"], 1.0)
    ms_norm = min(total_stats["ms"] / stat_caps["MAX_MS"], 1.0)

    hero_class = get_hero_class(hero_id, conn)
    weights = get_phase_weight(hero_class, phase)
    return (
        weights["patk"] * patk_norm +
        weights["ap"] * ap_norm +
        weights["def"] * def_norm +
        weights["hp"] * hp_norm +
        weights["cdr"] * cdr_norm +
        weights["crit"] * crit_norm +
        weights["ms"] * ms_norm
    )

def get_hero_class(hero_id, conn):
    cursor = conn.cursor()
    cursor.execute("SELECT First_Class FROM heroes WHERE HeroID = %s", (hero_id,))
    result = cursor.fetchone()
    cursor.close()
    return result[0] if result else "Fighter"

def get_phase_weight(hero_class, phase):
    base_weights = {
        "Fighter": {"patk": 0.3, "ap": 0.0, "def": 0.2, "hp": 0.2, "cdr": 0.2, "crit": 0.1, "ms": 0.0},
        "Tank": {"patk": 0.1, "ap": 0.0, "def": 0.4, "hp": 0.4, "cdr": 0.0, "crit": 0.0, "ms": 0.1},
        "Mage": {"patk": 0.0, "ap": 0.5, "def": 0.1, "hp": 0.1, "cdr": 0.2, "crit": 0.0, "ms": 0.1},
        "Assassin": {"patk": 0.4, "ap": 0.1, "def": 0.1, "hp": 0.1, "cdr": 0.3, "crit": 0.1, "ms": 0.0},
        "Carry": {"patk": 0.4, "ap": 0.0, "def": 0.1, "hp": 0.1, "cdr": 0.1, "crit": 0.3, "ms": 0.0},
        "Support": {"patk": 0.1, "ap": 0.3, "def": 0.2, "hp": 0.3, "cdr": 0.3, "crit": 0.0, "ms": 0.1}
    }
    weights = copy.deepcopy(base_weights.get(hero_class, base_weights["Fighter"]))
    if phase == "Early":
        weights["patk"] += 0.1
    elif phase == "Late":
        weights["hp"] += 0.2
    return weights

# คำนวณงบประมาณ
def score_budget(chromosome, item_data, budget):
    cost = sum(item_data.get(it, {}).get("Cost", 0) for it in chromosome)
    return 1.0 if cost <= budget else max(0, 1 - ((cost - budget) / budget) ** 2)

# เ่อมต้นประชากร
def initialize_population(size, item_pool, force_items, ban_items, lane):
    return [repair_chromosome(
        [random.choice(item_pool) for _ in range(6)], lane, item_pool, ban_items, force_items
    ) for _ in range(size)]

# Tournament Selection
def tournament_selection(population, fitness, tournament_size=3):
    selected = []
    for _ in range(len(population)):
        candidates = random.sample(list(zip(population, fitness)), tournament_size)
        winner = max(candidates, key=lambda x: x[1])
        selected.append(winner[0])
    return selected

# One-Point Crossover
def one_point_crossover(p1, p2):
    if random.random() < CROSSOVER_RATE:
        point = random.randint(1, 5)
        return p1[:point] + p2[point:], p2[:point] + p1[point:]
    return p1.copy(), p2.copy()

# Mutation
def mutate(chromo, item_pool, mutation_rate, ban_items, force_items, lane):
    if not chromo:
        return []
    new_chromo = chromo.copy()
    valid_items = [item for item in item_pool if item not in ban_items and item not in new_chromo]
    for i in range(len(new_chromo)):
        if random.random() < mutation_rate and valid_items:
            # Don't mutate force items
            if new_chromo[i] not in force_items:
                new_chromo[i] = random.choice(valid_items)
                valid_items.remove(new_chromo[i])
    return repair_chromosome(new_chromo, lane, item_pool, ban_items, force_items)

# Repair Chromosome
def repair_chromosome(chromo, lane, item_pool, ban_items, force_items):
    seen = set()
    valid = [item for item in chromo if item not in ban_items and not (item in seen or seen.add(item))]
    for fi in force_items:
        if fi not in valid:
            valid.append(fi)
    
    while len(valid) < 6:
        candidate = random.choice(item_pool)
        if candidate not in ban_items and candidate not in valid:
            valid.append(candidate)
    return valid[:6]

# Elitism
def elitism(population, offspring, fitness, item_data, stat_caps, hero_id, conn):
    # คำนวณ Fitness offspring
    offspring_fitness = []
    for chromo in offspring:
        total = sum(
            calculate_fitness(chromo, item_data, stat_caps, hero_id, phase, conn)
            for phase in PHASE_WEIGHTS
        )
        offspring_fitness.append(total)
    
    # รวม Population เ่อม Offspring
    combined = list(zip(population + offspring, fitness + offspring_fitness))
    combined.sort(key=lambda x: x[1], reverse=True)
    
    # เลือก Chromosome ดีที่สุด
    return [x[0] for x in combined[:POPULATION_SIZE]]

# Main Genetic Algorithm
def run_genetic_algorithm(hero_id, lane, force_items, ban_items):
    conn = connect_db()
    item_data = load_item_data(conn)
    stat_caps = load_stat_caps(conn)  # โหลดค่า stat_caps
    item_pool = [item for item in item_data if item not in ban_items]
    hero_class = get_hero_class(hero_id, conn)
    
    population = initialize_population(POPULATION_SIZE, item_pool, force_items, ban_items, lane)
    best_fitness = float("-inf")
    best_chromo = None
    
    for gen in range(MAX_GENERATIONS):
        fitness = []
        for chromo in population:
            total_fitness = sum(calculate_fitness(chromo, item_data, stat_caps, hero_id, phase, conn) 
                              for phase in PHASE_WEIGHTS)
            fitness.append(total_fitness)
        
        current_best = max(fitness)
        if current_best > best_fitness:
            best_fitness = current_best
            best_chromo = population[fitness.index(current_best)].copy()
        
        parents = tournament_selection(population, fitness)
        offspring = []
        while len(offspring) < POPULATION_SIZE:
            p1, p2 = random.sample(parents, 2)
            c1, c2 = one_point_crossover(p1, p2)
            offspring.extend([c1, c2])
        
        offspring = [mutate(chromo, item_pool, BASE_MUTATION_RATE, ban_items, force_items, lane) 
                    for chromo in offspring]
        
        # ส่ง 'stat_caps' และพารามิเตอร์จำเป็น 'elitism'
        population = elitism(population, offspring, fitness, item_data, stat_caps, hero_id, conn)
    
    conn.close()
    return best_chromo, best_fitness

# Main Execution
if __name__ == "__main__":
    hero_id = "H001"  # อย่าง ID ของฮีโร่
    lane = "Jungle"  # อย่าง Lane
    force_items = []
    ban_items = []
    
    best_build, best_fitness = run_genetic_algorithm(hero_id, lane, force_items, ban_items)
    print("Best Build:", best_build)
    print("Best Fitness:", best_fitness)



0.0.6

In [None]:
# rov_ga_recommendation.py – COMPLETE
"""roV Item Recommender (GA)
============================================================
✓ Boots≤1  ✓ Jungle≥1  ✓ Composition valid  ✓ Force/Ban clash check
✓ Dynamic class/phase weights  ✓ Skill/Item synergy  ✓ StatCap hero + item
✓ Adaptive mutation  ✓ Elitism  ✓ Early‑stop  ✓ Connection‑pool + cache
Note: requires MySQL DB schema (heroes, herostats, heroskills, items, itemcomposition)
"""
from __future__ import annotations

import os, random, math, time, copy
from functools import lru_cache
from typing import Dict, List, Tuple

import mysql.connector
from mysql.connector import pooling, Error
from dotenv import load_dotenv

# =============================================================
# 1. ENV / DB --------------------------------------------------
# =============================================================
load_dotenv('../config/.env')

DB_CONFIG = {
    "host": os.getenv("DB_HOST"),
    "port": int(os.getenv("DB_PRIMARY_PORT", 3306)),
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "database": os.getenv("DB_NAME"),
    "charset": "utf8mb4",
}
# pool_size small (GA single‑thread)
POOL = pooling.MySQLConnectionPool(pool_name="rov_pool", pool_size=3, **DB_CONFIG)


def get_conn():
    tries = 0
    while tries < 3:
        try:
            return POOL.get_connection()
        except Error:
            tries += 1
            time.sleep(1)
    raise RuntimeError("Cannot acquire DB connection")

# =============================================================
# 2. CONSTANTS -------------------------------------------------
# =============================================================
POP_SIZE = 100
MAX_GEN = 50
CROSSOVER_RATE = 0.8
BASE_MUT_RATE = 0.05
STAGNANT_LIMIT = 6
PHASE_WEIGHTS = {"Early": 0.2, "Mid": 0.3, "Late": 0.5}
BASE_BUDGET = {"Early": 2700, "Mid": 7500, "Late": 14000}

CLASS_BASE: Dict[str, Dict[str, float]] = {
    "Fighter":  {"patk": .35, "ap": .05, "def": .15, "hp": .15, "cdr": .15, "crit": .05, "ms": .05},
    "Tank":     {"patk": .1,  "ap": .0,  "def": .35, "hp": .3,  "cdr": .1,  "crit": .0,  "ms": .15},
    "Mage":     {"patk": .05, "ap": .45, "def": .1,  "hp": .1,  "cdr": .2,  "crit": .0,  "ms": .1},
    "Assassin": {"patk": .45, "ap": .05, "def": .1,  "hp": .1,  "cdr": .2,  "crit": .1,  "ms": .0},
    "Carry":    {"patk": .45, "ap": .0,  "def": .1,  "hp": .1,  "cdr": .1,  "crit": .2,  "ms": .05},
    "Support":  {"patk": .05, "ap": .25, "def": .2,  "hp": .25, "cdr": .2,  "crit": .0,  "ms": .05},
}
# add keys safeguard
for v in CLASS_BASE.values():
    v.setdefault("ms", .05)

# simple synergy table (extend as needed)
SYNERGY_RULES: List[Tuple[set, float]] = [
    ({"I006", "I001"}, 0.2),   # Cleaving Claymore + Short Sword
    ({"I020", "I015"}, 0.25),  # Claves Sancti + Bow of Slaughter
]

UNIQUE_SINGLE = {"I049"}  # Blade of Eternity เป็นต้น – ไม่ซ้ำ

# =============================================================
# 3. DATA LOADERS (cached) ------------------------------------
# =============================================================
@lru_cache(maxsize=None)
def load_item_data() -> Dict[str, Dict]:
    with get_conn() as conn:
        cur = conn.cursor(dictionary=True)
        cur.execute("SELECT * FROM items")
        items = {row["ItemID"]: row for row in cur.fetchall()}
        return items

@lru_cache(maxsize=None)
def load_item_comp() -> Dict[str, List[str]]:
    with get_conn() as conn:
        cur = conn.cursor(dictionary=True)
        cur.execute("SELECT Composite_ItemID, BaseItemID FROM itemcomposition")
        comp: Dict[str, List[str]] = {}
        for r in cur.fetchall():
            comp.setdefault(r["Composite_ItemID"], []).append(r["BaseItemID"])
        return comp

@lru_cache(maxsize=128)
def load_hero_stats(hero_id: str, level: int) -> Dict:
    with get_conn() as conn:
        cur = conn.cursor(dictionary=True)
        cur.execute("SELECT * FROM herostats WHERE HeroID=%s AND Level=%s", (hero_id, level))
        row = cur.fetchone()
        return row or {}

@lru_cache(maxsize=128)
def get_recommended_item_types(hero_id: str) -> List[str]:
    with get_conn() as conn:
        cur = conn.cursor()
        cur.execute("SELECT DISTINCT RecommendItemType FROM heroskills WHERE HeroID=%s", (hero_id,))
        return [r[0] for r in cur.fetchall() if r[0]]

@lru_cache(maxsize=128)
def get_hero_info(hero_id: str) -> Dict:
    with get_conn() as conn:
        cur = conn.cursor()
        cur.execute("SELECT First_Class, Second_Class, First_Lane FROM heroes WHERE HeroID=%s", (hero_id,))
        row = cur.fetchone()
        if not row:
            return {"primary": "Fighter", "secondary": None, "lane": "Mid"}
        return {"primary": row[0], "secondary": row[1] or None, "lane": row[2] or "Mid"}

# =============================================================
# 4. STAT CAPS -------------------------------------------------
# =============================================================
@lru_cache(maxsize=128)
def load_stat_caps(hero_id: str, buffer: float = 0.2) -> Dict[str, float]:
    items = load_item_data().values()
    hero_base = load_hero_stats(hero_id, 15)  # use level 15
    def item_max(key):
        return max(r.get(key, 0) or 0 for r in items)

    caps = {
        "PATK": (item_max("Phys_ATK")) * (1 + buffer),
        "AP":   (item_max("Magic_Power")) * (1 + buffer),
        "DEF":  (item_max("Phys_Defense")) * (1 + buffer),
        "HP":   (item_max("HP")) * (1 + buffer),
        "MS":   (item_max("Movement_Speed")) * 1.1,
        "CDR":  0.4,
        "CRIT": 1.0,
    }
    return caps

# =============================================================
# 5. HELPER: WEIGHT & SYNERGY ---------------------------------
# =============================================================

def get_phase_weight(hero_info: Dict, phase: str) -> Dict[str, float]:
    w = copy.deepcopy(CLASS_BASE.get(hero_info["primary"], CLASS_BASE["Fighter"]))
    # secondary tweak
    if hero_info["secondary"] == "Dark Slayer":
        w["patk"] += 0.05
    if hero_info["lane"] == "Support":
        w["hp"] += 0.05; w["cdr"] += 0.05
    # phase tweak
    if phase == "Early":
        w["patk"] += 0.05
    elif phase == "Late":
        w["hp"] += 0.1
    return w


def calc_synergy(chromo: List[str]) -> float:
    score = 0.0
    s = set(chromo)
    for req, bonus in SYNERGY_RULES:
        if req.issubset(s):
            score += bonus
    # simple tank synergy
    items = load_item_data()
    if sum(1 for it in chromo if items[it]["Class"] == "Defense") >= 3:
        score += 0.2
    return score

# =============================================================
# 6. VALID / REPAIR -------------------------------------------
# =============================================================
item_data = load_item_data()
item_comp = load_item_comp()

def is_valid_chromo(ch: List[str]) -> bool:
    # Boots limit
    if sum(1 for it in ch if item_data[it]["Class"] == "Movement") > 1:
        return False
    # composition
    for it in ch:
        if it in item_comp and not all(b in ch for b in item_comp[it]):
            return False
    # unique single items
    for u in UNIQUE_SINGLE:
        if ch.count(u) > 1:
            return False
    return True


def repair_chromosome(ch: List[str], lane: str, ban: set, force: set) -> List[str]:
    pool = [it for it in item_data if it not in ban]
    # remove duplicates & banned
    seen, out = set(), []
    for it in ch:
        if it not in ban and it not in seen:
            out.append(it)
            seen.add(it)
    # add force items
    for it in force:
        if it in ban:
            raise ValueError("Force item in ban list")
        if it not in out:
            out.append(it)
    # jungle rule
    if lane == "Jungle" and not any(item_data[it]["Class"] == "Jungle" for it in out):
        jungle_pool = [it for it in pool if item_data[it]["Class"] == "Jungle"]
        out.append(random.choice(jungle_pool))
    # fill until 6
    while len(out) < 6:
        cand = random.choice(pool)
        if cand not in out:
            out.append(cand)
    # fix invalidities (boots/composition)
    tries = 0
    while not is_valid_chromo(out) and tries < 10:
        # replace random invalid boots/composite with random valid item
        idx = random.randrange(6)
        repl = random.choice([it for it in pool if it not in out])
        out[idx] = repl
        tries += 1
    return out[:6]

# =============================================================
# 7. FITNESS ---------------------------------------------------
# =============================================================

def score_stats(ch: List[str], hero_base: Dict, caps: Dict, weights: Dict) -> float:
    total = {
        "patk": hero_base.get("Phys_ATK", 0),
        "ap": hero_base.get("Magic_Power", 0),
        "def": hero_base.get("Phys_Defense", 0),
        "hp": hero_base.get("HP", 0),
        "cdr": 0.0,
        "crit": 0.0,
        "ms": hero_base.get("Movement_Speed", 0),
    }
    for it in ch:
        row = item_data[it]
        total["patk"] += row.get("Phys_ATK", 0)
        total["ap"] += row.get("Magic_Power", 0)
        total["def"] += row.get("Phys_Defense", 0)
        total["hp"] += row.get("HP", 0)
        total["cdr"] += row.get("Cooldown_Reduction", 0)
        total["crit"] += row.get("Critical_Rate", 0)
        total["ms"] += row.get("Movement_Speed", 0)
    # normalize
    def norm(v, cap):
        return min(v / cap, 1.0) if cap else 0.0
    score = (
        weights["patk"] * norm(total["patk"], caps["PATK"]) +
        weights["ap"]   * norm(total["ap"],   caps["AP"]) +
        weights["def"]  * norm(total["def"],  caps["DEF"]) +
        weights["hp"]   * norm(total["hp"],   caps["HP"]) +
        weights["cdr"]  * norm(total["cdr"],  caps["CDR"]) +
        weights["crit"] * norm(total["crit"], caps["CRIT"]) +
        weights["ms"]   * norm(total["ms"],   caps["MS"])
    )
    return score


def score_budget(cost: int, budget: int, phase: str) -> float:
    if cost <= budget:
        return 1.0
    penalty = ((cost - budget) / budget) ** (1.2 if phase == "Late" else 1.5)
    return max(0.0, 1 - penalty)


def calculate_fitness(ch: List[str], hero_id: str, hero_info: Dict, phase: str, caps: Dict) -> float:
    level = {"Early": 3, "Mid": 9, "Late": 15}[phase]
    hero_base = load_hero_stats(hero_id, level)
    weights = get_phase_weight(hero_info, phase)
    stat_score = score_stats(ch, hero_base, caps, weights)
    cost = sum(item_data[it]["Price"] for it in ch)
    budget = BASE_BUDGET[phase] * (1.1 if hero_info["lane"] == "Jungle" and phase == "Early" else 1.0)
    budget_score = score_budget(cost, budget, phase)
    # bonuses
    rec_types = get_recommended_item_types(hero_id)
    skill_bonus = sum(0.2 for it in ch if item_data[it]["Class"] in rec_types)
    synergy = calc_synergy(ch)
    unique_bonus = sum(0.15 for it in ch if it in UNIQUE_SINGLE)
    return (stat_score + budget_score + skill_bonus + synergy + unique_bonus) * PHASE_WEIGHTS[phase]

# =============================================================
# 8. GA OPERATORS ---------------------------------------------
# =============================================================

def tournament_select(pop: List[List[str]], fitness: List[float], k: int = 3) -> List[str]:
    cand = random.sample(list(zip(pop, fitness)), k)
    return max(cand, key=lambda x: x[1])[0]


def crossover(p1: List[str], p2: List[str]) -> Tuple[List[str], List[str]]:
    if random.random() < CROSSOVER_RATE:
        point = random.randint(1, 5)
        return p1[:point] + p2[point:], p2[:point] + p1[point:]
    return p1[:], p2[:]


def adaptive_mut_rate(gen: int) -> float:
    return BASE_MUT_RATE * math.exp(-gen / (0.6 * MAX_GEN))


def mutate(ch: List[str], pool: List[str], rate: float, lane: str, ban: set, force: set) -> List[str]:
    out = ch[:]
    available = [it for it in pool if it not in out and it not in ban]
    for i in range(6):
        if out[i] not in force and random.random() < rate and available:
            out[i] = random.choice(available)
            available.remove(out[i])
    return repair_chromosome(out, lane, ban, force)

# =============================================================
# 9. MAIN GA ---------------------------------------------------
# =============================================================

def run_genetic_algorithm(hero_id: str, lane: str, force_items: List[str] | None = None, ban_items: List[str] | None = None):
    force_set, ban_set = set(force_items or []), set(ban_items or [])
    if force_set & ban_set:
        raise ValueError("force_items conflict with ban_items")

    hero_info = get_hero_info(hero_id)
    hero_info["lane"] = lane  # override
    caps = load_stat_caps(hero_id)

    pool = [it for it in item_data if it not in ban_set]
    pop = [repair_chromosome(random.sample(pool, 6), lane, ban_set, force_set) for _ in range(POP_SIZE)]

    best_fit = -1.0
    best_ch = None
    stagnant = 0

    for gen in range(MAX_GEN):
        # fitness
        fits = [sum(calculate_fitness(ch, hero_id, hero_info, ph, caps) for ph in PHASE_WEIGHTS) for ch in pop]
        gen_best = max(fits)
        if gen_best > best_fit + 1e-6:
            best_fit = gen_best
            best_ch = pop[fits.index(gen_best)][:]
            stagnant = 0
        else:
            stagnant += 1
        if stagnant >= STAGNANT_LIMIT:
            break
        # selection & offspring
        offspring: List[List[str]] = []
        while len(offspring) < POP_SIZE:
            p1 = tournament_select(pop, fits)
            p2 = tournament_select(pop, fits)
            c1, c2 = crossover(p1, p2)
            offspring.extend([c1, c2])
        # mutation & repair
        rate = adaptive_mut_rate(gen)
        offspring = [mutate(ch, pool, rate, lane, ban_set, force_set) for ch in offspring[:POP_SIZE]]
        # elitism
        offspring[random.randrange(POP_SIZE)] = best_ch[:]
        pop = offspring
    return best_ch, best_fit

# =============================================================
# 10. Demo (remove in production) -----------------------------
# =============================================================
if __name__ == "__main__":
    build, fit = run_genetic_algorithm("H001", "Mid")
    print("Best Build:", build)
    print("Fitness:", fit)


0.0.7

In [3]:
from __future__ import annotations
import os, random, math, time, copy
from functools import lru_cache
from typing import Dict, List, Tuple
import mysql.connector
from mysql.connector import pooling, Error
from dotenv import load_dotenv

# =============================================================
# 1. ENV / DB
# =============================================================
load_dotenv('../config/.env')

DB_CONFIG = {
    "host":     os.getenv("DB_HOST"),
    "port":     int(os.getenv("DB_PRIMARY_PORT", 3306)),
    "user":     os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "database": os.getenv("DB_NAME"),
    "charset":  "utf8mb4",
}

POOL = pooling.MySQLConnectionPool(pool_name="rov_pool",
                                   pool_size=3,
                                   **DB_CONFIG)


def get_conn():
    tries = 0
    while tries < 3:
        try:
            return POOL.get_connection()
        except Error:
            tries += 1
            time.sleep(1)
    raise RuntimeError("Cannot acquire DB connection")

# =============================================================
# 2. CONSTANTS
# =============================================================
POP_SIZE        = 100
MAX_GEN         = 50
CROSSOVER_RATE  = 0.8
BASE_MUT_RATE   = 0.05
STAGNANT_LIMIT  = 6

PHASE_WEIGHTS = {"Early": 0.2, "Mid": 0.3, "Late": 0.5}
BASE_BUDGET   = {"Early": 2700, "Mid": 7500, "Late": 14000}

CLASS_BASE: Dict[str, Dict[str, float]] = {
    "Fighter":  {"patk": .35, "ap": .05, "def": .15, "hp": .15, "cdr": .15, "crit": .05, "ms": .05},
    "Tank":     {"patk": .10, "ap": .00, "def": .35, "hp": .30, "cdr": .10, "crit": .00, "ms": .15},
    "Mage":     {"patk": .05, "ap": .45, "def": .10, "hp": .10, "cdr": .20, "crit": .00, "ms": .10},
    "Assassin": {"patk": .45, "ap": .05, "def": .10, "hp": .10, "cdr": .20, "crit": .10, "ms": .00},
    "Carry":    {"patk": .45, "ap": .00, "def": .10, "hp": .10, "cdr": .10, "crit": .20, "ms": .05},
    "Support":  {"patk": .05, "ap": .25, "def": .20, "hp": .25, "cdr": .20, "crit": .00, "ms": .05},
}
for v in CLASS_BASE.values():
    v.setdefault("ms", .05)

# -----------------------------------------------------------------
# ชื่อคลาสที่ DB / Skill ใช้ไม่ตรงกัน  → Map ให้เหมือนกัน
CLASS_ALIAS = {
    "Magic": "Magic", "Mage": "Magic",
    "Attack": "Attack", "Physical": "Attack",
    "Defense": "Defense",
    "Movement": "Movement", "Boot": "Movement",
    "Jungle": "Jungle",
    "Support": "Support",
}
def _norm_class(name: str | None) -> str:
    return CLASS_ALIAS.get(name or "", name or "")
# -----------------------------------------------------------------

SYNERGY_RULES: List[Tuple[set, float]] = [
    ({"I006", "I001"}, 0.20),     # Cleaving Claymore + Short Sword
    ({"I020", "I015"}, 0.25),     # Claves Sancti + Bow of Slaughter
]

UNIQUE_SINGLE = {"I049"}          # Blade of Eternity  (1 ชิ้นต่อเกม)

# =============================================================
# 3. DATA LOADERS
# =============================================================
@lru_cache(maxsize=None)
def load_item_data() -> Dict[str, Dict]:
    """โหลดไอเท็มทั้งหมด + Normalize class + cast number"""
    with get_conn() as conn:
        cur = conn.cursor(dictionary=True)
        cur.execute("SELECT * FROM items")
        out: Dict[str, Dict] = {}
        for row in cur.fetchall():
            row["Class"] = _norm_class(row.get("Class"))
            for k, v in row.items():
                if k not in ("ItemID", "ItemName", "Class") and v is not None:
                    try:
                        row[k] = float(v)
                    except (ValueError, TypeError):
                        row[k] = 0.0
            out[row["ItemID"]] = row
        return out


@lru_cache(maxsize=None)
def load_item_comp() -> Dict[str, List[str]]:
    with get_conn() as conn:
        cur = conn.cursor(dictionary=True)
        cur.execute("SELECT Composite_ItemID, BaseItemID FROM itemcomposition")
        comp: Dict[str, List[str]] = {}
        for r in cur.fetchall():
            comp.setdefault(r["Composite_ItemID"], []).append(r["BaseItemID"])
        return comp


@lru_cache(maxsize=128)
def load_hero_stats(hero_id: str, level: int) -> Dict:
    with get_conn() as conn:
        cur = conn.cursor(dictionary=True)
        cur.execute("SELECT * FROM herostats WHERE HeroID=%s AND Level=%s",
                    (hero_id, level))
        return cur.fetchone() or {}


@lru_cache(maxsize=128)
def get_recommended_item_types(hero_id: str) -> List[str]:
    """คืนรายการ class ไอเท็มที่สกิลของฮีโร่แนะนำ (ผ่าน alias แล้ว)"""
    with get_conn() as conn:
        cur = conn.cursor()
        cur.execute("SELECT DISTINCT RecommendItemType FROM heroskills "
                    "WHERE HeroID=%s", (hero_id,))
        return [_norm_class(r[0]) for r in cur.fetchall() if r[0]]


@lru_cache(maxsize=128)
def get_hero_info(hero_id: str) -> Dict:
    with get_conn() as conn:
        cur = conn.cursor()
        cur.execute("SELECT First_Class, Second_Class, First_Lane "
                    "FROM heroes WHERE HeroID=%s", (hero_id,))
        row = cur.fetchone()
        if not row:
            return {"primary": "Fighter", "secondary": None, "lane": "Mid"}
        return {"primary": row[0], "secondary": row[1] or None,
                "lane": row[2] or "Mid"}

# =============================================================
# 4. STAT CAP (ใช้ Max(hero,item) + buffer)
# =============================================================
@lru_cache(maxsize=128)
def load_stat_caps(hero_id: str, buffer: float = 0.20) -> Dict[str, float]:
    items = load_item_data().values()
    hero_max = load_hero_stats(hero_id, 15)            # level cap

    def item_max(key: str) -> float:
        return max(float(r.get(key) or 0) for r in items)

    return {
        "PATK": max(hero_max.get("Phys_ATK", 0),     item_max("Phys_ATK"))   * (1 + buffer),
        "AP":   max(hero_max.get("Magic_Power", 0),  item_max("Magic_Power"))* (1 + buffer),
        "DEF":  max(hero_max.get("Phys_Defense", 0), item_max("Phys_Defense"))* (1 + buffer),
        "HP":   max(hero_max.get("HP", 0),           item_max("HP"))         * (1 + buffer),
        "MS":   max(hero_max.get("Movement_Speed",0),item_max("Movement_Speed"))*1.10,
        "CDR":  0.40,
        "CRIT": 1.00,
    }

# =============================================================
# 5. HELPER  (weight + synergy)
# =============================================================
def get_phase_weight(hero_info: Dict, phase: str) -> Dict[str, float]:
    w = copy.deepcopy(CLASS_BASE.get(hero_info["primary"], CLASS_BASE["Fighter"]))
    if hero_info["secondary"] == "Dark Slayer":
        w["patk"] += 0.05
    if hero_info["lane"] == "Support":
        w["hp"] += 0.05
        w["cdr"] += 0.05
    if phase == "Early":
        w["patk"] += 0.05
    elif phase == "Late":
        w["hp"] += 0.10
    return w


def calc_synergy(chromo: List[str]) -> float:
    score = 0.0
    s = set(chromo)
    for req, bonus in SYNERGY_RULES:
        if req.issubset(s):
            score += bonus
    # mini-rule: 3 ไอเท็มสาย Defense ได้โบนัสเล็ก
    if sum(1 for it in chromo if item_data[it]["Class"] == "Defense") >= 3:
        score += 0.20
    return score

# =============================================================
# 6. VALID / REPAIR
# =============================================================
item_data = load_item_data()
item_comp = load_item_comp()


def is_valid_chromo(ch: List[str]) -> bool:
    if sum(1 for it in ch if item_data[it]["Class"] == "Movement") > 1:
        return False
    for it in ch:
        if it in item_comp and not all(b in ch for b in item_comp[it]):
            return False
    for u in UNIQUE_SINGLE:
        if ch.count(u) > 1:
            return False
    return True


def repair_chromosome(ch: List[str],
                      lane: str,
                      ban: set,
                      force: set) -> List[str]:
    pool = [it for it in item_data if it not in ban]
    seen, out = set(), []
    for it in ch:
        if it not in ban and it not in seen:
            out.append(it)
            seen.add(it)
    for it in force:
        if it in ban:
            raise ValueError("force item in ban list")
        if it not in out:
            out.append(it)

    if lane == "Jungle" and not any(item_data[it]["Class"] == "Jungle" for it in out):
        jungle_pool = [it for it in pool if item_data[it]["Class"] == "Jungle"]
        if jungle_pool:
            out.append(random.choice(jungle_pool))

    while len(out) < 6:
        cand = random.choice(pool)
        if cand not in out:
            out.append(cand)

    tries = 0
    while not is_valid_chromo(out) and tries < 10:
        idx = random.randrange(6)
        repl = random.choice([it for it in pool if it not in out])
        out[idx] = repl
        tries += 1
    return out[:6]

# =============================================================
# 7. FITNESS
# =============================================================
def _n(x):           # safe-cast (None/str → 0.0)
    return float(x) if x not in (None, "", "NULL") else 0.0


def score_stats(ch: List[str],
                hero_base: Dict,
                caps: Dict,
                weights: Dict) -> float:
    total = {
        "patk": _n(hero_base.get("Phys_ATK")),
        "ap":   _n(hero_base.get("Magic_Power")),
        "def":  _n(hero_base.get("Phys_Defense")),
        "hp":   _n(hero_base.get("HP")),
        "cdr":  0.0,
        "crit": 0.0,
        "ms":   _n(hero_base.get("Movement_Speed")),
    }
    for it in ch:
        row = item_data[it]
        total["patk"] += _n(row.get("Phys_ATK"))
        total["ap"]   += _n(row.get("Magic_Power"))
        total["def"]  += _n(row.get("Phys_Defense"))
        total["hp"]   += _n(row.get("HP"))
        total["cdr"]  += _n(row.get("Cooldown_Reduction"))
        total["crit"] += _n(row.get("Critical_Rate"))
        total["ms"]   += _n(row.get("Movement_Speed"))

    def norm(v, cap):
        return min(v / cap, 1.0) if cap else 0.0

    return (
        weights["patk"] * norm(total["patk"], caps["PATK"]) +
        weights["ap"]   * norm(total["ap"],   caps["AP"])   +
        weights["def"]  * norm(total["def"],  caps["DEF"])  +
        weights["hp"]   * norm(total["hp"],   caps["HP"])   +
        weights["cdr"]  * norm(total["cdr"],  caps["CDR"])  +
        weights["crit"] * norm(total["crit"], caps["CRIT"]) +
        weights["ms"]   * norm(total["ms"],   caps["MS"])
    )


def score_budget(cost: int, budget: int, phase: str) -> float:
    if cost <= budget:
        return 1.0
    penalty = ((cost - budget) / budget) ** (1.2 if phase == "Late" else 1.5)
    return max(0.0, 1 - penalty)


def calculate_fitness(ch: List[str],
                      hero_id: str,
                      hero_info: Dict,
                      phase: str,
                      caps: Dict) -> float:
    level = {"Early": 3, "Mid": 9, "Late": 15}[phase]
    hero_base = load_hero_stats(hero_id, level)
    weights = get_phase_weight(hero_info, phase)

    stat_score   = score_stats(ch, hero_base, caps, weights)
    cost         = int(sum(item_data[it]["Price"] for it in ch))
    budget       = BASE_BUDGET[phase] * (1.1 if hero_info["lane"] == "Jungle" and phase == "Early" else 1.0)
    budget_score = score_budget(cost, budget, phase)

    rec_types    = get_recommended_item_types(hero_id)
    skill_bonus  = sum(0.20 for it in ch if item_data[it]["Class"] in rec_types)
    synergy      = calc_synergy(ch)
    unique_bonus = sum(0.15 for it in ch if it in UNIQUE_SINGLE)

    return (stat_score + budget_score + skill_bonus + synergy + unique_bonus) * PHASE_WEIGHTS[phase]

# =============================================================
# 8. GA OPERATORS
# =============================================================
def tournament_select(pop: List[List[str]], fitness: List[float], k: int = 3) -> List[str]:
    cand = random.sample(list(zip(pop, fitness)), k)
    return max(cand, key=lambda x: x[1])[0]


def crossover(p1: List[str], p2: List[str]) -> Tuple[List[str], List[str]]:
    if random.random() < CROSSOVER_RATE:
        point = random.randint(1, 5)
        return p1[:point] + p2[point:], p2[:point] + p1[point:]
    return p1[:], p2[:]


def adaptive_mut_rate(gen: int) -> float:
    return BASE_MUT_RATE * math.exp(-gen / (0.6 * MAX_GEN))


def mutate(ch: List[str],
           pool: List[str],
           rate: float,
           lane: str,
           ban: set,
           force: set) -> List[str]:
    out = ch[:]
    available = [it for it in pool if it not in out and it not in ban]
    for i in range(6):
        if out[i] not in force and random.random() < rate and available:
            out[i] = random.choice(available)
            available.remove(out[i])
    return repair_chromosome(out, lane, ban, force)

# =============================================================
# 9. MAIN GA
# =============================================================
def run_genetic_algorithm(hero_id: str,
                          lane: str,
                          force_items: List[str] | None = None,
                          ban_items:   List[str] | None = None):
    force_set, ban_set = set(force_items or []), set(ban_items or [])
    if force_set & ban_set:
        raise ValueError("force_items conflict with ban_items")

    hero_info = get_hero_info(hero_id)
    hero_info["lane"] = lane
    caps = load_stat_caps(hero_id)

    pool = [it for it in item_data if it not in ban_set]
    pop  = [repair_chromosome(random.sample(pool, 6), lane, ban_set, force_set)
            for _ in range(POP_SIZE)]

    best_fit, best_ch, stagnant = -1.0, None, 0

    for gen in range(MAX_GEN):
        fits = [sum(calculate_fitness(ch, hero_id, hero_info, ph, caps)
                    for ph in PHASE_WEIGHTS)
                for ch in pop]

        gen_best = max(fits)
        if gen_best > best_fit + 1e-6:
            best_fit = gen_best
            best_ch  = pop[fits.index(gen_best)][:]   # deep-copy
            stagnant = 0
        else:
            stagnant += 1
        if stagnant >= STAGNANT_LIMIT:
            break

        offspring: List[List[str]] = []
        while len(offspring) < POP_SIZE:
            p1 = tournament_select(pop, fits)
            p2 = tournament_select(pop, fits)
            c1, c2 = crossover(p1, p2)
            offspring.extend([c1, c2])

        rate = adaptive_mut_rate(gen)
        offspring = [mutate(ch, pool, rate, lane, ban_set, force_set)
                     for ch in offspring[:POP_SIZE]]

        # elitism
        offspring[random.randrange(POP_SIZE)] = best_ch[:]
        pop = offspring

    return best_ch, best_fit

# =============================================================
# 10. Demo (remove in production)
# =============================================================
if __name__ == "__main__":
    build, fit = run_genetic_algorithm("H001", "Mid")
    print("Best Build:", build)
    print("Fitness:", fit)


Best Build: ['I059', 'I069', 'I071', 'I070', 'I064', 'I078']
Fitness: 2.773533131489461


0.0.8

In [None]:
from __future__ import annotations
import os, random, math, time, copy
from functools import lru_cache
from typing import Dict, List, Tuple

import mysql.connector
from mysql.connector import pooling, Error
from dotenv import load_dotenv

# =============================================================
# 1. ENV / DB
# =============================================================
load_dotenv('../config/.env')
DB_CONFIG = {
    "host":     os.getenv("DB_HOST"),
    "port":     int(os.getenv("DB_PRIMARY_PORT", 3306)),
    "user":     os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "database": os.getenv("DB_NAME"),
    "charset":  "utf8mb4",
}
POOL = pooling.MySQLConnectionPool(pool_name="rov_pool", pool_size=3, **DB_CONFIG)


def get_conn():
    tries = 0
    while tries < 3:
        try:
            return POOL.get_connection()
        except Error:
            tries += 1
            time.sleep(1)
    raise RuntimeError("Cannot acquire DB connection")

# =============================================================
# 2. CONSTANTS
# =============================================================
POP_SIZE       = 100
MAX_GEN        = 50
CROSSOVER_RATE = 0.8
BASE_MUT_RATE  = 0.05
STAGNANT_LIMIT = 6

PHASE_WEIGHTS = {"Early": .2, "Mid": .3, "Late": .5}
BASE_BUDGET   = {"Early": 2700, "Mid": 7500, "Late": 14000}

CLASS_BASE: Dict[str, Dict[str, float]] = {
    "Fighter":  {"patk": .35, "ap": .05, "def": .15, "hp": .15, "cdr": .15, "crit": .05, "ms": .05},
    "Tank":     {"patk": .10, "ap": .00, "def": .35, "hp": .30, "cdr": .10, "crit": .00, "ms": .15},
    "Mage":     {"patk": .05, "ap": .45, "def": .10, "hp": .10, "cdr": .20, "crit": .00, "ms": .10},
    "Assassin": {"patk": .45, "ap": .05, "def": .10, "hp": .10, "cdr": .20, "crit": .10, "ms": .00},
    "Carry":    {"patk": .45, "ap": .00, "def": .10, "hp": .10, "cdr": .10, "crit": .20, "ms": .05},
    "Support":  {"patk": .05, "ap": .25, "def": .20, "hp": .25, "cdr": .20, "crit": .00, "ms": .05},
}
for v in CLASS_BASE.values():
    v.setdefault("ms", .05)  # เผื่อ future key

# ---- class alias (Fix #1) -----------------------------------
CLASS_ALIAS = {
    "Magic": "Magic", "Mage": "Magic",
    "Attack": "Attack", "Physical": "Attack",
    "Defense": "Defense",
    "Movement": "Movement", "Boot": "Movement",
    "Jungle": "Jungle",
    "Support": "Support",
}
def _norm_class(n: str | None) -> str:
    return CLASS_ALIAS.get(n or "", n or "")
# -------------------------------------------------------------

SYNERGY_RULES: List[Tuple[set, float]] = [
    ({"I006", "I001"}, .20),
    ({"I020", "I015"}, .25),
]
UNIQUE_SINGLE = {"I049"}

# =============================================================
# 3. DATA LOADERS  (Fix #2 & #3)
# =============================================================
@lru_cache(maxsize=None)
def load_item_data() -> Dict[str, Dict]:
    with get_conn() as conn:
        cur = conn.cursor(dictionary=True)
        cur.execute("SELECT * FROM items")
        out: Dict[str, Dict] = {}
        for row in cur.fetchall():
            row["Class"] = _norm_class(row.get("Class"))
            for k, v in row.items():
                if k not in ("ItemID", "ItemName", "Class") and v is not None:
                    try:
                        row[k] = float(v)
                    except (TypeError, ValueError):
                        row[k] = 0.0
            out[row["ItemID"]] = row
        return out


@lru_cache(maxsize=None)
def load_item_comp() -> Dict[str, List[str]]:
    with get_conn() as conn:
        cur = conn.cursor(dictionary=True)
        cur.execute("SELECT Composite_ItemID, BaseItemID FROM itemcomposition")
        comp: Dict[str, List[str]] = {}
        for r in cur.fetchall():
            comp.setdefault(r["Composite_ItemID"], []).append(r["BaseItemID"])
        return comp


@lru_cache(maxsize=128)
def load_hero_stats(hero_id: str, level: int) -> Dict:
    with get_conn() as conn:
        cur = conn.cursor(dictionary=True)
        cur.execute("SELECT * FROM herostats WHERE HeroID=%s AND Level=%s",
                    (hero_id, level))
        return cur.fetchone() or {}


@lru_cache(maxsize=128)
def get_recommended_item_types(hero_id: str) -> List[str]:
    with get_conn() as conn:
        cur = conn.cursor()
        cur.execute("SELECT DISTINCT RecommendItemType "
                    "FROM heroskills WHERE HeroID=%s", (hero_id,))
        return [_norm_class(r[0]) for r in cur.fetchall() if r[0]]


@lru_cache(maxsize=128)
def get_hero_info(hero_id: str) -> Dict:
    with get_conn() as conn:
        cur = conn.cursor()
        cur.execute("SELECT First_Class, Second_Class, First_Lane "
                    "FROM heroes WHERE HeroID=%s", (hero_id,))
        r = cur.fetchone()
        if not r:
            return {"primary": "Fighter", "secondary": None, "lane": "Mid"}
        return {"primary": r[0], "secondary": r[1] or None, "lane": r[2] or "Mid"}

# =============================================================
# 4. STAT CAP  (Fix #4)
# =============================================================
@lru_cache(maxsize=128)
def load_stat_caps(hero_id: str, buf: float = .20) -> Dict[str, float]:
    items = load_item_data().values()
    hero_max = load_hero_stats(hero_id, 15)

    def item_max(k): return max(float(r.get(k) or 0) for r in items)

    return {
        "PATK": max(hero_max.get("Phys_ATK", 0),     item_max("Phys_ATK"))     * (1+buf),
        "AP":   max(hero_max.get("Magic_Power", 0),  item_max("Magic_Power"))  * (1+buf),
        "DEF":  max(hero_max.get("Phys_Defense", 0), item_max("Phys_Defense")) * (1+buf),
        "HP":   max(hero_max.get("HP", 0),           item_max("HP"))           * (1+buf),
        "MS":   max(hero_max.get("Movement_Speed",0),item_max("Movement_Speed"))*1.10,
        "CDR":  .40,
        "CRIT": 1.00,
    }

# =============================================================
# 5. HELPERS
# =============================================================
def get_phase_weight(hero_info: Dict, phase: str) -> Dict[str, float]:
    w = copy.deepcopy(CLASS_BASE.get(hero_info["primary"], CLASS_BASE["Fighter"]))
    if hero_info["secondary"] == "Dark Slayer":
        w["patk"] += .05
    if hero_info["lane"] == "Support":
        w["hp"] += .05; w["cdr"] += .05
    if phase == "Early": w["patk"] += .05
    elif phase == "Late": w["hp"] += .10
    return w


def calc_synergy(ch: List[str]) -> float:
    score = 0.0
    s = set(ch)
    for req, bonus in SYNERGY_RULES:
        if req.issubset(s): score += bonus
    if sum(1 for it in ch if item_data[it]["Class"] == "Defense") >= 3:
        score += .20
    return score

# =============================================================
# 6. VALID / REPAIR
# =============================================================
item_data = load_item_data()
item_comp = load_item_comp()


def is_valid_chromo(ch: List[str]) -> bool:
    if sum(1 for it in ch if item_data[it]["Class"] == "Movement") > 1: return False
    for it in ch:
        if it in item_comp and not all(b in ch for b in item_comp[it]): return False
    for u in UNIQUE_SINGLE:
        if ch.count(u) > 1: return False
    return True


def repair_chromosome(ch: List[str], lane: str, ban: set, force: set) -> List[str]:
    pool = [i for i in item_data if i not in ban]
    seen, out = set(), []
    for it in ch:
        if it not in ban and it not in seen:
            out.append(it); seen.add(it)
    for it in force:
        if it in ban: raise ValueError("force item in ban list")
        if it not in out: out.append(it)
    if lane == "Jungle" and not any(item_data[it]["Class"] == "Jungle" for it in out):
        jungle_pool = [i for i in pool if item_data[i]["Class"] == "Jungle"]
        if jungle_pool: out.append(random.choice(jungle_pool))
    while len(out) < 6:
        cand = random.choice(pool)
        if cand not in out: out.append(cand)
    tries = 0
    while not is_valid_chromo(out) and tries < 10:
        idx = random.randrange(6)
        repl = random.choice([i for i in pool if i not in out])
        out[idx] = repl; tries += 1
    return out[:6]

# =============================================================
# 7. FITNESS (Fix #5, #6, #7)
# =============================================================
def _n(x):  # safe float
    return float(x) if x not in (None, "", "NULL") else 0.0


def score_stats(ch: List[str], hero_base: Dict, caps: Dict, w: Dict) -> float:
    total = {
        "patk": _n(hero_base.get("Phys_ATK")),
        "ap":   _n(hero_base.get("Magic_Power")),
        "def":  _n(hero_base.get("Phys_Defense")),
        "hp":   _n(hero_base.get("HP")),
        "cdr":  0.0,
        "crit": 0.0,
        "ms":   _n(hero_base.get("Movement_Speed")),
    }
    for it in ch:
        row = item_data[it]
        total["patk"] += _n(row.get("Phys_ATK"))
        total["ap"]   += _n(row.get("Magic_Power"))
        total["def"]  += _n(row.get("Phys_Defense"))
        total["hp"]   += _n(row.get("HP"))
        total["cdr"]  += _n(row.get("Cooldown_Reduction"))
        total["crit"] += _n(row.get("Critical_Rate"))
        total["ms"]   += _n(row.get("Movement_Speed"))

    def norm(v, cap): return min(v / cap, 1.0) if cap else 0.0
    return (
        w["patk"] * norm(total["patk"], caps["PATK"]) +
        w["ap"]   * norm(total["ap"],   caps["AP"])   +
        w["def"]  * norm(total["def"],  caps["DEF"])  +
        w["hp"]   * norm(total["hp"],   caps["HP"])   +
        w["cdr"]  * norm(total["cdr"],  caps["CDR"])  +
        w["crit"] * norm(total["crit"], caps["CRIT"]) +
        w["ms"]   * norm(total["ms"],   caps["MS"])
    )


def score_budget(cost: int, budget: int, phase: str) -> float:
    if cost <= budget: return 1.0
    penalty = ((cost - budget) / budget) ** (1.2 if phase == "Late" else 1.5)
    return max(0.0, 1 - penalty)


def calculate_fitness(ch: List[str], hero_id: str, hero_info: Dict,
                      phase: str, caps: Dict) -> float:
    lvl        = {"Early":3, "Mid":9, "Late":15}[phase]
    hero_base  = load_hero_stats(hero_id, lvl)
    weights    = get_phase_weight(hero_info, phase)
    stat_score = score_stats(ch, hero_base, caps, weights)
    cost       = int(sum(item_data[i]["Price"] for i in ch))
    budget     = BASE_BUDGET[phase] * (1.1 if hero_info["lane"]=="Jungle" and phase=="Early" else 1.0)
    budget_sc  = score_budget(cost, budget, phase)

    rec_types   = get_recommended_item_types(hero_id)
    skill_bonus = sum(.20 for it in ch if item_data[it]["Class"] in rec_types)
    synergy     = calc_synergy(ch)
    uniq_bonus  = sum(.15 for it in ch if it in UNIQUE_SINGLE)

    return (stat_score + budget_sc + skill_bonus + synergy + uniq_bonus) * PHASE_WEIGHTS[phase]

# =============================================================
# 8. GA OPERATORS
# =============================================================
def tournament_select(pop: List[List[str]], fit: List[float], k: int = 3) -> List[str]:
    return max(random.sample(list(zip(pop, fit)), k), key=lambda x: x[1])[0]


def crossover(p1: List[str], p2: List[str]) -> Tuple[List[str], List[str]]:
    if random.random() < CROSSOVER_RATE:
        p = random.randint(1, 5)
        return p1[:p]+p2[p:], p2[:p]+p1[p:]
    return p1[:], p2[:]


def adaptive_mut_rate(gen: int) -> float:
    return BASE_MUT_RATE * math.exp(-gen / (0.6 * MAX_GEN))


def mutate(ch: List[str], pool: List[str], rate: float,
           lane: str, ban: set, force: set) -> List[str]:
    out = ch[:]
    avail = [i for i in pool if i not in out and i not in ban]
    for i in range(6):
        if out[i] not in force and random.random() < rate and avail:
            out[i] = random.choice(avail); avail.remove(out[i])
    return repair_chromosome(out, lane, ban, force)

# =============================================================
# 9. MAIN GA
# =============================================================
def run_genetic_algorithm(hero_id: str, lane: str,
                          force_items: List[str] | None = None,
                          ban_items:   List[str] | None = None):
    force, ban = set(force_items or []), set(ban_items or [])
    if force & ban: raise ValueError("force_items conflict with ban_items")
    hero_info = get_hero_info(hero_id); hero_info["lane"] = lane
    caps      = load_stat_caps(hero_id)
    pool      = [i for i in item_data if i not in ban]

    pop = [repair_chromosome(random.sample(pool, 6), lane, ban, force)
           for _ in range(POP_SIZE)]

    best_fit, best_ch, stagn = -1.0, None, 0
    for gen in range(MAX_GEN):
        fitness = [sum(calculate_fitness(ch, hero_id, hero_info, ph, caps)
                       for ph in PHASE_WEIGHTS) for ch in pop]
        gen_best = max(fitness)
        if gen_best > best_fit + 1e-6:
            best_fit, best_ch, stagn = gen_best, pop[fitness.index(gen_best)][:], 0
        else:
            stagn += 1
        if stagn >= STAGNANT_LIMIT: break

        offs: List[List[str]] = []
        while len(offs) < POP_SIZE:
            p1, p2 = tournament_select(pop, fitness), tournament_select(pop, fitness)
            offs.extend(crossover(p1, p2))
        rate = adaptive_mut_rate(gen)
        offs = [mutate(ch, pool, rate, lane, ban, force) for ch in offs[:POP_SIZE]]
        offs[random.randrange(POP_SIZE)] = best_ch[:]
        pop = offs
    return best_ch, best_fit

# =============================================================
# demo
# =============================================================
if __name__ == "__main__":
    build, fit = run_genetic_algorithm("H001", "Mid")
    print("Best Build:", build)
    print("Fitness:", fit)


Best Build: ['I059', 'I079', 'I066', 'I064', 'I057', 'I082']
Fitness: 2.7705819191972436


0.1.0

In [4]:
from __future__ import annotations
import os, random, math, time, copy
from functools import lru_cache
from typing import Dict, List, Tuple

import mysql.connector
from mysql.connector import pooling, Error
from dotenv import load_dotenv

# =============================================================
# 1. ENV / DB
# =============================================================
load_dotenv('../config/.env')
DB_CONFIG = {
    "host":     os.getenv("DB_HOST"),
    "port":     int(os.getenv("DB_PRIMARY_PORT", 3306)),
    "user":     os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "database": os.getenv("DB_NAME"),
    "charset":  "utf8mb4",
}
POOL = pooling.MySQLConnectionPool(pool_name="rov_pool", pool_size=3, **DB_CONFIG)

def get_conn():
    tries = 0
    while tries < 3:
        try:
            return POOL.get_connection()
        except Error:
            tries += 1
            time.sleep(1)
    raise RuntimeError("Cannot acquire DB connection")

# =============================================================
# 2. CONSTANTS
# =============================================================
POP_SIZE       = 100
MAX_GEN        = 50
CROSSOVER_RATE = 0.8
BASE_MUT_RATE  = 0.05
STAGNANT_LIMIT = 6

PHASE_WEIGHTS = {"Early": .2, "Mid": .3, "Late": .5}
BASE_BUDGET   = {"Early": 2700, "Mid": 7500, "Late": 14000}

CLASS_BASE: Dict[str, Dict[str, float]] = {
    "Fighter":  {"patk": .35, "ap": .05, "def": .15, "hp": .15, "cdr": .15, "crit": .05, "ms": .05},
    "Tank":     {"patk": .10, "ap": .00, "def": .35, "hp": .30, "cdr": .10, "crit": .00, "ms": .15},
    "Mage":     {"patk": .05, "ap": .45, "def": .10, "hp": .10, "cdr": .20, "crit": .00, "ms": .10},
    "Assassin": {"patk": .45, "ap": .05, "def": .10, "hp": .10, "cdr": .20, "crit": .10, "ms": .00},
    "Carry":    {"patk": .45, "ap": .00, "def": .10, "hp": .10, "cdr": .10, "crit": .20, "ms": .05},
    "Support":  {"patk": .05, "ap": .25, "def": .20, "hp": .25, "cdr": .20, "crit": .00, "ms": .05},
}
# ensure ms key
for v in CLASS_BASE.values():
    v.setdefault("ms", .05)

# ---- extend CLASS_BASE for new stats from herostats ----------------
NEW_STATS = [
    "as",       # Attack_Speed
    "r5",       # HP_5_sec
    "mpr",      # Mana_5_sec
    "apierce",  # Armor_Pierce
    "mpierce",  # Magic_Pierce
    "ls",       # Life_Steal
    "mls",      # Magic_Life_Steal
    "mana",     # Max_Mana
    "mdef",     # Magic_Defense
    "res",      # Resistance
]
for cls_w in CLASS_BASE.values():
    for st in NEW_STATS:
        cls_w.setdefault(st, 0.0)
# -------------------------------------------------------------------

# class alias normalization
CLASS_ALIAS = {
    "Magic": "Magic", "Mage": "Magic",
    "Attack": "Attack", "Physical": "Attack",
    "Defense": "Defense",
    "Movement": "Movement", "Boot": "Movement",
    "Jungle": "Jungle",
    "Support": "Support",
}
def _norm_class(n: str | None) -> str:
    return CLASS_ALIAS.get(n or "", n or "")

SYNERGY_RULES: List[Tuple[set, float]] = [
    ({"I006", "I001"}, .20),
    ({"I020", "I015"}, .25),
]
UNIQUE_SINGLE = {"I049"}

# =============================================================
# 3. DATA LOADERS
# =============================================================
@lru_cache(maxsize=None)
def load_item_data() -> Dict[str, Dict]:
    with get_conn() as conn:
        cur = conn.cursor(dictionary=True)
        cur.execute("SELECT * FROM items")
        out: Dict[str, Dict] = {}
        for row in cur.fetchall():
            row["Class"] = _norm_class(row.get("Class"))
            for k, v in row.items():
                if k not in ("ItemID", "ItemName", "Class") and v is not None:
                    try:
                        row[k] = float(v)
                    except (TypeError, ValueError):
                        row[k] = 0.0
            out[row["ItemID"]] = row
        return out

@lru_cache(maxsize=None)
def load_item_comp() -> Dict[str, List[str]]:
    with get_conn() as conn:
        cur = conn.cursor(dictionary=True)
        cur.execute("SELECT Composite_ItemID, BaseItemID FROM itemcomposition")
        comp: Dict[str, List[str]] = {}
        for r in cur.fetchall():
            comp.setdefault(r["Composite_ItemID"], []).append(r["BaseItemID"])
        return comp

@lru_cache(maxsize=128)
def load_hero_stats(hero_id: str, level: int) -> Dict:
    with get_conn() as conn:
        cur = conn.cursor(dictionary=True)
        cur.execute("SELECT * FROM herostats WHERE HeroID=%s AND Level=%s",
                    (hero_id, level))
        return cur.fetchone() or {}

@lru_cache(maxsize=128)
def get_recommended_item_types(hero_id: str) -> List[str]:
    with get_conn() as conn:
        cur = conn.cursor()
        cur.execute("SELECT DISTINCT RecommendItemType FROM heroskills WHERE HeroID=%s",
                    (hero_id,))
        return [_norm_class(r[0]) for r in cur.fetchall() if r[0]]

@lru_cache(maxsize=128)
def get_hero_info(hero_id: str) -> Dict:
    with get_conn() as conn:
        cur = conn.cursor()
        cur.execute("SELECT First_Class, Second_Class, First_Lane FROM heroes WHERE HeroID=%s",
                    (hero_id,))
        r = cur.fetchone()
        if not r:
            return {"primary": "Fighter", "secondary": None, "lane": "Mid"}
        return {"primary": r[0], "secondary": r[1] or None, "lane": r[2] or "Mid"}

# =============================================================
# 4. STAT CAPS
# =============================================================
@lru_cache(maxsize=128)
def load_stat_caps(hero_id: str, buf: float = .20) -> Dict[str, float]:
    items    = load_item_data().values()
    hero_max = load_hero_stats(hero_id, 15)
    def item_max(k): return max(float(r.get(k) or 0) for r in items)

    return {
        "PATK":     max(hero_max.get("Phys_ATK",      0), item_max("Phys_ATK"))       * (1+buf),
        "AP":       max(hero_max.get("Magic_Power",   0), item_max("Magic_Power"))    * (1+buf),
        "DEF":      max(hero_max.get("Phys_Defense",  0), item_max("Phys_Defense"))   * (1+buf),
        "HP":       max(hero_max.get("HP",            0), item_max("HP"))             * (1+buf),
        "MS":       max(hero_max.get("Movement_Speed",0), item_max("Movement_Speed")) * 1.10,
        "CDR":      0.40,
        "CRIT":     1.00,
        # extended caps
        "AS":       max(hero_max.get("Attack_Speed",      0), item_max("Attack_Speed"))      * (1+buf),
        "R5":       max(hero_max.get("HP_5_sec",          0), item_max("HP_5_sec"))          * (1+buf),
        "MPR":      max(hero_max.get("Mana_5_sec",        0), item_max("Mana_5_sec"))        * (1+buf),
        "APIERCE":  max(hero_max.get("Armor_Pierce",      0), item_max("Armor_Pierce"))      * (1+buf),
        "MPIERCE":  max(hero_max.get("Magic_Pierce",      0), item_max("Magic_Pierce"))      * (1+buf),
        "LS":       max(hero_max.get("Life_Steal",        0), item_max("Life_Steal"))        * (1+buf),
        "MLS":      max(hero_max.get("Magic_Life_Steal",  0), item_max("Magic_Life_Steal"))  * (1+buf),
        "MANA":     max(hero_max.get("Max_Mana",          0), item_max("Max_Mana"))          * (1+buf),
        "MDEF":     max(hero_max.get("Magic_Defense",     0), item_max("Magic_Defense"))     * (1+buf),
        "RES":      max(hero_max.get("Resistance",        0), item_max("Resistance"))        * (1+buf),
    }

# =============================================================
# 5. HELPERS
# =============================================================
def get_phase_weight(hero_info: Dict, phase: str) -> Dict[str, float]:
    w = copy.deepcopy(CLASS_BASE.get(hero_info["primary"], CLASS_BASE["Fighter"]))
    if hero_info["secondary"] == "Dark Slayer":
        w["patk"] += .05
    if hero_info["lane"] == "Support":
        w["hp"] += .05; w["cdr"] += .05
    if phase == "Early":
        w["patk"] += .05
    elif phase == "Late":
        w["hp"] += .10
    return w

def calc_synergy(ch: List[str]) -> float:
    score = 0.0
    s = set(ch)
    for req, bonus in SYNERGY_RULES:
        if req.issubset(s):
            score += bonus
    if sum(1 for it in ch if item_data[it]["Class"] == "Defense") >= 3:
        score += .20
    return score

# =============================================================
# 6. VALID / REPAIR
# =============================================================
item_data = load_item_data()
item_comp = load_item_comp()

def is_valid_chromo(ch: List[str]) -> bool:
    if sum(1 for it in ch if item_data[it]["Class"] == "Movement") > 1:
        return False
    for it in ch:
        if it in item_comp and not all(b in ch for b in item_comp[it]):
            return False
    for u in UNIQUE_SINGLE:
        if ch.count(u) > 1:
            return False
    return True

def repair_chromosome(ch: List[str], lane: str, ban: set, force: set) -> List[str]:
    pool = [i for i in item_data if i not in ban]
    seen, out = set(), []
    for it in ch:
        if it not in ban and it not in seen:
            out.append(it); seen.add(it)
    for it in force:
        if it in ban:
            raise ValueError("force item in ban list")
        if it not in out:
            out.append(it)
    if lane == "Jungle" and not any(item_data[it]["Class"] == "Jungle" for it in out):
        jungle_pool = [i for i in pool if item_data[i]["Class"] == "Jungle"]
        if jungle_pool:
            out.append(random.choice(jungle_pool))
    while len(out) < 6:
        cand = random.choice(pool)
        if cand not in out:
            out.append(cand)
    tries = 0
    while not is_valid_chromo(out) and tries < 10:
        idx = random.randrange(6)
        repl = random.choice([i for i in pool if i not in out])
        out[idx] = repl; tries += 1
    return out[:6]

# =============================================================
# 7. FITNESS
# =============================================================
def _n(x):
    return float(x) if x not in (None, "", "NULL") else 0.0

def score_stats(ch: List[str], hero_base: Dict, caps: Dict, w: Dict) -> float:
    total = {
        "patk":  _n(hero_base.get("Phys_ATK")),
        "ap":    _n(hero_base.get("Magic_Power")),
        "def":   _n(hero_base.get("Phys_Defense")),
        "hp":    _n(hero_base.get("HP")),
        "cdr":   _n(hero_base.get("Cooldown_Reduction")),
        "crit":  _n(hero_base.get("Critical_Rate")),
        "ms":    _n(hero_base.get("Movement_Speed")),
        "as":    _n(hero_base.get("Attack_Speed")),
        "r5":    _n(hero_base.get("HP_5_sec")),
        "mpr":   _n(hero_base.get("Mana_5_sec")),
        "apierce": _n(hero_base.get("Armor_Pierce")),
        "mpierce": _n(hero_base.get("Magic_Pierce")),
        "ls":    _n(hero_base.get("Life_Steal")),
        "mls":   _n(hero_base.get("Magic_Life_Steal")),
        "mana":  _n(hero_base.get("Max_Mana")),
        "mdef":  _n(hero_base.get("Magic_Defense")),
        "res":   _n(hero_base.get("Resistance")),
    }
    for it in ch:
        row = item_data[it]
        total["patk"]   += _n(row.get("Phys_ATK"))
        total["ap"]     += _n(row.get("Magic_Power"))
        total["def"]    += _n(row.get("Phys_Defense"))
        total["hp"]     += _n(row.get("HP"))
        total["cdr"]    += _n(row.get("Cooldown_Reduction"))
        total["crit"]   += _n(row.get("Critical_Rate"))
        total["ms"]     += _n(row.get("Movement_Speed"))
        total["as"]     += _n(row.get("Attack_Speed"))
        total["r5"]     += _n(row.get("HP_5_sec"))
        total["mpr"]    += _n(row.get("Mana_5_sec"))
        total["apierce"]+= _n(row.get("Armor_Pierce"))
        total["mpierce"]+= _n(row.get("Magic_Pierce"))
        total["ls"]     += _n(row.get("Life_Steal"))
        total["mls"]    += _n(row.get("Magic_Life_Steal"))
        total["mana"]   += _n(row.get("Max_Mana"))
        total["mdef"]   += _n(row.get("Magic_Defense"))
        total["res"]    += _n(row.get("Resistance"))

    def norm(v, cap): return min(v / cap, 1.0) if cap else 0.0

    score = 0.0
    for stat, weight in w.items():
        cap_key = {
            "patk": "PATK","ap": "AP","def": "DEF","hp": "HP",
            "cdr": "CDR","crit": "CRIT","ms": "MS",
            "as": "AS","r5": "R5","mpr": "MPR",
            "apierce": "APIERCE","mpierce": "MPIERCE",
            "ls": "LS","mls": "MLS","mana": "MANA",
            "mdef": "MDEF","res": "RES"
        }[stat]
        score += weight * norm(total[stat], caps[cap_key])
    return score

def score_budget(cost: int, budget: int, phase: str) -> float:
    if cost <= budget:
        return 1.0
    penalty = ((cost - budget) / budget) ** (1.2 if phase == "Late" else 1.5)
    return max(0.0, 1 - penalty)

def calculate_fitness(ch: List[str], hero_id: str, hero_info: Dict,
                      phase: str, caps: Dict) -> float:
    lvl        = {"Early":3, "Mid":9, "Late":15}[phase]
    hero_base  = load_hero_stats(hero_id, lvl)
    weights    = get_phase_weight(hero_info, phase)
    stat_score = score_stats(ch, hero_base, caps, weights)
    cost       = int(sum(item_data[i]["Price"] for i in ch))
    budget     = BASE_BUDGET[phase] * (1.1 if hero_info["lane"]=="Jungle" and phase=="Early" else 1.0)
    budget_sc  = score_budget(cost, budget, phase)

    rec_types   = get_recommended_item_types(hero_id)
    skill_bonus = sum(.20 for it in ch if item_data[it]["Class"] in rec_types)
    synergy     = calc_synergy(ch)
    uniq_bonus  = sum(.15 for it in ch if it in UNIQUE_SINGLE)

    return (stat_score + budget_sc + skill_bonus + synergy + uniq_bonus) * PHASE_WEIGHTS[phase]

# =============================================================
# 8. GA OPERATORS
# =============================================================
def tournament_select(pop: List[List[str]], fit: List[float], k: int = 3) -> List[str]:
    return max(random.sample(list(zip(pop, fit)), k), key=lambda x: x[1])[0]

def crossover(p1: List[str], p2: List[str]) -> Tuple[List[str], List[str]]:
    if random.random() < CROSSOVER_RATE:
        p = random.randint(1, 5)
        return p1[:p] + p2[p:], p2[:p] + p1[p:]
    return p1[:], p2[:]

def adaptive_mut_rate(gen: int) -> float:
    return BASE_MUT_RATE * math.exp(-gen / (0.6 * MAX_GEN))

def mutate(ch: List[str], pool: List[str], rate: float,
           lane: str, ban: set, force: set) -> List[str]:
    out = ch[:]
    avail = [i for i in pool if i not in out and i not in ban]
    for i in range(6):
        if out[i] not in force and random.random() < rate and avail:
            out[i] = random.choice(avail)
            avail.remove(out[i])
    return repair_chromosome(out, lane, ban, force)

# =============================================================
# 9. MAIN GA
# =============================================================
def run_genetic_algorithm(hero_id: str, lane: str,
                          force_items: List[str] | None = None,
                          ban_items:   List[str] | None = None):
    force, ban = set(force_items or []), set(ban_items or [])
    if force & ban:
        raise ValueError("force_items conflict with ban_items")
    hero_info = get_hero_info(hero_id)
    hero_info["lane"] = lane
    caps      = load_stat_caps(hero_id)
    pool      = [i for i in item_data if i not in ban]

    pop = [
        repair_chromosome(random.sample(pool, 6), lane, ban, force)
        for _ in range(POP_SIZE)
    ]

    best_fit, best_ch, stagn = -1.0, None, 0
    for gen in range(MAX_GEN):
        fitness = [
            sum(calculate_fitness(ch, hero_id, hero_info, ph, caps)
                for ph in PHASE_WEIGHTS)
            for ch in pop
        ]
        gen_best = max(fitness)
        if gen_best > best_fit + 1e-6:
            best_fit = gen_best
            best_ch  = pop[fitness.index(gen_best)][:]
            stagn    = 0
        else:
            stagn += 1
        if stagn >= STAGNANT_LIMIT:
            break

        offs: List[List[str]] = []
        while len(offs) < POP_SIZE:
            p1 = tournament_select(pop, fitness)
            p2 = tournament_select(pop, fitness)
            c1, c2 = crossover(p1, p2)
            offs.extend([c1, c2])
        rate = adaptive_mut_rate(gen)
        offs = [mutate(ch, pool, rate, lane, ban, force) for ch in offs[:POP_SIZE]]
        offs[random.randrange(POP_SIZE)] = best_ch[:]
        pop = offs

    return best_ch, best_fit

# =============================================================
# demo
# =============================================================
if __name__ == "__main__":
    build, fit = run_genetic_algorithm("H001", "Mid")
    print("Best Build:", build)
    print("Fitness:", fit)


Best Build: ['I023', 'I061', 'I055', 'I064', 'I071', 'I060']
Fitness: 2.7312142644535866
