<a href="https://colab.research.google.com/github/eugene-h-lin/Android-SlideExpandableListView/blob/master/prototype%5Cstrategy_break_sr.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
from dataclasses import asdict, dataclass, field
from datetime import date
import datetime
import json
import math
from typing import Iterator, List, Optional

from numpy import nan
import pandas as pd


# ============================
# Data Structures
# ============================

@dataclass(frozen=True)
class Pivot:
    stock_id: str
    date: pd.Timestamp
    type: str  # 'res' or 'sup'
    price: float
    level: int
    share_score: float # 前後3日交易量
    volatility_score: float #前後日漲跌幅絕對值總和（％）
    score: float # 分數
    share_raw: int # 前後3日交易量
    volatility_raw: float #前後日漲跌幅絕對值總和（％）
    confirm_date: pd.Timestamp

    def set_score(self, score: float):
        self.score = score
        return

    def from_series(self, row_pivot: pd.Series):

        self.stock_id =  row_pivot['stock_id']
        self.date=            row_pivot['date']
        self.type=            row_pivot['type']
        self.price=           row_pivot['price']
        self.level=           row_pivot['level']
        self.share_score=     row_pivot['share_score']
        self.volatility_score=row_pivot['volatility_score']
        self.score=           row_pivot['score']
        self.share_raw=       row_pivot['share_raw']
        self.volatility_raw=  row_pivot['volatility_raw']
        self.confirm_date=    row_pivot['confirm_date']

        return

@dataclass
class Pivots:
    pivots: List[Pivot] = field(default_factory=list)

    def __iter__(self) -> Iterator[Pivot]:
        """
        定義如何迭代 Zones 物件。
        """
        return iter(self.pivots)

    def __len__(self) -> int:
        """
        定義 len(Zones) 的行為。
        """
        return len(self.pivots)

    def add_pivot(self, pivot: Pivot):
        self.pivots.append(pivot)

    def get_pivots_by_date(self, date: date):
        matched_pivots = [p for p in self.pivots if p.confirm_date < date]
        return Pivots(matched_pivots)

    def calc_importance(self, date: date):
        # importance = sum(
        #     (p.level * 2 + p.volatility * 2 + p.share_3 / 3) / (date - p.date).days # volatility 要除以ATR_14, share_3 要除以成交量移動平均線 (VMA)
        #     for p in self.pivots
        # )
        if type(date) == pd.Timestamp:
            date = date.date()

        importance = sum(

            # p.score / min((date - p.date).days, 365)
            # p.score * math.exp(-0.03466 * min((date - p.date).days, 365)) # 20天為半衰期
            p.score * math.exp(-0.00577 * min((date - p.date).days, 365)) # 120天為半衰期
            for p in self.pivots if (date - p.date).days >= 0 and p.score is not None
        )
        # for p in self.pivots:
        #     if (date - p.date).days >= 0:
        #         print(f"p_score: {p.score}, width: {math.exp(min((date - p.date).days, 365))}, imp: {p.score / math.exp(min((date - p.date).days, 365))}")

        return importance

    def from_jason(self, pivot_str: str):
        list_of_dicts = json.loads(pivot_str)

        self.pivots = [Pivot(**item) for item in list_of_dicts]

    def from_df(self, df_pivots:pd.DataFrame):
        pivot = Pivot()

        for index, p in df_pivots.iterrows():
            pivot.from_series(p)
            # print(type(self.pivots))
            self.add_pivot(pivot)
        return

    def to_json(self) -> str:
        # 使用 asdict() 將每個 dataclass 實例轉換為字典
        list_of_dicts = [asdict(item) for item in self.pivots]
        # 使用 json.dumps 將字典列表轉換為格式化的 JSON 字串
        return json.dumps(list_of_dicts, indent=4, ensure_ascii=False)

    def to_df(self):
        # df: pd.DataFrame
        print(self.pivots[0])

        dict_list = [p.__dict__ for p in self.pivots]

        # 使用字典列表創建 DataFrame
        df = pd.DataFrame(dict_list)

        # 進行任何必要的資料類型轉換（例如確保日期格式正確）
        if 'date' in df.columns:
            df['date'] = pd.to_datetime(df['date'])

        print(df.iloc[0])

        return df

@dataclass
class Zone:
    stock_id: str
    type: str
    high: float
    low: float
    confirm_date: pd.Timestamp
    pivots: Pivots = field(default_factory=Pivots)
    expire_date: Optional[pd.Timestamp] = None
    importance: float = 0.0

    def contains_price(self, price: float, tolerance=0.02):
        """Check if price is within 2% distance of zone."""
        center = (self.low + self.high) / 2
        return self.high*(1+tolerance) >= price and self.low*(1-tolerance) <= price # abs(price - center) / center <= tolerance

    def update_boundary(self):
        """Recalculate zone boundaries based on member pivots."""
        prices = [p.price for p in self.pivots]
        self.low = min(prices)
        self.high = max(prices)

    def to_dict(self):
        return self.__dict__

    def add_pivot(self, pivot: Pivot):
        self.pivots.add_pivot(pivot)

    def load_pivots(self):

        df_pivots = get_pivot_of_zone(
            self.stock_id,
            self.confirm_date,
            self.type,
            self.high,
            self.low
        )

        if self.pivots is None:
            self.pivots = Pivots()
        else:
            pivot = Pivot()
            for index, p in df_pivots.iterrows:
                self.add_pivot(pivot.from_series(p))

        return


    def set_importance(self, date: date, atr_of_the_date: float):
        zone_width = (self.high - self.low)/atr_of_the_date

        if(zone_width == 0):
            zone_width = 1

        z_imp = self.pivots.calc_importance(date)
        self.importance = z_imp/zone_width
        # print(f"date: {self.confirm_date}, z_imp: {z_imp}, z_width: {zone_width}, z_importance: {self.importance}")

        return

@dataclass
class Zones:
    zones: List[Zone] = field(default_factory=list)

    def __iter__(self) -> Iterator[Zone]:
        """
        定義如何迭代 Zones 物件。
        """
        return iter(self.zones)

    def __len__(self) -> int:
        """
        定義 len(Zones) 的行為。
        """
        return len(self.zones)

    def from_df(self, df: pd.DataFrame):
        pivots = Pivots()

        for row in df:
            self.zones.append(Zone(
                stock_id = row['stock_id'],
                type = row['type'],
                high = row['high'],
                low = row['low'],
                confirm_date = row['confirm_date'],
                expire_date = row['expire_date'],
                pivots=pivots
            ))

        return

    def to_json(self):
        json_list = []
        for zone in self.zones:
            print(f'zone to be trans: {zone}')
            # *** 手動構建字典來對應欄位 ***
            data_dict = {
                "stock_id": zone.stock_id,
                "confrim_date": zone.confirm_date,       # mapping: observationDate -> date
                "type": zone.type,               # mapping: typeStr -> type
                "high": zone.high,              # mapping: typeStr -> type
                "low": zone.low,          # mapping: marketPrice -> price
                "expire_date": zone.expire_date,          # 處理遺失的欄位，給予一個預設值
                "importance": zone.importance
            }

            json_list.append(data_dict)

        return json_list

    def length(self) -> int:
        return len(self.zones)

    def to_response(self):
        pydantic_list = []
        for zone in self.zones:
            print(f'zone to be trans: {zone}')
            # *** 手動構建字典來對應欄位 ***
            data_dict = {
                "stock_id": zone.stock_id,
                "confrim_date": zone.confirm_date,       # mapping: observationDate -> date
                "type": zone.type,               # mapping: typeStr -> type
                "high": zone.high,              # mapping: typeStr -> type
                "low": zone.low,          # mapping: marketPrice -> price
                "expire_date": zone.expire_date,          # 處理遺失的欄位，給予一個預設值
                "importance": zone.importance
            }

            # 使用 model_validate 驗證這個新字典
            pydantic_obj = SupportResistResponse.model_validate(data_dict)
            pydantic_list.append(pydantic_obj)

        return pydantic_list

    def to_df(self) -> pd.DataFrame:
        # df: pd.DataFrame
        print(self.zones[0])

        dict_list = [z.to_dict() for z in self.zones]
        print(dict_list[0])

        df = pd.DataFrame(dict_list)
        print(df.iloc[0])

        return df

    def add_zone(self, zone: Zone):
        self.zones.append(zone)

    def match_conditions(self, type:str, price:int, tolerance=0.0):

        matched = [z for z in self.zones if
            z.contains_price(price, tolerance) and
            z.type == type and
            (not z.expire_date)
        ]

        return Zones(zones=matched)

    def up_breaked_zones(self, break_date: datetime.date, close:float, last_close:float):

        if close == last_close:
            return Zones()
        else:
            # print(type(self.zones[0].confirm_date))
            # print(type(break_date))
            matched = [
                z for z in self.zones if
                z.confirm_date <= break_date and
                (pd.isnull(z.expire_date) or z.expire_date > break_date)
                and
                (close >= z.high and last_close <= z.high)
            ]
        return Zones(zones=matched)

    def down_breaked_zones(self, break_date: datetime.date, close:float, last_close:float):

        if close == last_close:
            return Zones()
        else:
            matched = [
                z for z in self.zones if
                z.confirm_date <= break_date and
                (pd.isnull(z.expire_date) or z.expire_date > break_date)
                and
                (close <= z.low and last_close >= z.low)
            ]
        return Zones(zones=matched)


def load_pivot_from_df(row: pd.Series) -> Pivot:
    return Pivot(
        stock_id=row['stock_id'],
        date=row['date'],
        type=row['type'],
        price=row['price'],
        level=row['level'],
        share_score=row['share_score'],
        volatility_score=row['volatility_score'],
        score=row['score'],
        share_raw=row['share_raw'],
        volatility_raw=row['volatility_raw'],
        confirm_date=row['confirm_date_p']
        # 確保所有需要的欄位都在這裡列出
)

def load_zones_from_df(df:pd.DataFrame) -> Zones:
    zones = Zones()
    for index, row in df.iterrows():
        zone:Zone = load_zone_from_series(row)
        zones.add_zone(zone)

    return zones

def load_zone_from_series(row: pd.Series, pivots: Optional[Pivots] = None) -> Zone:
    zone = Zone(
            stock_id = row['stock_id'],
            low = row['low'],
            high = row['high'],
            confirm_date = row['confirm_date'],
            type = row['type'],
            pivots = pivots
            # 確保所有需要的欄位都在這裡列出
        )
    return zone


In [5]:

from dataclasses import dataclass, field
from datetime import date as DateType
from typing import List

import pandas as pd

@dataclass
class SrBreakSignal():
    break_day:DateType = field(default_factory=DateType)
    direction:str = field(default_factory=str)
    zones:Zones = field(default_factory=Zones)

    def __init__(self, break_day: DateType, direction:str, zones:Zones):
        self.break_day = break_day
        self.direction = direction
        self.zones = zones
        return

    def to_response(self):
        pydantic_list = []
        for zone in self.zones:
            print(f'signals to be trans: {zone}')
            # *** 手動構建字典來對應欄位 ***
            data_dict = {
                "stock_id": zone.stock_id,
                "break_date": self.break_day,       # mapping: observationDate -> date
                "type": zone.type,               # mapping: typeStr -> type
                "high": zone.high,              # mapping: typeStr -> type
                "low": zone.low,          # mapping: marketPrice -> price
                "expire_date": zone.expire_date,          # 處理遺失的欄位，給予一個預設值
                "importance": zone.importance
            }

            # 使用 model_validate 驗證這個新字典
            pydantic_obj = signal_response.model_validate(data_dict)
            pydantic_list.append(pydantic_obj)

        return pydantic_list

    def zones_to_schema_list(self) -> List[dict]:
        # 使用 Pydantic 的 from_attributes 來轉換 Zone 物件為字典
        return self.zones.to_response()

@dataclass
class SrBreakSignals:
    # 這是包含所有信號的容器
    signals: List[SrBreakSignal] = field(default_factory=list)

    def add(self, break_signal: SrBreakSignal):
        self.signals.append(break_signal)
