**Библиотеки**

In [None]:
import logging
import sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Callable, Tuple, List, Optional

import numpy as np
import sympy as sp
import jax.numpy as jnp
from jax import jacfwd
from sympy import Eq, Rel

In [None]:
@dataclass
class OptimizationConfig:
    """Configuration for the optimization algorithm."""

    # Inner Newton iterations (for fixed μ)
    max_newton_iterations: int = 1000

    # Barrier / outer loop parameters
    mu_factor: float = 0.1          # μ_{k+1} = mu_factor * μ_k
    mu_min: float = 1e-6            # stop when μ <= mu_min
    residual_tolerance_factor: float = 0.1  # require ||F|| < t * μ inside inner loop

    # Safeguards for positivity of slack variables s
    s_margin: float = 1e-6          # require s > s_margin (u in teacher's message)
    max_step_reductions: int = 30   # max attempts to shrink Newton step
    step_reduction_factor: float = 0.5  # shrink coefficient (0.5, 0.25, ...)
    min_step_size: float = 1e-10    # minimal admissible step

    # Input / logging
    input_file_path: Optional[str] = None
    log_to_file: bool = True


In [None]:
@dataclass
class OptimizationProblem:
    """Represents an optimization problem with constraints."""
    target_function: Callable
    target_expression: sp.Expr
    x_symbols: List[sp.Symbol]
    equalities: List[sp.Expr]
    inequalities: List[sp.Expr]
    lambda_symbols: List[sp.Symbol]
    nu_symbols: List[sp.Symbol]
    s_symbols: List[sp.Symbol]

In [None]:
@dataclass
class InitialPoint:
    """Initial values for optimization variables."""
    x: np.ndarray
    s: np.ndarray
    lambdas: np.ndarray
    mu: float
    epsilon: float

In [None]:
def setup_logger(log_to_file: bool) -> logging.Logger:
    """
    Configure logger with console and optional file output.
    
    Args:
        log_to_file: Whether to save logs to a file
        
    Returns:
        Configured logger instance
    """
    logger = logging.getLogger("interior_point_optimizer")
    logger.setLevel(logging.INFO)
    logger.handlers.clear()
    logger.propagate = False

    # Console handler
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setFormatter(logging.Formatter("%(message)s"))
    logger.addHandler(console_handler)

    # File handler
    if log_to_file:
        logs_dir = Path("logs")
        logs_dir.mkdir(exist_ok=True)
        
        timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        file_path = logs_dir / f"optimization_log_{timestamp}.txt"
        
        file_handler = logging.FileHandler(file_path, encoding="utf-8")
        file_handler.setFormatter(logging.Formatter("%(message)s"))
        logger.addHandler(file_handler)

    return logger

In [None]:
def parse_constraint(constraint_string: str) -> Tuple[Optional[str], Optional[sp.Expr]]:
    """
    Parse a constraint string into its type and expression.
    
    Args:
        constraint_string: String representation of the constraint
        
    Returns:
        Tuple of (constraint_type, expression) where type is 'equality' or 'inequality'
    """
    constraint_string = constraint_string.strip()

    try:
        # Handle equality constraints
        if "=" in constraint_string and all(op not in constraint_string for op in [">=", "<=", "!="]):
            constraint_string = constraint_string.replace("==", "=")
            left, right = constraint_string.split("=", 1)
            lhs = sp.sympify(left)
            rhs = sp.sympify(right)
            return "equality", lhs - rhs
        
        # Handle inequality constraints
        expr = sp.sympify(constraint_string)
        
        # Unwrap negations of equalities
        while isinstance(expr, sp.Not) and isinstance(expr.args[0], sp.Eq):
            expr = expr.args[0]

        if isinstance(expr, Eq):
            return "equality", expr.lhs - expr.rhs
        elif isinstance(expr, Rel):
            return "inequality", expr.lhs - expr.rhs
        
        return None, None
        
    except Exception as e:
        logging.error(f"Error parsing constraint '{constraint_string}': {e}")
        return None, None

In [None]:
def read_problem_from_file(file_path: Path) -> Tuple[OptimizationProblem, InitialPoint]:
    """
    Read optimization problem and initial values from file.

    Expected format (example):
        target_function: x1**2 + x2**2
        x: 1 1
        s: 1 1
        lambdas: 0
        mu: 1
        epsilon: 1e-8
        constraints:
            x1 + x2 = 1
            x1 - 2 <= 0

    Notes:
      * Initial slack variables must satisfy s_j > 0 (we take log(s_j)).
      * It is also desirable (for good start) that g_j(x) + s_j ≈ 0 for each inequality.
    """
    if not file_path.exists():
        raise FileNotFoundError(f"File '{file_path}' does not exist!")

    x, s, lambdas, mu, epsilon = None, None, None, None, None
    equalities, inequalities = [], []
    target_expr = None
    in_constraints_section = False

    with open(file_path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()

            # Skip empty lines and comments
            if not line or line.startswith("#"):
                continue

            # Check for constraints section
            if line.lower().startswith("constraints"):
                in_constraints_section = True
                continue

            # Parse constraints
            if in_constraints_section:
                constraint_type, expr = parse_constraint(line)
                if constraint_type == "equality":
                    equalities.append(expr)
                elif constraint_type == "inequality":
                    inequalities.append(expr)
                continue

            # Parse key-value pairs
            if ":" not in line:
                continue

            key, value = line.split(":", 1)
            key = key.strip()
            value = value.strip()

            if key == "target_function":
                target_expr = sp.sympify(value)
            elif key == "x":
                x = np.array([float(v) for v in value.split()], dtype=float)
            elif key == "s":
                s = np.array([float(v) for v in value.split()], dtype=float)
            elif key == "lambdas":
                lambdas = np.array([float(v) for v in value.split()], dtype=float)
            elif key == "mu":
                mu = float(value)
            elif key in ("epsilon", "eps"):
                epsilon = float(value)

    if target_expr is None:
        raise ValueError("target_function is missing in the input file")
    if x is None or s is None or lambdas is None or mu is None or epsilon is None:
        raise ValueError("Input file must contain x, s, lambdas, mu, and epsilon")

    # Collect ALL x-symbols used in target and constraints (not only in the target)
    all_syms = set(target_expr.free_symbols)
    for expr in equalities + inequalities:
        all_syms |= set(expr.free_symbols)
    x_symbols = sorted(all_syms, key=lambda sym: sym.name)

    # Create symbolic variables for multipliers/slacks
    lambda_symbols = sp.symbols(f'λ1:{len(equalities) + 1}')
    nu_symbols = sp.symbols(f'ν1:{len(inequalities) + 1}')
    s_symbols = sp.symbols(f's1:{len(inequalities) + 1}')

    # Basic dimension checks
    if len(s) != len(inequalities):
        raise ValueError(f"Length of s ({len(s)}) must equal number of inequalities ({len(inequalities)})")
    if len(lambdas) != len(equalities):
        raise ValueError(f"Length of lambdas ({len(lambdas)}) must equal number of equalities ({len(equalities)})")

    # Feasibility checks for initial point
    if np.any(s <= 0.0):
        bad = np.where(s <= 0.0)[0] + 1
        raise ValueError(f"Initial slack variables must satisfy s_j > 0. Non-positive indices: {bad.tolist()}")
    # Consistency g(x) + s ≈ 0 is strongly recommended
    if len(inequalities) > 0:
        g_funcs = [sp.lambdify(x_symbols, g, modules="numpy") for g in inequalities]
        gx = np.array([float(gf(*x)) for gf in g_funcs], dtype=float)
        violation = gx + s
        # not fatal, but warn loudly (it impacts convergence)
        if np.linalg.norm(violation, ord=np.inf) > 1e-4:
            logging.warning(
                "Initial point is poorly consistent with g(x)+s=0. "
                f"Max |g(x)+s| = {float(np.max(np.abs(violation))):.3e}. "
                "Consider adjusting initial s or x."
            )

    # Create problem and initial point
    problem = OptimizationProblem(
        target_function=sp.lambdify(x_symbols, target_expr),
        target_expression=target_expr,
        x_symbols=x_symbols,
        equalities=equalities,
        inequalities=inequalities,
        lambda_symbols=list(lambda_symbols),
        nu_symbols=list(nu_symbols),
        s_symbols=list(s_symbols)
    )

    initial = InitialPoint(x=x, s=s, lambdas=lambdas, mu=mu, epsilon=epsilon)
    return problem, initial


In [None]:
def create_lagrangian(
    problem: OptimizationProblem,
    mu: float
) -> sp.Expr:
    """
    Construct the Lagrangian with barrier terms.
    
    Args:
        problem: Optimization problem specification
        mu: Barrier parameter
        
    Returns:
        Symbolic Lagrangian expression
    """
    L = problem.target_expression
    
    # Add barrier terms for slack variables
    L -= mu * sum(sp.log(s) for s in problem.s_symbols)
    
    # Add equality constraints
    for lam, h in zip(problem.lambda_symbols, problem.equalities):
        L += lam * h
    
    # Add inequality constraints with slack variables
    for nu, g, s in zip(problem.nu_symbols, problem.inequalities, problem.s_symbols):
        L += nu * (g + s)
    
    # Substitute ν = μ/s (complementarity condition)
    nu_substitution = {nu: mu / s for nu, s in zip(problem.nu_symbols, problem.s_symbols)}
    L = L.subs(nu_substitution)
    
    return L

In [None]:
def create_kkt_system(
    lagrangian: sp.Expr,
    x_symbols: List[sp.Symbol],
    s_symbols: List[sp.Symbol],
    lambda_symbols: List[sp.Symbol]
) -> Callable:
    """
    Create the KKT system of equations from the Lagrangian.
    
    Args:
        lagrangian: Lagrangian expression
        x_symbols: Decision variables
        s_symbols: Slack variables
        lambda_symbols: Lagrange multipliers
        
    Returns:
        Function that computes KKT system residuals
    """
    # Compute gradients
    grad_x = [sp.diff(lagrangian, x) for x in x_symbols]
    grad_lambda = [sp.diff(lagrangian, lam) for lam in lambda_symbols]
    grad_s = [sp.diff(lagrangian, s) for s in s_symbols]
    
    # Combine all equations
    system_equations = grad_x + grad_lambda + grad_s
    all_symbols = list(x_symbols) + list(lambda_symbols) + list(s_symbols)
    
    # Create numerical function using JAX
    system_func = sp.lambdify(all_symbols, system_equations, modules="jax")
    
    def kkt_residual(y: jnp.ndarray) -> jnp.ndarray:
        return jnp.array(system_func(*y))
    
    return kkt_residual

In [None]:
def solve_interior_point(
    problem: OptimizationProblem,
    initial: InitialPoint,
    config: OptimizationConfig,
    logger: logging.Logger
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, float]:
    """
    Solve optimization problem using a (primal-dual) interior point / barrier method.

    Changes compared to previous version:
      * Added OUTER loop that gradually decreases μ (barrier parameter).
      * Inner Newton loop stops when ||F|| < t * μ (t = residual_tolerance_factor),
        then μ is reduced and we warm-start from the previous solution.
      * Added strict feasibility checks for slack variables s:
          - required s > s_margin (u)
          - step damping/backtracking with limits (max attempts, min step size)
      * Returns vectors for λ (not a single scalar).

    Returns:
        (x_opt, lambda_opt, s_opt, mu_final)
    """
    x_dim = len(initial.x)
    lambda_dim = len(initial.lambdas)
    s_dim = len(initial.s)

    # Start from provided initial point
    x = np.array(initial.x, dtype=float)
    lambdas = np.array(initial.lambdas, dtype=float)
    s = np.array(initial.s, dtype=float)

    # Safety: initial slack must be strictly inside
    if np.any(s <= config.s_margin):
        bad = np.where(s <= config.s_margin)[0] + 1
        raise ValueError(
            f"Initial slack variables must satisfy s_j > u, u={config.s_margin}. Bad indices: {bad.tolist()}"
        )

    mu = float(initial.mu)

    outer_iter = 0
    while mu > config.mu_min:
        outer_iter += 1
        logger.info("=" * 80)
        logger.info(f"Outer iteration {outer_iter}: solving barrier problem with μ = {mu:.3e}")
        logger.info("=" * 80)

        # Build KKT system for current μ
        lagrangian = create_lagrangian(problem, mu)
        kkt_system = create_kkt_system(
            lagrangian,
            problem.x_symbols,
            problem.s_symbols,
            problem.lambda_symbols
        )

        # Pack variables into one vector y = [x, λ, s]
        y = jnp.array(np.concatenate([x, lambdas, s]))

        # Inner Newton loop
        for iteration in range(config.max_newton_iterations):
            F = kkt_system(y)
            residual = float(jnp.linalg.norm(F))

            logger.info(f"Residual: {residual:.6e}")

            # Convergence for current μ
            if residual < config.residual_tolerance_factor * mu:
                logger.info(
                    f"Inner loop converged: ||F|| = {residual:.3e} < "
                    f"{config.residual_tolerance_factor} * μ = {(config.residual_tolerance_factor * mu):.3e}\n"
                )
                break

            # Newton direction
            J = jacfwd(kkt_system)(y)
            delta = jnp.linalg.solve(J, -F)

            # Safeguarded step to keep s > u
            step = 1.0
            for attempt in range(config.max_step_reductions):
                y_candidate = y + step * delta
                s_new = np.array(y_candidate[-s_dim:], dtype=float)

                if np.all(s_new > config.s_margin):
                    break

                step *= config.step_reduction_factor
                logger.info(
                    f"Slack would violate s>u (u={config.s_margin}). "
                    f"Reducing step: attempt {attempt + 1}, step={step:.3e}"
                )

                if step < config.min_step_size:
                    logger.warning(
                        f"Step became too small (step={step:.3e} < {config.min_step_size}). "
                        "Stopping inner iterations."
                    )
                    break
            else:
                logger.warning(
                    "Could not find a step maintaining s>u within the max number of reductions. "
                    "Stopping inner iterations."
                )
                break

            if step < config.min_step_size:
                break

            y = y_candidate

        else:
            logger.info("Maximum Newton iterations reached for current μ.")

        # Unpack warm-start point for the next μ
        y_np = np.array(y, dtype=float)
        x = y_np[:x_dim]
        lambdas = y_np[x_dim:x_dim + lambda_dim]
        s = y_np[x_dim + lambda_dim:]

        # Enforce feasibility (hard) before moving on
        if np.any(s <= config.s_margin):
            bad = np.where(s <= config.s_margin)[0] + 1
            raise RuntimeError(
                f"After inner solve, slack left feasible region s>u. "
                f"Bad indices: {bad.tolist()}, min(s)={float(np.min(s)):.3e}"
            )

        # Reduce μ
        mu *= config.mu_factor

    logger.info("Barrier loop finished: μ <= μ_min")
    return x, lambdas, s, mu


In [None]:
def log_initial_parameters(
    initial: InitialPoint,
    problem: OptimizationProblem,
    logger: logging.Logger
) -> None:
    """Log initial parameter values."""
    logger.info('Initial Parameters:\n')

    logger.info('Initial approximation x:')
    for i, val in enumerate(initial.x, 1):
        logger.info(f'x{i} = {val}')
    logger.info('')

    logger.info('Initial slack variables s:')
    for i, val in enumerate(initial.s, 1):
        logger.info(f's{i} = {val}')

    logger.info('Initial Lagrange multipliers λ:')
    for i, val in enumerate(initial.lambdas, 1):
        logger.info(f'λ{i} = {val}')
    logger.info('')

    logger.info(f'μ = {initial.mu}')
    logger.info(f'ε = {initial.epsilon}\n')


def log_solution(
    x_opt: np.ndarray,
    lambda_opt: np.ndarray,
    s_opt: np.ndarray,
    mu_final: float,
    target_function: Callable,
    logger: logging.Logger
) -> None:
    """Log the optimal solution."""
    logger.info('Optimal Solution:')

    for i, val in enumerate(x_opt, 1):
        logger.info(f'x{i}* = {val}')

    if len(lambda_opt) == 0:
        logger.info('No equality multipliers (no equality constraints).')
    else:
        for i, val in enumerate(lambda_opt, 1):
            logger.info(f'λ{i}* = {val}')

    for i, val in enumerate(s_opt, 1):
        logger.info(f's{i}* = {val}')

    logger.info(f'Final μ (after loop) = {mu_final:.3e}')

    obj_value = target_function(*x_opt)
    logger.info(f'Objective function value F(x*) = {obj_value}')


In [None]:
def main():
    """Main execution function."""
    # Configuration
    config = OptimizationConfig(
        max_newton_iterations=1000,
        mu_factor=0.1,
        mu_min=1e-6,
        residual_tolerance_factor=0.1,
        s_margin=1e-6,
        max_step_reductions=30,
        step_reduction_factor=0.5,
        min_step_size=1e-10,
        input_file_path="input_example.txt",  # Set to filename for file input
        log_to_file=True
    )

    logger = setup_logger(config.log_to_file)

    # Load problem
    if config.input_file_path:
        problem, initial = read_problem_from_file(Path(config.input_file_path))
    else:
        raise NotImplementedError("Interactive input not implemented in refactored version")

    # Log initial state
    log_initial_parameters(initial, problem, logger)

    # Solve (barrier outer loop + Newton inner loop)
    x_opt, lambda_opt, s_opt, mu_final = solve_interior_point(
        problem=problem,
        initial=initial,
        config=config,
        logger=logger
    )

    # Log results
    log_solution(x_opt, lambda_opt, s_opt, mu_final, problem.target_function, logger)


if __name__ == "__main__":
    main()
