In [1]:
!pip install jupyterlab
!pip install ipykernel
!pip install sympy
!pip install openai
!pip install python-dotenv

Collecting jupyterlab
  Downloading jupyterlab-4.4.7-py3-none-any.whl.metadata (16 kB)
Collecting async-lru>=1.0.0 (from jupyterlab)
  Downloading async_lru-2.0.5-py3-none-any.whl.metadata (4.5 kB)
Collecting jupyter-lsp>=2.0.0 (from jupyterlab)
  Downloading jupyter_lsp-2.3.0-py3-none-any.whl.metadata (1.8 kB)
Collecting jupyterlab-server<3,>=2.27.1 (from jupyterlab)
  Downloading jupyterlab_server-2.27.3-py3-none-any.whl.metadata (5.9 kB)
Collecting json5>=0.9.0 (from jupyterlab-server<3,>=2.27.1->jupyterlab)
  Downloading json5-0.12.1-py3-none-any.whl.metadata (36 kB)
Collecting jedi>=0.16 (from ipython>=7.23.1->ipykernel!=6.30.0,>=6.5.0->jupyterlab)
  Downloading jedi-0.19.2-py2.py3-none-any.whl.metadata (22 kB)
Downloading jupyterlab-4.4.7-py3-none-any.whl (12.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.3/12.3 MB[0m [31m81.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading async_lru-2.0.5-py3-none-any.whl (6.1 kB)
Downloading jupyter_lsp-2.3.0-py3-none-

In [None]:
!echo 'OPENAI_API_KEY=sk-***' > .env

In [None]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
Sequence → Formula (GPT + SymPy) — Two-Phase (No-Hint then Hint)
- Phase A: Try NO_HINT_TRIES attempts with NO hint.
- Phase B: If still unsolved, try HINT_TRIES attempts WITH hint (degree + checker feedback).
- Tracks attempts and whether GPT solved it (no exact interpolation fallback).
- Prints a summary table at the end, including whether a hint was needed.
- Adds a per-model summary with totals, solve rates, and avg attempts.
- Now supports exact rationals, Sum(...), harmonic(n), and common special sequences.

Requirements:
  pip install sympy openai python-dotenv
  echo "OPENAI_API_KEY=sk-***" > .env

Run:
  # Evaluate all demos across listed models
  python run.py

  # Or solve a custom sequence with the first model in the list
  python run.py 1 5 14 30 55 91
"""

import os
import re
import json
import sys
from typing import List, Optional, Dict, Tuple, Any

from dotenv import load_dotenv
from openai import OpenAI

from sympy import (
    symbols, simplify, factor, sympify, Integer, binomial, factorial,
    floor, ceiling, Piecewise, Abs, Sum, harmonic, Rational, nsimplify, S,
    fibonacci, lucas, catalan
)
from sympy.abc import n
from sympy.core.facts import InconsistentAssumptions

# ------------------- Setup -------------------

load_dotenv()
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

# Two-phase attempt knobs (you can override via env vars)
NO_HINT_TRIES = int(os.getenv("NO_HINT_TRIES", 5))   # N attempts without hint
HINT_TRIES    = int(os.getenv("HINT_TRIES", 5))      # M attempts with hint

# Summation index (allow Sum(...,(k,1,n)))
# Important: avoid overconstraining k (e.g., positive=True), which can clash
# with bounds-derived assumptions inside Sum and cause InconsistentAssumptions.
k = symbols('k', integer=True)

# Allowed symbols/functions for safe sympify
_ALLOWED = {
    "n": n,
    "k": k,
    "binomial": binomial,
    "factorial": factorial,
    "floor": floor,
    "ceiling": ceiling,
    "Piecewise": Piecewise,
    "Abs": Abs,
    "Integer": Integer,
    "Rational": Rational,  # e.g., Rational(3,2)
    "S": S,                # e.g., S(1)/2
    "Sum": Sum,
    "harmonic": harmonic,
    # Useful special sequences (so the model can emit them directly)
    "fibonacci": fibonacci,
    "lucas": lucas,
    "catalan": catalan,
}

# Globals for anti-repeat tracking
_last_guess: Optional[str] = None
_tried_formulas: set = set()


# ------------------- Helpers -------------------

def _extract_json(text: str) -> dict:
    """Accept raw model output (possibly fenced) and return parsed JSON dict."""
    s = text.strip()
    if s.startswith("```"):
        s = re.sub(r"^```(?:json)?\s*|\s*```$", "", s, flags=re.IGNORECASE | re.DOTALL).strip()
    return json.loads(s)

def _finite_diff_degree(seq: List[Any]) -> Optional[int]:
    """Return polynomial degree if finite differences become constant; else None."""
    if len(seq) < 2:
        return 0
    # numericize using nsimplify where possible
    cur = [nsimplify(x, rational=True) if not isinstance(x, (int, float)) else x for x in seq]
    diffs = [cur[:]]
    while len(diffs[-1]) > 1:
        prev = diffs[-1]
        nxt = []
        for a, b in zip(prev, prev[1:]):
            try:
                nxt.append(nsimplify(b, rational=True) - nsimplify(a, rational=True))
            except Exception:
                nxt.append(b - a)
        diffs.append(nxt)
        try:
            const = all(simplify(diffs[-1][i] - diffs[-1][0]) == 0 for i in range(1, len(diffs[-1])))
        except Exception:
            const = all(float(diffs[-1][i]) == float(diffs[-1][0]) for i in range(1, len(diffs[-1])))
        if const:
            return len(diffs) - 1
    return None

def _mismatch_table(expr, seq: List[Any], max_rows=6) -> str:
    """
    Return a plain text table for the first `max_rows` comparisons between
    expr(n=i) and the target sequence, using exact SymPy values when possible.
    """
    rows = []
    for i, target in enumerate(seq, start=1):
        try:
            got = expr.subs(n, i)
            try:
                got = got.doit()
            except Exception:
                pass
        except Exception:
            got = expr.subs(n, i)
        try:
            target_sym = nsimplify(target, rational=True)
        except Exception:
            target_sym = target
        rows.append((i, target_sym, got))
        if len(rows) >= max_rows:
            break
    hdr = "n | expected | got"
    body_lines = [f"{i} | {e} | {g}" for (i, e, g) in rows]
    return f"{hdr}\n" + "\n".join(body_lines)

def _short(expr, maxlen=60):
    if expr is None:
        return ""
    s = str(expr)
    return s if len(s) <= maxlen else s[: maxlen - 1] + "…"


# ------------------- GPT helper -------------------

def call_gpt_for_formula(seq: List[Any], feedback: str, model: str, include_hint: bool = False) -> dict:
    """Ask an OpenAI model to propose a SymPy formula for a sequence."""
    global _last_guess, _tried_formulas

    deg = _finite_diff_degree(seq)
    deg_hint = (f"The sequence appears to be a polynomial of degree {deg} in n."
                if (include_hint and deg is not None) else "")

    tried_list = sorted(_tried_formulas)

    system_prompt = (
        "You are a mathematical assistant.\n"
        "Given a sequence (1-indexed), return a closed-form formula f(n) in SymPy syntax.\n"
        "Constraints:\n"
        "- Variable is n (1-indexed). Use k as a summation index if needed.\n"
        "- Output JSON ONLY: {\"formula_sympy\": \"...\", \"rationale_short\": \"...\"}.\n"
        "- Keep rationale <= 200 chars.\n"
        "- The expression MUST match the provided terms exactly for n=1..k.\n"
        "- Allowed primitives include: n, binomial, factorial, floor, ceiling, Piecewise, Abs, Integer,\n"
        "  Rational, S, Sum( … , (k, 1, n)), harmonic(n), fibonacci(n), lucas(n), catalan(n).\n"
        "- Do NOT repeat a previous incorrect formula."
    )

    if include_hint:
        user_prompt = (
            f"Sequence (first terms, 1-indexed): {seq}\n"
            f"{deg_hint}\n\n"
            f"Previously tried formulas (do not repeat exactly): {tried_list}\n\n"
            f"HINT from checker:\n{feedback}\n\n"
            "Return JSON ONLY."
        )
    else:
        user_prompt = (
            f"Sequence (first terms, 1-indexed): {seq}\n\n"
            f"Previously tried formulas (do not repeat exactly): {tried_list}\n\n"
            "Return JSON ONLY."
        )

    resp = client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
        temperature=1.0,
        response_format={"type": "json_object"},
    )
    raw = resp.choices[0].message.content or ""
    data = _extract_json(raw)

    guess = (data.get("formula_sympy") or "").strip()
    if guess:
        if guess in _tried_formulas:
            data["rationale_short"] = "Avoiding repetition; trying a different closed form."
        else:
            _tried_formulas.add(guess)
    _last_guess = guess
    return data


# ------------------- Checker -------------------

def _matches_point(expr, i: int, target) -> bool:
    """Exact match using SymPy equality; supports integers and rationals."""
    try:
        got = expr.subs(n, i)
        try:
            got = got.doit()
        except Exception:
            pass
        try:
            target_sym = nsimplify(target, rational=True)
        except Exception:
            target_sym = target
        try:
            return simplify(got - target_sym) == 0
        except InconsistentAssumptions:
            eq = got.equals(target_sym)  # can be True/False/None
            return bool(eq) is True
    except Exception:
        return False

def holds_for_range(expr, seq: List[Any], k_extra: int = 2):
    """
    Check that expr matches seq (1-indexed) and extrapolates reasonably for k_extra terms.
    If data are nondecreasing, forbid the extrapolation from dropping.
    Uses exact SymPy arithmetic where possible (e.g., 3/2 vs 1.5).
    """
    L = len(seq)

    # 1) Exactness on provided data
    for i in range(1, L + 1):
        try:
            got = expr.subs(n, i)
            try:
                got = got.doit()
            except Exception:
                pass
        except Exception as e:
            return False, f"Expression failed to evaluate at n={i}: {e}"

        try:
            target_sym = nsimplify(seq[i - 1], rational=True)
        except Exception:
            target_sym = seq[i - 1]

        try:
            equal = simplify(got - target_sym) == 0
        except InconsistentAssumptions:
            eq = got.equals(target_sym)
            equal = bool(eq) is True

        if not equal:
            table = _mismatch_table(expr, seq)
            return False, f"Mismatch on provided data (1-indexed). Example: n={i} got {got}, expected {target_sym}.\n{table}"

    # 2) Light extrapolation sanity (if observed data are nondecreasing)
    def _to_float(x):
        try:
            return float(nsimplify(x, rational=True))
        except Exception:
            try:
                return float(x)
            except Exception:
                return float(simplify(x).evalf())

    try:
        nondec = all(nsimplify(seq[j], rational=True) <= nsimplify(seq[j + 1], rational=True) for j in range(L - 1))
    except Exception:
        nondec = all(_to_float(seq[j]) <= _to_float(seq[j + 1]) for j in range(L - 1))

    if nondec:
        prevf = _to_float(seq[-1])
        for t in range(L + 1, L + 1 + k_extra):
            try:
                got_t = expr.subs(n, t)
                try:
                    got_t = got_t.doit()
                except Exception:
                    pass
                gotf = _to_float(got_t)
            except Exception as e:
                return False, f"Expression failed to evaluate at n={t}: {e}"
            if gotf < prevf - 1e-12:
                return False, f"Suspicious extrapolation: at n={t} got {got_t} < {seq[-1]} (nondecreasing data)."
            prevf = gotf

    return True, "Matches data and extrapolation OK"


# ------------------- Core Solver (Two-Phase) -------------------

def sequence_to_formula(
    seq: List[Any],
    name: str,
    model: str,
    max_attempts: int = None,   # ignored; we use two-phase counts below
    use_gpt: bool = True
) -> Tuple[Optional[object], int, bool, str, Optional[bool]]:
    """
    Two-phase solver:
      - Phase A: NO_HINT_TRIES attempts without hint.
      - Phase B: HINT_TRIES attempts with hint.
    Returns: (expr, attempts, solved, method, needed_hint)
      method ∈ {"gpt", "unsolved"}
      needed_hint:
         - True  → solved in Phase B (needed hint)
         - False → solved in Phase A (no hint)
         - None  → unsolved
    """
    print(f"\n=== Sequence → Formula ({name}) — model={model} ===")
    print("Input terms (n=1..k):", seq)

    # reset per-run repetition memory
    global _last_guess, _tried_formulas
    _last_guess = None
    _tried_formulas = set()

    if not use_gpt:
        print("  Skipping GPT (use_gpt=False). No fallback; marking unsolved.")
        return None, 0, False, "unsolved", None

    attempts = 0
    feedback = ""  # empty at start; will be populated with checker messages

    # ---------- Phase A: no hints ----------
    for step in range(1, NO_HINT_TRIES + 1):
        attempts = step
        include_hint = False
        try:
            data = call_gpt_for_formula(seq, feedback, model=model, include_hint=include_hint)
        except Exception as e:
            print(f"\nStep {step} (no-hint): API error: {e}")
            feedback = "API error previously. Please return valid SymPy in variable n (1-indexed)."
            continue

        expr_str = (data.get("formula_sympy") or "").strip()
        rationale = data.get("rationale_short", "")

        if not expr_str:
            print(f"\nStep {step} (no-hint): Model returned empty/invalid formula. Rationale: {rationale}")
            feedback = "Your last reply omitted 'formula_sympy'. Return valid SymPy in n (1-indexed)."
            continue

        try:
            expr = sympify(expr_str, locals=_ALLOWED)
        except Exception as e:
            print(f"\nStep {step} (no-hint): Invalid SymPy expr {expr_str!r} ({e})")
            feedback = "Invalid SymPy syntax. Use n (1-indexed). Example: n*(n+1)/2, harmonic(n), or Sum(1/k,(k,1,n))."
            continue

        ok, reason = holds_for_range(expr, seq)
        print(f"\nStep {step} (no-hint):")
        print(f"  Guess:     f(n) = {expr}")
        print(f"  Rationale: {rationale}")
        print(f"  Verdict:   {'✅ correct' if ok else '❌ incorrect'} — {reason}")

        if ok:
            simp = simplify(expr)
            fact = factor(simp)
            print(f"  Simplified: {simp}")
            print(f"  Factored:   {fact}")
            return fact, attempts, True, "gpt", False  # solved without hint
        else:
            feedback = reason  # saved for Phase B

    # ---------- Phase B: with hints ----------
    for step_b in range(1, HINT_TRIES + 1):
        attempts = NO_HINT_TRIES + step_b
        include_hint = True
        try:
            data = call_gpt_for_formula(seq, feedback, model=model, include_hint=include_hint)
        except Exception as e:
            print(f"\nStep {attempts} (hint): API error: {e}")
            feedback = "API error previously. Please return valid SymPy in variable n (1-indexed)."
            continue

        expr_str = (data.get("formula_sympy") or "").strip()
        rationale = data.get("rationale_short", "")

        if not expr_str:
            print(f"\nStep {attempts} (hint): Model returned empty/invalid formula. Rationale: {rationale}")
            feedback = "Your last reply omitted 'formula_sympy'. Return valid SymPy in n (1-indexed)."
            continue

        try:
            expr = sympify(expr_str, locals=_ALLOWED)
        except Exception as e:
            print(f"\nStep {attempts} (hint): Invalid SymPy expr {expr_str!r} ({e})")
            feedback = "Invalid SymPy syntax. Use n (1-indexed). Example: n*(n+1)/2, harmonic(n), or Sum(1/k,(k,1,n))."
            continue

        ok, reason = holds_for_range(expr, seq)
        print(f"\nStep {attempts} (hint):")
        print(f"  Guess:     f(n) = {expr}")
        print(f"  Rationale: {rationale}")
        print(f"  Verdict:   {'✅ correct' if ok else '❌ incorrect'} — {reason}")

        if ok:
            simp = simplify(expr)
            fact = factor(simp)
            print(f"  Simplified: {simp}")
            print(f"  Factored:   {fact}")
            return fact, attempts, True, "gpt", True  # solved with hint
        else:
            feedback = reason

    print("\nModel failed to fit within two-phase limits. No fallback (interpolation disabled).")
    return None, attempts, False, "unsolved", None


# ------------------- Evaluation Harness -------------------

def run_evaluation(models: List[str]) -> None:
    """
    Evaluate the given OpenAI models on a diverse set of sequences.
    The table columns are:
      - Model, Problem, Attempts, Solved, NeededHint, Method, FinalExpr
    Then a per-model summary is printed: totals, solve rates, avg attempts.
    """
    demos: Dict[str, List[Any]] = {
        # Easy polynomials
        "Squares":              [1, 4, 9, 16, 25, 36],          # n**2
        "Triangular Numbers":   [1, 3, 6, 10, 15, 21],          # n*(n+1)/2
        "Sum of Squares":       [1, 5, 14, 30, 55, 91],         # n*(n+1)*(2*n+1)/6

        # Harder polynomial-ish
        "Cubes":                [1, 8, 27, 64, 125, 216],       # n**3
        "Tetrahedral Numbers":  [1, 4, 10, 20, 35, 56],         # n*(n+1)*(n+2)/6

        # Non-polynomial / special functions
        "Factorials":           [1, 2, 6, 24, 120, 720],        # factorial(n)
        "Double Factorial":     [1, 2, 8, 48, 384, 3840],       # (2*n-2)!!
        "Powers of 2":          [1, 2, 4, 8, 16, 32],           # 2**n
        "Alternating ±1":       [1, -1, 1, -1, 1, -1],          # (-1)**(n+1)
        "(n+1)*2^n":            [2, 6, 20, 72, 272, 1056],      # (n+1)*2**n

        # Fibonacci and relatives
        "Fibonacci (first 6)":  [1, 1, 2, 3, 5, 8],             # fibonacci(n)
        "Lucas Numbers":        [2, 1, 3, 4, 7, 11],            # lucas(n)

        # Catalan numbers (combinatorial)
        "Catalan Numbers":      [1, 2, 5, 14, 42, 132],         # catalan(n) or binomial(2n,n)/(n+1)

        # Central binomial coefficients
        "Central Binomial":     [2, 6, 20, 70, 252, 924],       # binomial(2n, n)

        # Harmonic numbers (hard: involves summation)
        "Harmonic Numbers":     [1, Rational(3,2), Rational(11,6), Rational(25,12), Rational(137,60), Rational(49,20)],  # H_n

        # Primes (very hard: not closed-form simple polynomial)
        "Primes":               [2, 3, 5, 7, 11, 13],

        # Floor / ceiling functions
        "Floor(n/2)":           [0, 1, 1, 2, 2, 3],
        "Ceiling(n/2)":         [1, 1, 2, 2, 3, 3],

        # Mixed exponential + polynomial
        "n*2^n":                [2, 8, 24, 64, 160, 384],       # n*2**n

        # Partition function (OEIS A000041 — very hard, no simple closed form)
        "Partition Numbers":    [1, 2, 3, 5, 7, 11],
    }

    results = []  # detailed rows for the first table

    for model in models:
        for name, seq in demos.items():
            expr, attempts, solved, method, needed_hint = sequence_to_formula(
                seq=seq,
                name=name,
                model=model,
                use_gpt=True
            )
            results.append({
                "Model": model,
                "Problem": name,
                "Attempts": attempts,
                "Solved": "Yes" if solved else "No",
                "NeededHint": ("Yes" if needed_hint is True
                               else "No" if needed_hint is False
                               else "Unsolved"),
                "Method": method,  # 'gpt' or 'unsolved'
                "FinalExpr": _short(expr),
            })

    # --------- Detailed evaluation table ---------
    print("\n==================== EVALUATION SUMMARY ====================")
    headers = ["Model", "Problem", "Attempts", "Solved", "NeededHint", "Method", "FinalExpr"]
    col_widths = {h: len(h) for h in headers}
    for row in results:
        for h in headers:
            col_widths[h] = max(col_widths[h], len(str(row[h])))

    def fmt_row(row_dict):
        return " | ".join(str(row_dict[h]).ljust(col_widths[h]) for h in headers)

    print(fmt_row({h: h for h in headers}))
    print("-+-".join("-" * col_widths[h] for h in headers))
    for row in results:
        print(fmt_row(row))

    # --------- Per-model rollup summary ---------
    aggregates: Dict[str, Dict[str, float]] = {}
    for r in results:
        m = r["Model"]
        ag = aggregates.setdefault(m, {
            "total": 0,
            "solved": 0,
            "no_hint": 0,
            "with_hint": 0,
            "unsolved": 0,
            "sum_attempts": 0.0,
        })
        ag["total"] += 1
        ag["sum_attempts"] += (r["Attempts"] or 0)
        if r["Solved"] == "Yes":
            ag["solved"] += 1
            if r["NeededHint"] == "No":
                ag["no_hint"] += 1
            elif r["NeededHint"] == "Yes":
                ag["with_hint"] += 1
        else:
            ag["unsolved"] += 1

    print("\n==================== MODEL SUMMARY ====================")
    sum_headers = ["Model", "Total", "Solved", "Solved_NoHint", "Solved_WithHint", "Unsolved", "SolveRate%", "AvgAttempts"]
    sum_colw = {h: len(h) for h in sum_headers}

    sum_rows = []
    for model in models:
        ag = aggregates.get(model, None)
        if not ag:
            continue
        total = ag["total"]
        solved = ag["solved"]
        no_hint = ag["no_hint"]
        with_hint = ag["with_hint"]
        unsolved = ag["unsolved"]
        solve_rate = (100.0 * solved / total) if total else 0.0
        avg_attempts = (ag["sum_attempts"] / total) if total else 0.0

        row = {
            "Model": model,
            "Total": str(total),
            "Solved": str(solved),
            "Solved_NoHint": str(no_hint),
            "Solved_WithHint": str(with_hint),
            "Unsolved": str(unsolved),
            "SolveRate%": f"{solve_rate:.1f}",
            "AvgAttempts": f"{avg_attempts:.2f}",
        }
        sum_rows.append(row)
        for h in sum_headers:
            sum_colw[h] = max(sum_colw[h], len(str(row[h])))

    def fmt_sum_row(row_dict):
        return " | ".join(str(row_dict[h]).ljust(sum_colw[h]) for h in sum_headers)

    print(fmt_sum_row({h: h for h in sum_headers}))
    print("-+-".join("-" * sum_colw[h] for h in sum_headers))
    for row in sum_rows:
        print(fmt_sum_row(row))


# ------------------- Entry -------------------

def main(argv: List[str]):
    """
    CLI entry point: either run evaluation across multiple models or
    solve a custom sequence with the first model.

    Example: python run.py 1 5 14 30 55 91
    """
    # CLI: custom sequence passed as ints (kept simple)
    if len(argv) > 1 and all(x.lstrip("-").isdigit() for x in argv[1:]):
        try:
            seq = [int(x) for x in argv[1:]]
        except ValueError:
            print("Usage: python run.py [sequence...]\nExample: python run.py 1 5 14 30 55 91")
            sys.exit(2)
        models = ["gpt-4o-mini"]  # first model for CLI single run
        sequence_to_formula(seq, name="CLI Sequence", model=models[0])
        return

    # Evaluation across multiple models
    models = [
        "gpt-3.5-turbo",
        "gpt-4-turbo",
        "gpt-4o-mini",
        "gpt-4o",
        "gpt-5",
    ]
    run_evaluation(models)


if __name__ == "__main__":
    main(sys.argv)