In [1]:
import pandas as pd
import pymysql
import datetime
from chinese_calendar import is_workday
import random
from pyecharts.charts import Bar,Line,Candlestick,Kline
from pyecharts import options as opts


# 数据库连接

In [2]:
# 返回一个 Connection 对象
db_conn = pymysql.connect(
host='localhost',
port=3306,
user='user1',
password='user1',
database='stock_analysis',
charset='utf8'
)


# 主函数

In [87]:

#设置分析开始结束日期
start_date = '2020-01-01'
end_date = '2022-12-31'
stock_code = 'sz300717'
#获取回测范围的历史数据
sql = "select * from stock_daily where 交易日期 >= '%s' and 交易日期 <= '%s' and 股票代码 = '%s' order by 交易日期 asc" % (start_date,end_date,stock_code)
df_history_data = pd.read_sql(sql,con=db_conn)



In [88]:
#构造x轴y轴
x_date = []
y_kindle = []
for index, row in df_history_data.iterrows():
    x_date.append(str(row['交易日期']))
    y_kindle.append(list(row[['开盘价', '收盘价', '最低价', '最高价']].values))

In [89]:
(
    Kline()
    .add_xaxis(x_date)
    .add_yaxis("sz301235", y_kindle)
    .set_global_opts(
        yaxis_opts=opts.AxisOpts(is_scale=True),
        xaxis_opts=opts.AxisOpts(is_scale=True),
        title_opts=opts.TitleOpts(title="Kline-基本示例"),
    )
    .render_notebook()
)

# 去除上下影线

In [90]:
y_kindle_no_shadow = []
for i in range(len(y_kindle)):
    k_line_tmp = y_kindle[i]
    day_open,day_close,day_low,day_high = k_line_tmp[0], k_line_tmp[1], k_line_tmp[2], k_line_tmp[3]
    if day_open <= day_close:
        ktmp = [day_low,day_high,day_low,day_high] #去除上影线下影线
    else:
        ktmp = [day_high,day_low,day_low,day_high] #去除上影线下影线
    y_kindle_no_shadow.append(ktmp)

In [91]:
(
    Kline()
    .add_xaxis(["day{}".format(i) for i in range(len(y_kindle_no_shadow))])
    .add_yaxis(
        "kline",
        y_kindle_no_shadow,
        itemstyle_opts=opts.ItemStyleOpts(color="#ec0000",color0="#00da3c",border_color="#8A0000",border_color0="#008F28",),
                )
    .set_global_opts(
        xaxis_opts=opts.AxisOpts(is_scale=True),
        yaxis_opts=opts.AxisOpts(
                                is_scale=True,
                                splitarea_opts=opts.SplitAreaOpts(is_show=True, areastyle_opts=opts.AreaStyleOpts(opacity=1)),
                                ),
        datazoom_opts=[opts.DataZoomOpts(type_="inside")],
        title_opts=opts.TitleOpts(title="Kline-ItemStyle"),
    )
    .render_notebook()
)

# 处理包含关系

In [92]:
#构造新的包含关系列表
y_kindle_contain = []
#基于包含关系列表构造每段K线的走势 0下 1上
y_kindle_trend = []

for i in range(len(y_kindle_no_shadow)):
    if i == 0:
        y_kindle_contain.append(y_kindle_no_shadow[i])
        y_kindle_trend.append(1)

    elif i >= 1:
        k_line_tmp_day = y_kindle_no_shadow[i]
        day_open,day_close,day_low,day_high = k_line_tmp_day[0], k_line_tmp_day[1], k_line_tmp_day[2], k_line_tmp_day[3]
        k_line_tmp_lastday = y_kindle_contain[-1]
        lastday_open,lastday_close,lastday_low,lastday_high  = k_line_tmp_lastday[0], k_line_tmp_lastday[1], k_line_tmp_lastday[2], k_line_tmp_lastday[3]
        
        #判断昨天今天状态：
        flag = 0
        if (day_high > lastday_high) and (day_low > lastday_low):
            flag = 1
        elif ((day_high > lastday_high) and (day_low <= lastday_low)) or ((day_high == lastday_high) and (day_low <= lastday_low)):
            flag = 2
        elif ((day_high == lastday_high) and (day_low > lastday_low)) or ((day_high < lastday_high) and (day_low >= lastday_low)):
            flag = 3
        elif ((day_high < lastday_high) and (day_low < lastday_low)) or ((day_high < lastday_low)):
            flag = 4
        
        #基于不同情况构造今天数据
        if flag == 1:#上升趋势
            y_kindle_contain.append(y_kindle_no_shadow[i])
            y_kindle_trend.append(1)
        
        elif flag == 4:#下降趋势
            y_kindle_contain.append(y_kindle_no_shadow[i])
            y_kindle_trend.append(0)
        
        elif flag == 2:#今天包含昨天
            #构造新的高价低价
            if y_kindle_trend[-1] == 1:#昨天上升
                new_high = day_high
                new_low = lastday_low
            elif y_kindle_trend[-1] == 0: #昨天下降
                new_high = lastday_high
                new_low = day_low    
            
            #基于今天判断阴阳线
            if day_close >= day_open:
                new_open = new_low
                new_close = new_high
            else:
                new_open = new_high
                new_close = new_low
            #生成新数据
            y_kindle_contain.pop()
            trend_tmp = y_kindle_trend.pop()
            y_kindle_contain.append([new_open,new_close,new_low,new_high])
            y_kindle_trend.append(trend_tmp)
            
        
        elif flag == 3:#昨天包含今天
            #构造新的高价低价
            if y_kindle_trend[-1] == 1:#昨天上升
                new_high = lastday_high
                new_low = day_low
            elif y_kindle_trend[-1] == 0: #昨天下降
                new_high = day_high
                new_low = lastday_low    
            
            #基于昨天判断阴阳线
            if lastday_close >= lastday_open:
                new_open = new_low
                new_close = new_high
            else:
                new_open = new_high
                new_close = new_low
            #生成新数据
            y_kindle_contain.pop()
            trend_tmp = y_kindle_trend.pop()
            y_kindle_contain.append([new_open,new_close,new_low,new_high])
            y_kindle_trend.append(trend_tmp)



In [93]:
(
    Kline()
    .add_xaxis(["day{}".format(i) for i in range(len(y_kindle_contain))])
    .add_yaxis(
        "kline",
        y_kindle_contain,
        itemstyle_opts=opts.ItemStyleOpts(color="#ec0000",color0="#00da3c",border_color="#8A0000",border_color0="#008F28",),
                )
    .set_global_opts(
        xaxis_opts=opts.AxisOpts(is_scale=True),
        yaxis_opts=opts.AxisOpts(
                                is_scale=True,
                                splitarea_opts=opts.SplitAreaOpts(is_show=True, areastyle_opts=opts.AreaStyleOpts(opacity=1)),
                                ),
        datazoom_opts=[opts.DataZoomOpts(type_="inside")],
        title_opts=opts.TitleOpts(title="Kline-ItemStyle"),
    )
    .render_notebook()
)

# 测试标记点

In [94]:
#判断连续3根K线的4种形态
#1三连升 2顶 3底 4三连降 0开头结尾
def kline_3_trend_type(pre_day,cur_day,last_day):
    if (cur_day[3] > pre_day[3]) and (cur_day[2] > pre_day[2]):
        if (last_day[3] > cur_day[3]) and (last_day[2] > cur_day[2]):
            return 1
        elif (last_day[3] <= cur_day[3]) and (last_day[2] <= cur_day[2]):
            return 2
    elif (cur_day[3] <= pre_day[3]) and (cur_day[2] <= pre_day[2]):
        if (last_day[3] > cur_day[3]) and (last_day[2] > cur_day[2]):
            return 3
        elif (last_day[3] <= cur_day[3]) and (last_day[2] <= cur_day[2]):
            return 4
    else:
        return 0

In [95]:
#存储连续3根K线的4种形态
kline_3_type = [0]
for i in range(1,len(y_kindle_contain)-1):
    pre_day = y_kindle_contain[i-1]
    cur_day = y_kindle_contain[i]
    last_day = y_kindle_contain[i+1]
    kline_3_type.append(kline_3_trend_type(pre_day,cur_day,last_day))
kline_3_type.append(0)

In [96]:
#构造顶底标记列表，在K线图显示
top_button_mark = []
for i in range(len(kline_3_type)):
    if kline_3_type[i] == 0:
        pass
    elif kline_3_type[i] == 2:
        tmp = []
        tmp.append({'xAxis': i, 'yAxis': y_kindle_contain[i][3], 'value': 'top'})
        tmp.append({'xAxis': i, 'yAxis': y_kindle_contain[i][3]})
        top_button_mark.append(tmp)
    elif kline_3_type[i] == 3:
        tmp = []
        tmp.append({'xAxis': i, 'yAxis': y_kindle_contain[i][2], 'value': 'low'})
        tmp.append({'xAxis': i, 'yAxis': y_kindle_contain[i][2]})
        top_button_mark.append(tmp)
    else:
        pass

In [97]:
(
    Kline()
    .add_xaxis(["day{}".format(i) for i in range(len(y_kindle_contain))])
    .add_yaxis(
        "kline",
        y_kindle_contain,
        itemstyle_opts=opts.ItemStyleOpts(color="#ec0000",color0="#00da3c",border_color="#8A0000",border_color0="#008F28",),
        markline_opts=opts.MarkLineOpts(
            label_opts=opts.LabelOpts(position="middle", color="blue", font_size=10),
            data=top_button_mark,
            symbol=["circle", "none"],
            symbol_size = [5,5]
            ),
                )
    .set_global_opts(
        xaxis_opts=opts.AxisOpts(is_scale=True),
        yaxis_opts=opts.AxisOpts(
                                is_scale=True,
                                splitarea_opts=opts.SplitAreaOpts(is_show=True, areastyle_opts=opts.AreaStyleOpts(opacity=1)),
                                ),
        datazoom_opts=[opts.DataZoomOpts(type_="inside")],
        title_opts=opts.TitleOpts(title="Kline-ItemStyle"),
    )
    .render_notebook()
)

# 根据顶和底画笔

In [98]:
current_flag = ''
current_value = 0
current_index = 0
draw_retult_list = []

for i in range(len(kline_3_type)):
    #print(i)
    #print(' current_flag:',current_flag,' current_value:', current_value,' current_index:',current_index)
    #print(' low_value:',y_kindle_contain[i][2],' high_value:',y_kindle_contain[i][3])
    #print('current type',kline_3_type[i])
    if i == 1:
        if kline_3_type[i] == 1:
            current_flag = 'bottom'
            current_value = y_kindle_contain[0][2]
            draw_retult_list.append(['bottom',0,y_kindle_contain[0][2]])
        elif kline_3_type[i] == 2:
            current_flag = 'top'
            current_value = y_kindle_contain[i][3]
            draw_retult_list.append(['top',i,y_kindle_contain[0][3]])
        elif kline_3_type[i] == 3:
            current_flag = 'bottom'
            current_value = y_kindle_contain[i][2]
            draw_retult_list.append(['bottom',i,y_kindle_contain[0][2]])    
        elif kline_3_type[i] == 4:
            current_flag = 'top'
            current_value = y_kindle_contain[0][3]
            draw_retult_list.append(['top',0,y_kindle_contain[0][3]])
        
    
    
    if current_flag == 'bottom':
        if (kline_3_type[i] == 2) and (i-current_index >= 4):
            draw_retult_list.append(['top',i,y_kindle_contain[i][3]])
            current_flag = 'top'
            current_value = y_kindle_contain[i][3]
            current_index = i
        elif kline_3_type[i] == 3:
            if y_kindle_contain[i][2] <= current_value:
                draw_retult_list.pop()
                draw_retult_list.append(['bottom',i,y_kindle_contain[i][2]])
                current_value = y_kindle_contain[i][2]
                current_index = i
        else:
            pass
    
    elif current_flag == 'top':
        if (kline_3_type[i] == 3) and (i-current_index >= 4):
            draw_retult_list.append(['bottom',i,y_kindle_contain[i][2]])
            current_flag = 'bottom'
            current_value = y_kindle_contain[i][2]
            current_index = i
        elif kline_3_type[i] == 2:
            if y_kindle_contain[i][3] >= current_value:
                draw_retult_list.pop()
                draw_retult_list.append(['top',i,y_kindle_contain[i][3]])
                current_value = y_kindle_contain[i][3]
                current_index = i
        else:
            pass
    #print('---------------------------------------------------------------------')

In [99]:
draw_mark = []
for i in range(len(draw_retult_list)-1):
    tmp = []
    tmp.append({'xAxis': draw_retult_list[i][1], 'yAxis': draw_retult_list[i][2]})
    tmp.append({'xAxis': draw_retult_list[i+1][1], 'yAxis': draw_retult_list[i+1][2]})
    draw_mark.append(tmp)


In [100]:
(
    Kline()
    .add_xaxis(["day{}".format(i) for i in range(len(y_kindle_contain))])
    .add_yaxis(
        "kline",
        y_kindle_contain,
        itemstyle_opts=opts.ItemStyleOpts(color="#ec0000",color0="#00da3c",border_color="#8A0000",border_color0="#008F28",),
        markline_opts=opts.MarkLineOpts(
            label_opts=opts.LabelOpts(position="middle", color="blue", font_size=10),
            data=draw_mark,
            symbol=["circle", "none"],
            symbol_size = [5,5],
            linestyle_opts = opts.LineStyleOpts(type_= "solid",color='#0000FF')
            ),
                )
    .set_global_opts(
        xaxis_opts=opts.AxisOpts(is_scale=True),
        yaxis_opts=opts.AxisOpts(
                                is_scale=True,
                                splitarea_opts=opts.SplitAreaOpts(is_show=True, areastyle_opts=opts.AreaStyleOpts(opacity=1)),
                                ),
        datazoom_opts=[opts.DataZoomOpts(type_="inside")],
        title_opts=opts.TitleOpts(title="Kline-ItemStyle"),
    )
    .render_notebook()
)

# 笔的包含关系处理

In [101]:
draw_to_kline = []
for each in draw_mark:
    tmp = [each[0]['xAxis'],each[0]['yAxis'],each[1]['xAxis'],each[1]['yAxis']]
    draw_to_kline.append(tmp)

In [102]:
def markline_to_xyseries(kline):
    x = []
    y = []
    x.append(kline[0][0])
    y.append(kline[0][1])
    for each in kline:
        x.append(each[2])
        y.append(each[3])
    return x,y

In [103]:
trend = ''
baohan_draw = []

for i in range(1,len(draw_to_kline)):
    if i==1:
        if draw_to_kline[0][3] >= draw_to_kline[0][1]:
            if draw_to_kline[i][3] >= draw_to_kline[0][1]:
                trend = 'up'
                baohan_draw.append(draw_to_kline[0])
                baohan_draw.append(draw_to_kline[1])
            elif draw_to_kline[i][3] < draw_to_kline[0][1]:
                trend = 'down'
                baohan_draw.append(draw_to_kline[0])
                baohan_draw.append(draw_to_kline[1])
        elif draw_to_kline[0][3] < draw_to_kline[0][1]:
            if draw_to_kline[i][3] >= draw_to_kline[0][1]:
                trend = 'up'
                baohan_draw.append(draw_to_kline[0])
                baohan_draw.append(draw_to_kline[1])
            elif draw_to_kline[i][3] < draw_to_kline[0][1]:
                trend = 'down'
                baohan_draw.append(draw_to_kline[0])
                baohan_draw.append(draw_to_kline[1])
    elif i > 1:
        if trend == 'up':#趋势为上升
            if (draw_to_kline[i][3] >= draw_to_kline[i][1]) and (i <= len(draw_to_kline)-2):#当前笔为向上且不为最后一个元素
                if (baohan_draw[-1][1] > draw_to_kline[i+1][1]) and (baohan_draw[-1][3] > draw_to_kline[i+1][3]):#后笔低于前笔
                    trend = 'down'
                    baohan_draw.append(draw_to_kline[i])
                else:
                    baohan_draw.append(draw_to_kline[i])
            elif (draw_to_kline[i][3] < draw_to_kline[i][1]) and (i>=2):#当前笔为向下且前面有两笔
                if (draw_to_kline[i][1] <= baohan_draw[-2][1]) and (draw_to_kline[i][3] >= baohan_draw[-2][3]):#前下笔包含当前下笔
                    baohan_draw.pop()
                    baohan_draw.pop()
                    baohan_draw.append([baohan_draw[-1][0],baohan_draw[-1][1],draw_to_kline[i][2],draw_to_kline[i][3]])
                else:
                    baohan_draw.append(draw_to_kline[i])
                    
        if trend == 'down':#趋势为下降
            if (draw_to_kline[i][1] >= draw_to_kline[i][3]) and (i <= len(draw_to_kline)-2):#当前笔为向下且不为最后一个元素
                if (baohan_draw[-1][1] < draw_to_kline[i+1][1]) and (baohan_draw[-1][3] < draw_to_kline[i+1][3]):#后笔高于前笔
                    trend = 'up'
                    baohan_draw.append(draw_to_kline[i])
                else:
                    baohan_draw.append(draw_to_kline[i])
            elif (draw_to_kline[i][1] < draw_to_kline[i][3]) and (i>=2):#当前笔为向上且前面有两笔
                if (draw_to_kline[i][1] >= baohan_draw[-2][1]) and (draw_to_kline[i][3] <= baohan_draw[-2][3]):#前上笔包含当前上笔
                    baohan_draw.pop()
                    baohan_draw.pop()
                    baohan_draw.append([baohan_draw[-1][0],baohan_draw[-1][1],draw_to_kline[i][2],draw_to_kline[i][3]])
                else:
                    baohan_draw.append(draw_to_kline[i]) 

In [104]:
draw_line_x,draw_line_y = markline_to_xyseries(draw_to_kline)
draw_line = Line()
# 添加折线图数据
draw_line.add_xaxis(xaxis_data=draw_line_x)
draw_line.add_yaxis(series_name="笔", y_axis=draw_line_y)
draw_line.set_series_opts(
    linestyle_opts=opts.LineStyleOpts(width=2, color="red")  # 设置折线宽度为 2，颜色为红色
)


bhdraw_line_x,bhdraw_line_y = markline_to_xyseries(baohan_draw)
bhdraw_line = Line()
# 添加折线图数据
bhdraw_line.add_xaxis(xaxis_data=bhdraw_line_x)
bhdraw_line.add_yaxis(series_name="笔包含", y_axis=bhdraw_line_y)
bhdraw_line.set_series_opts(
    linestyle_opts=opts.LineStyleOpts(width=2, color="blue")  # 设置折线宽度为 2，颜色为红色
)

<pyecharts.charts.basic_charts.line.Line at 0x2092028a358>

In [105]:
c = (
    Kline()
    .add_xaxis(["day{}".format(i) for i in range(len(y_kindle_contain))])
    .add_yaxis(
        "kline",
        y_kindle_contain,
        itemstyle_opts=opts.ItemStyleOpts(color="#ec0000",color0="#00da3c",border_color="#8A0000",border_color0="#008F28",),
                )
    .set_global_opts(
        xaxis_opts=opts.AxisOpts(is_scale=True),
        yaxis_opts=opts.AxisOpts(
                                is_scale=True,
                                splitarea_opts=opts.SplitAreaOpts(is_show=True, areastyle_opts=opts.AreaStyleOpts(opacity=1)),
                                ),
        datazoom_opts=[opts.DataZoomOpts(type_="inside")],
        title_opts=opts.TitleOpts(title="Kline-ItemStyle"),
    )
)

In [106]:
bhdraw_line.overlap(draw_line)
c.overlap(bhdraw_line)

<pyecharts.charts.basic_charts.kline.Kline at 0x209202770b8>

In [107]:
c.render_notebook()

# 画段

In [108]:
draw_point = []
for i in range(len(bhdraw_line_x)):
    draw_point.append([bhdraw_line_x[i],bhdraw_line_y[i]])

In [109]:
trend = ''
start = []
end = []
seg_result = [draw_point[0]]

for i in range(2,len(draw_point)):
    if i == 2:
        if draw_point[1][1] >= draw_point[0][1]:#第一条笔向上
            if draw_point[i][1] >= draw_point[0][1]:
                trend = 'up'
            elif draw_point[i][1] < draw_point[0][1]:
                trend = 'down'
        elif draw_point[1][1] < draw_point[0][1]:#第一条笔向下
            if draw_point[i][1] >= draw_point[0][1]:
                trend = 'up'
            elif draw_point[i][1] < draw_point[0][1]:
                trend = 'down'
    elif i > 2:
        if trend == 'up':
            if (draw_point[i][1] >= draw_point[i-1][1]) and (draw_point[i][1] < draw_point[i-2][1]):
                seg_result.append(draw_point[i-2])
                trend = 'down'
        elif trend == 'down':
            if (draw_point[i][1] < draw_point[i-1][1]) and (draw_point[i][1] > draw_point[i-2][1]):
                seg_result.append(draw_point[i-2])
                trend = 'up'
if (trend == 'up') and (draw_point[-1][1] >= draw_point[-2][1]):
    seg_result.append(draw_point[-1])
elif (trend == 'up') and (draw_point[-1][1] < draw_point[-2][1]):
    seg_result.append(draw_point[-2])
if (trend == 'down') and (draw_point[-1][1] >= draw_point[-2][1]):
    seg_result.append(draw_point[-2])
elif (trend == 'down') and (draw_point[-1][1] < draw_point[-2][1]):
    seg_result.append(draw_point[-1])

In [110]:
seg_x = []
seg_y = []
for each in seg_result:
    seg_x.append(each[0])
    seg_y.append(each[1])

In [111]:
#笔
bhdraw_line = Line()
bhdraw_line.add_xaxis(xaxis_data=bhdraw_line_x)
bhdraw_line.add_yaxis(series_name="笔包含", y_axis=bhdraw_line_y)
bhdraw_line.set_series_opts(
    linestyle_opts=opts.LineStyleOpts(width=2, color="blue")  # 设置折线宽度为 2，颜色为红色
)
#段
segline = Line()
segline.add_xaxis(xaxis_data=seg_x)
segline.add_yaxis(series_name="段", y_axis=seg_y)
segline.set_series_opts(
    linestyle_opts=opts.LineStyleOpts(width=2, color="green")  # 设置折线宽度为 2，颜色为红色
)
#K线
c = (
    Kline()
    .add_xaxis(["day{}".format(i) for i in range(len(y_kindle_contain))])
    .add_yaxis(
        "kline",
        y_kindle_contain,
        itemstyle_opts=opts.ItemStyleOpts(color="#ec0000",color0="#00da3c",border_color="#8A0000",border_color0="#008F28",),
                )
    .set_global_opts(
        xaxis_opts=opts.AxisOpts(is_scale=True),
        yaxis_opts=opts.AxisOpts(
                                is_scale=True,
                                splitarea_opts=opts.SplitAreaOpts(is_show=True, areastyle_opts=opts.AreaStyleOpts(opacity=1)),
                                ),
        datazoom_opts=[opts.DataZoomOpts(type_="inside")],
        title_opts=opts.TitleOpts(title="Kline-ItemStyle"),
    )
)

In [112]:
segline.overlap(bhdraw_line)
c.overlap(segline)
c.render_notebook()