# V1 Scoring Algorithm Prototype

## P2 Phase: Data Integration & Algorithm Development

This notebook implements the five-dimensional scoring framework for League of Legends match analysis:
1. Combat Efficiency (30%)
2. Economic Management (25%)
3. Objective Control (25%)
4. Vision & Map Control (10%)
5. Team Contribution (10%)

## 1. Environment Setup

In [None]:
import sys
import os
import asyncio
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import json
from typing import Any, Dict, List
from datetime import datetime
from dotenv import load_dotenv
from pydantic import BaseModel, Field

# Enable async support in Jupyter
import nest_asyncio
nest_asyncio.apply()

# Load environment variables
load_dotenv()

# Add src to path
sys.path.insert(0, os.path.abspath('..'))

# Import our contracts and adapters
from src.adapters.riot_api import RiotAPIAdapter
from src.contracts.timeline import MatchTimeline, ParticipantFrame

print("✅ Environment setup complete")

## 2. Data Fetching

Using CLI 2's RiotAPIAdapter to fetch real match timeline data.

In [None]:
# Initialize RiotAPIAdapter
riot_api = RiotAPIAdapter()

async def fetch_match_timeline(match_id: str, region: str = "na1") -> MatchTimeline | None:
    """
    Fetch match timeline data from Riot API.
    
    Args:
        match_id: Match ID (e.g., "NA1_4497655573")
        region: Riot region code
        
    Returns:
        MatchTimeline object or None if failed
    """
    try:
        timeline_data = await riot_api.get_match_timeline(match_id, region)
        if timeline_data:
            return MatchTimeline(**timeline_data)
        return None
    except Exception as e:
        print(f"❌ Error fetching match {match_id}: {e}")
        return None

# Test with a sample match ID (replace with real match IDs)
sample_match_ids = [
    "NA1_4497655573",  # Replace with actual match IDs
]

# Fetch match timelines
timelines: List[MatchTimeline] = []
for match_id in sample_match_ids:
    print(f"Fetching {match_id}...")
    timeline = await fetch_match_timeline(match_id)
    if timeline:
        timelines.append(timeline)
        print(f"✅ Successfully fetched {match_id}")
    else:
        print(f"⚠️  Failed to fetch {match_id}")

print(f"\n📊 Total timelines fetched: {len(timelines)}")

## 3. Scoring Algorithm Implementation

### 3.1 Combat Efficiency (30%)

In [None]:
def calculate_combat_efficiency(timeline: MatchTimeline, participant_id: int) -> Dict[str, float]:
    """
    Calculate combat efficiency metrics.
    
    Returns:
        Dictionary with normalized scores (0-1) for:
        - kda_score
        - damage_efficiency
        - kill_participation
    """
    # Extract kill/death/assist data from events
    kills = 0
    deaths = 0
    assists = 0
    
    for frame in timeline.info.frames:
        for event in frame.events:
            if event.get('type') == 'CHAMPION_KILL':
                if event.get('killerId') == participant_id:
                    kills += 1
                if event.get('victimId') == participant_id:
                    deaths += 1
                if participant_id in event.get('assistingParticipantIds', []):
                    assists += 1
    
    # Calculate KDA
    kda = (kills + assists) / max(deaths, 1)
    kda_score = min(kda / 10, 1.0)  # Normalize to 0-1, max at KDA=10
    
    # Calculate kill participation using timeline helper
    kill_participation = timeline.get_kill_participation(participant_id)
    kill_participation_score = kill_participation / 100  # Already percentage
    
    # Get damage stats from last frame
    last_frame = timeline.info.frames[-1]
    participant_frame = last_frame.participant_frames.get(str(participant_id))
    
    if participant_frame:
        damage_to_champs = participant_frame.damage_stats.total_damage_done_to_champions
        gold_spent = participant_frame.total_gold
        
        # Damage efficiency: damage per 1000 gold spent
        damage_efficiency_raw = damage_to_champs / max(gold_spent / 1000, 1)
        damage_efficiency = min(damage_efficiency_raw / 1000, 1.0)  # Normalize
    else:
        damage_efficiency = 0.0
    
    return {
        'kda_score': kda_score,
        'kill_participation': kill_participation_score,
        'damage_efficiency': damage_efficiency,
        'raw_kda': kda,
        'kills': kills,
        'deaths': deaths,
        'assists': assists
    }

# Test combat efficiency calculation
if timelines:
    test_participant_id = 1
    combat_metrics = calculate_combat_efficiency(timelines[0], test_participant_id)
    print("Combat Efficiency Metrics:")
    for key, value in combat_metrics.items():
        print(f"  {key}: {value:.3f}")

### 3.2 Economic Management (25%)

In [None]:
def calculate_economic_management(timeline: MatchTimeline, participant_id: int) -> Dict[str, float]:
    """
    Calculate economic management metrics.
    
    Returns:
        Dictionary with normalized scores for:
        - cs_efficiency
        - gold_lead
        - item_timing
    """
    last_frame = timeline.info.frames[-1]
    participant_frame = last_frame.participant_frames.get(str(participant_id))
    
    if not participant_frame:
        return {'cs_efficiency': 0.0, 'gold_lead': 0.5, 'item_timing': 0.5}
    
    # Calculate CS/min
    game_duration_min = last_frame.timestamp / 60000  # Convert ms to minutes
    total_cs = participant_frame.minions_killed + participant_frame.jungle_minions_killed
    cs_per_min = total_cs / game_duration_min
    
    # CS efficiency (normalize around 7 CS/min as baseline)
    cs_efficiency = min(cs_per_min / 10, 1.0)
    
    # Calculate gold lead/deficit
    team_id = 100 if participant_id <= 5 else 200
    opponent_team_ids = range(6, 11) if team_id == 100 else range(1, 6)
    
    # Average gold of opponent team
    opponent_gold = []
    for opp_id in opponent_team_ids:
        opp_frame = last_frame.participant_frames.get(str(opp_id))
        if opp_frame:
            opponent_gold.append(opp_frame.total_gold)
    
    avg_opponent_gold = np.mean(opponent_gold) if opponent_gold else participant_frame.total_gold
    gold_difference = participant_frame.total_gold - avg_opponent_gold
    
    # Normalize gold lead (-5000 to +5000 range)
    gold_lead_score = (gold_difference + 5000) / 10000
    gold_lead_score = max(0, min(1, gold_lead_score))
    
    # Item timing analysis (simplified - check for major item purchases)
    major_item_purchases = 0
    for frame in timeline.info.frames:
        for event in frame.events:
            if event.get('type') == 'ITEM_PURCHASED' and event.get('participantId') == participant_id:
                item_id = event.get('itemId', 0)
                # Major items typically have IDs > 3000
                if item_id >= 3000:
                    major_item_purchases += 1
    
    # Normalize item purchases (expect 2-4 major items)
    item_timing_score = min(major_item_purchases / 4, 1.0)
    
    return {
        'cs_efficiency': cs_efficiency,
        'gold_lead': gold_lead_score,
        'item_timing': item_timing_score,
        'cs_per_min': cs_per_min,
        'total_gold': participant_frame.total_gold,
        'gold_difference': gold_difference
    }

# Test economic management calculation
if timelines:
    economic_metrics = calculate_economic_management(timelines[0], test_participant_id)
    print("\nEconomic Management Metrics:")
    for key, value in economic_metrics.items():
        print(f"  {key}: {value:.3f}")

### 3.3 Objective Control (25%)

In [None]:
def calculate_objective_control(timeline: MatchTimeline, participant_id: int) -> Dict[str, float]:
    """
    Calculate objective control metrics.
    
    Returns:
        Dictionary with normalized scores for:
        - epic_monster_participation
        - tower_participation
        - objective_setup
    """
    epic_monsters = 0
    tower_kills = 0
    total_epic_monsters = 0
    total_towers = 0
    
    team_id = 100 if participant_id <= 5 else 200
    team_participant_ids = list(range(1, 6)) if team_id == 100 else list(range(6, 11))
    
    for frame in timeline.info.frames:
        for event in frame.events:
            # Epic monster kills (Dragon, Baron, Rift Herald)
            if event.get('type') == 'ELITE_MONSTER_KILL':
                killer_id = event.get('killerId', 0)
                if killer_id in team_participant_ids:
                    total_epic_monsters += 1
                    if killer_id == participant_id:
                        epic_monsters += 1
            
            # Tower/Building kills
            elif event.get('type') == 'BUILDING_KILL':
                killer_id = event.get('killerId', 0)
                assisting_ids = event.get('assistingParticipantIds', [])
                
                if killer_id in team_participant_ids:
                    total_towers += 1
                    if killer_id == participant_id or participant_id in assisting_ids:
                        tower_kills += 1
    
    # Calculate participation rates
    epic_monster_participation = epic_monsters / max(total_epic_monsters, 1)
    tower_participation = tower_kills / max(total_towers, 1)
    
    # Simple objective setup score (if they got objectives, assume setup was good)
    objective_setup = (epic_monster_participation + tower_participation) / 2
    
    return {
        'epic_monster_participation': epic_monster_participation,
        'tower_participation': tower_participation,
        'objective_setup': objective_setup,
        'epic_monsters': epic_monsters,
        'tower_kills': tower_kills
    }

# Test objective control calculation
if timelines:
    objective_metrics = calculate_objective_control(timelines[0], test_participant_id)
    print("\nObjective Control Metrics:")
    for key, value in objective_metrics.items():
        print(f"  {key}: {value:.3f}")

### 3.4 Vision & Map Control (10%)

In [None]:
def calculate_vision_control(timeline: MatchTimeline, participant_id: int) -> Dict[str, float]:
    """
    Calculate vision and map control metrics.
    
    Returns:
        Dictionary with normalized scores for:
        - ward_placement_rate
        - ward_clear_efficiency
        - vision_score
    """
    wards_placed = 0
    wards_killed = 0
    
    for frame in timeline.info.frames:
        for event in frame.events:
            if event.get('type') == 'WARD_PLACED' and event.get('creatorId') == participant_id:
                wards_placed += 1
            elif event.get('type') == 'WARD_KILL' and event.get('killerId') == participant_id:
                wards_killed += 1
    
    # Calculate wards per minute
    last_frame = timeline.info.frames[-1]
    game_duration_min = last_frame.timestamp / 60000
    wards_per_min = wards_placed / game_duration_min
    
    # Normalize (expect ~1-2 wards per minute for good vision)
    ward_placement_rate = min(wards_per_min / 2, 1.0)
    
    # Ward clear efficiency (normalize around 5-10 wards cleared)
    ward_clear_efficiency = min(wards_killed / 10, 1.0)
    
    # Combined vision score
    vision_score = (ward_placement_rate + ward_clear_efficiency) / 2
    
    return {
        'ward_placement_rate': ward_placement_rate,
        'ward_clear_efficiency': ward_clear_efficiency,
        'vision_score': vision_score,
        'wards_placed': wards_placed,
        'wards_killed': wards_killed
    }

# Test vision control calculation
if timelines:
    vision_metrics = calculate_vision_control(timelines[0], test_participant_id)
    print("\nVision Control Metrics:")
    for key, value in vision_metrics.items():
        print(f"  {key}: {value:.3f}")

### 3.5 Team Contribution (10%)

In [None]:
def calculate_team_contribution(timeline: MatchTimeline, participant_id: int) -> Dict[str, float]:
    """
    Calculate team contribution metrics.
    
    Returns:
        Dictionary with normalized scores for:
        - assist_ratio
        - teamfight_presence
        - objective_assists
    """
    # Get combat metrics for assists
    combat = calculate_combat_efficiency(timeline, participant_id)
    assists = combat['assists']
    kills = combat['kills']
    
    # Assist ratio (assists relative to kills)
    assist_ratio = assists / max(kills + assists, 1)
    
    # Count assists on objectives
    objective_assists = 0
    for frame in timeline.info.frames:
        for event in frame.events:
            if event.get('type') in ['ELITE_MONSTER_KILL', 'BUILDING_KILL']:
                assisting_ids = event.get('assistingParticipantIds', [])
                if participant_id in assisting_ids:
                    objective_assists += 1
    
    # Normalize objective assists (expect 3-5 per game)
    objective_assist_score = min(objective_assists / 5, 1.0)
    
    # Teamfight presence (simplified: high assist ratio indicates good teamfight presence)
    teamfight_presence = assist_ratio
    
    return {
        'assist_ratio': assist_ratio,
        'teamfight_presence': teamfight_presence,
        'objective_assists': objective_assist_score,
        'total_assists': assists,
        'objective_assist_count': objective_assists
    }

# Test team contribution calculation
if timelines:
    team_metrics = calculate_team_contribution(timelines[0], test_participant_id)
    print("\nTeam Contribution Metrics:")
    for key, value in team_metrics.items():
        print(f"  {key}: {value:.3f}")

## 4. Weighted Score Calculation

In [None]:
class PlayerScore(BaseModel):
    """Structured player performance score."""
    participant_id: int
    total_score: float = Field(..., ge=0, le=100)
    
    # Dimension scores
    combat_efficiency: float
    economic_management: float
    objective_control: float
    vision_control: float
    team_contribution: float
    
    # Raw metrics
    kda: float
    cs_per_min: float
    gold_difference: float
    kill_participation: float
    
    # Metadata for LLM
    strengths: List[str]
    improvements: List[str]
    emotion_tag: str = "neutral"  # For TTS integration

def calculate_total_score(timeline: MatchTimeline, participant_id: int) -> PlayerScore:
    """
    Calculate weighted total score using five dimensions.
    
    Weights:
    - Combat: 30%
    - Economic: 25%
    - Objective: 25%
    - Vision: 10%
    - Team: 10%
    """
    # Calculate all dimension scores
    combat = calculate_combat_efficiency(timeline, participant_id)
    economic = calculate_economic_management(timeline, participant_id)
    objective = calculate_objective_control(timeline, participant_id)
    vision = calculate_vision_control(timeline, participant_id)
    team = calculate_team_contribution(timeline, participant_id)
    
    # Calculate dimension averages
    combat_score = np.mean([combat['kda_score'], combat['kill_participation'], combat['damage_efficiency']])
    economic_score = np.mean([economic['cs_efficiency'], economic['gold_lead'], economic['item_timing']])
    objective_score = np.mean([objective['epic_monster_participation'], objective['tower_participation'], objective['objective_setup']])
    vision_score = vision['vision_score']
    team_score = np.mean([team['assist_ratio'], team['teamfight_presence'], team['objective_assists']])
    
    # Apply weights
    weights = {
        'combat': 0.30,
        'economic': 0.25,
        'objective': 0.25,
        'vision': 0.10,
        'team': 0.10
    }
    
    total_score = (
        combat_score * weights['combat'] +
        economic_score * weights['economic'] +
        objective_score * weights['objective'] +
        vision_score * weights['vision'] +
        team_score * weights['team']
    ) * 100  # Convert to 0-100 scale
    
    # Identify strengths and improvements
    dimension_scores = {
        'Combat Efficiency': combat_score,
        'Economic Management': economic_score,
        'Objective Control': objective_score,
        'Vision Control': vision_score,
        'Team Contribution': team_score
    }
    
    sorted_dimensions = sorted(dimension_scores.items(), key=lambda x: x[1], reverse=True)
    strengths = [dim for dim, score in sorted_dimensions[:2]]
    improvements = [dim for dim, score in sorted_dimensions[-2:]]
    
    # Determine emotion tag based on performance
    if total_score >= 80:
        emotion = "excited"
    elif total_score >= 60:
        emotion = "positive"
    elif total_score >= 40:
        emotion = "neutral"
    else:
        emotion = "concerned"
    
    return PlayerScore(
        participant_id=participant_id,
        total_score=total_score,
        combat_efficiency=combat_score * 100,
        economic_management=economic_score * 100,
        objective_control=objective_score * 100,
        vision_control=vision_score * 100,
        team_contribution=team_score * 100,
        kda=combat['raw_kda'],
        cs_per_min=economic['cs_per_min'],
        gold_difference=economic['gold_difference'],
        kill_participation=combat['kill_participation'] * 100,
        strengths=strengths,
        improvements=improvements,
        emotion_tag=emotion
    )

# Test total score calculation
if timelines:
    player_score = calculate_total_score(timelines[0], test_participant_id)
    print("\n🎯 Player Performance Score:")
    print(player_score.model_dump_json(indent=2))

## 5. Visualization

In [None]:
def plot_radar_chart(player_score: PlayerScore):
    """
    Create radar chart for five-dimensional performance.
    """
    categories = ['Combat\nEfficiency', 'Economic\nManagement', 'Objective\nControl', 'Vision\nControl', 'Team\nContribution']
    values = [
        player_score.combat_efficiency,
        player_score.economic_management,
        player_score.objective_control,
        player_score.vision_control,
        player_score.team_contribution
    ]
    
    # Number of variables
    N = len(categories)
    
    # Compute angle for each axis
    angles = [n / float(N) * 2 * np.pi for n in range(N)]
    values += values[:1]  # Complete the circle
    angles += angles[:1]
    
    # Initialize plot
    fig, ax = plt.subplots(figsize=(8, 8), subplot_kw=dict(projection='polar'))
    
    # Draw the plot
    ax.plot(angles, values, 'o-', linewidth=2, label=f'Participant {player_score.participant_id}')
    ax.fill(angles, values, alpha=0.25)
    
    # Fix axis to go from 0 to 100
    ax.set_ylim(0, 100)
    
    # Add labels
    ax.set_xticks(angles[:-1])
    ax.set_xticklabels(categories)
    
    # Add title
    plt.title(f'Performance Radar Chart\nTotal Score: {player_score.total_score:.1f}/100', 
              size=16, y=1.08)
    
    plt.legend(loc='upper right', bbox_to_anchor=(1.3, 1.1))
    plt.tight_layout()
    plt.show()

def plot_gold_timeline(timeline: MatchTimeline, participant_id: int):
    """
    Plot gold accumulation over time.
    """
    timestamps = []
    gold_values = []
    
    for frame in timeline.info.frames:
        participant_frame = frame.participant_frames.get(str(participant_id))
        if participant_frame:
            timestamps.append(frame.timestamp / 60000)  # Convert to minutes
            gold_values.append(participant_frame.total_gold)
    
    plt.figure(figsize=(12, 6))
    plt.plot(timestamps, gold_values, marker='o', linewidth=2)
    plt.xlabel('Game Time (minutes)')
    plt.ylabel('Total Gold')
    plt.title(f'Gold Accumulation - Participant {participant_id}')
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

# Test visualizations
if timelines:
    player_score = calculate_total_score(timelines[0], test_participant_id)
    plot_radar_chart(player_score)
    plot_gold_timeline(timelines[0], test_participant_id)

## 6. Batch Analysis for All Participants

In [None]:
def analyze_full_match(timeline: MatchTimeline) -> List[PlayerScore]:
    """
    Analyze all 10 participants in a match.
    
    Returns:
        List of PlayerScore objects sorted by total score
    """
    scores = []
    
    for participant_id in range(1, 11):
        try:
            score = calculate_total_score(timeline, participant_id)
            scores.append(score)
        except Exception as e:
            print(f"⚠️  Error calculating score for participant {participant_id}: {e}")
    
    # Sort by total score
    scores.sort(key=lambda x: x.total_score, reverse=True)
    
    return scores

def display_match_summary(scores: List[PlayerScore]):
    """
    Display a summary table of all participants.
    """
    data = {
        'Participant': [s.participant_id for s in scores],
        'Total Score': [f"{s.total_score:.1f}" for s in scores],
        'KDA': [f"{s.kda:.2f}" for s in scores],
        'CS/min': [f"{s.cs_per_min:.1f}" for s in scores],
        'KP%': [f"{s.kill_participation:.1f}" for s in scores],
        'Strongest': [s.strengths[0] if s.strengths else 'N/A' for s in scores]
    }
    
    df = pd.DataFrame(data)
    print("\n📊 Match Performance Summary:")
    print(df.to_string(index=False))
    
    # Find MVP
    mvp = scores[0]
    print(f"\n🏆 MVP: Participant {mvp.participant_id} (Score: {mvp.total_score:.1f})")
    print(f"   Strengths: {', '.join(mvp.strengths)}")
    print(f"   Emotion Tag: {mvp.emotion_tag}")

# Test full match analysis
if timelines:
    all_scores = analyze_full_match(timelines[0])
    display_match_summary(all_scores)

## 7. Export for LLM Integration

In [None]:
class MatchAnalysisOutput(BaseModel):
    """Structured output for LLM consumption."""
    match_id: str
    game_duration_minutes: float
    player_scores: List[PlayerScore]
    mvp_id: int
    team_blue_avg_score: float
    team_red_avg_score: float
    
def generate_llm_input(timeline: MatchTimeline) -> MatchAnalysisOutput:
    """
    Generate structured output for /讲道理 LLM analysis.
    """
    scores = analyze_full_match(timeline)
    
    # Calculate team averages
    blue_scores = [s.total_score for s in scores if s.participant_id <= 5]
    red_scores = [s.total_score for s in scores if s.participant_id > 5]
    
    last_frame = timeline.info.frames[-1]
    game_duration = last_frame.timestamp / 60000
    
    return MatchAnalysisOutput(
        match_id=timeline.metadata.match_id,
        game_duration_minutes=game_duration,
        player_scores=scores,
        mvp_id=scores[0].participant_id,
        team_blue_avg_score=np.mean(blue_scores),
        team_red_avg_score=np.mean(red_scores)
    )

# Test LLM output generation
if timelines:
    llm_output = generate_llm_input(timelines[0])
    print("\n📤 LLM Input (JSON):")
    print(llm_output.model_dump_json(indent=2))
    
    # Save to file
    output_path = "../outputs/match_analysis.json"
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    with open(output_path, 'w') as f:
        f.write(llm_output.model_dump_json(indent=2))
    print(f"\n✅ Analysis saved to {output_path}")

## Next Steps

### P2 Phase Completion:
1. ✅ V1 Scoring Algorithm Prototype (this notebook)
2. ⏳ Integrate Data Dragon (DDragon) for champion/item names
3. ⏳ Evaluate opgg.py for supplementary data

### P3 Phase Preview:
- Migrate validated algorithm to `src/core/scoring.py`
- Implement comprehensive unit tests
- Create scoring service interface

### P4 Phase Preview:
- Prompt engineering for Gemini LLM
- System prompt design for `/讲道理` command
- Integration with TTS emotion tags