In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import polars as pl

# df = pd.read_csv("../data/raw/us_stocks_sip/minute_aggs_v1/2025/08/2025-08-11.csv")
# df = df[df['ticker'] =='AAPL']
stock_path = "../data/lake/us_stocks_sip/minute_aggs_v1/2025/08/2025-08-08.parquet"

df = pl.read_parquet(stock_path).filter(pl.col("ticker") == 'AAPL')

print(df.head())

print(df.count())
# 计算一日的vprice
vprice_day = (df['volume'] * df['close']).sum() / df['volume'].sum()

# print("收盘价Close:", df.iloc[-1]['close'])
print("一日的vprice:", vprice_day)


In [None]:
import ipywidgets as widgets

mode = widgets.Dropdown(
    options=['backtest', 'live'],
    value='backtest',
    description='Mode:',
)

prefix = widgets.Text(
    value='demo',
    description='Prefix:',
)

display(mode, prefix)

def run_strategy(m, p):
    print(f"运行模式: {m}, 前缀: {p}")

widgets.interactive(run_strategy, m=mode, p=prefix)


In [10]:
import glob
import polars as pl
from datetime import datetime, timedelta

# 数据路径（7月份所有 parquet 文件）
# valid columns: ["ticker", "volume", "open", "close", "high", "low", "window_start", "transactions"]
data_dir = "../data/lake/us_stocks_sip/minute_aggs_v1/2025/07/*.parquet"

lf = (
    pl.scan_parquet(data_dir)
    .filter(pl.col("ticker") == "AAPL")
    .with_columns(
        pl.from_epoch(pl.col("window_start"), time_unit='ns')
        # .dt.convert_time_zone("America/New_York")
        .alias('datetime')
    )
)

df_all = lf.collect(engine='streaming')

# 计算整月的成交量加权价格（VWAP）
vprice_month = (df_all["volume"] * df_all["close"]).sum() / df_all["volume"].sum()
print(f"\n2025-07 VWAP: {vprice_month}")


# 使用Polars检查周末数据
print("检查周末数据...")
week_data = df_all.filter(
    (pl.col("datetime") >= pl.datetime(2025, 7, 5)) & 
    (pl.col("datetime") <= pl.datetime(2025, 7, 6, 23, 59, 59))
)
print(f"2025-07-05到2025-07-06的数据行数: {week_data.height}")

# 更详细的日期检查
print("\n详细的日期检查:")
dates = ['2025-07-04', '2025-07-05', '2025-07-06', '2025-07-07']
for date_str in dates:
    year, month, day = map(int, date_str.split('-'))
    start_datetime = pl.datetime(year, month, day)
    end_datetime = pl.datetime(year, month, day, 23, 59, 59)
    
    date_data = df_all.filter(
        (pl.col("datetime") >= start_datetime) & 
        (pl.col("datetime") <= end_datetime)
    )
    
    # 获取星期几
    weekday_num = datetime(year, month, day).weekday()  # 0=Monday, 6=Sunday
    weekdays = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
    weekday = weekdays[weekday_num]
    
    print(f"{date_str} ({weekday}): {date_data.height} 行数据")

# 使用Polars检查星期几的分布
print("\n按星期几统计数据量:")
df_with_weekday = df_all.with_columns(
    pl.col("datetime").dt.strftime("%A").alias("weekday")
)
weekday_counts = df_with_weekday.group_by("weekday").agg(pl.len().alias("count")).sort("count", descending=True)
print(weekday_counts)



2025-07 VWAP: 211.3413595311783
检查周末数据...
2025-07-05到2025-07-06的数据行数: 0

详细的日期检查:
2025-07-04 (Friday): 0 行数据
2025-07-05 (Saturday): 0 行数据
2025-07-06 (Sunday): 0 行数据
2025-07-07 (Monday): 828 行数据

按星期几统计数据量:
shape: (5, 2)
┌───────────┬───────┐
│ weekday   ┆ count │
│ ---       ┆ ---   │
│ str       ┆ u32   │
╞═══════════╪═══════╡
│ Tuesday   ┆ 3925  │
│ Wednesday ┆ 3803  │
│ Thursday  ┆ 3583  │
│ Monday    ┆ 3071  │
│ Friday    ┆ 2107  │
└───────────┴───────┘


In [None]:
# use yfinance to double check splits discrepancy
import yfinance as yf
import pandas as pd

ticker ='ENVX'

df = yf.download(ticker,period='10y',actions=True)
df= pd.DataFrame(df)
with pd.option_context('display.max_rows', 50, 
                       'display.max_columns', 20, 
                       'display.max_colwidth', 100,
                       'display.width', None,           # 不限制总宽度
                       'display.expand_frame_repr', False):  # 不要换行显示
    df = df[df['Stock Splits'] > 0].dropna()
    print(df)

In [None]:
def apply_split_adjustments(df: pl.DataFrame, splits: pl.DataFrame, price_columns: list = None) -> pl.DataFrame:
    """
    通用的分拆调整函数
    
    Args:
        df: 包含价格数据的DataFrame，必须包含 'ticker' 和 'datetime' 列
        splits: 分拆数据DataFrame，包含 'ticker', 'execution_date', 'split_from', 'split_to' 列
        price_columns: 需要调整的价格列名列表，默认为 ['open', 'close', 'high', 'low']
    
    Returns:
        调整后的DataFrame
    """
    if price_columns is None:
        price_columns = ['open', 'close', 'high', 'low']
    
    # 确保日期格式正确
    splits_processed = splits.with_columns([
        pl.col('execution_date').str.to_date().alias('split_date'),
        (pl.col('split_from') / pl.col('split_to')).alias('split_ratio')
    ])
    
    # 为每个ticker计算累计分拆比率
    result_df = df.clone()
    
    for ticker in df['ticker'].unique():
        ticker_splits = splits_processed.filter(pl.col('ticker') == ticker).sort('split_date')
        ticker_data = df.filter(pl.col('ticker') == ticker).sort('datetime')
        
        if ticker_splits.height == 0:
            continue
            
        # 为每行数据计算需要应用的累计分拆比率
        ticker_data = ticker_data.with_columns([
            pl.col('datetime').dt.date().alias('data_date')
        ])
        
        # 使用join_asof进行时间匹配，获取每个数据点之后发生的所有分拆
        adjusted_data = ticker_data.clone()
        
        for price_col in price_columns:
            if price_col in ticker_data.columns:
                # 计算该日期之后的所有分拆的累计比率
                cumulative_ratios = []
                
                for row in ticker_data.iter_rows(named=True):
                    data_date = row['data_date']
                    # 获取该日期之后的所有分拆
                    future_splits = ticker_splits.filter(pl.col('split_date') > data_date)
                    
                    # 计算累计分拆比率
                    if future_splits.height > 0:
                        cumulative_ratio = future_splits['split_ratio'].product()
                    else:
                        cumulative_ratio = 1.0
                    
                    cumulative_ratios.append(cumulative_ratio)
                
                # 应用分拆调整
                adjusted_data = adjusted_data.with_columns([
                    (pl.col(price_col) * pl.Series(cumulative_ratios)).alias(price_col)
                ])
        
        # 调整成交量（分拆时成交量按相反比例调整）
        if 'volume' in ticker_data.columns:
            volume_ratios = []
            for row in ticker_data.iter_rows(named=True):
                data_date = row['data_date']
                future_splits = ticker_splits.filter(pl.col('split_date') > data_date)
                
                if future_splits.height > 0:
                    # 成交量按分拆比率的倒数调整
                    volume_ratio = (1 / future_splits['split_ratio']).product()
                else:
                    volume_ratio = 1.0
                
                volume_ratios.append(volume_ratio)
            
            adjusted_data = adjusted_data.with_columns([
                (pl.col('volume') * pl.Series(volume_ratios)).alias('volume')
            ])
        
        # 更新结果
        result_df = result_df.filter(pl.col('ticker') != ticker).vstack(
            adjusted_data.drop('data_date')
        )
    
    return result_df.sort(['ticker', 'datetime'])

In [None]:
import matplotlib
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import polars as pl
import seolpyo_mplchart as mc

# warnings.filterwarnings("ignore", message=".*Font family.*not found.*")
matplotlib.rcParams["font.family"] = "DejaVu Sans"

# 数据路径（7月份所有 parquet 文件）
# valid columns: ["ticker", "volume", "open", "close", "high", "low", "window_start", "transactions"]
data_dir = "../data/lake/us_stocks_sip/minute_aggs_v1/2025/07/*.parquet"

# splits data
splits_dir = "../data/raw/us_stocks_sip/splits/splits.parquet"
splits_error_dir = "../data/raw/us_stocks_sip/splits/splits_error.parquet"

splits_original = pl.read_parquet(splits_dir)
splits_errors = pl.read_parquet(splits_error_dir)

splits = splits_original.filter(~pl.col("id").is_in(splits_errors["id"].implode()))

print(splits.sort(['execution_date']).filter(pl.col('ticker') == 'BIVI'))

lf = (
    pl.scan_parquet(data_dir).with_columns(
        pl.from_epoch(pl.col("window_start"), time_unit="ns")
        .dt.convert_time_zone("America/New_York")
        .alias("datetime")
    )
).collect(engine="streaming").filter(pl.col("ticker") == "BIVI")

print(f"before:{lf.head(1)}")
print(f"before tail:{lf.tail(1)}")

# splits event process for historical data
# lf = apply_split_adjustments(lf, splits,volume_integer=True).sort(['datetime'])

# df_test = lf.filter(pl.col("ticker") == "BIVI").sort(["datetime"])
df_test = lf.sort(["datetime"])

print(f"after:{df_test.head(1)}")


shape: (3, 5)
┌─────────────────────────────────┬────────────────┬────────────┬──────────┬────────┐
│ id                              ┆ execution_date ┆ split_from ┆ split_to ┆ ticker │
│ ---                             ┆ ---            ┆ ---        ┆ ---      ┆ ---    │
│ str                             ┆ str            ┆ f64        ┆ f64      ┆ str    │
╞═════════════════════════════════╪════════════════╪════════════╪══════════╪════════╡
│ E0968510ccf34e46e16724e0827424… ┆ 2019-11-22     ┆ 125.0      ┆ 1.0      ┆ BIVI   │
│ E1441dc663515409c4c912da970e37… ┆ 2024-08-06     ┆ 10.0       ┆ 1.0      ┆ BIVI   │
│ E5aaf42e4160c06b45b7b57279eac4… ┆ 2025-07-07     ┆ 10.0       ┆ 1.0      ┆ BIVI   │
└─────────────────────────────────┴────────────────┴────────────┴──────────┴────────┘
before:shape: (1, 9)
┌────────┬────────┬──────┬───────┬───┬──────┬───────────────────┬──────────────┬───────────────────┐
│ ticker ┆ volume ┆ open ┆ close ┆ … ┆ low  ┆ window_start      ┆ transactions ┆ datetime 