In [58]:
import datetime

import pandas as pd
import polars as pl

import quant_utils.data_moudle as dm
from data_functions.portfolio_data import get_portfolio_info, query_portfolio_nav
from quant_utils.constant import DATE_FORMAT, DB_CONFIG, TODAY
from quant_utils.constant_varialbles import LAST_TRADE_DT
from quant_utils.db_conn import DB_CONN_JJTG_DATA
from quant_utils.send_email import MailSender

RENAME_DICT = {
    "TICKER_SYMBOL": "组合名称",
    "CYCLE": "周期",
    "START_DATE": "起始日期",
    "END_DATE": "结束日期",
    "INDICATOR": "指标",
    "PORTFOLIO_VALUE": "组合",
    "PEER_RANK": "同类基金排名",
    "BENCHMARK_VALUE_OUTTER": "对客基准",
    "BENCHMARK_VALUE_INNER": "对内基准",
    "PEER_MEDIAN": "同类中位数",
    "PEER_FOF_RANK": "同类FOF排名",
    "PEER_PORTFOLIO_RANK": "同类投顾排名",
}

USED_COLUMNS = [
    "TICKER_SYMBOL", 
    "PORTFOLIO_NAME",
    "START_DATE", 
    "END_DATE", 
    "INDICATOR",
]

def crate_database_uri(database_type: str, config: dict) -> str:
    return f"{database_type}://{config['user']}:{config['pwd']}@{config['host']}:{config['port']}/{config['database']}"


JJTF_URI = crate_database_uri("mysql", DB_CONFIG["jjtg"])


def unpivot_dataframe(df: pl.LazyFrame) -> pl.LazyFrame:
    """
    数据透视表转置
    """
    return df.unpivot(
        index=["TICKER_SYMBOL", "PORTFOLIO_NAME","START_DATE", "END_DATE"],
        variable_name="INDICATOR",
        value_name="PORTFOLIO_VALUE",
        on=[
            "CUM_RETURN",
            "ANNUAL_RETURN",
            "ANNUAL_VOLATILITY",
            "SHARP_RATIO_ANNUAL",
            "CALMAR_RATIO_ANNUAL",
            "MAXDD",
        ],
    )

def get_portfolio_performance(
    end_date: str,
    table_name: str,
) -> pl.LazyFrame:
    query_sql = f"""
    SELECT
        TICKER_SYMBOL,
        TICKER_SYMBOL as PORTFOLIO_NAME,
        START_DATE,
        END_DATE,
        CUM_RETURN,
        ANNUAL_RETURN,
        ANNUAL_VOLATILITY,
        SHARP_RATIO_ANNUAL,
        CALMAR_RATIO_ANNUAL,
        MAXDD
    FROM
        {table_name} 
    WHERE
        1 = 1
        AND END_DATE = '{end_date}'
    """
    return pl.read_database_uri(query_sql, uri=JJTF_URI).lazy().pipe(unpivot_dataframe)


def get_peer_fund_performance(
    end_date: str,
) -> pl.LazyFrame:
    peer_query_df = (
        dm.get_portfolio_info()[["PORTFOLIO_NAME", "PEER_QUERY"]]
    )
    query_sql = f"""
        SELECT
            a.TICKER_SYMBOL,
            a.START_DATE,
            a.END_DATE,
            a.CUM_RETURN,
            a.ANNUAL_RETURN,
            a.ANNUAL_VOLATILITY,
            a.SHARP_RATIO_ANNUAL,
            a.CALMAR_RATIO_ANNUAL,
            a.MAXDD,
            c.LEVEL_1,
            c.LEVEL_2,
            c.LEVEL_3,
            c.EQUITY_RATIO_IN_NA,
            c.SEC_SHORT_NAME
        FROM
            fund_performance_inner a
            JOIN fund_type_own c ON c.TICKER_SYMBOL = a.TICKER_SYMBOL
        WHERE
            1 = 1 
            AND a.END_DATE = '{end_date}' 
            AND ( 
                c.REPORT_DATE = ( 
                    SELECT max( report_date ) 
                    FROM fund_type_own 
                    WHERE PUBLISH_DATE <= '{end_date}' 
                )
            )
        """
    fund_perf = pl.read_database_uri(query_sql, uri=JJTF_URI).lazy()
    result_list = []
    for _, val in peer_query_df.iterrows():
        peer_query = val["PEER_QUERY"]
        peer_query = peer_query.replace("==", "=")
        peer_query = peer_query.replace('"', "'")
        peer_query_sql = f""" 
            select 
                TICKER_SYMBOL,
                START_DATE,
                END_DATE,
                CUM_RETURN,
                ANNUAL_RETURN,
                ANNUAL_VOLATILITY,
                SHARP_RATIO_ANNUAL,
                CALMAR_RATIO_ANNUAL,
                MAXDD
            from
                self
            where {peer_query}
        """
        temp = (
            fund_perf.sql(peer_query_sql)
            .with_columns(pl.lit(val["PORTFOLIO_NAME"]).alias("PORTFOLIO_NAME"))
            .pipe(unpivot_dataframe)
        )
        result_list.append(temp)
    return pl.concat(result_list)

def get_peer_fof_performance(
    end_date: str,
) -> pl.LazyFrame:
    query_sql = f"""
    SELECT
        a.TICKER_SYMBOL,
        c.INNER_TYPE as PORTFOLIO_NAME,
        a.START_DATE,
        a.END_DATE,
        a.CUM_RETURN,
        a.ANNUAL_RETURN,
        a.ANNUAL_VOLATILITY,
        a.SHARP_RATIO_ANNUAL,
        a.CALMAR_RATIO_ANNUAL,
        a.MAXDD 
    FROM
        fund_performance_inner a
        JOIN fof_type c ON c.TICKER_SYMBOL = a.TICKER_SYMBOL 
    WHERE
        1 = 1 
        AND a.END_DATE = '{end_date}' 
    """
    return pl.read_database_uri(query_sql, uri=JJTF_URI).lazy().pipe(unpivot_dataframe)

def get_peer_portfolio_performance(
    end_date: str,
)-> pl.LazyFrame:
    query_sql = f"""
    SELECT
        a.TICKER_SYMBOL,
        c.PORTFOLIO_TYPE as PORTFOLIO_NAME,
        a.START_DATE,
        a.END_DATE,
        a.CUM_RETURN,
        a.ANNUAL_RETURN,
        a.ANNUAL_VOLATILITY,
        a.SHARP_RATIO_ANNUAL,
        a.CALMAR_RATIO_ANNUAL,
        a.MAXDD 
    FROM
        peer_performance_inner a
        JOIN peer_portfolio_type c ON c.TICKER_SYMBOL = a.TICKER_SYMBOL 
    WHERE
        1 = 1 
        AND a.END_DATE = '{end_date}' 
    """
    return (
        pl.read_database_uri(query_sql, uri=JJTF_URI)
        .lazy()
        .pipe(unpivot_dataframe)
    )

def get_benchmark_value_outter(end_date: str) -> pl.lazyframe:
    query = f"""
    SELECT
        a.TICKER_SYMBOL,
        a.START_DATE,
        a.END_DATE,
        a.CUM_RETURN,
        a.ANNUAL_RETURN,
        a.ANNUAL_VOLATILITY,
        a.SHARP_RATIO_ANNUAL,
        a.CALMAR_RATIO_ANNUAL,
        a.MAXDD 
    FROM
        benchmark_performance_inner a
    WHERE
        1 = 1 
        AND a.END_DATE = '{end_date}' 
    """
    df = pl.read_database_uri(query, uri=JJTF_URI).lazy()
    df_unpivot = df.unpivot(
        index=["TICKER_SYMBOL", "START_DATE", "END_DATE"],
        variable_name="INDICATOR",
        value_name="BENCHMARK_VALUE_OUTTER",
    )
    return df_unpivot


def rank_pct(
    rank_col: str, patition_by: str | list = None, descending: bool = True
) -> pl.Expr:
    rank_expr = pl.col(rank_col).rank(descending=descending).cast(pl.UInt32)
    count_expr = pl.col(rank_col).count().cast(pl.UInt32)
    return 100 * ((rank_expr - 1) / (count_expr - 1)).over(patition_by)


def rank_str(
    rank_col: str, patition_by: str | list = None, descending: bool = True
) -> pl.Expr:
    rank_expr = pl.col(rank_col).rank(descending=descending).cast(pl.UInt32)
    count_expr = pl.col(rank_col).count().cast(pl.UInt32)
    return (
        rank_expr.cast(pl.String).over(patition_by)
        + "/"
        + count_expr.cast(pl.String).over(patition_by)
    )


def _cal_performance_rank_helper(
    df: pl.LazyFrame,
    patition_by: str | list = None,
    incicator_list: list = None,
    descending: bool = True,
) -> pl.LazyFrame:
    # 计算排名及百分位
    # 特别注意在polars中rank函数不考虑空值
    result_df = (
        df.select(
            [
                pl.col("TICKER_SYMBOL"),
                pl.col("PORTFOLIO_NAME"),
                pl.col("START_DATE"),
                pl.col("END_DATE"),
                pl.col("INDICATOR"),
                pl.col("PORTFOLIO_VALUE"),
            ]
        )
        .filter(pl.col("INDICATOR").is_in(incicator_list))
        .with_columns(
            rank_pct(
                "PORTFOLIO_VALUE", patition_by=patition_by, descending=descending
            ).alias("PEER_RANK_PCT"),
            rank_str(
                "PORTFOLIO_VALUE", patition_by=patition_by, descending=descending
            ).alias("PEER_RANK"),
        )
    )

    return result_df


def cal_performance_rank(df: pl.LazyFrame) -> pl.LazyFrame:
    asscending_indicators = ["MAXDD", "ANNUAL_VOLATILITY",]
    descending_indicators = [
        "CUM_RETURN",
        "ANNUAL_RETURN",
        "SHARP_RATIO_ANNUAL",
        "CALMAR_RATIO_ANNUAL",
    ]
    patition_by = ["PORTFOLIO_NAME", "START_DATE", "END_DATE", "INDICATOR",]
    # 计算排名及百分位
    # 特别注意在polars中rank函数不考虑空值
    df_asscending = _cal_performance_rank_helper(
        df,
        patition_by=patition_by,
        incicator_list=asscending_indicators,
        descending=False,
    )

    df_descending = _cal_performance_rank_helper(
        df,
        patition_by=patition_by,
        incicator_list=descending_indicators,
        descending=True,
    )
    result = pl.concat([df_asscending, df_descending]).filter(
        pl.col("TICKER_SYMBOL") == pl.col("PORTFOLIO_NAME")
    )
    return result

def get_portfolio_dates(end_date: str) -> pl.LazyFrame:
    query_sql = f"""
    SELECT
        DATE_NAME AS CYCLE,
        PORTFOLIO_NAME as FLAG,
        START_DATE,
        END_DATE 
    FROM
        portfolio_dates 
    WHERE
        1 = 1 
        AND END_DATE = '{end_date}'
    """
    return pl.read_database_uri(query_sql, uri=JJTF_URI).lazy()


def rename_indicator_col_into_chinese(df: pl.LazyFrame):
    indicator_map_dict = {
        "CUM_RETURN": "累计收益率",
        "ANNUAL_RETURN": "年化收益率",
        "ANNUAL_VOLATILITY": "年化波动率",
        "SHARP_RATIO_ANNUAL": "收益波动比",
        "CALMAR_RATIO_ANNUAL": "年化收益回撤比",
        "MAXDD": "最大回撤",
    }
    return df.with_columns(
        pl.col("INDICATOR").replace(indicator_map_dict).alias("INDICATOR")
    )

def add_benchmark_value_otter(
    df: pl.LazyFrame,  end_date: str
) -> pl.LazyFrame:
    benchmark_df = get_benchmark_value_outter(end_date)
    result = df.join(
        benchmark_df,
        on=["TICKER_SYMBOL", "START_DATE", "END_DATE", "INDICATOR"],
        how="left",
    )

    return result


def add_peer_fof_performance(
    df: pl.LazyFrame,  end_date: str
) -> pl.LazyFrame:
    peer_fof = get_peer_fof_performance( end_date)
    result = cal_performance_rank(pl.concat([df, peer_fof]))
    result = result.rename(
        {"PEER_RANK_PCT": "PEER_FOF_RANK_PCT", "PEER_RANK": "PEER_FOF_RANK"}
    )
    result = result.select(
        USED_COLUMNS + ["PEER_FOF_RANK_PCT", "PEER_FOF_RANK"]
    )
    return df.join(
        result, on=USED_COLUMNS, how="left"
    )


def add_peer_portfolio_performance(
    df: pl.LazyFrame,  end_date: str
) -> pl.LazyFrame:
    peer_portfolio = get_peer_portfolio_performance( end_date)
    result = cal_performance_rank(pl.concat([df, peer_portfolio]))
    result = result.rename(
        {"PEER_RANK_PCT": "PEER_PORTFOLIO_RANK_PCT", "PEER_RANK": "PEER_PORTFOLIO_RANK"}
    )
    result = result.select(
        USED_COLUMNS + ["PEER_PORTFOLIO_RANK_PCT", "PEER_PORTFOLIO_RANK",]
    )
    return df.join(
        result, on=USED_COLUMNS, how="left"
    )


def cal_peer_median(peer_fund_performance: pl.LazyFrame) -> pl.LazyFrame:
    peer_median = peer_fund_performance.group_by(
        ["PORTFOLIO_NAME", "START_DATE", "END_DATE", "INDICATOR"]
    ).agg(pl.col("PORTFOLIO_VALUE").median().alias("PEER_MEDIAN"))
    return peer_median


def _cal_portfolio_performance(
    end_date: str, table_name: str
) -> pl.LazyFrame:
    portfolio_perf = get_portfolio_performance(end_date, table_name)

    peer_fund_performance = get_peer_fund_performance(end_date)

    peer_median = cal_peer_median(peer_fund_performance)

    df = pl.concat([portfolio_perf, peer_fund_performance])
    perf_rank = cal_performance_rank(df).join(
        peer_median, 
        on=["PORTFOLIO_NAME", "START_DATE", "END_DATE", "INDICATOR"], 
        how="left"
    )
    return perf_rank


def get_portfolio_derivatives_rank(end_date: str):
    portfolio_dates = get_portfolio_dates(end_date)
    perf_rank = _cal_portfolio_performance(
        end_date, "portfolio_derivatives_performance_inner"
    )
    perf_rank = (
        perf_rank.pipe(add_benchmark_value_otter, end_date)
        .rename({"BENCHMARK_VALUE_OUTTER": "BENCHMARK_VALUE_INNER"})
        .pipe(rename_indicator_col_into_chinese)
    )
    result = (
        portfolio_dates
        .join(perf_rank, on=["START_DATE", "END_DATE"])
        .filter(
            (pl.col("PORTFOLIO_NAME") == pl.col("FLAG"))
            | (pl.col("FLAG") == "ALL")
        )
        .select(pl.all().exclude(["FLAG", "PORTFOLIO_NAME"]))
    )
    return result


def get_portfolio_rank(end_date: str):
    portfolio_dates = get_portfolio_dates( end_date)
    perf_rank = _cal_portfolio_performance(
        end_date, "portfolio_performance_inner"
    )
    perf_rank = (
        perf_rank.pipe(add_benchmark_value_otter,  end_date)
        .pipe(add_peer_fof_performance,  end_date)
        # .pipe(add_peer_portfolio_performance,  end_date)
        # .pipe(rename_indicator_col_into_chinese)
    )
    result = (
        portfolio_dates
        .join(perf_rank, on=["START_DATE", "END_DATE"])
        .filter(
            (pl.col("PORTFOLIO_NAME") == pl.col("FLAG"))
            | (pl.col("FLAG") == "ALL")
        )
        .select(pl.all().exclude(["FLAG", "PORTFOLIO_NAME"]))
    )
    return result


In [59]:
get_portfolio_rank("2024-11-22").collect()

CYCLE,START_DATE,END_DATE,TICKER_SYMBOL,INDICATOR,PORTFOLIO_VALUE,PEER_RANK_PCT,PEER_RANK,PEER_MEDIAN,BENCHMARK_VALUE_OUTTER,PEER_FOF_RANK_PCT,PEER_FOF_RANK
str,date,date,str,str,f64,f64,str,f64,f64,f64,str
"""对客日""",2023-03-22,2024-11-22,"""兴证全明星精选-蚂蚁""","""ANNUAL_VOLATILITY""",16.474475,11.518325,"""727/6304""",21.813618,18.682019,,"""1/1"""
"""成立日""",2023-03-22,2024-11-22,"""兴证全明星精选-蚂蚁""","""ANNUAL_VOLATILITY""",16.474475,11.518325,"""727/6304""",21.813618,18.682019,,"""1/1"""
"""近1年""",2023-11-22,2024-11-22,"""兴证全明星精选-蚂蚁""","""ANNUAL_VOLATILITY""",19.395761,16.54006,"""1155/6978""",24.912297,21.92104,,"""1/1"""
"""YTD""",2023-12-29,2024-11-22,"""兴证全明星精选-蚂蚁""","""ANNUAL_VOLATILITY""",20.256695,17.739274,"""1291/7273""",25.958877,22.853567,,"""1/1"""
"""近9月""",2024-02-22,2024-11-22,"""兴证全明星精选-蚂蚁""","""ANNUAL_VOLATILITY""",19.594175,15.006916,"""1086/7231""",25.357836,22.588243,,"""1/1"""
…,…,…,…,…,…,…,…,…,…,…,…
"""TGDS_2""",2024-09-30,2024-11-22,"""知己私享-稳确幸""","""CALMAR_RATIO_ANNUAL""",-3.028385,58.992992,"""4547/7707""",-2.433275,-1.951095,57.142857,"""21/36"""
"""近1月""",2024-10-22,2024-11-22,"""知己私享-稳确幸""","""CALMAR_RATIO_ANNUAL""",-3.114297,37.699138,"""2888/7659""",-4.363493,-4.049633,11.111111,"""5/37"""
"""MTD""",2024-10-31,2024-11-22,"""知己私享-稳确幸""","""CALMAR_RATIO_ANNUAL""",-2.138423,32.597911,"""2498/7661""",-3.690076,-3.227203,5.555556,"""3/37"""
"""近1周""",2024-11-15,2024-11-22,"""知己私享-稳确幸""","""CALMAR_RATIO_ANNUAL""",-26.92931,70.806808,"""5451/7698""",-23.615221,-29.935812,33.333333,"""13/37"""


In [13]:
from quant_utils.db_conn import JJTG_URI
import polars as pl
import quant_utils.data_moudle as dm
def get_fund_performance_inner(
    end_date: str
):
    query_sql = f"""
    SELECT
    TICKER_SYMBOL,
    START_DATE,
    END_DATE,
    CUM_RETURN,
    ANNUAL_RETURN,
    VOLATILITY,
    ANNUAL_VOLATILITY,
    SHARP_RATIO,
    SHARP_RATIO_ANNUAL,
    MAXDD,
    CALMAR_RATIO_ANNUAL,
    MAXDD_DATE,
    MAXDD_RECOVER
    FROM
    fund_performance_inner
    WHERE
    END_DATE = '{end_date}'
    """
    return pl.read_database_uri(query_sql, JJTG_URI).lazy()

trade_dts = dm.get_trade_cal("20241122", "20241126")
for dt in trade_dts:
    get_fund_performance_inner(dt).sink_parquet(f"F:/data_parquet/fund_performance_inner/{dt}.parquet")


In [4]:
import polars as pl
df1 = pl.DataFrame({"a": [1], "b": [3]})
df2 = pl.DataFrame({"a": [2], "b": [4]})
df3 = pl.DataFrame({"a": [], "b": []})

In [6]:
df3

a,b
null,null


In [1]:
import polars as pl
from quant_utils.constant import DB_CONFIG

query_sql = """
WITH a AS (
  SELECT
    REPORT_DATE,
  CASE
      a.IF_TRADING_DAY 
      WHEN 1 THEN
      TRADE_DT ELSE PREV_TRADE_DATE 
    END AS TRADE_DT 
  FROM
    md_tradingdaynew a
    JOIN md_report_date_calender b ON a.TRADE_DT = b.REPORT_DATE 
  WHERE
    1 = 1 
    AND MONTH(REPORT_DATE) = 12 
    AND SECU_MARKET = 83 UNION
  SELECT
    '2024-11-26' AS REPORT_DATE,
'2024-11-26' AS TRADE_DT) SELECT
c.ticker_symbol,
YEAR(c.end_date) AS YEAR,
c.RETURN_RATE_YTD 
FROM
  a
  JOIN fund_type_own b ON a.REPORT_DATE = b.REPORT_DATE
  JOIN fund_nav_gr c ON c.end_date = a.trade_dt 
  AND c.ticker_symbol = b.ticker_symbol 
WHERE
  1 = 1 
  AND b.level_1 = '主动权益' UNION
SELECT
  c.ticker_symbol,
  YEAR(c.end_date) AS YEAR,
  c.RETURN_RATE_YTD 
FROM
  fund_type_own b
  JOIN fund_nav_gr c ON c.end_date = '20241126' 
  AND c.ticker_symbol = b.ticker_symbol 
WHERE
  1 = 1 
  AND b.level_1 = '主动权益' 
  AND b.report_date = '20240930'
"""
def crate_database_uri(database_type: str, config: dict) -> str:
    return f"{database_type}://{config['user']}:{config['pwd']}@{config['host']}:{config['port']}/{config['database']}"


JJTF_URI = crate_database_uri("mysql", DB_CONFIG["jjtg"])

In [2]:
hs300_query = """
WITH a AS (
  SELECT
    REPORT_DATE,
  CASE
      a.IF_TRADING_DAY 
      WHEN 1 THEN
      TRADE_DT ELSE PREV_TRADE_DATE 
    END AS TRADE_DT 
  FROM
    md_tradingdaynew a
    JOIN md_report_date_calender b ON a.TRADE_DT = b.REPORT_DATE 
  WHERE
    1 = 1 
    AND MONTH(REPORT_DATE) = 12 
    AND SECU_MARKET = 83 UNION
  SELECT
    '2024-11-26' AS REPORT_DATE,
'2024-11-26' AS TRADE_DT) SELECT
YEAR(a.TRADE_DT) AS YEAR,
100*(b.ClosePrice / lag(b.ClosePrice) over (ORDER BY a.TRADE_DT) - 1) AS 'hs300' 
FROM
  a
  JOIN jy_indexquote b ON b.TradingDay = a.TRADE_DT 
  AND a.TRADE_DT <= '20241126' 
  AND b.SecuCode = '000300'
"""

In [3]:
df = pl.read_database_uri(query_sql, JJTF_URI)
hs300 = pl.read_database_uri(hs300_query, JJTF_URI)

In [4]:
temp = (
    df.join(hs300, on="YEAR", how="left")
    .with_columns(
        if_win=pl.when(pl.col("RETURN_RATE_YTD") > pl.col("hs300")).then(1).otherwise(0)
    )
)

In [5]:
temp.group_by("YEAR").agg(
    win_rate=pl.col("if_win").sum() / pl.col("if_win").count()
).sort("YEAR").to_pandas().to_excel("f:/test.xlsx")

In [6]:
df.group_by("YEAR").agg(
    mean=pl.col("RETURN_RATE_YTD").mean(),
    min=pl.col("RETURN_RATE_YTD").min(),
    percent25=pl.col("RETURN_RATE_YTD").quantile(0.25),
    median=pl.col("RETURN_RATE_YTD").median(),
    percent75=pl.col("RETURN_RATE_YTD").quantile(0.75),
    max=pl.col("RETURN_RATE_YTD").max(),
).sort("YEAR")

YEAR,mean,min,percent25,median,percent75,max
i64,f64,f64,f64,f64,f64,f64
2007,105.466149,1.2,91.9608,112.8785,129.2366,226.2829
2008,-44.419087,-66.416,-53.4285,-48.4837,-43.7008,6.4938
2009,56.501016,-16.6,43.1599,61.9221,74.3219,116.1939
2010,4.180169,-24.0399,-2.8,3.7393,10.3231,37.7664
2011,-21.878733,-50.773,-26.3,-22.5281,-17.9998,5.3283
…,…,…,…,…,…,…
2020,46.597458,-11.657,24.7234,46.5848,66.1811,166.565
2021,7.385901,-32.4985,-2.8997,3.0096,15.7362,119.4234
2022,-17.613985,-50.0748,-25.0596,-18.8806,-11.2489,48.5612
2023,-11.833022,-46.5001,-18.3075,-12.2753,-5.3635,61.0878


In [9]:
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('TkAgg')
# df = df.filter(pl.col("YEAR")>=2014)
plt.rcParams['font.sans-serif']=['SimHei']  # 用于显示中文
plt.rcParams['axes.unicode_minus'] = False  # 用于显示中文
sns.boxenplot(x="YEAR", y="RETURN_RATE_YTD", data=df, orient="v")
plt.xlabel("年份")
plt.ylabel("收益率")
plt.show()

In [8]:
(
    pl
    .scan_parquet("F:/data_parquet/fund_performance_inner/*.parquet")
    .group_by("TICKER_SYMBOL")
    .agg(
        FUND_PERF_START_DATE=pl.col("END_DATE").min(),
        FUND_PERF_END_DATE=pl.col("END_DATE").max()
    )
).collect()

TICKER_SYMBOL,FUND_PERF_START_DATE,FUND_PERF_END_DATE
str,date,date
"""014377""",2023-06-30,2024-11-26
"""018495""",2023-06-30,2024-11-26
"""160814""",2023-06-30,2024-05-22
"""011733""",2023-06-30,2024-11-26
"""003752""",2023-06-30,2024-11-26
…,…,…
"""009159""",2023-06-30,2024-11-25
"""217203""",2023-06-30,2024-11-26
"""010593""",2023-06-30,2024-11-26
"""017517""",2023-06-30,2024-11-26


In [1]:
from sqlalchemy import create_engine
from quant_utils.db_conn import PG_DATA_URI, JJTG_URI
from sqlalchemy import inspect
import polars as pl
import os 

def get_db_table_unque_index(table_name: str, uri: str)-> list[list[str]]:
    # 创建数据库引擎
    engine = create_engine(uri)
    # 创建元数据对象
    inspector = inspect(engine)
    # 获取指定表的索引列表
    indexes = inspector.get_indexes(table_name)
    return [index["column_names"] for index in indexes if index['unique'] is True]

In [2]:
get_db_table_unque_index("chinamutualfundnav", JJTG_URI)

[['F_INFO_WINDCODE', 'PRICE_DATE']]

In [4]:
file_root_path = "F:/data_parquet/fund_performance_inner/"
for file_name in os.listdir(file_root_path):
    if file_name.endswith(".parquet"):
        file_path = os.path.join(file_root_path, file_name)
        df = pl.read_parquet(file_path)
        df.write_database(
            connection=PG_DATA_URI, 
            table_name="fund_performance_inner", 
            if_table_exists="append"
        )
        print(f"Loaded {file_name} into database")

KeyboardInterrupt: 