In [71]:
import math
from datetime import datetime
import pandas as pd
from openpyxl import Workbook
from openpyxl.styles import Font, Border, Side, Alignment, PatternFill
from openpyxl.utils import get_column_letter

# ==============================================================================
# 1. ВСПОМОГАТЕЛЬНЫЕ КЛАССЫ (без изменений)
# ==============================================================================
class CurrencyConverter:
    def __init__(self):
        self.rates = {"USD": 80.50, "EUR": 95.00, "CNY": 12.00, "RUB": 1.0}
    def convert_to_rub(self, amount: float, currency: str) -> float:
        rate = self.rates.get(currency.upper().strip())
        if rate is None: raise ValueError(f"Курс для валюты {currency} не задан.")
        return amount * rate

# ==============================================================================
# 2. ОСНОВНЫЕ КЛАССЫ ДАННЫХ (без изменений)
# ==============================================================================
class Deal:
    def __init__(self, expiration_date: str, nominal_amount: float, nominal_currency: str, fair_value: float, asset_type: str, precalculated_ea: float = 0, netting_indicator: int = 0, netting_agreement_id: str = None):
        self.expiration_date = datetime.strptime(expiration_date, "%Y-%m-%d")
        self.nominal_amount, self.nominal_currency = nominal_amount, nominal_currency
        self.fair_value, self.asset_type = fair_value, asset_type
        self.precalculated_ea, self.netting_indicator = precalculated_ea, netting_indicator
        self.netting_agreement_id = netting_agreement_id
    def calculate_mi(self, rsk_date: datetime) -> float:
        return max(1.0, round((self.expiration_date - rsk_date).days / 365.0, 2))
    def get_nominal_in_rub(self, converter: CurrencyConverter) -> float:
        return converter.convert_to_rub(self.nominal_amount, self.nominal_currency)

class CreditSwap:
    def __init__(self, maturity: float, notional: float):
        self.maturity, self.notional = max(1.0, maturity), notional
    def calculate_swap_part(self) -> dict:
        d_k = (1 - math.exp(-0.05 * self.maturity)) / (0.05 * self.maturity)
        contribution = self.maturity * self.notional * d_k
        return {'m': self.maturity, 'n': self.notional, 'd': d_k, 'contribution': contribution}

class IndexCreditSwap:
    def __init__(self, id: str, weight: float, maturity: float, notional: float):
        self.id = id
        self.weight, self.maturity, self.notional = weight / 100.0, max(1.0, maturity), notional
    def calculate_aind(self) -> dict:
        d_ind = (1 - math.exp(-0.05 * self.maturity)) / (0.05 * self.maturity)
        a_ind = self.weight * self.maturity * self.notional * d_ind
        return {'id': self.id, 'w': self.weight, 'm': self.maturity, 'n': self.notional, 'd': d_ind, 'a_ind': a_ind}

# ==============================================================================
# 3. КЛАССЫ-КАЛЬКУЛЯТОРЫ (с изменениями для отчётности)
# ==============================================================================
class ExposureCalculator:
    @staticmethod
    def _get_potential_risk_coefficient(asset_type: str, maturity: float) -> float:
        coeffs = {'currency':[0.01,0.05,0.075],'interest':[0.005,0.005,0.015],'security':[0.06,0.08,0.1],'metal':[0.07,0.07,0.08],'other':[0.1,0.12,0.15]}
        rates = coeffs.get(asset_type)
        if not rates: raise ValueError(f"Неизвестный тип актива: {asset_type}")
        if maturity < 1: return rates[0]
        elif 1 <= maturity <= 5: return rates[1]
        else: return rates[2]
    @staticmethod
    def calculate(deals: list[Deal], rsk_date: datetime) -> float:
        is_netting = len(deals) > 1 or (len(deals) == 1 and deals[0].netting_indicator == 1)
        current_risk = max(0, sum(d.fair_value for d in deals)) if is_netting else max(0, deals[0].fair_value)
        if not is_netting:
            deal = deals[0]
            coeff = ExposureCalculator._get_potential_risk_coefficient(deal.asset_type, deal.calculate_mi(rsk_date))
            potential_risk = deal.nominal_amount * coeff
        else:
            vpr_v = sum(d.nominal_amount * ExposureCalculator._get_potential_risk_coefficient(d.asset_type, d.calculate_mi(rsk_date)) for d in deals)
            tsz = sum(max(0, d.fair_value) for d in deals)
            k = current_risk / tsz if tsz > 0 else 0
            potential_risk = (0.4 * vpr_v) + (0.6 * k * vpr_v)
        return current_risk + potential_risk

class RSKCalculator:
    def __init__(self, counterparties: list, index_swaps: list = None):
        self.counterparties, self.index_swaps = counterparties, index_swaps or []
        self.converter = CurrencyConverter()
    def calculate_full_report(self, rsk_date_str: str, use_full_ea_calc: bool = False) -> dict:
        rsk_date = datetime.strptime(rsk_date_str, "%Y-%m-%d")
        cp_reports = [cp.calculate_ai_report(rsk_date, self.converter, use_full_ea_calc) for cp in self.counterparties]
        aind_reports = [swap.calculate_aind() for swap in self.index_swaps]
        
        all_ai = [rep['a_i'] for rep in cp_reports]
        all_aind_values = [rep['a_ind'] for rep in aind_reports]
        
        sum_ai, sum_aind = sum(all_ai), sum(all_aind_values)
        sum_ai_sq = sum(a**2 for a in all_ai)
        term1, term2 = (0.5 * sum_ai - sum_aind)**2, 0.75 * sum_ai_sq
        
        rsk_total = 12.5 * 2.33 * math.sqrt(term1 + term2) if term1 + term2 >= 0 else 0.0
        
        return {'rsk_total': rsk_total, 'date': rsk_date_str, 'counterparties': cp_reports, 'index_swaps': aind_reports}

# ==============================================================================
# 4. КЛАССЫ-АГРЕГАТОРЫ (с изменениями для отчётности)
# ==============================================================================
class NettingAgreement:
    def __init__(self, deals: list):
        self.deals = deals
    def calculate_mj(self, rsk_date: datetime, converter: CurrencyConverter) -> float:
        if len(self.deals) == 1 and self.deals[0].netting_indicator == 0:
            return self.deals[0].calculate_mi(rsk_date)
        total_weighted = sum(d.get_nominal_in_rub(converter) * d.calculate_mi(rsk_date) for d in self.deals)
        total_nominal = sum(d.get_nominal_in_rub(converter) for d in self.deals)
        if total_nominal == 0: return 1.0
        return max(1.0, round(total_weighted / total_nominal, 2))

class Counterparty:
    rating_map = {"AAA": 0.7, "AA": 0.7, "A": 0.8, "BBB": 1.0, "BB": 2.0, "B": 3.0, "CCC": 10.0}
    def __init__(self, id:str, rating: str, deals: list[Deal], swaps: list = None):
        self.id, self.rating = id, rating.upper().strip()
        self.swaps = swaps or []
        self.deal_groups = self._group_deals(deals)
    def _group_deals(self, deals: list[Deal]) -> dict:
        groups = {}; i = 0
        for deal in deals:
            key = deal.netting_agreement_id if deal.netting_indicator == 1 and deal.netting_agreement_id else f"individual_{i}"; i+=1
            groups.setdefault(key, []).append(deal)
        return groups
    def get_risk_weight(self) -> float:
        return self.rating_map.get(self.rating, 4.0) / 100.0
    def calculate_ai_report(self, rsk_date: datetime, converter: CurrencyConverter, use_full_ea_calc: bool = False) -> dict:
        deals_report_data = []
        total_deal_contrib = 0
        for group_id, deal_group in self.deal_groups.items():
            agreement = NettingAgreement(deal_group)
            mj = agreement.calculate_mj(rsk_date, converter)
            dj = (1 - math.exp(-0.05 * mj)) / (0.05 * mj)
            eaj = ExposureCalculator.calculate(deal_group, rsk_date) if use_full_ea_calc else sum(d.precalculated_ea for d in deal_group)
            contribution = mj * eaj * dj
            total_deal_contrib += contribution
            deals_report_data.append({'id': group_id, 'm': mj, 'ea': eaj, 'd': dj, 'contribution': contribution})
        
        swaps_report_data = [swap.calculate_swap_part() for swap in self.swaps]
        total_swap_contrib = sum(item['contribution'] for item in swaps_report_data)
        
        w_i = self.get_risk_weight()
        a_i = w_i * (total_deal_contrib - total_swap_contrib)

        return {'id': self.id, 'rating': self.rating, 'w': w_i, 'a_i': a_i, 'deals': deals_report_data, 'swaps': swaps_report_data}





In [None]:
from openpyxl import Workbook
from openpyxl.styles import Font, Border, Side, Alignment, PatternFill
from openpyxl.utils import get_column_letter
from typing import List, Dict, Any
from math import isnan

class ExcelReportGenerator:
    """
    Генерирует отчёт РСК:
    """
    # Настройки внешнего вида
    THIN = Side(style="thin", color="000000")
    BORDER_ALL = Border(left=THIN, right=THIN, top=THIN, bottom=THIN)
    H_CENTER = Alignment(horizontal="center", vertical="center", wrap_text=True)
    H_RIGHT  = Alignment(horizontal="right", vertical="center")
    H_LEFT   = Alignment(horizontal="left",  vertical="center", wrap_text=True)

    HEAD_FILL = PatternFill("solid")
    TITLE_FILL = PatternFill("solid")

    NUM0 = "#,##0"
    NUM2 = "#,##0.00"
    PCT1 = "0.0%"

    def __init__(self, report_data: Dict[str, Any]):
        self.data = report_data
        self.wb = Workbook()
        self.ws = self.wb.active
        self.ws.title = "РСК"

    # ---------- публичный метод ----------
    def generate(self, output_path: str):
        ws = self.ws
        row = 1

        # Верхний заголовок
        ws.merge_cells(start_row=row, start_column=2, end_row=row, end_column=10)
        ws.cell(row=row, column=2, value="Риск изменения стоимости кредитного требования").alignment = self.H_CENTER
        row += 1; ws.cell(row=row, column=2, value="в результате ухудшения кредитного качества контрагента").alignment = self.H_CENTER
        ws.merge_cells(start_row=row, start_column=2, end_row=row, end_column=10)
        row += 1; ws.cell(row=row, column=2, value="по состоянию на").alignment = self.H_CENTER
        ws.merge_cells(start_row=row, start_column=2, end_row=row, end_column=10)
        row += 2

        # Шапка «Перечень контрагентов» и общий РСК справа
        ws.merge_cells(start_row=row, start_column=2, end_row=row, end_column=10)
        t = ws.cell(row=row, column=2, value="Перечень контрагентов")
        t.alignment = self.H_CENTER; t.font = Font(bold=True); t.fill = self.TITLE_FILL

        ws.merge_cells(start_row=row, start_column=11, end_row=row, end_column=11)
        hdr_rsk = ws.cell(row=row, column=11, value="РСК")
        hdr_rsk.alignment = self.H_CENTER; hdr_rsk.font = Font(bold=True); hdr_rsk.fill = self.TITLE_FILL
        row += 1

        start_table_row = row

        # Внутренняя «малая» шапка
        self._put_head_row(row)
        row += 1

        # Тело по контрагентам
        total_rsk = self._safe(self.data.get("rsk_total"))
        cp_start_row = row
        for cp in self.data.get("counterparties", []):
            row = self._write_counterparty_block(row, cp)

        # Рамка вокруг всей таблицы
        self._outline_table(start_table_row-1, row - 1)
        self._outline_table(start_table_row, row - 1)

        # Общий РСК в правом большом столбце (группируем по высоте всей таблицы)
        ws.merge_cells(start_row=start_table_row, start_column=11, end_row=row - 1, end_column=11)
        cell_total = ws.cell(row=start_table_row, column=11, value=total_rsk)
        cell_total.number_format = self.NUM2
        cell_total.alignment = self.H_CENTER
        cell_total.font = Font(bold=True)

        # Автоширина и базовые стили
        self._auto_width(1, 11)
        self._thin_borders( start_table_row, 11, row - 1, 11 )

        self.wb.save(output_path)

    # ---------- построение блоков ----------
    def _write_counterparty_block(self, row: int, cp: Dict[str, Any]) -> int:
        """
        Рисует блок контрагента:
        слева объединённый столбец 'i' (название), рядом рейтинг и Wi,
        дальше подзаголовок и строки инструментов (deals), затем — при наличии — блок 'Свопы'.
        Справа — ячейка Ai, объединённая по высоте блока.
        """
        ws = self.ws

        # Подсчитать кол-во строк внутри блока (нужно для объединений)
        n_deals = max(1, len(cp.get("deals", [])))
        n_swaps = len(cp.get("swaps", []))
        block_rows = 1 + n_deals + (1 if n_swaps else 0) + n_swaps  # строка подзаголовка deals + сами deals + (подзаголовок swaps) + swaps

        block_first = row
        block_last  = row + block_rows

        # Левая часть: i / рейтинг / Wi
        self._merge_and_set(block_first, block_last, 2, 2, cp.get("id"))          # i (название)
        self._merge_and_set(block_first, block_last, 3, 3, cp.get("rating"))      # рейтинг
        self._merge_and_set(block_first, block_last, 4, 4, self._safe(cp.get("w")), numfmt=self.PCT1)  # Wi

        # Средняя часть: подзаголовок + строки deals
        ws.merge_cells(start_row=row, start_column=5, end_row=row, end_column=9)
        cap = ws.cell(row=row, column=5, value="Производный финансовый инструмент (соглашение о неттинге)")
        cap.alignment = self.H_CENTER; cap.font = Font(bold=True); cap.fill = self.HEAD_FILL
        row += 1

        # Заголовок колонок j, Mj, Dj, EAj, EAj×Dj×Mj
        self._put_inner_header(row)
        row += 1

        # Строки сделок
        for d in cp.get("deals", []) or [{"id": "", "m": "", "d": "", "ea": "", "contribution": ""}]:
            self._put_deal_row(row, d)
            row += 1

        # Блок «свопов» (если есть)
        if n_swaps:
            ws.merge_cells(start_row=row, start_column=5, end_row=row, end_column=9)
            cap2 = ws.cell(row=row, column=5, value="Производный финансовый инструмент (соглашение о неттинге)")
            cap2.alignment = self.H_CENTER; cap2.font = Font(bold=True); cap2.fill = self.HEAD_FILL
            row += 1
            self._put_inner_header(row); row += 1
            for s in cp.get("swaps", []):
                self._put_deal_row(row, s)
                row += 1

        # Правая часть: Ai
        self._merge_and_set(block_first, block_last, 10, 10, self._safe(cp.get("ai")), numfmt=self.NUM2, bold=False, align=self.H_RIGHT)

        # Вертикальные разделители внутри блока
        self._thin_borders(block_first, 2, block_last, 10)

        return row

    # ---------- элементы таблицы ----------
    def _put_head_row(self, row: int):
        ws = self.ws
        # широкая рамочная строка под «i/рейтинг/Wi | инструменты | Ai»
        # колонки: B(i) C(рейтинг) D(Wi) | E..I (внутр. таблица) | K(Ai) | L(РСК)
        hdrs = [("i", 2), ("Рейтинг контрагента", 3), ("Wi", 4)]
        for text, col in hdrs:
            c = ws.cell(row=row, column=col, value=text)
            c.alignment = self.H_CENTER; c.font = Font(bold=True); c.fill = self.HEAD_FILL

        ws.merge_cells(start_row=row, start_column=5, end_row=row, end_column=9)
        c_mid = ws.cell(row=row, column=5, value="")
        c_mid.alignment = self.H_CENTER; c_mid.font = Font(bold=True); c_mid.fill = self.HEAD_FILL

        c_ai = ws.cell(row=row, column=10, value="Ai")
        c_ai.alignment = self.H_CENTER; c_ai.font = Font(bold=True); c_ai.fill = self.HEAD_FILL

        # «РСК» заголовок стоит в строке выше (кол. 12)

    def _put_inner_header(self, row: int):
        ws = self.ws
        labels = ["j", "Mj", "Dj", "EAj", "EAj x Dj x Mj"]
        for j, text in enumerate(labels, start=5):
            cell = ws.cell(row=row, column=j, value=text)
            cell.alignment = self.H_CENTER
            cell.font = Font(bold=True)
            cell.fill = self.HEAD_FILL

    def _put_deal_row(self, row: int, d: Dict[str, Any]):
        ws = self.ws
        vals = [
            d.get("id", ""),
            self._safe(d.get("m")),
            self._safe(d.get("d")),
            self._safe(d.get("ea")),
            self._safe(d.get("contribution")),
        ]
        numfmts = [None, self.NUM2, self.NUM2, self.NUM2, self.NUM2]
        aligns  = [self.H_LEFT, self.H_RIGHT, self.H_RIGHT, self.H_RIGHT, self.H_RIGHT]

        for off, (v, nf, al) in enumerate(zip(vals, numfmts, aligns)):
            c = ws.cell(row=row, column=5 + off, value=v)
            c.alignment = al
            if nf: c.number_format = nf

    # ---------- утилиты оформления ----------
    def _merge_and_set(self, r1, r2, c1, c2, value, numfmt=None, bold=False, align=None):
        ws = self.ws
        ws.merge_cells(start_row=r1, start_column=c1, end_row=r2, end_column=c2)
        cell = ws.cell(row=r1, column=c1, value=value)
        cell.alignment = align or self.H_CENTER
        cell.font = Font(bold=bold)
        if numfmt: cell.number_format = numfmt

    def _outline_table(self, r1, r2):
        # рамка по внешнему контуру таблицы B..L
        for col in range(2, 12):
            self._set_border(r1, col)
            self._set_border(r2, col)
        for row in range(r1, r2 + 1):
            self._set_border(row, 2)
            self._set_border(row, 11)

    def _set_border(self, r, c):
        self.ws.cell(row=r, column=c).border = self.BORDER_ALL

    def _thin_borders(self, r1, c1, r2, c2):
        for r in range(r1, r2 + 1):
            for c in range(c1, c2 + 1):
                self.ws.cell(row=r, column=c).border = self.BORDER_ALL

    def _auto_width(self, c1, c2):
        ws = self.ws
        for col in range(c1, c2 + 1):
            letter = get_column_letter(col)
            max_len = 0
            for cell in ws[letter]:
                val = str(cell.value) if cell.value is not None else ""
                max_len = max(max_len, len(val))
            ws.column_dimensions[letter].width = min(max(11, max_len + 2), 40)

    @staticmethod
    def _safe(x):
        try:
            return float(x)
        except Exception:
            return x if x not in (None, float("nan")) else ""


In [73]:
# ==============================================================================
# 6. ДЕМОНСТРАЦИОННЫЙ ЗАПУСК С ВЫВОДОМ В EXCEL
# ==============================================================================
if __name__ == "__main__":
    try:
        print("--- Расчёт РСК на основе данных из Excel-файла ---")
        
        excel_file_path = 'FinancialDataTemplate (2).xlsx'
        all_sheets = pd.read_excel(excel_file_path, sheet_name=None)
        
        df_counterparties, df_deals = all_sheets['Counterparties'], all_sheets['Deals']
        df_credit_swaps, df_index_swaps = all_sheets['CreditSwaps'], all_sheets['IndexSwaps']

        index_swaps_list = [IndexCreditSwap(id=row['IndexSwapID'], weight=row['Weight'], maturity=row['Maturity'], notional=row['Notional']) for _, row in df_index_swaps.iterrows()]
        deals_by_cp = {cp_id: [Deal(expiration_date=d['ExpirationDate'].strftime('%Y-%m-%d'), nominal_amount=d['NominalAmount'], nominal_currency=d['NominalCurrency'], fair_value=d['FairValue'], asset_type=d['AssetType'], netting_indicator=d['NettingIndicator'], netting_agreement_id=d['NettingAgreementID']) for _, d in group.iterrows()] for cp_id, group in df_deals.groupby('CounterpartyID')}
        swaps_by_cp = {cp_id: [CreditSwap(maturity=s['Maturity'], notional=s['Notional']) for _, s in group.iterrows()] for cp_id, group in df_credit_swaps.groupby('CounterpartyID')}
        
        counterparties_list = []
        for _, row in df_counterparties.iterrows():
            cp_id = str(row['CounterpartyID'])
            counterparties_list.append(Counterparty(id=cp_id, rating=str(row['Rating']), deals=deals_by_cp.get(cp_id, []), swaps=swaps_by_cp.get(cp_id, [])))

        rsk_calculator = RSKCalculator(counterparties=counterparties_list, index_swaps=index_swaps_list)
        report_data = rsk_calculator.calculate_full_report("2025-02-01", use_full_ea_calc=True)
        
        report_generator = ExcelReportGenerator(report_data)
        report_generator.generate("RSK_Report.xlsx")

    except FileNotFoundError:
        print(f"ОШИБКА: Файл с входными данными не найден: {excel_file_path}")
    except Exception as e:
        print(f"Произошла непредвиденная ошибка: {e}")

--- Расчёт РСК на основе данных из Excel-файла ---


  warn(msg)
