In [None]:
# Set Configuration and Imports
import os, sys, shutil, subprocess, json, textwrap, platform, time, hashlib
from pathlib import Path

# Detect Kaggle paths vs local
IN_KAGGLE = Path('/kaggle/working').exists()
WORK_DIR = Path('/kaggle/working') if IN_KAGGLE else Path.cwd() / 'working'
REPO_NAME = 'sudoku-ai'
REPO_URL = 'https://github.com/NIKHILSAI71/sudoku-ai.git'
REPO_PATH = WORK_DIR / REPO_NAME
EXPORT_DIR = REPO_PATH / 'export'
OUTPUTS_DIR = REPO_PATH / 'outputs'
(WORK_DIR).mkdir(parents=True, exist_ok=True)
(OUTPUTS_DIR).mkdir(parents=True, exist_ok=True)

def run(cmd, cwd=None, env=None, check=True):
    print('>>', ' '.join(cmd))
    return subprocess.run(cmd, cwd=cwd, env=env, check=check)

def print_tree(path: Path, depth: int = 2):
    path = Path(path)
    for root, dirs, files in os.walk(path):
        level = Path(root).relative_to(path).parts
        if len(level) > depth:
            continue
        indent = '  ' * len(level)
        for d in dirs:
            print(f"{indent}{d}/")
        for f in files:
            print(f"{indent}{f}")

print('IN_KAGGLE:', IN_KAGGLE)
print('WORK_DIR:', WORK_DIR)
print('REPO_PATH:', REPO_PATH)

In [None]:
# Clean and Clone Repository from GitHub
import os
import shutil
import subprocess

# --- Configuration ---
working_directory = str(WORK_DIR) + '/' if not str(WORK_DIR).endswith('/') else str(WORK_DIR)
repo_name = REPO_NAME
repo_path = os.path.join(working_directory, repo_name)
repo_url = REPO_URL

print('--- Starting Setup ---')
if not os.path.exists(working_directory):
    os.makedirs(working_directory)
    print(f'Created working directory: {working_directory}')

if os.path.exists(repo_path):
    print(f'Found existing directory at {repo_path}. Removing it.')
    try:
        shutil.rmtree(repo_path)
        print('Successfully removed the old directory.')
    except OSError as e:
        print(f'Error removing directory {repo_path}: {e}')
else:
    print(f'No existing directory found at {repo_path}. Ready to clone.')

os.chdir(working_directory)
print(f'Changed current directory to: {os.getcwd()}')

print(f'Cloning repository from {repo_url}...')
try:
    subprocess.run(['git','clone', repo_url], check=True)
    print('Repository cloned successfully!')
except subprocess.CalledProcessError as e:
    print(f'Failed to clone repository: {e}')

print('\n--- Verification ---')
if os.path.exists(repo_path):
    print(f"Success! The directory '{repo_name}' is now in {working_directory}.")
    print('Contents:')
    for item in os.listdir(repo_path):
        print(f'- {item}')
else:
    print(f"Verification failed. The directory '{repo_name}' was not created.")
print('\n--- Setup Complete ---')

In [None]:
# Verify Clone and List Files
from pathlib import Path
root = Path(repo_path)
assert root.exists(), f'Clone missing at {root}'
print('Top-level:')
for p in sorted(root.iterdir()):
    print('-', p.name)
print('\nKey files present:')
for name in ['pyproject.toml','README.md','cli','sudoku_ai','sudoku_engine','tests']:
    print(name, '->', (root / name).exists())

In [None]:
# Initialize and Activate Virtual Environment
import sys
VENV_PATH = Path(repo_path) / '.venv'
# Always prefer running pip via the Python interpreter to avoid missing pip3 paths
if not IN_KAGGLE:
    run([sys.executable, '-m', 'venv', str(VENV_PATH)])
    PY = str(VENV_PATH / ('Scripts/python.exe' if platform.system()=='Windows' else 'bin/python'))
    PIP = PY  # we'll call with -m pip
else:
    # In Kaggle, use current interpreter for -m pip
    PY = sys.executable
    PIP = PY  # we'll call with -m pip
print('Using python:', PY)
print('pip will be invoked as: python -m pip')

In [None]:
# Install Project Dependencies
req = Path(repo_path) / 'requirements.txt'
pyproject = Path(repo_path) / 'pyproject.toml'
if req.exists():
    run([PY, '-m', 'pip', 'install', '-r', str(req)])
else:
    # Install project in editable mode
    run([PY, '-m', 'pip', 'install', '-U', 'pip', 'setuptools', 'wheel'])
    run([PY, '-m', 'pip', 'install', '-e', str(REPO_PATH)], cwd=str(REPO_PATH))
# Optional extras: sat and cpsat backends (skip if wheels unavailable)
try:
    run([PY, '-m', 'pip', 'install', 'python-sat[pblib,aiger]>=0.1.8'])
except Exception as e:
    print('python-sat optional install failed:', e)
try:
    run([PY, '-m', 'pip', 'install', 'ortools>=9.10'])
except Exception as e:
    print('ortools optional install failed:', e)

In [None]:
# Project Setup Tasks
(REPO_PATH / 'checkpoints').mkdir(parents=True, exist_ok=True)
(REPO_PATH / 'logs').mkdir(parents=True, exist_ok=True)
(EXPORT_DIR).mkdir(parents=True, exist_ok=True)
(OUTPUTS_DIR).mkdir(parents=True, exist_ok=True)
# Copy sample puzzle to outputs for quick test
shutil.copy(REPO_PATH / 'examples' / 'easy1.sdk', OUTPUTS_DIR / 'easy1.sdk')

In [None]:
# Discover Training and Testing Entrypoints
# The project exposes training via cli ai-solve --train or via sudoku_ai.policy.train_supervised
print('CLI entry:', 'python -m cli.main ai-solve --help')
print('Module entry:', 'from sudoku_ai.policy import train_supervised')

## Prepare dataset from Kaggle CSV

This step converts the Kaggle `sudoku.csv` into three files in the working directory:
- `data.jsonl` (with keys `puzzle` and `solution`)
- `puzzles.txt`
- `solutions.txt`

Make sure the dataset is attached in Kaggle under the Data tab and the CSV path is correct.

In [None]:
import pandas as pd
import json
from pathlib import Path

# Define the input and output file paths
# IMPORTANT: Double-check this path in your Kaggle notebook's right-hand sidebar under "Data".
# You can use the "Copy file path" button to ensure it's correct.
input_file = '/kaggle/input/sudoku/sudoku.csv'
jsonl_output_file = 'data.jsonl'
puzzles_output_file = 'puzzles.txt'
solutions_output_file = 'solutions.txt'

# Use a chunk size to process the large file efficiently
chunk_size = 100_000

print("Starting data processing...")

try:
    # Use an iterator to read the CSV file in chunks to save memory
    reader = pd.read_csv(input_file, chunksize=chunk_size, iterator=True)

    # Open all output files at once. 'w' mode will create/overwrite them.
    with open(jsonl_output_file, 'w') as jsonl_file, \
         open(puzzles_output_file, 'w') as puzzles_file, \
         open(solutions_output_file, 'w') as solutions_file:

        # Loop through each chunk from the CSV reader
        for i, chunk_df in enumerate(reader):
            # --- 1. Process and write the JSONL data ---

            # Rename columns to match the desired JSON keys ('puzzle', 'solution')
            json_chunk = chunk_df.rename(columns={'quizzes': 'puzzle', 'solutions': 'solution'})

            # Convert the entire DataFrame chunk to a JSONL string in one go.
            # orient='records' and lines=True creates one JSON object per line.
            # This is much faster than iterating row by row.
            jsonl_string = json_chunk[['puzzle', 'solution']].to_json(orient='records', lines=True)

            # Write the generated string to the .jsonl file
            jsonl_file.write(jsonl_string + "\n")

            # --- 2. Process and write the plain text files ---

            # For puzzles.txt, join all quizzes in the chunk with a newline character.
            # Add a final newline to ensure proper line separation between chunks.
            puzzles_text = '\n'.join(chunk_df['quizzes'].astype(str)) + '\n'
            puzzles_file.write(puzzles_text)

            # Do the same for solutions.txt
            solutions_text = '\n'.join(chunk_df['solutions'].astype(str)) + '\n'
            solutions_file.write(solutions_text)

            # Print progress to the console
            print(f"Processed chunk {i + 1}...")

    print(f"\nSuccessfully created {jsonl_output_file}, {puzzles_output_file}, and {solutions_output_file}")

except FileNotFoundError:
    print(f"Error: The file was not found at '{input_file}'.")
    print("Please verify the dataset path in your Kaggle notebook's data section.")
    print("Click the 'Copy file path' button next to the CSV file to get the correct path.")
except KeyError:
    # This block runs if the columns 'quizzes' or 'solutions' don't exist
    try:
        # Try to read the first few lines to see what the actual column names are
        sample_df = pd.read_csv(input_file, nrows=5)
        print("Error: The CSV file does not contain 'quizzes' or 'solutions' columns.")
        print(f"The available columns are: {list(sample_df.columns)}")
    except Exception as e:
        print(f"An unexpected error occurred while diagnosing the KeyError: {e}")

In [None]:
# High-parameter training via CLI
import sys
from pathlib import Path

# Ensure the dataset exists
assert Path('data.jsonl').exists(), 'data.jsonl not found. Run the dataset prep cell first.'

# Use the Python interpreter resolved earlier (PY) if available; otherwise fallback to sys.executable
try:
    PY
except NameError:
    PY = sys.executable

# Train with high parameters
from subprocess import CalledProcessError
print('Starting high-parameter training...')
args = [
    PY, '-m', 'cli.main', 'ai-solve', '--train',
    '--dataset', 'data.jsonl',
    '--ckpt', 'checkpoints/policy.pt',
    '--train-epochs', '500',
    '--train-limit', '20000'
]
print('>>', ' '.join(args))
try:
    import subprocess
    subprocess.run(args, check=True)
except CalledProcessError as e:
    print('Training failed with exit code', e.returncode)

In [None]:
# Run Training Job (production-style)
code = r'''
import json, os, sys, time
from pathlib import Path
from sudoku_ai.policy import train_supervised

ckpt = Path('checkpoints/policy.pt')
ckpt.parent.mkdir(parents=True, exist_ok=True)

# Prefer a prepared dataset if available, otherwise fall back to a tiny puzzles file
dataset_jsonl = Path('data.jsonl') if Path('data.jsonl').exists() else None
puzzles_path = None
solutions_path = None
if dataset_jsonl is None:
    # Create a tiny puzzles file from example to bootstrap training
    puzzles_path = Path('outputs/_puzzles.txt')
    puzzles_path.parent.mkdir(parents=True, exist_ok=True)
    puzzle = Path('examples/easy1.sdk').read_text().strip()
    puzzles_path.write_text(puzzle + '\n')
    print('Training from example puzzle at:', puzzles_path)
else:
    print('Training from dataset:', dataset_jsonl)

hist = train_supervised(
    out_path=str(ckpt),
    dataset_jsonl=str(dataset_jsonl) if dataset_jsonl else None,
    puzzles_path=str(puzzles_path) if puzzles_path else None,
    solutions_path=str(solutions_path) if solutions_path else None,
    epochs=500,
    batch_size=64,
    lr=3e-4,
    val_split=0.1,
    max_samples=20000,
    augment=True,
    amp=False,
    seed=42,
    overfit=False,
)
print('Saved checkpoint ->', ckpt)
print('Best epoch:', hist.get('best_epoch'))
'''
run([PY, '-c', code], cwd=str(REPO_PATH))

In [None]:
# Run Evaluation/Tests
import json, time
metrics_path = REPO_PATH / 'export' / 'metrics.json'
(REPO_PATH / 'export').mkdir(parents=True, exist_ok=True)
start = time.time()
# Run a very small subset: engine tests and heuristics tests can be heavy; use -q
try:
    run([PY, '-m', 'pytest', '-q', 'tests/test_engine.py', 'tests/test_solvers.py'], cwd=str(REPO_PATH), check=False)
except Exception as e:
    print('pytest encountered issues:', e)
# Sanity: run CLI ai-solve with the produced checkpoint
try:
    run([PY, '-m', 'cli.main', 'ai-solve', '-i', 'examples/easy1.sdk', '--ckpt', 'checkpoints/policy.pt', '--max-steps', '50'], cwd=str(REPO_PATH), check=False)
except Exception as e:
    print('cli ai-solve failed:', e)
dur = time.time() - start
with open(metrics_path, 'w', encoding='utf-8') as f:
    json.dump({'duration_sec': dur, 'notes': 'Light tests on Kaggle'}, f, indent=2)
print('Wrote metrics ->', metrics_path)

In [None]:
# Package Trained Artifacts
from datetime import datetime
EXPORT_DIR.mkdir(parents=True, exist_ok=True)
ckpt_src = REPO_PATH / 'checkpoints' / 'policy.pt'
ckpt_dst = EXPORT_DIR / 'policy.pt'
if ckpt_src.exists():
    shutil.copy(ckpt_src, ckpt_dst)
else:
    raise FileNotFoundError('Checkpoint not found at ' + str(ckpt_src))
# Minimal config
config = {
    'model': 'ResNetPolicy',
    'created': datetime.utcnow().isoformat() + 'Z',
    'repo': REPO_URL,
}
(EXPORT_DIR / 'config.json').write_text(json.dumps(config, indent=2), encoding='utf-8')
# Model card
model_card = f'''
---
license: mit
library_name: pytorch
tags: [sudoku, policy, deep-learning, gradio]
---
,
# Sudoku AI Policy
This repository hosts a lightweight Sudoku policy network and a Gradio demo.
Artifacts were prepared from {REPO_URL}.
'''.replace(' ,', '')
(EXPORT_DIR / 'README.md').write_text(model_card, encoding='utf-8')
print('Export contents:')
for p in sorted(EXPORT_DIR.iterdir()):
    print('-', p.name, p.stat().st_size, 'bytes')

In [None]:
# Authenticate to Hugging Face Hub
import os
try:
    from huggingface_hub import notebook_login, whoami
except ImportError:
    run([PY, '-m', 'pip', 'install', '-U', 'huggingface_hub'])
    from huggingface_hub import notebook_login, whoami
HF_TOKEN = os.environ.get('HUGGINGFACEHUB_API_TOKEN') or os.environ.get('HF_TOKEN')
if HF_TOKEN:
    os.environ['HF_TOKEN'] = HF_TOKEN
    print('Using token from environment')
else:
    print('Please login interactively:')
    notebook_login()
try:
    print('whoami:', whoami())
except Exception as e:
    print('whoami failed:', e)

## Authenticate to Hugging Face Hub

Use environment variable `HF_TOKEN`/`HUGGINGFACEHUB_API_TOKEN` or login interactively.

In [None]:
# Create/Use Hub Repository and Push Model Artifacts
try:
    from huggingface_hub import HfApi, create_repo, upload_folder
except ImportError:
    run([PY, '-m', 'pip', 'install', '-U', 'huggingface_hub'])
    from huggingface_hub import HfApi, create_repo, upload_folder
api = HfApi()
hub_model_id = 'nikhilsaipagidimarri/sudoku-ai'
create_repo(hub_model_id, exist_ok=True, repo_type='model')
print('Pushing model artifacts...')
upload_folder(
    repo_id=hub_model_id,
    folder_path=str(EXPORT_DIR),
    repo_type='model',
    commit_message='Add trained checkpoint, config, and metrics'
)
print('Model pushed to https://huggingface.co/' + hub_model_id)

In [None]:
# Create a Gradio Space and push interface
try:
    from huggingface_hub import create_repo, upload_file
except ImportError:
    run([PY, '-m', 'pip', 'install', '-U', 'huggingface_hub'])
    from huggingface_hub import create_repo, upload_file
space_id = 'nikhilsaipagidimarri/sudoku-ai-demo'
create_repo(space_id, repo_type='space', exist_ok=True, space_sdk='gradio')
SPACE_DIR = WORK_DIR / 'space_work'
SPACE_DIR.mkdir(parents=True, exist_ok=True)
# requirements for space
(SPACE_DIR / 'requirements.txt').write_text('\n'.join(['torch','numpy','rich','gradio']), encoding='utf-8')
app_py = f'''
import gradio as gr
import torch
from pathlib import Path
import json
import os
from sudoku_ai.policy import load_policy, board_to_tensor
from sudoku_engine import parse_line, board_to_line
from sudoku_engine.board import Board

CKPT = 'policy.pt'
def load():
    return load_policy(CKPT)

policy = load()

def solve_sudoku(puzzle_line: str):
    line = ''.join(ch for ch in puzzle_line if ch.isdigit())
    if len(line) != 81:
        return 'Input must be 81 digits.'
    b = Board(parse_line(line))
    # Simple greedy decode with policy probabilities and legality mask
    for _ in range(200):
        if b.is_complete():
            break
        x = board_to_tensor(board_to_line(b.grid)).unsqueeze(0)
        with torch.no_grad():
            logits = policy(x)[0]
            probs = torch.softmax(logits, dim=-1)
        masks = b.candidates_mask()
        mask_tensor = torch.zeros(81, 9)
        for idx in range(81):
            r, c = divmod(idx, 9)
            m = int(masks[r, c])
            if b.grid[r, c] != 0 or m == 0:
                continue
            for d in range(1, 10):
                if m & (1 << (d - 1)):
                    mask_tensor[idx, d - 1] = 1.0
        masked = probs * mask_tensor
        flat = masked.view(-1)
        if float(flat.sum().item()) <= 0.0:
            break
        choice = int(torch.argmax(flat).item())
        cell, dig_idx = divmod(choice, 9)
        r, c = divmod(cell, 9)
        d = dig_idx + 1
        b.set_cell(r, c, d)
    return board_to_line(b.grid)

demo = gr.Interface(
    fn=solve_sudoku,
    inputs=gr.Textbox(label='Enter Sudoku (81 digits, 0=empty)', lines=2),
    outputs=gr.Textbox(label='Solved/Final Board'),
    title='Sudoku AI (Policy)',
    description='Paste an 81-digit Sudoku string. Uses policy checkpoint and legality mask.'
)

if __name__ == '__main__':
    demo.launch()
'''
(SPACE_DIR / 'app.py').write_text(app_py, encoding='utf-8')
# copy checkpoint and minimal packages of project files used by interface
shutil.copy(EXPORT_DIR / 'policy.pt', SPACE_DIR / 'policy.pt')
# Upload space files
upload_file(path_or_fileobj=str(SPACE_DIR / 'app.py'), path_in_repo='app.py', repo_id=space_id, repo_type='space')
upload_file(path_or_fileobj=str(SPACE_DIR / 'requirements.txt'), path_in_repo='requirements.txt', repo_id=space_id, repo_type='space')
upload_file(path_or_fileobj=str(SPACE_DIR / 'policy.pt'), path_in_repo='policy.pt', repo_id=space_id, repo_type='space')
print('Space pushed: https://huggingface.co/spaces/' + space_id)

In [None]:
# Record Run Metadata
import sys, subprocess, hashlib, time
run_meta = {
    'python': sys.version,
    'platform': platform.platform(),
    'time_utc': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
    'repo_url': REPO_URL,
    'work_dir': str(WORK_DIR),
}
try:
    out = subprocess.check_output(['git', '-C', str(REPO_PATH), 'rev-parse', 'HEAD']).decode().strip()
    run_meta['git_commit'] = out
except Exception:
    run_meta['git_commit'] = None
# hash checkpoint
def sha256(p: Path):
    h = hashlib.sha256()
    with open(p, 'rb') as f:
        for chunk in iter(lambda: f.read(8192), b''):
            h.update(chunk)
    return h.hexdigest()
run_meta['artifacts'] = {}
for p in [EXPORT_DIR / 'policy.pt', EXPORT_DIR / 'config.json', EXPORT_DIR / 'README.md']:
    if p.exists():
        run_meta['artifacts'][p.name] = {'bytes': p.stat().st_size, 'sha256': sha256(p)}
(EXPORT_DIR / 'run.json').write_text(json.dumps(run_meta, indent=2), encoding='utf-8')
print('Saved run.json ->', EXPORT_DIR / 'run.json')

In [None]:
# Install deps (Kaggle-friendly) and add repo to sys.path
import sys
# Base tools and libs we need regardless of package install
base_pkgs = ['pip', 'setuptools', 'wheel', 'numpy', 'huggingface_hub', 'gradio', 'pytest']
run([PY, '-m', 'pip', 'install', '-U'] + base_pkgs)
if IN_KAGGLE:
    # Avoid pip -e . due to Python>=3.11 constraint; use path insert instead
    if str(REPO_PATH) not in sys.path:
        sys.path.insert(0, str(REPO_PATH))
    print('Added to sys.path:', REPO_PATH)
else:
    # On local py>=3.11, install the package for scripts entry points
    run([PY, '-m', 'pip', 'install', '-e', str(REPO_PATH)])

In [None]:
# Vendor minimal project modules into Space so imports work
import glob
for pkg in ['sudoku_ai','sudoku_engine']:
    src = REPO_PATH / pkg
    dst = SPACE_DIR / pkg
    if dst.exists():
        shutil.rmtree(dst)
    shutil.copytree(src, dst)
    print('Copied', pkg, '->', dst)
# upload folders to space (recursive)
try:
    from huggingface_hub import upload_folder
except ImportError:
    run([PY, '-m', 'pip', 'install', '-U', 'huggingface_hub'])
    from huggingface_hub import upload_folder
upload_folder(repo_id=space_id, repo_type='space', folder_path=str(SPACE_DIR), commit_message='Update app with vendored modules and checkpoint')