# Import Library

In [1]:
import yfinance as yf
import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
import seaborn as sns
import datetime as dt
from matplotlib.ticker import PercentFormatter
from scipy.optimize import minimize_scalar

# import os
# from fredapi import Fred

from marketobserve import *

# Return Distribution

## Download Data

In [2]:
# Example usage:
start = dt.date(1900, 1, 1)
end = dt.date(2026, 1, 1)
ticker = "^HSI"
data = yf_download(ticker, start, end)
# data.index = data.index.strftime('%Y-%m')
# data.to_excel(f"{ticker}.xlsx")
data.describe()

[*********************100%***********************]  1 of 1 completed


Price,Open,High,Low,Close,Adj Close,Volume
count,9445.0,9445.0,9445.0,9445.0,9445.0,9445.0
mean,15731.337912,15836.078826,15608.299126,15725.260673,15725.260673,1016739000.0
std,7934.27969,7977.676224,7875.395237,7926.633924,7926.633924,1157914000.0
min,1950.5,1950.5,1894.900024,1894.900024,1894.900024,0.0
25%,9633.429688,9696.700195,9554.30957,9632.5,9632.5,0.0
50%,15747.55957,15861.169922,15613.200195,15742.299805,15742.299805,422676000.0
75%,22477.599609,22588.689453,22300.089844,22455.839844,22455.839844,1788490000.0
max,33335.480469,33484.078125,32897.039062,33154.121094,33154.121094,12686500000.0


In [None]:
def create_data_sources(df, periods, all_period_start, frequency):
    """
    创建不同时间周期的数据来源
    """
    current_date = pd.Timestamp.now()

    # 根据 frequency 筛选数据到本周/本月/本季度的第一天
    if frequency == 'ME':
        end_date = current_date.replace(day=1)
    elif frequency == 'W':
        end_date = current_date - pd.DateOffset(days=current_date.weekday())
    elif frequency == 'QE':
        end_date = current_date - pd.tseries.offsets.QuarterBegin()
    else:
        raise ValueError("Invalid frequency value. Allowed values are 'ME', 'W', 'QE'.")

    df = df[df.index < end_date]
    last_date = df.index[-1]

    if all_period_start is None:
        all_period_start = "2010-01-01"

    data_sources = {}
    for period in periods:
        if isinstance(period, int):
            if frequency in ['ME', 'W']:
                start_date = last_date - pd.DateOffset(months=period - 1)
            elif frequency == 'QE':
                start_date = last_date - pd.DateOffset(quarters=period - 1)
            col_name = f"{start_date.strftime('%y%b')}-{last_date.strftime('%y%b')}"
            data_sources[col_name] = df.loc[df.index >= start_date]
        elif period == "ALL":
            col_name = f"{pd.to_datetime(all_period_start).strftime('%y%b')}-{last_date.strftime('%y%b')}"
            data_sources[col_name] = df.loc[df.index >= all_period_start]
        else:
            raise ValueError("Invalid period value")

    return data_sources


def refrequency(df, frequency: str):
    """
    对数据进行重采样
    """
    if not isinstance(df.index, pd.DatetimeIndex):
        raise ValueError("DataFrame index must be a DatetimeIndex")
    if not {'Open', 'High', 'Low', 'Close'}.issubset(df.columns):
        raise ValueError("DataFrame must contain OHLC columns")

    try:
        refrequency_df = df.resample(frequency).agg({
            'Open': 'first',
            'High': 'max',
            'Low': 'min',
            'Close': 'last',
            'Adj Close': 'last',
            'Volume': 'sum'
        }).dropna()

    except KeyError as e:
        import logging
        logging.error(f"Missing column {e} in DataFrame")
        raise ValueError(f"Error processing data: Missing column {e}")
    except Exception as e:
        import logging
        logging.error(f"Unexpected error: {str(e)}")
        raise ValueError(f"Error processing data: {str(e)}")


    return refrequency_df


def oscillation(df):
    """
    计算震荡指标
    """
    data = df[['Open', 'High', 'Low', 'Close']].copy()
    data['LastClose'] = data["Close"].shift(1)
    data["Oscillation"] = data["High"] - data["Low"]
    data["OscillationPct"] = data["Oscillation"] / data['LastClose'] * 100
    data = data.dropna()
    return data


def tail_stats(df, feature, frequency, periods: list = [12, 36, 60, "ALL"], all_period_start: str = None, interpolation: str = "linear"):
    """
    计算不同时间周期的统计指标
    """
    if not isinstance(periods, list):
        raise TypeError("periods must be a list")
    if not all(isinstance(p, (int, str)) for p in periods):
        raise ValueError("periods must contain integers or strings")

    data_sources = create_data_sources(df, periods, all_period_start, frequency)

    stats_index = pd.Index(["mean", "std", "skew", "kurt", "max", "99th", "95th", "90th"])
    stats_df = pd.DataFrame(index=stats_index)

    for period_name, data in data_sources.items():
        stats_df[period_name] = [
            data[feature].mean(),
            data[feature].std(),
            data[feature].skew(),
            data[feature].kurtosis(),
            data[feature].max(),
            data[feature].quantile(0.99, interpolation=interpolation),
            data[feature].quantile(0.95, interpolation=interpolation),
            data[feature].quantile(0.90, interpolation=interpolation)
        ]

    return stats_df


def tail_plot(df, feature, frequency, periods: list = [12, 36, 60, "ALL"], all_period_start: str = None, interpolation: str = "linear"):
    """
    绘制不同时间周期的特征值分布
    """
    data_sources = create_data_sources(df, periods, all_period_start, frequency)

    if frequency == "ME":
        bin_range = list(range(0, 35, 5))
    elif frequency == "W":
        bin_range = list(range(0, 18, 3))


    for period_name, data in data_sources.items():
        plt.figure(figsize=(10, 6))
        sns.set_style("darkgrid")
        n, bins, patches = plt.hist(data[feature], bins=bin_range, alpha=0.5, color='skyblue', density=True, cumulative=True)

        n_diff = np.insert(np.diff(n), 0, n[0])
        for rect, h_diff, h in zip(patches, n_diff, n):
            height = rect.get_height()
            plt.text(rect.get_x() + rect.get_width() / 2, height, f'{h_diff * 100:.0f}%/{h * 100:.0f}%', ha='center', va='bottom', size=12)

        percentiles = [data[feature].quantile(p, interpolation=interpolation) for p in [0.90, 0.95, 0.99]]
        for p, val in zip([90, 95, 99], percentiles):
            plt.axvline(val, color='red', linestyle=':', alpha=0.3, label=f'{p}th: {val:.1f}%')

        last_three = data[feature].iloc[-3:]
        last_three_dates = last_three.index.strftime('%Y%b')
        for val, date, grayscale in zip(last_three, last_three_dates, np.arange(0.7, 0, -0.3)):
            plt.scatter(val, 0, color=str(grayscale), s=100, zorder=5, label=f'{date}: {val:.1f}%')

        plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
        plt.title(f"Distribution of {feature} - {period_name}")
        plt.xlabel(f"{feature} (%)")
        plt.ylabel("Density")
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()



def calculate_projections(data, feature, percentile, interpolation, bias_weight):
    data["ProjectHigh"] = data["LastClose"] + data["LastClose"] * data[feature].quantile(percentile, interpolation=interpolation) / 100 * bias_weight
    data["ProjectLow"] = data["LastClose"] - data["LastClose"] * data[feature].quantile(percentile, interpolation=interpolation) / 100 * (1 - bias_weight)
    data["ActualClosingStatus"] = np.where(data["Close"] > data["ProjectHigh"], 1, 
                                       np.where(data["Close"] < data["ProjectLow"], -1, 0))
    realized_bias = ((data["ActualClosingStatus"] == 1).sum() - ((data["ActualClosingStatus"] == -1).sum())) / len(data)
    
    return realized_bias

def volatility_projection(df, feature, frequency: str = 'ME', percentile: float = 0.90, prefer_bias: float = None, periods: list = [12, 36, 60, "ALL"], all_period_start: str = None, interpolation: str = "linear"):
    """
    计算不同时间周期的波动率预测
    """
    if not isinstance(periods, list):
        raise TypeError("periods must be a list")
    if not all(isinstance(p, (int, str)) for p in periods):
        raise ValueError("periods must contain integers or strings")

    if feature == "OscillationPct":
        refrequency_data = refrequency(df, frequency=frequency)
        refrequency_feature = oscillation(refrequency_data)

        data_sources = create_data_sources(refrequency_feature, periods, all_period_start, frequency)

        volatility_projection_index = pd.Index(
            [
                f"Last: {refrequency_feature.index[-2].strftime('%y%b%d')}", 
                f"{percentile}th {feature}", 
                "RealizedBias%",  
                "ProjectedHighWeight%", 
                "ProjHigh", 
                "ProjLow", 
                f"Today: {df.index[-1].strftime('%y%b%d')}"
                ]
            )
        volatility_projection_df = pd.DataFrame(index=volatility_projection_index)

        for period_name, data in data_sources.items():
            period_end_close = data["Close"].iloc[-1]
            assumed_volatility = data[feature].quantile(percentile, interpolation=interpolation)

            if prefer_bias is not None:
                # 寻找最佳 bias_weight 的逻辑
                proj_high_weights = np.linspace(0.1, 0.9, 90)  # 在 0 到 1 之间生成 100 个等间距的 bias_weight 值
                min_error = float('inf')
                best_proj_high_weight = 0
                for proj_high_weight in proj_high_weights:
                    realized_bias = calculate_projections(data.copy(), feature, percentile, interpolation, proj_high_weight)
                    error = abs(realized_bias - prefer_bias)
                    if error < min_error:
                        min_error = error
                        best_proj_high_weight = proj_high_weight
                proj_high_weight = best_proj_high_weight            
            
            else:
                proj_high_weight = 0.5

            realized_bias = calculate_projections(data, feature, percentile, interpolation, proj_high_weight)

            proj_high = period_end_close + period_end_close * assumed_volatility / 100 * proj_high_weight
            proj_low = period_end_close - period_end_close * assumed_volatility / 100 * (1 - proj_high_weight)
            
            last_close = df["Close"].iloc[-1]

            volatility_projection_df[period_name] = [
                period_end_close,
                assumed_volatility,
                realized_bias*100,
                proj_high_weight*100,
                proj_high,
                proj_low,
                last_close
            ]
        return volatility_projection_df
    else:
        raise ValueError("Invalid feature value")




def tail_table(df, feature, frequency, periods: list = [12, 36, 60, "ALL"], all_period_start: str = None, interpolation: str = "linear"):
    """
    **To Modify Ouput Table Information** 按每个时间段计算表格，输出每个时间段及其对应的表格，最后将结果存储在字典中返回
    """
    data_sources = create_data_sources(df, periods, all_period_start, frequency)

    if frequency == "ME":
        bin_range = list(range(0, 35, 5))
    elif frequency == "W":
        bin_range = list(range(0, 18, 3))

    result_dict = {}
    for period_name, data in data_sources.items():
        # 计算直方图的累积密度
        n, bins = np.histogram(data[feature], bins=bin_range, density=True)
        cumulative_n = np.cumsum(n * np.diff(bins))
        n_diff = np.insert(np.diff(cumulative_n), 0, cumulative_n[0])

        # 计算百分位数
        percentiles = [data[feature].quantile(p, interpolation=interpolation) for p in [0.90, 0.95, 0.99]]

        # 获取最后三个数据点
        last_three = data[feature].iloc[-3:]
        last_three_dates = last_three.index.strftime('%Y%b')

        table_data = []
        for i in range(len(bins) - 1):
            bin_info = {
                "Period": period_name,
                "Bin Start": bins[i],
                "Bin End": bins[i + 1],
                "Diff Percentage": f'{n_diff[i] * 100:.0f}%',
                "Cumulative Percentage": f'{cumulative_n[i] * 100:.0f}%',
                "90th Percentile": percentiles[0],
                "95th Percentile": percentiles[1],
                "99th Percentile": percentiles[2],
            }
            for j, (date, val) in enumerate(zip(last_three_dates, last_three)):
                bin_info[f"Last {3 - j} Date"] = date
                bin_info[f"Last {3 - j} Value"] = val
            table_data.append(bin_info)

        table = pd.DataFrame(table_data)
        print(period_name, table)
        result_dict[period_name] = table

    return result_dict


def frequency_gap_stats(df, feature, frequency, periods: list = [12, 36, 60, "ALL"], all_period_start: str = None, interpolation: str = "linear"):
    """
    计算不同时间周期的频率缺口统计
    Given df, feature, and frequency,
        for each period in frequency,
            compute the gap_return = percentage change of first date open over last period close
            compute statistics of gap_return
            compute frequency_return = percentage change of last date close over last period close
            set days_of_period = len(df[rows only in the period])
            compare distribution of (gap_return+1)**days_of_period-1 with frequency_return distribution
        return distribution table of gap_return 
    """


In [13]:
# frequency = 'ME'
# monthly_data = refrequency(data, frequency=frequency)
# monthly_oscill = oscillation(monthly_data)
# monthly_tail_stats_result =tail_stats(monthly_oscill,"OscillationPct",frequency=frequency)
# print(monthly_tail_stats_result,"\n")

# round_digit = 1
# tail_plot(monthly_oscill,"OscillationPct",frequency=frequency)
# volatility_proj_pbn = volatility_projection(data,"OscillationPct",frequency=frequency,prefer_bias=None).round(round_digit)
# print(volatility_proj_pbn,"\n")
# volatility_proj_pb0 = volatility_projection(data,"OscillationPct",frequency=frequency,prefer_bias=0).round(round_digit)
# print(volatility_proj_pb0)


In [15]:
frequency = 'W'
refrequency_data = refrequency(data, frequency=frequency)
oscill = oscillation(refrequency_data)
tail_stats_result =tail_stats(oscill,"OscillationPct",frequency=frequency)
print(tail_stats_result,"\n")

round_digit = 1
# tail_plot(oscill,"OscillationPct",frequency=frequency)
# tail_hist = tail_table(oscill,"OscillationPct",frequency=frequency)
# print(tail_hist,"\n")
volatility_proj_pbn = volatility_projection(data,"OscillationPct",frequency=frequency,prefer_bias=None).round(round_digit)
print(volatility_proj_pbn,"\n")
volatility_proj_pb0 = volatility_projection(data,"OscillationPct",frequency=frequency,prefer_bias=0).round(round_digit)
print(volatility_proj_pb0)


      24May-25Apr  22May-25Apr  20May-25Apr  10Jan-25Apr
mean     4.719653     4.750659     4.487516     3.667798
std      2.579451     2.200721     2.098890     1.895296
skew     1.925695     1.915009     2.228378     2.034881
kurt     4.548465     5.038169     8.069789     7.257269
max     13.859898    13.859898    16.593874    16.593874
99th    13.652987    13.543815    13.533469    10.378523
95th     9.388461     8.982476     7.598381     6.901116
90th     6.992733     7.050944     6.606538     5.989418 

                      24May-25Apr  22May-25Apr  20May-25Apr  10Jan-25Apr
25Apr06                   22849.8      22849.8      22849.8      22849.8
0.9th OscillationPct          7.0          7.1          6.6          6.0
RealizedBias%                 4.2          2.0         -2.7         -1.0
ProjectedHighWeight%         50.0         50.0         50.0         50.0
ProjHigh                  23648.7      23655.4      23604.6      23534.1
ProjLow                   22050.9      22044.2 

In [None]:
# test
# mo.ChangeDistPlot(data, time_windows=['1Y', ('20240101', '20250401'), '100Y'], frequencies = ['W', 'M'])