In [1]:
import os
from pathlib import Path

NOTEBOOK_PATH: Path = Path(os.path.abspath(''))

DATA_PATH: Path = NOTEBOOK_PATH.parent.joinpath('data')

PATH_FOR_RESULT: Path = NOTEBOOK_PATH.joinpath('RESULT')

if not PATH_FOR_RESULT.exists():
    PATH_FOR_RESULT.mkdir()

print(f'当前运行目录：{NOTEBOOK_PATH}')
print(f'当前数据目录：{DATA_PATH}')

当前运行目录：D:\Development\Python\InvestmentWorkshop\notebook
当前数据目录：D:\Development\Python\InvestmentWorkshop\data


# 缠论 A-01

A 系列仅使用 pandas 读取数据，不用于计算。

## 准备

### 引用模块

In [2]:
from typing import List

import pandas as pd

from utility import (
    get_available_datafile_name,
    load_csv_as_dataframe,
    get_saved_filename,
    save_dataframe_to_csv
)
from InvestmentWorkshop.indicator import (
    pbx,
    merge_candlestick_dataframe,
)

Load succeed.


### 设置 pandas

In [3]:
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 200)

### 可用数据

In [4]:
available_datafile = get_available_datafile_name(DATA_PATH, 'Minute')
for symbol, period in available_datafile:
    print(f'symbol: {symbol:>12}, period: {period}')

symbol:    DCE.c2201, period: Minute
symbol:    DCE.i2201, period: Minute
symbol:  SHFE.ag2106, period: Minute
symbol:  SHFE.al2111, period: Minute
symbol:  SHFE.al2112, period: Minute
symbol:  SHFE.hc2110, period: Minute
symbol:  SHFE.hc2201, period: Minute
symbol:  SHFE.hc2202, period: Minute
symbol:  SHFE.rb2201, period: Minute


### 声明数据

使用分钟数据

In [5]:
symbol: str = 'DCE.i2201'
period: str = 'Minute'

### 加载数据

In [6]:
from pathlib import Path

def load_bar_data(data_file: Path, datetime_index: bool = True) -> pd.DataFrame:
    """
    加载 bar 数据为 pandas.DataFrame。
    :param data_file:
    :param datetime_index:
    :return:
    """
    if not data_file.exists():
        raise FileNotFoundError(f'{data_file} not found.')

    if datetime_index:
        return pd.read_csv(data_file, parse_dates=['datetime'], index_col=['datetime'])
    else:
        return pd.read_csv(data_file, parse_dates=['datetime'])

In [7]:
df_origin: pd.DataFrame = load_bar_data(DATA_PATH.joinpath(f'{symbol}_{period}.csv'))

df_origin.info()
print('-' * 100)
df_origin.head(5)

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 62535 entries, 2021-01-15 21:00:00 to 2021-10-22 22:59:00
Data columns (total 8 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   datetime_nano  62535 non-null  int64  
 1   open           62535 non-null  float64
 2   high           62535 non-null  float64
 3   low            62535 non-null  float64
 4   close          62535 non-null  float64
 5   volume         62535 non-null  int64  
 6   open_oi        62535 non-null  int64  
 7   close_oi       62535 non-null  int64  
dtypes: float64(4), int64(4)
memory usage: 4.3 MB
----------------------------------------------------------------------------------------------------


Unnamed: 0_level_0,datetime_nano,open,high,low,close,volume,open_oi,close_oi
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2021-01-15 21:00:00,1610715600000000000,930.0,930.0,921.0,924.0,8,0,8
2021-01-15 21:01:00,1610715660000000000,924.0,940.0,924.0,930.0,4,8,12
2021-01-15 21:02:00,1610715720000000000,930.0,935.0,925.0,929.0,7,12,19
2021-01-15 21:03:00,1610715780000000000,929.0,935.0,925.0,929.0,5,19,24
2021-01-15 21:04:00,1610715840000000000,929.0,930.0,929.0,929.5,5,24,29


## 尝试

### 数据截取

In [8]:
df_ohlc: pd.DataFrame = df_origin.loc['2021-10-15 13:30:00':'2021-10-19 00:59:00', ['open', 'high', 'low', 'close']]

df_ohlc.info()
print('-' * 100)
df_ohlc.head(5)

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 555 entries, 2021-10-15 13:30:00 to 2021-10-18 22:59:00
Data columns (total 4 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   open    555 non-null    float64
 1   high    555 non-null    float64
 2   low     555 non-null    float64
 3   close   555 non-null    float64
dtypes: float64(4)
memory usage: 21.7 KB
----------------------------------------------------------------------------------------------------


Unnamed: 0_level_0,open,high,low,close
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2021-10-15 13:30:00,727.0,730.5,727.0,730.5
2021-10-15 13:31:00,730.5,731.5,729.5,730.0
2021-10-15 13:32:00,730.0,730.0,728.0,728.0
2021-10-15 13:33:00,728.0,730.0,728.0,729.0
2021-10-15 13:34:00,729.0,729.5,726.0,726.0


### 转 Python dict 试试看。

In [9]:
class Bar:
    idx: pd.Timestamp
    open: float
    high: float
    low: float
    close: float
    
    def __init__(self, idx: pd.Timestamp, open: float, high: float, low: float, close: float):
        self.idx = idx
        self.open = open
        self.high = high
        self.low = low
        self.close = close
        
    def __str__(self) -> str:
        return f'Candlestick at {self.idx}: O {self.open}, H {self.high}, L {self.low}, C {self.close}'


# 获取 DataFrame 的 index list。
df_index_list: List[pd.Timestamp] = df_ohlc.index.tolist()
# [print(idx.timestamp) for idx in df_plot.index]

# 生成数据。
candlstick_list: List[Bar] = [
    Bar(
        idx=idx,
        open=df_ohlc.loc[idx, 'open'],
        high=df_ohlc.loc[idx, 'high'],
        low=df_ohlc.loc[idx, 'low'],
        close=df_ohlc.loc[idx, 'close']        
    ) for idx in df_ohlc.index
]

for item in candlstick_list:
    print(item)

Candlestick at 2021-10-15 13:30:00: O 727.0, H 730.5, L 727.0, C 730.5
Candlestick at 2021-10-15 13:31:00: O 730.5, H 731.5, L 729.5, C 730.0
Candlestick at 2021-10-15 13:32:00: O 730.0, H 730.0, L 728.0, C 728.0
Candlestick at 2021-10-15 13:33:00: O 728.0, H 730.0, L 728.0, C 729.0
Candlestick at 2021-10-15 13:34:00: O 729.0, H 729.5, L 726.0, C 726.0
Candlestick at 2021-10-15 13:35:00: O 726.0, H 728.5, L 726.0, C 728.0
Candlestick at 2021-10-15 13:36:00: O 728.0, H 730.0, L 728.0, C 730.0
Candlestick at 2021-10-15 13:37:00: O 730.0, H 732.0, L 729.5, C 731.5
Candlestick at 2021-10-15 13:38:00: O 731.5, H 734.5, L 731.0, C 733.5
Candlestick at 2021-10-15 13:39:00: O 733.5, H 737.5, L 733.0, C 737.0
Candlestick at 2021-10-15 13:40:00: O 737.0, H 737.0, L 735.5, C 736.5
Candlestick at 2021-10-15 13:41:00: O 736.5, H 736.5, L 735.0, C 736.5
Candlestick at 2021-10-15 13:42:00: O 736.5, H 736.5, L 735.0, C 735.0
Candlestick at 2021-10-15 13:43:00: O 735.0, H 735.5, L 734.5, C 735.0
Candle

In [10]:
def is_inclusive(current_bar: Bar, previous_bar: Bar) -> bool:
    """
    判断是否K线包含。
    """
    return (current_bar.high <= previous_bar.high and current_bar.low >= previous_bar.low) or (current_bar.high >= previous_bar.high and current_bar.low <= previous_bar.low)


from enum import Enum
class Trend(Enum):
    Up = 'Bullish'
    Down = 'Bearish'
    Bullish = 'Bullish'
    Bearish = 'Bearish'
    
    def __str__(self) -> str:
        return '上涨' if self.value == 'Bullish' else '下跌'

    
def get_trend(current_bar: Bar, previous_bar: Bar) -> Trend:
    """
    判断趋势。
    """
    if current_bar.high >= previous_bar.high and current_bar.low >= previous_bar.low:
        return Trend.Bullish
    elif current_bar.high <= previous_bar.high and current_bar.low <= previous_bar.low:
        return Trend.Bearish
    else:
        # 针对开始的几根K线，存在包含关系。
        delta_h: int = current_bar.high - previous_bar.high
        delta_l: int = current_bar.low - previous_bar.low
        if abs(delta_h) > abs(delta_l):
            # High 价差大，delta_h 为正数则为上涨，反之下跌。
            return Trend.Bullish if delta_h > 0 else Trend.Bearish
        elif abs(delta_h) < abs(delta_l):
            # High 价差小，delta_h 为正数则为下跌，反之上涨。
            return Trend.Bearish if delta_h > 0 else Trend.Bullish
        else:
            # 两根K线一样高低，强制定为上涨。
            return Trend.Bullish


class BarMerged:
    idx: pd.Timestamp
    high: float
    low: float
    interval: int
    
    def __init__(self, idx: pd.Timestamp, high: float, low: float, interval: int):
        self.idx = idx
        self.high = high
        self.low = low
        self.interval = interval
        
    def __str__(self) -> str:
        return f'BarMerged at {self.idx}, Keep {self.interval} period, H {self.high}, L {self.low}'


first_bar = candlstick_list[0]
merged_list: List[BarMerged] = []
data_length: int = len(candlstick_list)
trend: Trend
# 处理第0，1两根
print(candlstick_list[0].idx, 'INIT 0')
merged_list.append(
    BarMerged(
        idx=candlstick_list[0].idx,
        high=candlstick_list[0].high,
        low=candlstick_list[0].low,
        interval=1
    )
)
print(candlstick_list[1].idx, 'INIT 1')
merged_list.append(
    BarMerged(
        idx=candlstick_list[1].idx,
        high=candlstick_list[1].high,
        low=candlstick_list[1].low,
        interval=1
    )
)

print(f'前K：count={len(merged_list)}, idx={merged_list[-1].idx}, H={merged_list[-1].high}, L={merged_list[-1].low}')

# 处理第3根（编号2）开始
for i in range(2, data_length):
    current_bar = candlstick_list[i]
    previous_bar = merged_list[-1]
    print(current_bar.idx)
    print(f'\t当前： H={current_bar.high}, L={current_bar.low} vs 前K：H={previous_bar.high}, L={previous_bar.low}')
    if is_inclusive(current_bar=current_bar, previous_bar=merged_list[-1]):
        print(f'\t包含： YES')
        trend = get_trend(current_bar=merged_list[-1], previous_bar=merged_list[-2])
        print('\t趋势：', trend)
        if trend == Trend.Bullish:
            merged_list[-1].high = max(current_bar.high, previous_bar.high)
            merged_list[-1].low = max(current_bar.low, previous_bar.low)
            merged_list[-1].interval += 1
            print(f'\t结果： 开始于{merged_list[-1].idx}, 持续{merged_list[-1].interval}周期, H={merged_list[-1].high}, L={merged_list[-1].low}')
        if trend == Trend.Bearish:
            merged_list[-1].high = min(current_bar.high, previous_bar.high)
            merged_list[-1].low = min(current_bar.low, previous_bar.low)
            merged_list[-1].interval += 1
            print(f'\t结果： 开始于{merged_list[-1].idx}, 持续{merged_list[-1].interval}周期, H={merged_list[-1].high}, L={merged_list[-1].low}')
        if trend is None:
            print(f'\t结果： ERROR ------------------')
    else:
        print(f'\t包含： ---')
        merged_list.append(
            BarMerged(
                idx=current_bar.idx,
                high=current_bar.high,
                low=current_bar.low,
                interval=1
            )
        )

2021-10-15 13:30:00 INIT 0
2021-10-15 13:31:00 INIT 1
前K：count=2, idx=2021-10-15 13:31:00, H=731.5, L=729.5
2021-10-15 13:32:00
	当前： H=730.0, L=728.0 vs 前K：H=731.5, L=729.5
	包含： ---
2021-10-15 13:33:00
	当前： H=730.0, L=728.0 vs 前K：H=730.0, L=728.0
	包含： YES
	趋势： 下跌
	结果： 开始于2021-10-15 13:32:00, 持续2周期, H=730.0, L=728.0
2021-10-15 13:34:00
	当前： H=729.5, L=726.0 vs 前K：H=730.0, L=728.0
	包含： ---
2021-10-15 13:35:00
	当前： H=728.5, L=726.0 vs 前K：H=729.5, L=726.0
	包含： YES
	趋势： 下跌
	结果： 开始于2021-10-15 13:34:00, 持续2周期, H=728.5, L=726.0
2021-10-15 13:36:00
	当前： H=730.0, L=728.0 vs 前K：H=728.5, L=726.0
	包含： ---
2021-10-15 13:37:00
	当前： H=732.0, L=729.5 vs 前K：H=730.0, L=728.0
	包含： ---
2021-10-15 13:38:00
	当前： H=734.5, L=731.0 vs 前K：H=732.0, L=729.5
	包含： ---
2021-10-15 13:39:00
	当前： H=737.5, L=733.0 vs 前K：H=734.5, L=731.0
	包含： ---
2021-10-15 13:40:00
	当前： H=737.0, L=735.5 vs 前K：H=737.5, L=733.0
	包含： YES
	趋势： 上涨
	结果： 开始于2021-10-15 13:39:00, 持续2周期, H=737.5, L=735.5
2021-10-15 13:41:00
	当前： H=736.5, L=735.0 v

In [11]:
# 合并前后对照。
mm: int = 0
for i in range(data_length):
    timestamp = candlstick_list[i].idx
    print(f'{i:3d}, {candlstick_list[i].idx},\t原始K线: H={candlstick_list[i].high:7.1f}, L={candlstick_list[i].low:7.1f},\t', end='')
    if merged_list[mm].idx == timestamp:
        print(f'\t合并K线: H={merged_list[mm].high:7.1f}, L={merged_list[mm].low:7.1f}, 持续{merged_list[mm].interval:2d} 周期')
        if mm < len(merged_list) - 1:
            mm += 1
    else:
        print()

  0, 2021-10-15 13:30:00,	原始K线: H=  730.5, L=  727.0,		合并K线: H=  730.5, L=  727.0, 持续 1 周期
  1, 2021-10-15 13:31:00,	原始K线: H=  731.5, L=  729.5,		合并K线: H=  731.5, L=  729.5, 持续 1 周期
  2, 2021-10-15 13:32:00,	原始K线: H=  730.0, L=  728.0,		合并K线: H=  730.0, L=  728.0, 持续 2 周期
  3, 2021-10-15 13:33:00,	原始K线: H=  730.0, L=  728.0,	
  4, 2021-10-15 13:34:00,	原始K线: H=  729.5, L=  726.0,		合并K线: H=  728.5, L=  726.0, 持续 2 周期
  5, 2021-10-15 13:35:00,	原始K线: H=  728.5, L=  726.0,	
  6, 2021-10-15 13:36:00,	原始K线: H=  730.0, L=  728.0,		合并K线: H=  730.0, L=  728.0, 持续 1 周期
  7, 2021-10-15 13:37:00,	原始K线: H=  732.0, L=  729.5,		合并K线: H=  732.0, L=  729.5, 持续 1 周期
  8, 2021-10-15 13:38:00,	原始K线: H=  734.5, L=  731.0,		合并K线: H=  734.5, L=  731.0, 持续 1 周期
  9, 2021-10-15 13:39:00,	原始K线: H=  737.5, L=  733.0,		合并K线: H=  737.5, L=  735.5, 持续 2 周期
 10, 2021-10-15 13:40:00,	原始K线: H=  737.0, L=  735.5,	
 11, 2021-10-15 13:41:00,	原始K线: H=  736.5, L=  735.0,		合并K线: H=  736.5, L=  735.0, 持续 2 周期
 12, 2021-10-15 

In [12]:
class FractalType(Enum):
    Upper = 'Upper'
    Lower = 'Lower'
    Not = 'Not'
    Unknown = 'Unknow'
    
    def __str__(self) -> str:
        if self.value == 'Upper':
            return '顶分型'
        elif self.value == 'Lower':
            return '底分型'
        elif self.value == 'Not':
            return '非分型'
        else:
            return '未知状态'

    
class FractalPosition(Enum):
    Reversal = 'Reversal'
    Continuous = 'Continuous'
    
    def __str__(self) -> str:
        return '反转' if self.value == 'Reversal' else '中继'


def get_fractal():
    pass