In [6]:
from py2neo import Graph,Node,Relationship,NodeMatcher
import pandas as pd
import numpy as np
from multiprocessing.pool import ThreadPool
import akshare as ak
import logging
from datetime import datetime, timedelta
from tqdm import tqdm
import os
from finllmqa.api.core import NEO4J_API_URL, STOCK_KG_USER, STOCK_KG_PW

pool = ThreadPool(10)
# 连接Neo4j数据库
graph = Graph('http://192.168.30.158:7474', auth=(STOCK_KG_USER, STOCK_KG_PW), name='neo4j')
matcher = NodeMatcher(graph)

In [2]:
def get_logger(name):
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    
    # 以下两行是为了在jupyter notebook 中不重复输出日志
    if logger.root.handlers:
        logger.root.handlers[0].setLevel(logging.WARNING)
 
    handler_stdout = logging.StreamHandler()
    handler_stdout.setLevel(logging.INFO)
    handler_stdout.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
    logger.addHandler(handler_stdout)
 
    handler_file = logging.FileHandler('log_file.log', encoding='utf-8')
    handler_file.setLevel(logging.DEBUG)
    handler_file.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
    logger.addHandler(handler_file)
 
    return logger
log = get_logger(None)

# Construct financial knowledge graph of stock

In [3]:
# get stock data
df_stock_code = ak.stock_info_a_code_name()
stock_code_list = df_stock_code['code'].to_list()

  0%|          | 0/13 [00:00<?, ?it/s]

In [None]:
# create stock node constraint
cypher = 'CREATE CONSTRAINT IF NOT EXISTS FOR (n:股票) REQUIRE n.代码 IS UNIQUE'
graph.run(cypher=cypher)

# create stock node
for code, name in df_stock_code.values:
    stock_node = Node('股票', 代码=code, 名称=name, name=name)
    try:
        graph.create(stock_node)
    except:
        continue

## 1. Financial Abstract

In [None]:
# create financial abstract constraint
cypher = 'CREATE CONSTRAINT IF NOT EXISTS FOR (n:财务指标) REQUIRE (n.股票代码, n.指标类型) IS UNIQUE'
graph.run(cypher=cypher)
df_fin_ab_demo = ak.stock_financial_abstract(symbol='000001')
for abstract_type in df_fin_ab_demo['选项']:
    # create financial abstract indicators constraint
    cypher = f'CREATE CONSTRAINT IF NOT EXISTS FOR (n:{abstract_type}) REQUIRE (n.股票代码, n.报告期) IS UNIQUE'
    graph.run(cypher=cypher)

for code in tqdm(stock_code_list):
    # stock = matcher.match("股票", 代码=code).first()
    # fin_ab = matcher.match("财务指标", 股票代码=code, 指标类型='常用指标').first()
    # if stock is not None and fin_ab is not None:
    #     continue
    # get financial abstract info
    try:
        df_fin_ab = ak.stock_financial_abstract(symbol=code).copy()
    except:
        log.warning(f'Can not get {code} financial abstract info')
        
    for abstract_type, df_group in df_fin_ab.groupby('选项'):
        # create financial abstract node
        fin_ab_node = Node('财务指标', 股票代码=code, 指标类型=abstract_type, name=abstract_type)
        try:
            graph.create(fin_ab_node)
        except:
            pass

        # create stock - financial abstract relationship
        stock = matcher.match("股票", 代码=code).first()
        fin_ab = matcher.match("财务指标", 股票代码=code, 指标类型=abstract_type).first()
        realation = Relationship(stock, '基本面', fin_ab)
        try:
            graph.create(realation)
        except:
            pass

        # create financial abstract indicators node
        df = df_group.drop('选项', axis=1).set_index('指标').copy()
        df = df.T.reset_index(drop=False, names='报告期').copy()
        df['股票代码'] = code
        df['报告期'] = pd.to_datetime(df['报告期'])
        df = df[df['报告期'] > datetime(2020,12,31)]
        df['报告期'] = df['报告期'].dt.strftime('%Y-%m-%d')
        df = df.fillna('null').copy()
        for record in df.to_dict('records'):
            record.update({'name': record['报告期']})
            fin_ab_indicator_node = Node(abstract_type, **record)
            try:
                graph.create(fin_ab_indicator_node)
            except:
                pass

        # create financial abstract-financial abstract indicators relationship
        fin_ab = matcher.match("财务指标",股票代码=code, 指标类型=abstract_type).first()
        fin_ab_indicator_ls = matcher.match(abstract_type, 股票代码=code)
        for fin_ab_indicator in fin_ab_indicator_ls: 
            realation = Relationship(fin_ab, '按报告期', fin_ab_indicator)
            try:
                graph.create(realation)
            except:
                pass

## 2. Research Report

In [None]:
def _process_reasearch_report(df: pd.DataFrame):
    df['日期'] = pd.to_datetime(df['日期'])
    df = df[df['日期'] >= datetime.now() - timedelta(365)].copy()
    df_rsh_repo = df[['股票代码']].drop_duplicates().copy()
    df_rsh_repo['近一年研报数量'] = len(df)
    df_rsh_repo['买入/增持评级数量'] = len(df[df['东财评级'].isin(['买入', '增持'])])
    if df['2023-盈利预测-收益'].isna().sum() == len(df):
        df_rsh_repo['2023盈利预测-收益'] = 'null'
    else:
        df_rsh_repo['2023盈利预测-收益'] = df[~df['2023-盈利预测-收益'].isna()]['2023-盈利预测-收益'].mean()
    if df['2023-盈利预测-市盈率'].isna().sum() == len(df):
        df_rsh_repo['2023盈利预测-市盈率'] = 'null'
    else:
        df_rsh_repo['2023盈利预测-市盈率'] = df[~df['2023-盈利预测-市盈率'].isna()]['2023-盈利预测-市盈率'].mean()
    if df['2024-盈利预测-收益'].isna().sum() == len(df):
        df_rsh_repo['2024盈利预测-收益'] = 'null'
    else:
        df_rsh_repo['2024盈利预测-收益'] = df[~df['2024-盈利预测-收益'].isna()]['2024-盈利预测-收益'].mean()
    if df['2024-盈利预测-市盈率'].isna().sum() == len(df):
        df_rsh_repo['2024盈利预测-市盈率'] = 'null'
    else:
        df_rsh_repo['2024盈利预测-市盈率'] = df[~df['2024-盈利预测-市盈率'].isna()]['2024-盈利预测-市盈率'].mean()
    return df_rsh_repo

In [None]:
df = ak.stock_research_report_em(symbol='000001')

In [None]:
_process_reasearch_report(df)

In [None]:
# create research report constraint
cypher = 'CREATE CONSTRAINT IF NOT EXISTS FOR (n:个股研报) REQUIRE (n.股票代码) IS UNIQUE'
graph.run(cypher=cypher)


# get research report info
df_rsh_repo_ls = []
for code in tqdm(stock_code_list):
    try:
        df_rsh_repo = ak.stock_research_report_em(symbol=code).copy()
        df_rsh_repo = _process_reasearch_report(df_rsh_repo)
        df_rsh_repo_ls.append(df_rsh_repo)
    except:
        continue
        # log.warning(f'{code} has no research report info')
if len(df_rsh_repo_ls) == 0:
    raise ConnectionError('Can not get stock research report from akshare')
else:
    df_rsh_repo = pd.concat(df_rsh_repo_ls, ignore_index=True)
    
    
for record in tqdm(df_rsh_repo.to_dict('records')):
    # create research report nodeb
    rsh_report_node = Node('个股研报', name='个股研报', **record)
    try:
        graph.create(rsh_report_node)
    except:
        pass
    
    # create stock - main business composition relationship
    stock = matcher.match("股票", 代码=record['股票代码']).first()
    if stock is None:
        continue
    rsh_report = matcher.match("个股研报", 股票代码=record['股票代码']).first()
    realation = Relationship(stock, '基本面', rsh_report)
    try:
        graph.create(realation)
    except:
        pass

## 3. Main Business Composition

In [None]:
def assign_exchange_prefix(code: str):
    if code.startswith('0') or code.startswith('3'):
        code = 'SZ' + code
    else:
        code = 'SH' + code
    return code

In [None]:
# create main business composition constraint
cypher = 'CREATE CONSTRAINT IF NOT EXISTS FOR (n:主营构成) REQUIRE (n.股票代码, n.报告期) IS UNIQUE'
graph.run(cypher=cypher)

# create main business constraint
cypher = 'CREATE CONSTRAINT IF NOT EXISTS FOR (n:主营业务) REQUIRE (n.股票代码, n.报告期, n.业务名称) IS UNIQUE'
graph.run(cypher=cypher)


for code in tqdm(stock_code_list):
    # get main business info
    try:
        symbol = assign_exchange_prefix(code)
        df_main_business = ak.stock_zygc_em(symbol=symbol).copy()
        df_main_business = df_main_business.fillna('nan').copy()
    except:
        log.warning(f'Can not get {code} main business info')
        continue
        
    df_main_business = df_main_business[df_main_business['分类类型'] == '按产品分类'].copy()
    df_main_business['报告日期'] = pd.to_datetime(df_main_business['报告日期'])
    df_main_business = df_main_business[df_main_business['报告日期'] > datetime(2020,12,31)].copy()
    df_main_business['报告日期'] = df_main_business['报告日期'].dt.strftime('%Y-%m-%d')
    for report_time, df_group in df_main_business.groupby('报告日期'):
        # create main business composition node
        main_bus_compo_node = Node('主营构成', 股票代码=code, 报告期=report_time, name=report_time)
        try:
            graph.create(main_bus_compo_node)
        except:
            pass

        # create stock - main business composition relationship
        stock = matcher.match("股票", 代码=code).first()
        main_bus_compo = matcher.match("主营构成", 股票代码=code, 报告期=report_time).first()
        realation = Relationship(stock, '基本面', main_bus_compo)
        try:
            graph.create(realation)
        except:
            pass

        # create main business node
        df_group = df_group[['股票代码', '报告日期', '主营构成', '主营收入', '收入比例', '主营成本',
                             '成本比例', '主营利润', '利润比例', '毛利率']].copy()
        df_group = df_group.rename(columns={'主营构成': '业务名称', '报告日期': '报告期'})
        for record in df_group.to_dict('records'):
            record.update({'name': record['业务名称']})
            main_business_node = Node('主营业务', **record)
            try:
                graph.create(main_business_node)
            except:
                pass

        # create main business composition-main business relationship
        main_bus_compo = matcher.match("主营构成", 股票代码=code, 报告期=report_time).first()
        main_bus_ls = matcher.match("主营业务", 股票代码=code, 报告期=report_time)
        for main_bus in main_bus_ls: 
            realation = Relationship(main_bus_compo, '按产品分类', main_bus)
            try:
                graph.create(realation)
            except:
                pass

## 4. Main Shareholders

In [4]:
def assign_exchange_prefix(code: str):
    if code.startswith('0') or code.startswith('3'):
        code = 'sz' + code
    else:
        code = 'sh' + code
    return code

In [5]:
# create main shareholders constraint
cypher = 'CREATE CONSTRAINT IF NOT EXISTS FOR (n:主要股东) REQUIRE (n.股票代码, n.报告期) IS UNIQUE'
graph.run(cypher=cypher)

# create shares holding constraint
cypher = 'CREATE CONSTRAINT IF NOT EXISTS FOR (n:持股信息) REQUIRE (n.股票代码, n.报告期, n.股东名称) IS UNIQUE'
graph.run(cypher=cypher)

report_time_ls = ['20210331', '20210630', '20210930', '20211231', 
                  '20220331', '20220630', '20220930', '20221231', 
                  '20230331', '20230630', '20230930', '20231231']

for code in tqdm(stock_code_list):
    # get main shareholders info
    for report_time in report_time_ls:
        try:
            symbol = assign_exchange_prefix(code)
            df_main_shareholders = ak.stock_gdfx_top_10_em(symbol=symbol, date=report_time).copy()
        except:
            log.warning(f'Can not get {code} {report_time} main shareholders info')
            continue

        df_main_shareholders = df_main_shareholders[:3].fillna('nan').copy()
        # create main shareholders node
        main_shareholders_node = Node('主要股东', 股票代码=code, 报告期=report_time, name=report_time)
        try:
            graph.create(main_shareholders_node)
        except:
            break

        # create stock - main shareholders relationship
        stock = matcher.match("股票", 代码=code).first()
        main_shareholders = matcher.match("主要股东", 股票代码=code, 报告期=report_time).first()
        realation = Relationship(stock, '基本面', main_shareholders)
        try:
            graph.create(realation)
        except:
            pass

        # create shares holding node
        for record in df_main_shareholders.to_dict('records'):
            record.update({'name': record['股东名称'], '股票代码': code, '报告期': report_time})
            shares_holding = Node('持股信息', **record)
            try:
                graph.create(shares_holding)
            except:
                break

        # create main business composition-main business relationship
        main_shareholders = matcher.match("主要股东", 股票代码=code, 报告期=report_time).first()
        shares_holding_ls = matcher.match("持股信息", 股票代码=code, 报告期=report_time)
        for shares_holding in shares_holding_ls: 
            realation = Relationship(main_shareholders, '按持股比例', shares_holding)
            try:
                graph.create(realation)
            except:
                break

  0%|          | 0/5363 [00:00<?, ?it/s]

100%|██████████| 5363/5363 [3:25:32<00:00,  2.30s/it]


## 5.Price Indicators

In [9]:
import akshare as ak

stock_zh_a_hist_df = ak.stock_zh_a_hist(symbol="001359", period="daily", start_date="20230412", end_date='20240412', adjust="hfq")
stock_zh_a_hist_df['日期'].astype(str)

0    2024-03-28
1    2024-03-29
2    2024-04-01
3    2024-04-02
4    2024-04-03
5    2024-04-08
6    2024-04-09
7    2024-04-10
8    2024-04-11
9    2024-04-12
Name: 日期, dtype: object

In [4]:
def process_date_to_report_time(date: datetime):
    date = pd.to_datetime(date)
    year = date.year
    first_season_end_date = datetime(year, 3, 31)
    second_season_end_date = datetime(year, 6, 30)
    third_season_end_date = datetime(year, 9, 30)
    last_season_end_date = datetime(year, 12, 31)
    if date < first_season_end_date:
        return first_season_end_date.strftime('%Y-%m-%d')
    if date < second_season_end_date:
        return second_season_end_date.strftime('%Y-%m-%d')
    if date < third_season_end_date:
        return third_season_end_date.strftime('%Y-%m-%d')
    if date < last_season_end_date:
        return last_season_end_date.strftime('%Y-%m-%d')
    
def MA(data, n):
    MA = pd.Series(data['收盘'].rolling(n).mean().round(2), name='MA_' + str(n))
    return MA.fillna('nan')

def EMA(data, n):
    EMA = pd.Series(data['收盘'].ewm(span=n, min_periods=n).mean().round(2), name='EMA_' + str(n))
    return EMA.fillna('nan')

def RSI(data, n):
    lc = data['收盘'].shift(1)
    diff = data['收盘'] - lc
    up = diff.where(diff > 0, 0)
    down = -diff.where(diff < 0, 0)
    ema_up = up.ewm(alpha=1/n, adjust=False).mean()
    ema_down = down.ewm(alpha=1/n, adjust=False).mean()
    rs = ema_up / ema_down
    rsi = round(100 - 100 / (1 + rs), 2)
    return pd.Series(rsi, name='RSI_' + str(n)).fillna('nan')


def MACD_Level(data, n_fast, n_slow):
    EMAfast = data['收盘'].ewm(span=n_fast, min_periods=n_slow).mean().round(2)
    EMAslow = data['收盘'].ewm(span=n_slow, min_periods=n_slow).mean().round(2)
    data['MACD'] = EMAfast - EMAslow
    data['MACDsignal'] = data['MACD'].ewm(span=9, min_periods=9).mean().round(2)
    data['MACDhist'] = data['MACD'] - data['MACDsignal']
    return data[['MACD', 'MACDsignal', 'MACDhist']].fillna('nan')


In [7]:
# create price data type constraint
cypher = 'CREATE CONSTRAINT IF NOT EXISTS FOR (n:行情类型) REQUIRE (n.股票代码, n.行情类型) IS UNIQUE'
graph.run(cypher=cypher)

# create price data constraint
cypher = 'CREATE CONSTRAINT IF NOT EXISTS FOR (n:行情数据) REQUIRE (n.股票代码, n.行情类型, n.报告期) IS UNIQUE'
graph.run(cypher=cypher)

# create price indicators type constraint
cypher = 'CREATE CONSTRAINT IF NOT EXISTS FOR (n:技术指标类型) REQUIRE (n.股票代码, n.技术指标类型) IS UNIQUE'
graph.run(cypher=cypher)

# create price indicatorsconstraint
cypher = 'CREATE CONSTRAINT IF NOT EXISTS FOR (n:技术指标数据) REQUIRE (n.股票代码, n.技术指标类型, n.报告期) IS UNIQUE'
graph.run(cypher=cypher)


start_date = (datetime.now() - timedelta(365)).strftime('%Y%m%d')
end_date = datetime.now().strftime('%Y%m%d')

neccessry_price_data_type = ['收盘', '成交量', '涨跌幅', '换手率']

for code in tqdm(stock_code_list):
    stock = matcher.match("股票", 代码=code).first()
    if stock is None:
        continue
    # get price info
    try:
        df_price = ak.stock_zh_a_hist(symbol=code, period="daily", start_date=start_date, end_date=end_date, adjust="hfq")
        df_price = df_price[['日期'] + neccessry_price_data_type].copy()
        df_price['报告期'] = df_price['日期'].apply(process_date_to_report_time)
        df_price['日期'] = df_price['日期'].astype(str)
        df_price['股票代码'] = code
    except:
        log.warning(f'Can not get {code} price data')
        continue
    
    for price_type in neccessry_price_data_type:
        # create price data price_type node
        price_data_type_node = Node('行情类型', 股票代码=code, 行情类型=price_type, name=price_type)
        try:
            graph.create(price_data_type_node)
        except:
            continue

        # create stock - main shareholders relationship
        stock = matcher.match("股票", 代码=code).first()
        price_data_type = matcher.match("行情类型", 股票代码=code, 行情类型=price_type).first()
        realation = Relationship(stock, '行情', price_data_type)
        try:
            graph.create(realation)
        except:
            pass

    for report_time, df in df_price.groupby('报告期'):
        for price_type in neccessry_price_data_type:
            assert price_type in df.columns, f'{price_type} not in {code} {report_time} price data'
            attribute = {}
            for date, value in df[['日期', price_type]].values:
                attribute[date] = value

            price_data_node = Node('行情数据', 股票代码=code, 行情类型=price_type, 报告期=report_time, name=report_time, **attribute)
            try:
                graph.create(price_data_node)
            except:
                continue

            # create stock - main shareholders relationship
            price_data_type = matcher.match("行情类型", 股票代码=code, 行情类型=price_type).first()
            price_data = matcher.match("行情数据", 股票代码=code, 行情类型=price_type, 报告期=report_time).first()
            realation = Relationship(price_data_type, '行情具体数据', price_data)
            try:
                graph.create(realation)
            except:
                pass
    
    df_indicators = df_price[['日期', '报告期', '收盘']].copy()
    df_indicators['MA5'] = MA(data=df_indicators, n=5)
    df_indicators['MA20'] = MA(data=df_indicators, n=20)
    df_indicators['EMA5'] = EMA(data=df_indicators, n=5)
    df_indicators['EMA20'] = EMA(data=df_indicators, n=20)
    df_indicators['RSI'] = RSI(data=df_indicators, n=14)
    df_indicators[['MACD', 'MACD_hist', 'MACD_signal']] = MACD_Level(data=df_indicators, n_fast=12, n_slow=26)
    df_indicators = df_indicators.drop('收盘', axis=1)

    indicator_name_map = {
        'MA5': '五日移动均线',
        'MA20': '二十日移动均线',
        'EMA5': '五日指数移动均线',
        'EMA20': '二十日指数移动均线',
        'RSI': '相对强弱指标',
        'MACD': 'MACD线',
        'MACD_hist': 'MACD离差图',
        'MACD_signal': 'MACD信号线'
    }
    

    for idicator_type in indicator_name_map.keys():
        # create price idicator type node
        price_idicator_type_node = Node('技术指标类型', 股票代码=code, 技术指标类型=indicator_name_map[idicator_type], name=idicator_type)
        try:
            graph.create(price_idicator_type_node)
        except:
            break

        # create stock - main shareholders relationship
        stock = matcher.match("股票", 代码=code).first()
        price_data_type = matcher.match("技术指标类型", 股票代码=code, 技术指标类型=indicator_name_map[idicator_type]).first()
        realation = Relationship(stock, '行情', price_data_type)
        try:
            graph.create(realation)
        except:
            pass

    for report_time, df in df_indicators.groupby('报告期'):
        for idicator_type in indicator_name_map.keys():
            assert idicator_type in df.columns, f'{idicator_type} not in {code} {report_time} price data'
            attribute = {}
            for date, value in df[['日期', idicator_type]].values:
                attribute[date] = value

            price_indicator_node = Node('技术指标数据', 股票代码=code, 技术指标类型=indicator_name_map[idicator_type], 
                                        报告期=report_time, name=report_time, **attribute)
            try:
                graph.create(price_indicator_node)
            except:
                break

            # create stock - main shareholders relationship
            price_indicator_type = matcher.match("技术指标类型", 股票代码=code, 技术指标类型=indicator_name_map[idicator_type]).first()
            price_indicator = matcher.match("技术指标数据", 股票代码=code, 技术指标类型=indicator_name_map[idicator_type], 报告期=report_time).first()
            realation = Relationship(price_indicator_type, '技术指标具体数据', price_indicator)
            try:
                graph.create(realation)
            except:
                pass

100%|██████████| 5363/5363 [2:09:08<00:00,  1.44s/it]  
