In [1]:
import pandas as pd
import datetime
from calc_HV_IV import calc_greeks_letter

In [None]:
class DataService:
    def __init__(self):
        self.calendar = pd.read_pickle("./data/trading_days.pkl")
        self.main_contract = []
        self.dict_history_all = {}
        self.df_results_all = pd.DataFrame()
        self.n = 0.08  # 信号判定参数
        self.alarm_ratio = 0.03
        self.option_static_df = pd.DataFrame()
        self.option_table = pd.DataFrame()
        self.logs = []
        self.max_log_length = 20
        self.latest_results = None
        self.keep_running = True
        self.setup_option()
      
    def setup_option(self):
          today = datetime.now().strftime("%Y%m%d")
          # 把交易日取成list
          if isinstance(self.calendar, pd.DataFrame):
              days = self.calendar.iloc[:, 0].tolist()
          elif isinstance(self.calendar, pd.Series):
              days = self.calendar.tolist()
          else:
              days = list(self.calendar)
          # 判断今天是不是交易日
          if today in days:
              idx = days.index(today)
              last_trading_day = days[idx - 1] if idx > 0 else days[0]
          else:
              last_trading_day = days[-1]  # 取最后一个可用日
          file = f"./data/每日报告_{last_trading_day}.xlsx"
          try:
              # 读取昨日报告合约
              df = pd.read_excel(file)
              df_copy = df.copy()
              df_copy.columns = df_copy.columns.str.strip().str.replace("\n", "")
              df_copy = df_copy.drop("日期", axis=1)
              df_copy = df_copy.dropna()

              # 读主力合约
              with open("./data/code.txt") as f:
                  codes = [line.strip() for line in f]
              # 生成映射，如 {'ag': '.SF', 'al': '.SF', 'cu': '.SHF', ...}
              code_map = {}
              import re

              for code in codes:
                  m = re.match(r"([a-zA-Z]{1,2})[0-9]*\..+", code)
                  if m:
                      short = m.group(1)
                      code_map[short] = "." + code.split(".")[-1]

              # 处理合约
              def add_exchange_suffix(option_code):
                  # 只对像 ag2508C9100 这样无后缀的代码处理
                  if "." in option_code:
                      return option_code
                  m = re.match(r"([a-zA-Z]{1,2})", option_code)
                  if m:
                      short = m.group(1)
                      suffix = code_map.get(short)
                      if suffix and not option_code.endswith(suffix):
                          return option_code + suffix
                  return option_code

              def extract_future_code(option_code):
                  option_code = str(option_code).strip()  # 确保是字符串并去除空格

                  if len(option_code) < 11:
                      return option_code
                  if "-" in option_code:
                      future_code = option_code.split("-")[0] + option_code[-3:]
                      return future_code

                  elif "P" in option_code.upper():  # 处理看跌期权（如 SR405P6200）
                      return (
                          option_code.split("P")[0] + option_code[-3:]
                      )  # 取 P 之前的部分

                  elif "C" in option_code.upper():  # 处理看涨期权（如 SR405C6200）
                      return option_code.split("C")[0] + option_code[-3:]

                  else:  # 未知格式，返回原值或报错
                      return option_code

              # 统一使用option_code列名
              df_copy["option_code"] = df_copy["期权合约代码"].apply(add_exchange_suffix)
              df_copy["future_code"] = df_copy["option_code"].apply(extract_future_code)

              # 计算需要展示的字段
              # 1. 持仓均价=‘平均开仓期权价’
              df_copy["pos_avg_price"] = df_copy["平均开仓期权价"]

              df_copy["pos"] = df_copy["手数"]
              df_copy["multiply"] = df_copy["交易单位"].fillna(1).astype(int)
              # 最终展示字段（你可以调整顺序/筛选字段）
              show_cols = [
                  "option_code",
                  "pos_avg_price",
                  "pos",
                  "future_code",
                  "multiply",
              ]
              result = df_copy[show_cols].copy()
              # 保存/缓存到类变量里，或返回result
              self.option_static_df = result.fillna("")  # 可前端展示用
          except Exception as e:
              print(e)
              self.option_static_df = pd.DataFrame()

    def update_option(self):
          print("option update start")
          if self.option_static_df.empty:
              self.option_table = pd.DataFrame()
              return

          today = datetime.now().strftime("%Y%m%d")
          df = self.option_static_df.copy()

          # 1. 保证 option_code 是索引
          if "option_code" in df.columns:
              df.set_index("option_code", inplace=True)

          df.columns = [c.strip() for c in df.columns]  # 清理空格
          opt_codes = [code for code in df.index if "." in code and len(code) > 11]
          fut_codes = [
              code for code in df["future_code"].unique() if "." in code and len(code) > 11
          ]
          # 2. Greeks
          greeks = calc_greeks_letter(opt_codes)
          deltas = {code: greek.get("delta", None) for code, greek in greeks.items() if len(code) > 11}
          ivs = {code: greek.get("iv", None) for code, greek in greeks.items() if len(code) > 11}
          # 3. 行情数据
          opt_prices_dict = self.get_market_data(opt_codes, "1d", today, today, 1)
          fut_prices_dict = self.get_market_data(fut_codes, "1d", today, today, 1)

          # 4. 填充到df
          def pick_last_close(code, data_dict):
              v = data_dict.get(code, {})
              series = v.get("close", None)
              if series is not None and hasattr(series, "iloc") and len(series) > 0:
                  return series.iloc[-1]
              return None

          df["current_opt_price"] = df.index.map(
              lambda code: pick_last_close(code, opt_prices_dict)
          )
          df["current_fut_price"] = df["future_code"].map(
              lambda code: pick_last_close(code, fut_prices_dict)
          )
          df["iv"] = df.index.map(ivs.get)
          df["delta"] = df.index.map(deltas.get)
          df["option_type"] = df.index.map(
              lambda x: "P" if "P" in x.upper() else ("C" if "C" in x.upper() else "")
          )
          # 5. 浮动盈亏等
          df["gain_loss"] = df.apply(
              lambda x: (x["pos_avg_price"] - x["current_opt_price"])
              * x["pos"]
              * x["multiply"]
              if pd.notnull(x["current_opt_price"])
              and pd.notnull(x["pos_avg_price"])
              and pd.notnull(x["pos"])
              and pd.notnull(x["multiply"])
              and x["pos"] != 0
              and x["multiply"] != 0
              else None,
              axis=1,
          )

          df["gain_loss_pct"] = df.apply(
              lambda x: (
                  x["gain_loss"] / (x["pos_avg_price"] * abs(x["pos"]) * x["multiply"]) * 100
              )
              if pd.notnull(x["gain_loss"])
              and x["pos_avg_price"]
              and x["pos"]
              and x["multiply"]
              else None,
              axis=1,
          )

          # 6. 报警价
          alarm_ratio = getattr(self, "alarm_ratio", 0.02)

          def calc_alarm_price(row):
              if pd.isnull(row["current_fut_price"]):
                  return None
              if row["option_type"] == "P":
                  return row["current_fut_price"] * (1 - alarm_ratio)
              elif row["option_type"] == "C":
                  return row["current_fut_price"] * (1 + alarm_ratio)
              return None

          df["alarm_price"] = df.apply(calc_alarm_price, axis=1)

          show_cols = [
              "current_fut_price",
              "option_type",
              "current_opt_price",
              "pos_avg_price",
              "pos",
              "gain_loss",
              "gain_loss_pct",
              "alarm_price",
              "iv",
              "delta",
          ]
          show_df = df.reset_index()[["option_code"] + show_cols].fillna("")
          num_cols = show_df.select_dtypes(
              include=["float", "float64", "int", "int64"]
          ).columns
          show_df[num_cols] = show_df[num_cols].round(2)
          print(show_df)
          # 保证 option_code 在前端可见
          with self.update_data_lock:
              self.option_table = show_df
