### TFM Data Transform & Structuring

Transform local data (`local_data/`) into readable, analysis-ready structures.

**Prerequisites**
1. Run `uv run preprocess` to fetch local data
2. `local_data/tfm.db` exists with games, game_results, user_game_results tables

**Output**: `local_data/tfm_analysis.db` - SQLite database with structured tables

**Memory Optimization**: Uses chunked processing to handle large datasets

In [3]:
import pandas as pd
import sqlite3
import json
import os
import gc
from datetime import datetime

# Configuration
LOCAL_DATA_DIR = './local_data'
SOURCE_DB_PATH = os.path.join(LOCAL_DATA_DIR, 'tfm.db')
ANALYSIS_DB_PATH = os.path.join(LOCAL_DATA_DIR, 'tfm_analysis.db')

# Chunk size for processing large tables (adjust based on available memory)
CHUNK_SIZE = 500  # Process 500 games at a time

# Check source data exists
if not os.path.exists(SOURCE_DB_PATH):
    raise FileNotFoundError(f"Source DB not found: {SOURCE_DB_PATH}\nRun 'uv run preprocess' first")

print(f"Source DB: {SOURCE_DB_PATH}")
print(f"Analysis DB: {ANALYSIS_DB_PATH}")
print(f"Chunk size: {CHUNK_SIZE} games per batch")

Source DB: ./local_data/tfm.db
Analysis DB: ./local_data/tfm_analysis.db
Chunk size: 500 games per batch


#### 1. Load Source Data (Non-game Tables)

In [4]:
# Load smaller tables into memory
source_conn = sqlite3.connect(SOURCE_DB_PATH)

# Get row counts first
games_count = pd.read_sql('SELECT COUNT(*) as cnt FROM games', source_conn)['cnt'][0]
game_results_df = pd.read_sql('SELECT * FROM game_results', source_conn)
user_game_results_df = pd.read_sql('SELECT * FROM user_game_results', source_conn)

# Load CSV
user_rank_df = pd.read_csv(os.path.join(LOCAL_DATA_DIR, 'user_rank.csv'))
users_df = pd.read_csv(os.path.join(LOCAL_DATA_DIR, 'users.csv'))

source_conn.close()

print(f"games: {games_count} rows (will process in chunks)")
print(f"game_results: {len(game_results_df)} rows")
print(f"user_game_results: {len(user_game_results_df)} rows")
print(f"user_rank: {len(user_rank_df)} rows")
print(f"users: {len(users_df)} rows")

games: 17201 rows (will process in chunks)
game_results: 17817 rows
user_game_results: 57102 rows
user_rank: 6397 rows
users: 7336 rows


#### 2. Utility Functions

In [None]:
def parse_json_safe(json_str):
    """Safely parse JSON string"""
    if pd.isna(json_str):
        return None
    if isinstance(json_str, dict):
        return json_str
    if isinstance(json_str, str):
        try:
            return json.loads(json_str)
        except json.JSONDecodeError:
            try:
                return eval(json_str)
            except:
                return None
    return None


def split_corporation(corp_str):
    """Split corporation names (separated by |)"""
    if pd.isna(corp_str) or corp_str == '':
        return []
    return [c.strip() for c in str(corp_str).split('|') if c.strip()]


def clean_player_name(name):
    """Clean player name (remove prefix ~, @, ～ and lowercase)"""
    if pd.isna(name):
        return None
    name = str(name)
    if name and name[0] in ['~', '@', '～']:
        name = name[1:]
    return name.lower()


def is_bot_player(name):
    """Check if player is a bot"""
    bot_names = {'green', '1', '2', '3', '4', 'red', 'blue', 'yellow',
                 '绿色', '红色', '黄色', '蓝色'}
    if pd.isna(name):
        return True
    return str(name).lower() in bot_names

#### 3. Flatten game_results Scores

In [None]:
def flatten_game_results(df):
    """
    Flatten game_results.scores field
    Expand each player's score into separate rows
    """
    df = df.copy()
    df['scores_parsed'] = df['scores'].apply(parse_json_safe)

    rows = []
    for idx, row in df.iterrows():
        scores = row['scores_parsed']
        if not scores or not isinstance(scores, list):
            continue

        for position, score_data in enumerate(scores, start=1):
            if not isinstance(score_data, dict):
                continue

            flat_row = {
                'game_id': row['game_id'],
                'seed_game_id': row.get('seed_game_id'),
                'players': row['players'],
                'generations': row['generations'],
                'createtime': row['createtime'],
                'position': position,
                'player_name': clean_player_name(score_data.get('player')),
                'player_name_raw': score_data.get('player'),
                'player_score': score_data.get('playerScore'),
                'corporation_raw': score_data.get('corporation'),
            }

            corps = split_corporation(score_data.get('corporation'))
            flat_row['corporation_1'] = corps[0] if len(corps) > 0 else None
            flat_row['corporation_2'] = corps[1] if len(corps) > 1 else None
            flat_row['corporation_3'] = corps[2] if len(corps) > 2 else None
            flat_row['corporation_count'] = len(corps)
            flat_row['is_bot'] = is_bot_player(flat_row['player_name'])

            rows.append(flat_row)

    result_df = pd.DataFrame(rows)

    if len(result_df) > 0:
        result_df['rank'] = result_df.groupby('game_id')['player_score'] \
            .rank(method='min', ascending=False).astype(int)

    return result_df

flat_game_results_df = flatten_game_results(game_results_df)
print(f"Flattened game_results: {len(flat_game_results_df)} rows")
flat_game_results_df.head(10)

#### 4. Process user_game_results

In [None]:
def process_user_game_results(df):
    """
    Process user_game_results table
    - Split corporation names
    - Clean player names
    """
    df = df.copy()

    corps_split = df['corporation'].apply(split_corporation)
    df['corporation_1'] = corps_split.apply(lambda x: x[0] if len(x) > 0 else None)
    df['corporation_2'] = corps_split.apply(lambda x: x[1] if len(x) > 1 else None)
    df['corporation_3'] = corps_split.apply(lambda x: x[2] if len(x) > 2 else None)
    df['corporation_count'] = corps_split.apply(len)

    return df

processed_ugr_df = process_user_game_results(user_game_results_df)
print(f"Processed user_game_results: {len(processed_ugr_df)} rows")
processed_ugr_df.head(10)

#### 5. Chunked Game Data Processor

Process games table in chunks to avoid memory issues.

**Strategy**: Only process games that have corresponding game_results (LEFT JOIN logic)

In [None]:
def process_game_row(row):
    """
    Process a single game row, extract all data
    Returns: dict with cards, stats, milestones, awards, globals
    """
    result = {
        'cards': [],
        'stats': [],
        'milestones': [],
        'awards': [],
        'globals': []
    }

    game_data = parse_json_safe(row['game'])
    if not game_data:
        return result

    game_id = row['game_id']
    createtime = row['createtime']
    generation = game_data.get('generation')
    player_count = len(game_data.get('players', []))

    # Extract player data
    for player in game_data.get('players', []):
        user_id = player.get('userId')
        player_id = player.get('id')
        player_name = player.get('name')
        player_name_clean = clean_player_name(player_name)
        tr = player.get('terraformRating')

        corps = player.get('corporations', [])
        corp_names = [c.get('name') for c in corps if c.get('name')]
        corp_1 = corp_names[0] if len(corp_names) > 0 else None
        corp_2 = corp_names[1] if len(corp_names) > 1 else None

        # Extract played cards
        for card_idx, card in enumerate(player.get('playedCards', [])):
            result['cards'].append({
                'game_id': game_id,
                'createtime': createtime,
                'user_id': user_id,
                'player_id': player_id,
                'player_name': player_name_clean,
                'player_name_raw': player_name,
                'terraform_rating': tr,
                'corporation_1': corp_1,
                'corporation_2': corp_2,
                'card_order': card_idx + 1,
                'card_name': card.get('name'),
                'resource_count': card.get('resourceCount', 0),
                'clone_tag': card.get('cloneTag'),
                'bonus_resource': card.get('bonusResource'),
                'is_disabled': card.get('isDisabled', False),
            })

        # Extract player stats
        gps = player.get('globalParameterSteps', {})
        timer = player.get('timer', {})
        vp_by_gen = player.get('victoryPointsByGeneration', [])

        result['stats'].append({
            'game_id': game_id,
            'createtime': createtime,
            'generation': generation,
            'user_id': user_id,
            'player_id': player_id,
            'player_name': player_name_clean,
            'player_name_raw': player_name,
            'corporation_1': corp_1,
            'corporation_2': corp_2,
            'terraform_rating': tr,
            'victory_points_final': vp_by_gen[-1] if vp_by_gen else None,
            'colony_victory_points': player.get('colonyVictoryPoints', 0),
            'mega_credits': player.get('megaCredits'),
            'mc_production': player.get('megaCreditProduction'),
            'steel': player.get('steel'),
            'steel_production': player.get('steelProduction'),
            'titanium': player.get('titanium'),
            'titanium_production': player.get('titaniumProduction'),
            'plants': player.get('plants'),
            'plant_production': player.get('plantProduction'),
            'energy': player.get('energy'),
            'energy_production': player.get('energyProduction'),
            'heat': player.get('heat'),
            'heat_production': player.get('heatProduction'),
            'actions_taken': player.get('actionsTakenThisGame'),
            'delegates_placed': player.get('totalDelegatesPlaced'),
            'oceans_contributed': gps.get('oceans', 0),
            'oxygen_contributed': gps.get('oxygen', 0),
            'temperature_contributed': gps.get('temperature', 0),
            'venus_contributed': gps.get('venus', 0),
            'time_elapsed_ms': timer.get('sumElapsed'),
            'fleet_size': player.get('fleetSize'),
            'trades_this_generation': player.get('tradesThisGeneration'),
            'cards_played_count': len(player.get('playedCards', [])),
            'cards_in_hand_count': len(player.get('cardsInHand', [])),
        })

    # Extract milestones
    for order, m in enumerate(game_data.get('claimedMilestones', []), start=1):
        result['milestones'].append({
            'game_id': game_id,
            'createtime': createtime,
            'milestone_name': m.get('milestone', {}).get('name'),
            'player_id': m.get('player', {}).get('id'),
            'claim_order': order,
        })

    # Extract awards
    for order, a in enumerate(game_data.get('fundedAwards', []), start=1):
        result['awards'].append({
            'game_id': game_id,
            'createtime': createtime,
            'award_name': a.get('award', {}).get('name'),
            'funder_player_id': a.get('player', {}).get('id'),
            'fund_order': order,
        })

    # Extract global parameters per generation
    for gen_idx, gen_data in enumerate(game_data.get('globalsPerGeneration', [])):
        result['globals'].append({
            'game_id': game_id,
            'createtime': createtime,
            'player_count': player_count,
            'generation': gen_idx + 1,
            'temperature': gen_data.get('temperature'),
            'oxygen': gen_data.get('oxygen'),
            'oceans': gen_data.get('oceans'),
            'venus': gen_data.get('venus'),
        })

    return result

In [None]:
def process_games_chunked(source_db_path, chunk_size=500):
    """
    Process games table in chunks to avoid memory issues

    Args:
        source_db_path: Path to source SQLite database
        chunk_size: Number of games to process per chunk

    Returns:
        dict with DataFrames: cards, stats, milestones, awards, globals
    """
    conn = sqlite3.connect(source_db_path)

    # Only process games that have corresponding game_results (LEFT JOIN logic)
    # This ensures we don't process games without results
    total_count = pd.read_sql(
        """SELECT COUNT(*) as cnt FROM games g
           INNER JOIN (SELECT DISTINCT game_id FROM game_results) gr
           ON g.game_id = gr.game_id
           WHERE g.status IN ('finished', 'end')""", conn
    )['cnt'][0]

    games_without_results = pd.read_sql(
        """SELECT COUNT(*) as cnt FROM games g
           WHERE g.status IN ('finished', 'end')
           AND g.game_id NOT IN (SELECT DISTINCT game_id FROM game_results)""", conn
    )['cnt'][0]

    print(f"Games with game_results to process: {total_count}")
    print(f"Games without game_results (skipped): {games_without_results}")
    print(f"Chunk size: {chunk_size}")
    print(f"Estimated chunks: {(total_count + chunk_size - 1) // chunk_size}")
    print("="*50)

    # Initialize result containers
    all_cards = []
    all_stats = []
    all_milestones = []
    all_awards = []
    all_globals = []

    offset = 0
    chunk_num = 0

    while offset < total_count:
        chunk_num += 1

        # Fetch chunk - only games with game_results
        sql = f"""
            SELECT g.game_id, g.game, g.status, g.createtime
            FROM games g
            INNER JOIN (SELECT DISTINCT game_id FROM game_results) gr
            ON g.game_id = gr.game_id
            WHERE g.status IN ('finished', 'end')
            LIMIT {chunk_size} OFFSET {offset}
        """
        chunk_df = pd.read_sql(sql, conn)

        if len(chunk_df) == 0:
            break

        print(f"Chunk {chunk_num}: Processing {len(chunk_df)} games (offset {offset})...")

        # Process each game in chunk
        for _, row in chunk_df.iterrows():
            result = process_game_row(row)
            all_cards.extend(result['cards'])
            all_stats.extend(result['stats'])
            all_milestones.extend(result['milestones'])
            all_awards.extend(result['awards'])
            all_globals.extend(result['globals'])

        # Clear chunk from memory
        del chunk_df
        gc.collect()

        # Progress report
        processed = min(offset + chunk_size, total_count)
        print(f"  -> Progress: {processed}/{total_count} ({processed*100//total_count}%)")
        print(f"  -> Cards: {len(all_cards)}, Stats: {len(all_stats)}, Milestones: {len(all_milestones)}")

        offset += chunk_size

    conn.close()

    print("="*50)
    print("Processing complete!")

    return {
        'cards': pd.DataFrame(all_cards) if all_cards else pd.DataFrame(),
        'stats': pd.DataFrame(all_stats) if all_stats else pd.DataFrame(),
        'milestones': pd.DataFrame(all_milestones) if all_milestones else pd.DataFrame(),
        'awards': pd.DataFrame(all_awards) if all_awards else pd.DataFrame(),
        'globals': pd.DataFrame(all_globals) if all_globals else pd.DataFrame(),
    }

#### 6. Execute Chunked Processing

In [None]:
# Process games in chunks
print("Starting chunked game processing...")
print(f"Memory-efficient mode: {CHUNK_SIZE} games per batch\n")

game_data = process_games_chunked(SOURCE_DB_PATH, chunk_size=CHUNK_SIZE)

# Assign to individual DataFrames
played_cards_df = game_data['cards']
player_stats_df = game_data['stats']
milestones_df = game_data['milestones']
awards_df = game_data['awards']
globals_df = game_data['globals']

# Clean up
del game_data
gc.collect()

print(f"\nExtracted data summary:")
print(f"  played_cards: {len(played_cards_df)} rows")
print(f"  player_stats: {len(player_stats_df)} rows")
print(f"  milestones: {len(milestones_df)} rows")
print(f"  awards: {len(awards_df)} rows")
print(f"  global_parameters: {len(globals_df)} rows")

#### 7. Save to Analysis Database

In [None]:
def save_to_analysis_db(dataframes_dict, db_path):
    """
    Save multiple DataFrames to SQLite analysis database
    """
    conn = sqlite3.connect(db_path)

    try:
        for table_name, df in dataframes_dict.items():
            if df is not None and len(df) > 0:
                df.to_sql(table_name, conn, if_exists='replace', index=False)
                print(f"✓ Saved {table_name}: {len(df)} rows")
            else:
                print(f"⚠ Skipped {table_name}: no data")
    finally:
        conn.close()

    print(f"\nDatabase saved: {db_path}")

tables_to_save = {
    'flat_game_results': flat_game_results_df,
    'processed_user_game_results': processed_ugr_df,
    'user_rank': user_rank_df,
    'users': users_df,
    'played_cards': played_cards_df,
    'player_stats': player_stats_df,
    'milestones': milestones_df,
    'awards': awards_df,
    'global_parameters': globals_df,
}

save_to_analysis_db(tables_to_save, ANALYSIS_DB_PATH)

#### 8. Verify Data

In [None]:
conn = sqlite3.connect(ANALYSIS_DB_PATH)

tables = pd.read_sql(
    "SELECT name FROM sqlite_master WHERE type='table'", conn
)['name'].tolist()

print("=" * 50)
print("Analysis Database Schema")
print("=" * 50)

for table in tables:
    count = pd.read_sql(f'SELECT COUNT(*) as cnt FROM {table}', conn)['cnt'][0]
    cols = pd.read_sql(f'PRAGMA table_info({table})', conn)['name'].tolist()
    print(f"\n[{table}] {count} rows")
    print(f"  Columns: {', '.join(cols[:8])}{'...' if len(cols) > 8 else ''}")

conn.close()

#### 9. Usage Examples

In [None]:
conn = sqlite3.connect(ANALYSIS_DB_PATH)

# Example 1: Corporation win rate (excluding bots)
sql_corp_win_rate = """
SELECT
    corporation_1 as corporation,
    players,
    COUNT(*) as total_games,
    SUM(CASE WHEN rank = 1 THEN 1 ELSE 0 END) as wins,
    ROUND(SUM(CASE WHEN rank = 1 THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as win_rate,
    ROUND(AVG(player_score), 1) as avg_score
FROM flat_game_results
WHERE is_bot = 0
  AND corporation_1 IS NOT NULL
GROUP BY corporation_1, players
HAVING total_games >= 10
ORDER BY players, win_rate DESC
"""
print("Corporation Win Rate (excl. bots, min 10 games):")
pd.read_sql(sql_corp_win_rate, conn)

In [None]:
# Example 2: Card win rate (joined with user_game_results)
sql_card_win_rate = """
SELECT
    pc.card_name,
    COUNT(*) as play_count,
    SUM(CASE WHEN ugr.position = 1 THEN 1 ELSE 0 END) as wins,
    ROUND(SUM(CASE WHEN ugr.position = 1 THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as win_rate,
    ROUND(AVG(ugr.player_score), 1) as avg_score
FROM played_cards pc
JOIN processed_user_game_results ugr
    ON pc.game_id = ugr.game_id AND pc.user_id = ugr.user_id
WHERE ugr.phase = 'end'
GROUP BY pc.card_name
HAVING play_count >= 20
ORDER BY win_rate DESC
LIMIT 20
"""
print("Card Win Rate TOP 20 (min 20 plays):")
pd.read_sql(sql_card_win_rate, conn)

In [None]:
conn.close()
print("\nData transform complete!")
print(f"Analysis DB: {ANALYSIS_DB_PATH}")