In [1]:
import numpy as np
from ppopt.mplp_program import MPLP_Program
from ppopt.mpmodel import MPModeler
from ppopt.mp_solvers.solve_mpqp import solve_mpqp, mpqp_algorithm
from numpy.polynomial.legendre import leggauss
from scipy.optimize import linprog
from typing import Union
from collections import defaultdict
from itertools import product
import time
import sympy as sp
from typing import Union, List
import pickle
# from sympy import symbols, expand, lambdify, Expr, simplify, pprint, Rel
import math
from pyomo.environ import *
import pyomo.environ as pyo
from typing import List
import itertools
import numpy as np
from IPython.display import display

In [2]:
def gauss_legendre_between_bounds(expr_coeffs: np.ndarray, n_gl: int, max_idx: int = 0, min_idx: int = 1):
    """
    Generate n Gauss–Legendre quadrature points and weights between min and max bounds
    defined by two linear expressions.

    Parameters:
        expr_coeffs (np.ndarray): 2xD array. Row 0 = max point co`efficients, Row 1 = min.
        n (int): Number of quadrature points.

    Returns:
        points (np.ndarray): (n, D) array of quadrature points.
        weights (np.ndarray): (n,) array of weights.
    """
    if expr_coeffs.shape[0] != 2:
        raise ValueError("expr_coeffs must have two rows")

    max_coeffs = expr_coeffs[max_idx]
    min_coeffs = expr_coeffs[min_idx]

    # Get Gauss–Legendre points and weights on [-1, 1]
    nodes, weights = leggauss(n_gl)
    weights = weights.reshape(-1,1)
    
    # Affine transformation to domain [min_coeffs, max_coeffs]
    points = 0.5 * (np.outer((nodes + 1), max_coeffs) + np.outer((1 - nodes), min_coeffs))

    # Adjust weights to match new domain
    weights = 0.5 * weights@(max_coeffs - min_coeffs).reshape(1,-1)

    return points, weights

In [3]:
def get_quadrature_points(solution, nq: int, t_vector: np.ndarray):
    # Augment t_vector once
    t_vector_aug = np.append(t_vector, 1).reshape(-1, 1)

    if isinstance(solution, list):
        qpoints, qweights = np.polynomial.legendre.leggauss(nq)
        min, max = solution[0], solution[1]
        qps_mapped = 0.5*(max*(1+qpoints) + min*(1-qpoints))
        qws_mapped = 0.5*(max-min)*qweights
        # print(max, min, qps_mapped, qws_mapped)
        return max, min, qps_mapped, qws_mapped

    for region in solution.critical_regions:
        if region.is_inside(t_vector):
            coeffs = np.concatenate([region.A, region.b], axis=1)[:2, :]
            qpoints, qweights = gauss_legendre_between_bounds(expr_coeffs=coeffs, n_gl=nq)
            return coeffs[0] @ t_vector_aug, coeffs[1] @ t_vector_aug, qpoints @ t_vector_aug, qweights @ t_vector_aug

    # print(f't_vector: {t_vector}')
    # print(f'solution:{solution}')
    raise ValueError("No region found that contains the given t_vector.")


In [4]:
def mpformulate_theta_bounds(flex_sol, num_theta:int, theta_bounds:list, num_design:int=0, design_bounds:list=None, psi_idx:int=0, theta_m:int=0):
    A0, b0, F0 = np.empty((len(flex_sol), num_theta)), np.empty((len(flex_sol), 1)), np.empty((len(flex_sol), num_design))
    num_cr = len(flex_sol.critical_regions)
    for i, region in enumerate(flex_sol.critical_regions):
        A0[i] = region.A[psi_idx,:num_theta]
        b0[i] = -region.b[psi_idx]
        F0[i] = -region.A[psi_idx, num_theta:num_theta+num_design]
    # print(f'num_cr:{num_cr}')
    # print(f'num_theta:{num_theta}')
    # print(f'num_design:{num_design}')
    # print(f"A0: {A0}")
    # print(f"b0: {b0}")
    # print(f"F0: {F0}")
    
    c = np.hstack([np.array([-1, 1]).reshape(1, -1), np.zeros((1, 2 * (num_theta - 1 - theta_m)))]).reshape(-1,1)
    # print(f'c:{c}')
    # print(f'c.shape: {c.shape}')
    
    row1_block = np.hstack([block for i in range(theta_m, num_theta) for block in (A0[:, [i]], np.zeros((num_cr, 1)))])
    row2_block = np.hstack([block for i in range(theta_m, num_theta) for block in (np.zeros((num_cr, 1)), A0[:, [i]])])
    bound_row = np.hstack([np.array([-1, 1]).reshape(1, -1), np.zeros((1, 2 * (num_theta - 1 - theta_m)))])
    A = np.vstack([row1_block, row2_block, bound_row, -np.eye(2*(num_theta-theta_m)), np.eye(2*(num_theta-theta_m))])
    # print(f'A: {A}')
    # print(f'A.shape: {A.shape}')
    
    x_lb = np.array([val for i in range(theta_m, len(theta_bounds)) for val in [theta_bounds[i][0]] * 2])
    x_ub = np.array([val for i in range(theta_m, len(theta_bounds)) for val in [theta_bounds[i][1]] * 2])
    b = np.vstack([b0, b0, np.zeros((1,1)), -x_lb.reshape(-1,1), x_ub.reshape(-1,1)])
    # print(f'b: {b}')
    # print(f'b.shape: {b.shape}')
    
    if F0.size==0 and theta_m==0:
        # print('here')
        return A, b, c, np.array([]), np.array([]), np.array([]), np.array([]) 
    
    F = np.vstack([F0, F0, np.zeros((1,num_design)), np.zeros((4*(num_theta-theta_m), num_design))]) if num_design>0 else np.vstack([F0, F0])
    # print(f'F:{F}')
    # print(f'F.shape: {F.shape}')
    if theta_m > 0:
        F_lltheta = np.hstack([A0[:, [i]] for i in range(theta_m)])
        # print(f'F_lltheta: {F_lltheta}')
        # print(f'F_lltheta.shape: {F_lltheta.shape}')
        F = np.hstack([np.vstack([-F_lltheta, -F_lltheta, np.zeros((1,len(range(theta_m)))), np.zeros((4*(num_theta-theta_m), theta_m))]), F]) if F.size > 0 else np.vstack([-F_lltheta, -F_lltheta, np.zeros((1,len(range(theta_m)))), np.zeros((4*(num_theta-theta_m), theta_m))])
    # print(f'F:{F}')
    # print(f'F.shape: {F.shape}')
    
    H = np.zeros((2*(num_theta-theta_m), theta_m+num_design))
    # print(f'H:{H}')
    # print(f'H.shape: {H.shape}')
    
    A_t = np.vstack([-np.eye(theta_m+num_design), np.eye(theta_m+num_design)])
    # print(f'A_t:{A_t}')
    # print(f'A_t.shape: {A_t.shape}')
    
    theta_lb = np.array([-theta_bounds[i][0] for i in range(theta_m)] + ([-j[0] for j in design_bounds] if isinstance(design_bounds, list) 
                                                                        else [])).reshape(-1, 1)
    theta_ub = np.array([theta_bounds[i][1] for i in range(theta_m)] + ([j[1] for j in design_bounds] if isinstance(design_bounds, list) 
                                                                        else [])).reshape(-1, 1)
    
    b_t = np.vstack([theta_lb, theta_ub])
    # print(f'b_t:{b_t}')
    # print(f'b_t.shape: {b_t.shape}')
    
    return A, b, c, H, A_t, b_t, F

In [5]:
def get_theta_bounds(flex_sol, numt, tbounds, numd:int=0, dbounds:list=None):
    
    theta_bound_dict = defaultdict(dict)
    prob_dict = defaultdict(dict)
    for i in range(numt):
        A, b, c, H, A_t, b_t, F = mpformulate_theta_bounds(flex_sol=flex_sol, num_theta=numt ,num_design=numd, theta_bounds=tbounds, design_bounds=dbounds, theta_m=i)
        if F.size != 0:
            prob = MPLP_Program(A=A, b=b, c=c, H=H, A_t=A_t, b_t=b_t, F=F)
            prob.process_constraints()
            solution = solve_mpqp(problem=prob, algorithm=mpqp_algorithm.geometric)
            prob_dict[f't{i}'] = prob
            theta_bound_dict[f't{i}'] = solution
        else:
            linsol = linprog(c=c, A_ub=A, b_ub=b)
            prob_dict[f't{i}'] = linsol
            theta_bound_dict[f't{i}'] = [linsol.x[1], linsol.x[0]]
            # if linsol.success:
                # print("Optimal value:", linsol.fun)
                # print("Optimal x:", linsol.x)
        print(f'Finished solving for theta{i+1}')
    probs = [p for key, p in prob_dict.items()]
    sols = [sol for key, sol in theta_bound_dict.items()]
    
    return probs, sols
    

In [6]:
def calculate_stocflexibility(sols, nq: Union[int, list], joint_func, d_vector: np.ndarray = None):
    
    # Validate nq if it's a list
    if isinstance(nq, list):
        if len(nq) != len(sols):
            raise ValueError("If nq is a list, it must have the same length as sols")

    def recurse(level: int, theta_prev: list, weight_prev: float) -> float:
        """
        Recursive inner function to compute nested quadrature.
        """
        # print(f'level:{level}')
        if level == len(sols):
            return weight_prev * joint_func(theta_prev)

        # Use nq[level] if nq is a list, otherwise use scalar nq
        nql = nq[level] if isinstance(nq, list) else nq

        t_vector = np.block([np.array(theta_prev), d_vector]) if isinstance(d_vector, np.ndarray) else np.array(theta_prev)
        # print(f't_vector:{t_vector}')
        # print(f'probs[{level}].A:{probs[level].A}')
        _, _, t_points, t_weights = get_quadrature_points(solution=sols[level], nq=nql, t_vector=t_vector)

        t_points = t_points.flatten()
        t_weights = t_weights.flatten()
        # print('t_points:', t_points)
        # print(f'theta_prev: {theta_prev}')
        return sum(recurse(level + 1, theta_prev + [v], weight_prev * w) for v, w in zip(t_points, t_weights))
    s = time.time()
    stflex =  recurse(level=0, theta_prev=[], weight_prev=1.0)
    e = time.time()
    print(f'Elapsed time for calculating sf index: {e- s}')
    
    return stflex
    

In [7]:
# Bansal (2000) Illustrative Example
t_bounds=[(0,4),(0,4)]
d_bounds=[(0,5), (0,5)]
nt = len(t_bounds)
nd = len(d_bounds)

m = MPModeler()

u = m.add_var(name='u')
x = m.add_var(name='x')
z = m.add_var(name='z')

t1 = m.add_param(name='t1')
t2 = m.add_param(name='t2')
d1 = m.add_param(name='d1')
d2 = m.add_param(name='d2')
m.add_constr(2*x - 3*z + t1 - d2 == 0)
m.add_constr(x - z/2 -t1/2 +t2/2 +d1 -7*d2/2 <= u)
m.add_constr(-2*x +2*z -4*t1/3 -t2 +2*d2 +1/3<= u)
m.add_constr(-x + 5*z/2 +t1/2 -t2 -d1 +d2/2 -1 <= u)
m.add_constr(-50 <= x)
m.add_constr(-50 <= z)
m.add_constr(t_bounds[0][0] <= t1)
m.add_constr(t_bounds[1][0] <= t2)
m.add_constr(d_bounds[0][0] <= d1)
m.add_constr(d_bounds[1][0] <= d2)
m.add_constr(t1 <= t_bounds[0][1])
m.add_constr(t2 <= t_bounds[1][1])
m.add_constr(d1 <= d_bounds[0][1])
m.add_constr(d2 <= d_bounds[1][1])
m.set_objective(u)
prob = m.formulate_problem()
prob.process_constraints()
solution_flexibility = solve_mpqp(problem=prob, algorithm=mpqp_algorithm.geometric)

start_time = time.time()
prob_list, sol_list = get_theta_bounds(flex_sol=solution_flexibility, numt=nt, numd=nd, tbounds=t_bounds, dbounds=d_bounds)
end_time = time.time()
print(f'Elapsed time for solving mp problems: {end_time-start_time}')

def joint_pdf(theta:list):
    return (2/np.pi)*np.exp(-2*((theta[0]-2)**2 + (theta[1]-2)**2))

Set parameter Username
Academic license - for non-commercial use only - expires 2025-12-20
Using a found active set [0, 1, 2]
Using a found active set [6, 9, 11, 12]
Finished solving for theta1
Using a found active set [6, 7]
Finished solving for theta2
Elapsed time for solving mp problems: 0.08403539657592773


In [8]:
# d_vector = np.array([5, 1.159054910657785])
# nq = 5
# sf_idx = calculate_stocflexibility(sols=sol_list, nq=nq, joint_func=joint_pdf, d_vector=d_vector)
# print(f'Stochastic Flexibility Index: {sf_idx:.4}')

Elapsed time for calculating sf index: 0.0020325183868408203
Stochastic Flexibility Index: 0.8


## Finished Calculation of stochastic flexibility index

In [10]:
def get_bounds_regions(sols:List, min_idx:int=1, max_idx:int=0):
    theta_bounds_list = list()
    theta_regions_list = list()
    
    for theta_sol in sols:
        min_max_list = list()
        region_list = list()
        for cr in theta_sol.critical_regions:
            Ab = np.concatenate([cr.A, cr.b], axis=1)[:2]
            min_max_list.append([Ab[min_idx].tolist(), Ab[max_idx].tolist()])
    
            Ef = np.concatenate([cr.E, -cr.f], axis=1)
            region_list.append([row.tolist() for row in Ef])
    
        theta_bounds_list.append(np.array(min_max_list))
        theta_regions_list.append(region_list)  
    
    return theta_bounds_list, theta_regions_list

In [11]:
t_bounds_list, t_regions_list = get_bounds_regions(sols=sol_list)