In [None]:
# Cell 1: Environment Setup & Workspace Connection
# ==============================================================================
# üß© TACTICAL FORGE V2 - INITIALIZATION
# ==============================================================================

import os
import time
import threading
from google.colab import drive

# ‡ßß. ‡¶≤‡¶æ‡¶á‡¶¨‡ßç‡¶∞‡ßá‡¶∞‡¶ø ‡¶á‡¶®‡¶∏‡ßç‡¶ü‡¶≤
print("‚öôÔ∏è Installing Dependencies...")
!pip install python-chess zstandard

import chess
import zstandard as zstd

# ‡ß®. ‡¶°‡ßç‡¶∞‡¶æ‡¶á‡¶≠ ‡¶Æ‡¶æ‡¶â‡¶®‡ßç‡¶ü
print("\nüîó Connecting to Google Drive...")
drive.mount('/content/drive')

# ‡ß©. Shared Workspace
PROJECT_ROOT = '/content/drive/MyDrive/GambitFlow_Project'
DATA_FACTORY_DIR = os.path.join(PROJECT_ROOT, 'Synapse_Data')

os.makedirs(DATA_FACTORY_DIR, exist_ok=True)
print(f"‚úÖ Workspace: {DATA_FACTORY_DIR}")

# ‡ß™. Keep-Alive System
def keep_colab_awake():
    while True:
        time.sleep(60)

threading.Thread(target=keep_colab_awake, daemon=True).start()
print("‚úÖ Keep-Alive Active. Tactical Forge Ready.")

In [None]:
# Cell 2: Download Lichess Puzzle Database
# ==============================================================================
# üì• ACQUIRE TACTICAL PUZZLE DATA
# ==============================================================================

import requests
import shutil
import os

# Configuration
PUZZLE_URL = "https://database.lichess.org/lichess_db_puzzle.csv.zst"
FILENAME = "lichess_db_puzzle.csv.zst"

# Paths
LOCAL_DIR = "/content/data"
LOCAL_FILE_PATH = os.path.join(LOCAL_DIR, FILENAME)
DRIVE_BACKUP_PATH = os.path.join(DATA_FACTORY_DIR, FILENAME)

os.makedirs(LOCAL_DIR, exist_ok=True)

print(f"üéØ Target: {FILENAME}")
print(f"üìÇ Local: {LOCAL_FILE_PATH}")

# Download Logic
if os.path.exists(LOCAL_FILE_PATH):
    print("‚úÖ File exists locally!")

elif os.path.exists(DRIVE_BACKUP_PATH):
    print("üì¶ Copying from Drive to Local SSD...")
    shutil.copy(DRIVE_BACKUP_PATH, LOCAL_FILE_PATH)
    print("‚úÖ Copy Complete!")

else:
    print(f"‚¨áÔ∏è Downloading Puzzle Database...")
    try:
        with requests.get(PUZZLE_URL, stream=True) as r:
            r.raise_for_status()
            with open(LOCAL_FILE_PATH, 'wb') as f:
                total_dl = 0
                for chunk in r.iter_content(chunk_size=8192):
                    f.write(chunk)
                    total_dl += len(chunk)
                    if total_dl % (50 * 1024 * 1024) == 0:
                        print(f"   Downloaded: {total_dl / (1024*1024):.0f} MB...")

        print("‚úÖ Download Complete!")
        shutil.copy(LOCAL_FILE_PATH, DRIVE_BACKUP_PATH)

    except Exception as e:
        print(f"‚ùå Download Failed: {e}")

print(f"‚öñÔ∏è File Size: {os.path.getsize(LOCAL_FILE_PATH) / (1024*1024):.2f} MB")

üéØ Target: lichess_db_puzzle.csv.zst
üìÇ Local: /content/data/lichess_db_puzzle.csv.zst
‚¨áÔ∏è Downloading Puzzle Database...
   Downloaded: 50 MB...
   Downloaded: 100 MB...
   Downloaded: 150 MB...
   Downloaded: 200 MB...
   Downloaded: 250 MB...
‚úÖ Download Complete!
‚öñÔ∏è File Size: 270.37 MB


In [None]:
# Cell 3: Tactical Pattern Extraction & Labeling
# ==============================================================================
# ‚öîÔ∏è EXTRACT HIGH-QUALITY PUZZLES WITH PATTERN DETECTION
# ==============================================================================

import sqlite3
import csv
import io
import zstandard as zstd
import chess
import os
import shutil

# Configuration
LOCAL_DB_NAME = "tactical_puzzles_v2.db"
DRIVE_DB_PATH = os.path.join(DATA_FACTORY_DIR, LOCAL_DB_NAME)
MIN_RATING = 1800
MIN_POPULARITY = 85

print(f"‚è≥ Processing Puzzles (Rating ‚â• {MIN_RATING}, Popularity ‚â• {MIN_POPULARITY})...")

# SQLite Setup
conn = sqlite3.connect(LOCAL_DB_NAME)
cursor = conn.cursor()
cursor.execute('''
    CREATE TABLE IF NOT EXISTS puzzles (
        puzzle_id TEXT PRIMARY KEY,
        fen TEXT,
        moves TEXT,
        rating INTEGER,
        themes TEXT,
        tactical_pattern TEXT
    )
''')
cursor.execute('PRAGMA synchronous = OFF')
cursor.execute('PRAGMA journal_mode = MEMORY')

# Tactical Pattern Detection Functions
def detect_fork(board, move):
    """Detect if move creates a fork"""
    board_copy = board.copy()
    board_copy.push(move)

    moving_piece_sq = move.to_square
    attacking_piece = board_copy.piece_at(moving_piece_sq)

    if not attacking_piece:
        return False

    attacked_squares = board_copy.attacks(moving_piece_sq)
    valuable_targets = 0

    for sq in attacked_squares:
        target = board_copy.piece_at(sq)
        if target and target.color != attacking_piece.color:
            if target.piece_type in [chess.QUEEN, chess.ROOK, chess.KING]:
                valuable_targets += 1

    return valuable_targets >= 2

def detect_pin(board, move):
    """Detect if move creates a pin"""
    board_copy = board.copy()
    board_copy.push(move)

    moving_sq = move.to_square
    piece = board_copy.piece_at(moving_sq)

    if not piece or piece.piece_type not in [chess.BISHOP, chess.ROOK, chess.QUEEN]:
        return False

    # Check if piece attacks through another piece to a valuable target
    for sq in board_copy.attacks(moving_sq):
        pinned_piece = board_copy.piece_at(sq)
        if pinned_piece and pinned_piece.color != piece.color:
            # Check if removing this piece reveals attack to king/queen
            board_temp = board_copy.copy()
            board_temp.remove_piece_at(sq)

            for beyond_sq in board_temp.attacks(moving_sq):
                target = board_temp.piece_at(beyond_sq)
                if target and target.color == pinned_piece.color:
                    if target.piece_type in [chess.KING, chess.QUEEN]:
                        return True

    return False

def detect_skewer(board, move):
    """Detect if move creates a skewer"""
    board_copy = board.copy()
    board_copy.push(move)

    if board_copy.is_check():
        # King must move, revealing piece behind
        king_sq = board_copy.king(not board_copy.turn)
        moving_sq = move.to_square

        # Check if there's a valuable piece in line behind king
        piece = board_copy.piece_at(moving_sq)
        if piece and piece.piece_type in [chess.BISHOP, chess.ROOK, chess.QUEEN]:
            return True

    return False

def analyze_tactical_pattern(fen, solution_moves):
    """Identify tactical patterns in puzzle"""
    try:
        board = chess.Board(fen)
        first_move = chess.Move.from_uci(solution_moves.split()[0])

        patterns = []

        if detect_fork(board, first_move):
            patterns.append('fork')

        if detect_pin(board, first_move):
            patterns.append('pin')

        if detect_skewer(board, first_move):
            patterns.append('skewer')

        # Check for discovered attack
        board_copy = board.copy()
        from_sq = first_move.from_square
        board_copy.push(first_move)

        # Simple heuristic: if removing piece creates new attacks
        if len(list(board_copy.legal_moves)) > len(list(board.legal_moves)) + 5:
            patterns.append('discovered_attack')

        return ','.join(patterns) if patterns else 'other'

    except:
        return 'unknown'

# Streaming Decompression & Processing
dctx = zstd.ZstdDecompressor()
puzzles_found = 0

with open(LOCAL_FILE_PATH, 'rb') as ifh:
    with dctx.stream_reader(ifh) as reader:
        text_stream = io.TextIOWrapper(reader, encoding='utf-8')
        csv_reader = csv.reader(text_stream)

        # Skip header
        next(csv_reader)

        batch = []
        for row in csv_reader:
            try:
                # CSV: PuzzleId, FEN, Moves, Rating, RatingDeviation, Popularity, NbPlays, Themes, GameUrl, OpeningTags
                p_id, fen, moves, rating, _, pop, _, themes, _, _ = row
                rating = int(rating)
                pop = int(pop)

                # Filter
                if rating >= MIN_RATING and pop >= MIN_POPULARITY:
                    # Detect tactical pattern
                    tactical_pattern = analyze_tactical_pattern(fen, moves)

                    batch.append((p_id, fen, moves, rating, themes, tactical_pattern))
                    puzzles_found += 1

                    # Batch insert
                    if len(batch) >= 5000:
                        cursor.executemany(
                            'INSERT OR IGNORE INTO puzzles VALUES (?, ?, ?, ?, ?, ?)',
                            batch
                        )
                        conn.commit()
                        batch = []

                        if puzzles_found % 10000 == 0:
                            print(f"   Stored {puzzles_found:,} puzzles...")
            except:
                continue

        # Insert remaining
        if batch:
            cursor.executemany(
                'INSERT OR IGNORE INTO puzzles VALUES (?, ?, ?, ?, ?, ?)',
                batch
            )
            conn.commit()

conn.close()
print(f"‚úÖ Extraction Complete! Total Puzzles: {puzzles_found:,}")

# Save to Drive
print("üöö Saving to Drive...")
shutil.copy(LOCAL_DB_NAME, DRIVE_DB_PATH)
print(f"‚úÖ Database Secured: {DRIVE_DB_PATH}")

‚è≥ Processing Puzzles (Rating ‚â• 1800, Popularity ‚â• 85)...
   Stored 10,000 puzzles...
   Stored 20,000 puzzles...
   Stored 30,000 puzzles...
   Stored 40,000 puzzles...
   Stored 50,000 puzzles...
   Stored 60,000 puzzles...
   Stored 70,000 puzzles...
   Stored 80,000 puzzles...
   Stored 90,000 puzzles...
   Stored 100,000 puzzles...
   Stored 110,000 puzzles...
   Stored 120,000 puzzles...
   Stored 130,000 puzzles...
   Stored 140,000 puzzles...
   Stored 150,000 puzzles...
   Stored 160,000 puzzles...
   Stored 170,000 puzzles...
   Stored 180,000 puzzles...
   Stored 190,000 puzzles...
   Stored 200,000 puzzles...
   Stored 210,000 puzzles...
   Stored 220,000 puzzles...
   Stored 230,000 puzzles...
   Stored 240,000 puzzles...
   Stored 250,000 puzzles...
   Stored 260,000 puzzles...
   Stored 270,000 puzzles...
   Stored 280,000 puzzles...
   Stored 290,000 puzzles...
   Stored 300,000 puzzles...
   Stored 310,000 puzzles...
   Stored 320,000 puzzles...
   Stored 330,000 