In [800]:
import numpy as np
from docplex.mp.model import Model

# from docplex.mp.callbacks.cb_mixin import ConstraintCallbackMixin
# from docplex.mp.callbacks.cb_mixin import LazyConstraintCallback
import matplotlib.pyplot as plt

In [801]:
import cplex
from cplex.callbacks import UserCutCallback, LazyConstraintCallback

In [None]:
# 需要点と施設候補の座標データ
demand_points = [(2, 22), (42, 6), (48, 50)]
candidate_sites = [(10, 10), (20, 20), (30, 30)]

In [803]:
J = len(candidate_sites)  # 施設候補の数
D = len(demand_points)  # 需要点の数
alpha = 0
beta = 0.1

p = 1
r = 1

In [804]:
h_i = np.full(D, 1 / D)

In [805]:
# 既存のリーダーの施設セット J_L を仮定
J_L = {}  # インデックスとして候補施設の一部を選択
J_F = {}

In [806]:
# ユークリッド距離の計算
def compute_distances(demand_points, candidate_sites):
    distances = np.zeros((D, J))
    for d in range(D):
        for j in range(J):
            distances[d][j] = np.sqrt(
                (demand_points[d][0] - candidate_sites[j][0]) ** 2
                + (demand_points[d][1] - candidate_sites[j][1]) ** 2
            )
    return distances


distances = compute_distances(demand_points, candidate_sites)

In [807]:
# w_ij の計算
wij_matrix = np.exp(alpha - beta * distances)

In [808]:
def compute_Ui_L(wij_matrix, J_L):
    """
    リーダーの既存施設による U_i^L を計算する関数

    Parameters:
        wij_matrix (np.array): D × J の w_ij の重み行列
        J_L (set): リーダーが既に持っている施設のインデックス集合

    Returns:
        np.array: 各需要点 i に対する U_i^L のベクトル
    """
    return wij_matrix[:, list(J_L)].sum(axis=1)


def compute_Ui_F(wij_matrix, J_F):
    """
    リーダーの既存施設による U_i^L を計算する関数

    Parameters:
        wij_matrix (np.array): D × J の w_ij の重み行列
        J_L (set): リーダーが既に持っている施設のインデックス集合

    Returns:
        np.array: 各需要点 i に対する U_i^L のベクトル
    """
    return wij_matrix[:, list(J_F)].sum(axis=1)


# U_i^L の計算
Ui_L = compute_Ui_L(wij_matrix, J_L)

# U_i^F の計算
Ui_F = compute_Ui_F(wij_matrix, J_F)

print(f"Ui_L: {Ui_L}")
print(f"Ui_F: {Ui_F}")

Ui_L: [0. 0. 0.]
Ui_F: [0. 0. 0.]


In [809]:
def compute_L(h_i, Ui_L, Ui_F, wij, x, y):
    """
    関数 L(x, y) を計算する

    Parameters:
        h (np.array): 需要点ごとの人口密度ベクトル (D,)
        Ui_L (np.array): 各需要点におけるリーダーの影響度 (D,)
        Ui_F (np.array): 各需要点におけるフォロワーの影響度 (D,)
        wij (np.array): 需要点と施設候補の重み行列 (D, J)
        x (np.array): リーダーが選択した施設配置 (J,)
        y (np.array): フォロワーが選択した施設配置 (J,)

    Returns:
        float: L(x, y) の計算結果
    """
    numerator = Ui_L + (wij @ x)  # 分子: リーダーの影響度 + 選択した施設の影響
    denominator = Ui_L + Ui_F + (wij @ np.maximum(x, y))  # 分母: 総合影響度

    return np.sum(h_i * (numerator / denominator))


x = np.random.randint(0, 2, J)  # ランダムなリーダーの施設選択 (0 or 1)
y = np.random.randint(0, 2, J)  # ランダムなフォロワーの施設選択 (0 or 1)

print(f"x: {x}")
print(f"y: {y}")

# L(x, y) の計算
L_value = compute_L(h_i, Ui_L, Ui_F, wij_matrix, x, y)

# 計算結果の表示
print(f"L(x, y): {L_value}")

x: [1 1 0]
y: [1 1 1]
L(x, y): 0.5789196158068347


### 大枠から作っていきますか

In [None]:
import cplex


def solve_s_cflp():
    """S-CFLPのBranch-and-Cutフレームワークを実装"""
    global F  # `F` をグローバル変数として扱う

    # **初期化**
    F = initialize_problem()

    # Branch-and-Cut のループ
    while True:
        # **連続緩和問題を解く**
        incumbent_solution = solve_relaxation(F)  # `F` を渡す

        # **分離問題を解く**
        y_star = separation_problem(incumbent_solution)

        # **カットを追加するかチェック**
        if should_add_cut(incumbent_solution, y_star):
            add_cuts(incumbent_solution, y_star)
        else:
            # **最適整数解なら終了**
            if is_integer_solution(incumbent_solution):
                update_best_solution(incumbent_solution)
                break
            else:
                # **分枝処理を行う**
                branch_and_bound(incumbent_solution)

In [811]:
def initialize_problem():
    """
    Branch-and-Cutフレームワークの初期化を行う。
    連続緩和問題をセットアップし、Fに追加する。
    """
    problem = cplex.Cplex()

    # **最大化問題に設定**
    problem.objective.set_sense(problem.objective.sense.maximize)  # ここを明示的に指定

    # 変数 x_j の追加（リーダーの施設配置、0 ≤ x_j ≤ 1）
    x_names = [f"x{j}" for j in range(J)]
    x_types = [problem.variables.type.continuous] * J  # 連続変数
    x_lb = [0] * J  # 下限
    x_ub = [1] * J  # 上限
    x_obj = [0] * J  # 初期の目的関数係数（後で更新）

    # 目的変数 θ の追加
    theta_name = "theta"
    problem.variables.add(
        names=[theta_name],
        types=[problem.variables.type.continuous],  # 連続変数
        lb=[0],  # 下限 0
        ub=[1],  # 上限 1
        obj=[1],  # **目的関数: max θ**
    )

    # 連続変数 x の追加
    problem.variables.add(names=x_names, types=x_types, lb=x_lb, ub=x_ub, obj=x_obj)

    # 施設数制約: Σ x_j = p
    problem.linear_constraints.add(
        lin_expr=[cplex.SparsePair(ind=x_names, val=[1.0] * J)],
        senses=["E"],  # "E" は equal (=)
        rhs=[p],  # 右辺
        names=["facility_limit"],
    )

    # 初期解のセットアップ
    F = [problem]  # 定式化のセット
    BestSol = None  # 最良解の初期化
    theta_LB = 0  # 最良値の初期化

    return F, BestSol, theta_LB

In [812]:
def solve_relaxation(F):
    """
    連続緩和問題を解き、インカンベント解 (x_hat, theta_hat) を取得する。

    Parameters:
        F (list): 定式化のセット（Cplex 問題オブジェクトのリスト）

    Returns:
        dict: インカンベント解 {'x': np.array, 'theta': float}
    """
    # F から最新の Cplex 問題オブジェクトを取得
    problem = F[-1]

    # 問題を解く
    problem.solve()

    # CPLEX の解の取得（変数名のアンダースコアを削除）
    x_values = np.array(
        problem.solution.get_values([f"x{j}" for j in range(J)])
    )  # "x_0" → "x0"
    theta_value = problem.solution.get_values("theta")

    # インカンベント解を辞書形式で返す
    return {"x": x_values, "theta": theta_value}

In [813]:
import numpy as np


def separation_problem(incumbent_solution):
    """
    分離問題を解く。Proposition 5 に基づき、最適なフォロワーの施設選択 ŷ を決定する。

    Parameters:
        incumbent_solution (dict): インカンベント解 {'x': np.array, 'theta': float}

    Returns:
        np.array: 最適なフォロワーの施設配置 y_hat (J,)
    """
    # インカンベント解 (x_hat)
    x_hat = incumbent_solution["x"]

    # 定数 a_i(x_hat) の計算
    a_i = Ui_L + (wij_matrix @ x_hat)

    # 定数 w_i^L(x_hat) および w_i^U(x_hat) の計算
    w_i_L = np.min(Ui_F[:, np.newaxis] + wij_matrix * (1 - x_hat), axis=1)
    w_i_U = np.max(Ui_F[:, np.newaxis] + wij_matrix * (1 - x_hat), axis=1)

    # β(x_hat) の計算
    beta_hat = np.sum(
        h_i[:, np.newaxis]
        * (
            (a_i[:, np.newaxis] * wij_matrix * (1 - x_hat))
            / (
                (a_i[:, np.newaxis] + w_i_U[:, np.newaxis])
                * (a_i[:, np.newaxis] + w_i_L[:, np.newaxis])
            )
        ),
        axis=0,
    )

    # β(x_hat) の降順ソート
    sorted_indices = np.argsort(-beta_hat)  # 降順ソート

    # フォロワーの最適な選択 y_hat
    y_hat = np.zeros(J)
    y_hat[sorted_indices[:r]] = 1  # 上位 r 個を選択

    return y_hat

これも正しく動いているかチェック

In [814]:
y_hat = separation_problem(solution)
print(f"y_hat: {y_hat}")

y_hat: [0. 0. 1.]


In [815]:
def should_add_cut(incumbent_solution, y_star):
    """
    カットを追加すべきか判定

    Parameters:
        incumbent_solution (dict): インカンベント解 {'x': np.array, 'theta': float}
        y_star (np.array): フォロワーの最適な施設配置 y_hat (J,)

    Returns:
        bool: カットを追加すべきなら True, そうでなければ False
    """
    # インカンベント解 (x_hat, theta_hat)
    x_hat = incumbent_solution["x"]
    theta_hat = incumbent_solution["theta"]

    # L(x_hat, y_star) を計算
    L_value = compute_L(h_i, Ui_L, Ui_F, wij_matrix, x_hat, y_star)

    # カット追加判定
    return theta_hat > L_value

In [None]:
import itertools
import cplex
import numpy as np

import itertools


def generate_subsets(J):
    """
    J のすべての部分集合 S ⊆ J を生成する（全探索）

    Parameters:
        J (int): 施設候補の数（candidate_sites の長さ）

    Returns:
        list: すべての部分集合 S のリスト（各要素は set 型）
    """
    candidate_indices = set(range(J))  # 修正: `J` を `int` として受け取る
    subsets = []

    # J のすべての部分集合を生成（空集合から J まで）
    for rr in range(len(candidate_indices) + 1):
        for subset in itertools.combinations(candidate_indices, rr):
            subsets.append(set(subset))  # `set` 型で格納

    return subsets


def compute_L_Y(S, Y):
    """
    L_Y(S) を計算する

    Parameters:
        S (set): リーダーが開設する施設の部分集合
        Y (set): フォロワーが開設する施設の集合

    Returns:
        float: L_Y(S)
    """
    # この関数はあってそう
    # print("=== compute_L_Y ===")
    # print(f"S: {S}")
    # print(f"Y: {Y}")
    # print(f"wij_matrix[:, list(S)] : {wij_matrix[:, list(S)]}")
    # print(f"wij_matrix : {wij_matrix}")
    # print("=====================")

    # wij_matrix[:, list(S)]は D × |S| の行列を返却

    f_i_Y = (Ui_L[:, np.newaxis] + wij_matrix[:, list(S)].sum(axis=1)) / (
        Ui_L[:, np.newaxis]
        + Ui_F[:, np.newaxis]
        + wij_matrix[:, list(S.union(Y))].sum(axis=1)
    )
    return np.sum(h_i * f_i_Y)


def compute_rho_Y(S, k, Y):
    """
    rho_Y(S, k) を計算する

    Parameters:
        S (set): リーダーが開設する施設の部分集合
        k (int): 追加する施設のインデックス
        Y (set): フォロワーの施設配置

    Returns:
        float: rho_Y(S, k)
    """
    return compute_L_Y(S.union({k}), Y) - compute_L_Y(S, Y)

In [None]:
def add_cuts(incumbent_solution, y_star):
    """
    カット（制約）を追加する。Algorithm 1 に基づき、サブモジュラー不等式 (8) を適用し、
    強化した定式化を F に戻す。

    Parameters:
        incumbent_solution (dict): インカンベント解 {'x': np.array, 'theta': float}
        y_star (np.array): フォロワーの最適な施設配置 y_hat (J,)
    """
    # インカンベント解 (x_hat, theta_hat)
    x_hat = incumbent_solution["x"]
    theta_hat = incumbent_solution["theta"]

    # フォロワーが選んだ施設の集合 Y
    Y = {j for j in range(J) if y_star[j] == 1}
    print(f"in add_cuts Y: {Y}")

    # 候補施設の集合をセット型で定義
    J_set = set(range(J))
    print(f"in add_cuts J_set: {J_set}")

    # 最も違反する S を探索
    min_cut_value = float("inf")
    print(f"in add_cuts min_cut_value: {min_cut_value}")
    best_S = None

    for S in generate_subsets(J):
        S_set = set(S)

        print("======in loop======")
        print(f"S_set: {S_set}")
        print(f"J_set: {J_set}")
        print(f"Y: {Y}")

        # k の値を明示的に表示するため、リスト内包表記をループに変更
        rho_sum_1 = 0
        rho_sum_2 = 0

        for k in S_set:
            rho_value = compute_rho_Y(J_set - {k}, k, Y)
            rho_sum_1 += rho_value
            print(f"k={k}, compute_rho_Y(J_set - {{k}}, k, Y)={rho_value}")

        for k in S_set:
            rho_value = compute_rho_Y(J_set - {k}, k, Y) * x_hat[k]
            rho_sum_2 += rho_value
            print(f"k={k}, compute_rho_Y(J_set - {{k}}, k, Y) * x_hat[k]={rho_value}")

        rho_sum_3 = sum(compute_rho_Y(S_set, k, Y) * x_hat[k] for k in (J_set - S_set))

        print(f"compute_L_Y(S_set, Y) : {compute_L_Y(S_set, Y)}")
        print(f"sum(compute_rho_Y(J_set - {{k}}, k, Y) for k in S_set) : {rho_sum_1}")
        print(
            f"sum(compute_rho_Y(J_set - {{k}}, k, Y) * x_hat[k] for k in S_set) : {rho_sum_2}"
        )

        # 修正後のカット値計算
        cut_value = (
            compute_L_Y(S_set, Y) - rho_sum_1 + rho_sum_2 + rho_sum_3  # 定数項の補正
        )

        print(f"Computed cut_value: {cut_value}")

        print("===Judge===")
        print(f"in roop cut_value: {cut_value}")
        print(f"in roop min_cut_value: {min_cut_value}")
        if cut_value < min_cut_value:
            min_cut_value = cut_value
            best_S = S_set

    # `best_S` の出力を追加
    print("\n=== Best S Found ===")
    if best_S:
        print(f"S* = {sorted(best_S)} (最も違反する部分集合)")
    else:
        print("S* = None (カットを追加する必要なし)")
    print("====================\n")

    # CPLEX 問題の取得
    problem = F[-1]

    # サブモジュラー不等式 (8) に基づくカットを追加
    if best_S is not None:
        x_coeffs_s = {k: compute_rho_Y(J_set - {k}, k, Y) for k in best_S}  # S内の係数
        x_coeffs_js = {
            k: compute_rho_Y(best_S, k, Y) for k in (J_set - best_S)
        }  # J\S の係数

        # 定数項の修正
        constant_term = compute_L_Y(best_S, Y) - sum(
            compute_rho_Y(J_set - {k}, k, Y) for k in best_S
        )
        rhs_value = min_cut_value + constant_term

        submodular_cut_expr = cplex.SparsePair(
            ind=[f"x{k}" for k in J_set],
            val=list(x_coeffs_s.values()) + list(x_coeffs_js.values()),
        )

        # 制約の数式を表示
        constraint_str = f"θ ≤ {constant_term:.4f}"
        for k, coeff in x_coeffs_s.items():
            constraint_str += f" + ({coeff:.4f})x_{k}"
        for k, coeff in x_coeffs_js.items():
            constraint_str += f" + ({coeff:.4f})x_{k}"

        print("\n=== Added Submodular Cut ===")
        print(constraint_str)
        print("==============================\n")

        # CPLEX に制約を追加
        problem.linear_constraints.add(
            lin_expr=[submodular_cut_expr],
            senses=["L"],
            rhs=[rhs_value],
            names=[f"cut_submodular_{len(problem.linear_constraints.get_names())}"],
        )

    # 強化した定式化を F に戻す
    F.append(problem)

どんな制約が返ってくるかチェック

In [818]:
# add_cuts(solution, y_hat)

# ↑要検討。間違ってる可能性の方が高い

とりあえず次に進む

In [None]:
def is_integer_solution(incumbent_solution, theta_LB, tol=1e-5):
    """
    インカンベント解のリーダー施設配置 x が整数 (0 or 1) であり、
    かつ目的関数値 theta が θ_LB を超えているか判定。

    Parameters:
        incumbent_solution (dict): インカンベント解 {'x': np.array, 'theta': float}
        theta_LB (float): 現在の目的関数の下界
        tol (float): 数値誤差の許容範囲（デフォルト 1e-5）

    Returns:
        bool: すべての x_j が整数であり、かつ θ > θ_LB なら True, そうでなければ False
    """
    # 条件1: 目的関数値 θ が θ_LB を超えているか
    if incumbent_solution["theta"] <= theta_LB:
        return False

    # 条件2: x のすべての要素が 0 or 1 の整数か
    x_values = incumbent_solution["x"]
    if np.all(np.abs(x_values - np.round(x_values)) < tol):  # すべて整数に近い
        return True
    else:
        return False

ちゃんと判断できるかチェック

In [None]:
def update_best_solution(incumbent_solution):
    """
    インカンベント解 (x, theta) を用いて最良解 BestSol を更新し、
    目的関数の下界 θ_LB を更新する。

    Parameters:
        incumbent_solution (dict): インカンベント解 {'x': np.array, 'theta': float}
    """
    global BestSol, θ_LB  # グローバル変数を更新

    # BestSol の更新（x のコピーを保存）
    BestSol = incumbent_solution["x"].copy()

    # θ_LB の更新
    θ_LB = incumbent_solution["theta"]

    print("\n=== Best Solution Updated ===")
    print(f"BestSol: {BestSol}")
    print(f"θ_LB: {θ_LB:.4f}")
    print("==============================\n")

In [None]:
def branch_and_bound(incumbent_solution):
    """
    分枝限定法を適用し、最も整数から遠い x_j を固定して
    2つの新しい定式化を作成し、リスト F に追加する。

    Parameters:
        incumbent_solution (dict): インカンベント解 {'x': np.array, 'theta': float}
    """
    global F  # 定式化のリスト F を更新

    x_hat = incumbent_solution["x"]

    # **最も整数から遠い x_j を選択**
    fractional_indices = np.where((x_hat > 1e-5) & (x_hat < 1 - 1e-5))[0]
    if len(fractional_indices) == 0:
        print("No fractional x found. Branching is not needed.")
        return

    # 最大誤差のある x_j を選択
    j_star = fractional_indices[np.argmax(np.abs(x_hat[fractional_indices] - 0.5))]

    print("\n=== Branching on x_{} ===".format(j_star))
    print("Fractional value: x_{} = {:.4f}".format(j_star, x_hat[j_star]))
    print("==============================\n")

    # **現在の定式化をコピー**
    problem = F[-1]  # 現在の問題
    problem_0 = problem.copy()  # x_j = 0 の場合
    problem_1 = problem.copy()  # x_j = 1 の場合

    # **制約 x_j = 0 を追加**
    problem_0.linear_constraints.add(
        lin_expr=[cplex.SparsePair(ind=[f"x_{j_star}"], val=[1.0])],
        senses=["E"],  # "=" 制約
        rhs=[0.0],
        names=[f"branch_x_{j_star}_0"],
    )

    # **制約 x_j = 1 を追加**
    problem_1.linear_constraints.add(
        lin_expr=[cplex.SparsePair(ind=[f"x_{j_star}"], val=[1.0])],
        senses=["E"],  # "=" 制約
        rhs=[1.0],
        names=[f"branch_x_{j_star}_1"],
    )

    # **新しい定式化をリスト F に追加**
    F.append(problem_0)
    F.append(problem_1)

    print("Added two new formulations for x_{} = 0 and x_{} = 1".format(j_star, j_star))

: 