In [None]:
# main.py

import polars as pl
import os

from src.futuapi.futu_api import FutuApi
from src.calculator.mathmatics import add_mean, add_boll

# 设置 Polars 显示的最大行数和列数
# pl.Config.set_tbl_width(1000)  # 设置表格的最大宽度，单位为字符
pl.Config.set_tbl_cols(100)  # 设置显示的最大列数
pl.Config.set_tbl_rows(10)  # 设置显示的最大行数


def trans(df):
    df = df.with_columns(
        pl.col("time_key").str.strptime(pl.Datetime, format="%Y-%m-%d %H:%M:%S")
    )
    df = df.with_columns(
        [
            pl.col("time_key").dt.date().cast(pl.Utf8).alias("date"),
            pl.col("time_key").dt.time().cast(pl.Utf8).alias("time"),
        ]
    )
    df = df.select(["code", "close", "change_rate", "date", "time", "volume"])
    df_vol = df.group_by("date").agg([pl.col("volume").sum().alias("volume")])
    # print(df_vol.filter(pl.col("date") > "2024-11-10"))

    close_df = df.filter(pl.col("time") == "15:00:00")
    # check_point_df = df.filter(pl.col("time") == "14:55:00")

    # df = (
    #     close_df.join(check_point_df, on=["code", "date"])
    #     .with_columns(
    #         [
    #             pl.col("close_right").cast(pl.Float64).alias("check_price"),
    #         ]
    #     )
    #     .select(["code", "date", "check_price", "close"])
    # )

    # join vol
    df = close_df
    df = df.with_columns(pl.col("close").alias("price"))
    df = df.drop('volume')
    df = df.join(df_vol, on=["date"]).select(["code", "date", "price", "volume"])
    df = df.sort('date')
    
    # print(df.filter(pl.col("date") > "2024-11-10"))

    df = df.with_columns(pl.col("price").rolling_mean(5).alias(f"ma_5"))
    df = df.with_columns(pl.col("price").rolling_mean(20).alias(f"ma_20"))
    df = df.with_columns(pl.col("price").rolling_mean(60).alias(f"ma_60"))
    df = df.with_columns(pl.col("price").rolling_mean(120).alias(f"ma_120"))
    df = df.with_columns(
        pl.col("price").rolling_max(window_size=30, min_periods=1).alias("max_20")
    )
    df = df.with_columns(
        pl.col("price").rolling_min(window_size=5, min_periods=1).alias("min_20")
    )
    df = df.with_columns([(pl.col("price") - pl.col("price").shift(5)).alias("del_10")])

    print(df)

    return df


import time
from dataclasses import dataclass


@dataclass
class Record:
    code: str
    date: str
    price: float
    ma_5: float
    ma_20: float
    ma_60: float
    ma_120: float
    max_20: float
    min_20: float
    del_10: float


api = FutuApi()
info = []
try:
    sec_id = "SH.510300"
    df = api.get_minute_kline(sec_id, "2016-01-01", "2024-11-19")
    # sec_id = "SH.510880"
    # df = api.get_minute_kline(sec_id, "2017-01-01", "2024-11-19")
    # sec_id = "SH.510050"
    # df = api.get_minute_kline(sec_id, "2017-01-01", "2024-11-19")
    # sec_id = "SH.601857"
    # df = api.get_minute_kline(sec_id, "2016-01-01", "2024-11-19")
    # sec_id = "SH.510500"
    # df = api.get_minute_kline(sec_id, "2016-01-01", "2024-11-19")
    # sec_id = "SH.512100"
    # df = api.get_minute_kline(sec_id, "2017-01-01", "2024-11-19")
    # sec_id = "SH.512040"
    # df = api.get_minute_kline(sec_id, "2017-01-01", "2024-11-19")
    # sec_id = "SH.517000"
    # df = api.get_minute_kline(sec_id, "2017-01-01", "2024-11-19")
    df = trans(df)
    for row in df.iter_rows(named=True):
        # print(row)
        record = Record(
            date=row["date"],
            code=row["code"],
            price=row["price"],
            ma_5=row["ma_5"],
            ma_20=row["ma_20"],
            ma_120=row["ma_120"],
            ma_60=row["ma_60"],
            max_20=row["max_20"],
            min_20=row["min_20"],
            del_10=row["del_10"],
        )
        info.append(record)

    # print(info["2024-11-08"])
finally:
    api.close()


: 

In [None]:
import math
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator


def truncate_float(number, t):
    """
    截断浮点数到指定的小数位数，不进行四舍五入。

    :param number: 要截断的浮点数
    :param decimals: 小数位数
    :return: 截断后的浮点数
    """
    factor = 10.0**t
    truncated = math.trunc(number * factor) / factor
    return truncated


def show_rate(x: list, y: list, py: list):
    ax = plt.gca()  # 获取当前轴对象
    ax.xaxis.set_major_locator(
        MaxNLocator(integer=True, prune="both", nbins=6)
    )  # 自动选择6个关键点显示
    plt.plot(x, y, linestyle="-", linewidth=1, color="b", label="y = raw")
    plt.plot(x, py, linestyle="-", linewidth=1, color="r", label="py = main")
    plt.title(f"{sec_id}")
    plt.xlabel("date")
    plt.ylabel("rate")
    plt.xticks(rotation=45, fontsize=10)
    plt.tight_layout()
    plt.grid(True, linestyle="--", alpha=0.5)
    # 显示图例
    plt.legend()

    # 显示图形
    plt.show()


class Account:
    def __init__(self):
        # self.code = "SH.560050"
        self.cash = 100000
        self.position = 0
        self.fee = 0.001
        self.buys = []
        self.sales = []
        self.has_position = 0
        self.no_position = 0
        self.max_val = 0.0
        self.max_drawdone = 0.0

    def val(self, price):
        return self.cash + self.position * price

    def buy(self, price):
        if self.position != 0:
            return
        price = price * (1.0 + self.fee)
        cnt = truncate_float(self.cash / price, -2)
        self.position += cnt
        self.cash -= cnt * price
        # print(f"buy: val: {self.val(price)}")
        self.buys.append(self.val(price))

    def sale(self, price):
        if self.position == 0:
            return
        price *= 1.0 - self.fee
        self.cash += self.position * price
        self.position = 0
        # print(f"sale: val: {self.val(price)}")
        self.sales.append(self.val(price))

    def rate(self, price):
        if self.position == 0:
            self.no_position += 1
        else:
            self.has_position += 1

        self.max_val = max(self.max_val, self.val(price))
        self.max_drawdone = max(
            self.max_drawdone, (self.max_val - self.val(price)) / self.max_val * 100
        )
        return (self.val(price) * 100 / 100000) - 100.0

    def print(self):
        s_cnt = 0
        f_cnt = 0
        for i in range(0, len(self.sales)):
            if self.buys[i] < self.sales[i]:
                s_cnt += 1
            else:
                f_cnt += 1
        print(
            f"s_cnt: {s_cnt}, f_cnt: {f_cnt}, has_position {self.has_position}, no_position: {self.no_position}, max_drawdone:{self.max_drawdone}"
        )


account = Account()

less_20 = False
less_5 = False
greater_120 = False
init_price = 0.0

x = []
y = []
py = []
vol = []

last_price = 0.0
trade_cnt = 0

for record in info:
    if record.price == 0.0 or record.ma_120 == None:
        continue
    if init_price == 0.0:
        init_price = record.price

    if record.price >= record.max_20 and record.ma_60 < record.ma_120:
        account.buy(record.price)
        # print("buy")
        # trade_cnt += 1
    elif record.price <= record.max_20 * 0.95:
        account.sale(record.price)
        # print("sale")
    # less_20 = record.price <= record.ma_20
    # less_5 = record.price <= record.ma_5
    # greater_120 = record.price > record.ma_120

    # print(account.rate(record.price), record.price * 100 / init_price - 100.0)
    x.append(record.date)
    y.append(record.price * 100 / init_price - 100.0)
    py.append(account.rate(record.price))
    # time.sleep(1)

    print(account.rate(record.price), record.price * 100 / init_price - 100.0)


# print(trade_cnt)
account.print()


show_rate(x, y, py)