# A股验证后股票数据Exe1 V1 （202411）

# Validated Stock Data of A-share Stock In Chinese Market Execute Document V1 （202411）

In [1]:
import datetime
today = datetime.datetime.today()
current_date=today.strftime("%Y%m%d")  #用于标记此次处理的数据表

## 1. 概述
本文通过以下几步获得验证的股票数据，数据源主要是AKShare源，通过国泰君安App软件导出的数据进行比对后，保存入数据库供调用。分为以下几步：
- A股清单从实时行情中获取
- 通过AKShare实时行情、基本面信息和国泰君安App软件比对后获得A股可信清单
- 将可信A股清单保存入内存以result(DataFrame)形式
- 从国泰君安APP中获取历史数据（不复权）作为A股历史数据验证数据
- 将国泰君安A股历史验证数据（不复权）通过AKShare在线获取股票历史不复权数据与验证数据比对后，获得可信A股历史数据，存入verifystock数据库gtja表（DolphinDB)
- 根据在可信A股历史验证数据中的股票代码，从AKShare网上再下载所需的复权信息（后复权为主，但可自由选择），存入dailystock数据库ashare表（DolphinDB)

## 1. A股清单和CDA验证
我们采用sqlite数据库保存A股清单
### 1.1 AkShareA股清单下载
通过实时行情数据-京沪深A股栏目,提取所有AKShare股票A股清单
（ https://akshare.akfamily.xyz/data/stock/stock.html#id1 ）

In [2]:
import pandas as pd
import sqlite3
import akshare as ak
import numpy as np
import logging
import json

#disable future warnings of akshare package
import warnings
warnings.filterwarnings("ignore")

# derived from CQF E3 Assginment, delete the trans frunction.
# function of data acquisition: if there's table in database use 
#     native data, else retrieve from akShare Source.
# AKShareFunc -- AKShare function to retrieve data from web source,
# paraDict -- parameters of AKShare function in the form of Dictionary,
# tablename -- table name to store data(svm.db),
# filelink -- database file link,
def get_data(AKShareFunc,paraDict,tablename,filelink,inform=True):
    conn = sqlite3.connect(filelink)
    cursor = conn.cursor()
    cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table'\
    AND name='{tablename}'")
    result = cursor.fetchone()
    dataEng=pd.DataFrame()
    if not result:
        if inform:
            print('get data from source.')
        fundData = AKShareFunc(**paraDict)
        fundData.to_sql(tablename, conn,index=False)
    else:
        if inform:
            print('read from sql.')
    funddataEng=pd.read_sql(f"SELECT * FROM {tablename}", conn)
    cursor.close()
    conn.close()
    return funddataEng

#生成stock symbol数据库（sqlite3)
def gen_stocksymb(filelink,tablename):
    AKShareFunc=ak.stock_zh_a_spot_em
    paraDict={}
    AKrealtimeA = get_data(AKShareFunc,paraDict,tablename,filelink)
    print(f'从{tablename}获得AKShare股票代码清单。')
    return {'date':formatted_date,'df':AKrealtimeA}

tablename='stockSymb_AK'+current_date
filelink='Data/stock.db'

#分步代码，需要分步运行时解除以下注释
#AKrealtimeA=gen_stocksymb(filelink,tablename)['df']

### 1.2 国泰君安富易APP所有A股清单
#### 1.2.1 沪深京盘后数据下载维护
点击“行情/A股市场”，设置/盘后数据下载（1992-1-1至今）
![gtja_afterdata.png](image/gtja_afterdata.png)

#### 1.2.2 导出数据xls
- 选择APP显示A股行情：点击“行情/A股市场”
- 菜单选择：“设置/数据导出”（目标文件夹：共享本地目录/每日盘前/全部A股241203.xls)
- 将源数据复制到待处理数据池：(/Users/mac/Downloads/同步空间/notebook/薛猫选股/A股验证股票数据/Data/manual/全部Ａ股20241203.xls）。

#### 1.2.3 手工整理csv
形成文档"Data/gtjaA20241203.csv"，主要整理：  
- 去掉最后一行“数据来源:通达信"
- 同时检查单个或者多个0开头的股票代码是否是文本形式
- 将空值"--"转换为-1
- 将xls转换为csv
![guotaiA.png](image/guotaiA.png)

#### 1.2.4 读入数据

In [3]:
datalink=f'Data/gtjaA20241203.csv'   #手工保存国泰君安App数据
#分步代码，需要分步运行时解除以下注释
#gt_A=pd.read_csv(datalink,dtype={"代码":str})
#gt_A

### 1.3 A股清单数据比对
#### 1.3.1 比对函数

In [4]:
#find two difference of DataFrame column value items regardless of its sequences. 
# 比对两个不同的DataFrame相同列和相同索引处数据的差异，并给出报告
# NOT SUITABLE FOR INDEX COMPARISON ON COLUMNS FOR SEQUENCE REDUPLICATED VALUES
# colnameList: List of the names of columns to be compared
# return 
#  colname:
#  df12common: df1与公共部分差异
#  df22common: df2与公共部分差异
#  df1todf2:  df1有而df2没有的股票
#  df2todf1:  df2有而df1没有的股票
# edited 2024-10-7 can be 
def compareDiff(df1,df2,colnameList=None):
    diffDf=pd.DataFrame()
    if colnameList==None:
        colnameList=list(set(df1.columns.tolist()+df2.columns.tolist()))
    for colname in colnameList:
        df1Arr=set(df1[colname])
        df2Arr=set(df2[colname])

        common=df1Arr & df2Arr
        df12common=(len(df1Arr)-len(common))/len(df1Arr)
        df22common=(len(df2Arr)-len(common))/len(df2Arr)
        
        df1todf2=df1Arr-df2Arr
        df2todf1=df2Arr-df1Arr
        
        #change decimals to strings if values are numbers
        if len(df1todf2)>0 and type(list(df1todf2)[0])!='str':
            df1todf2=set(map(str,df1todf2))
        if len(df2todf1)>0 and type(list(df2todf1)[0])!='str':
            df2todf1=set(map(str,df2todf1))
            
        df1Diff=','.join(df1todf2)
        df2Diff=','.join(df2todf1)
        #print(df1Diff,df2Diff)
        
        # colname: name of compared columns
        # df12common,df22common: Difference ratio with intersection of DataFrames(df1,df2)
        # dif1Diff,dif2Diff: List of Differences
        newdf=pd.DataFrame({
            'colname':[colname],
            'df12common':[df12common],
            'df22common':[df22common],
            'df1Diff':[df1Diff],
            'df2Diff':[df2Diff]
        })
        diffDf=pd.concat([diffDf,newdf],ignore_index=True)

    return diffDf

In [5]:
# 需要运行测试时，取消注释
#df1=pd.DataFrame({'date':['2020-01-01','2020-01-02','2020-01-03'],'value':[3,4,5]})
#df1.set_index('date',inplace=True)
#df2=pd.DataFrame({'date':['2020-01-02','2020-01-03','2020-01-04'],'value':[4,4.9,5]})
#df2.set_index('date',inplace=True)
#compareDiff(df1,df2)

"\ndf1=pd.DataFrame({'date':['2020-01-01','2020-01-02','2020-01-03'],'value':[3,4,5]})\ndf1.set_index('date',inplace=True)\ndf2=pd.DataFrame({'date':['2020-01-02','2020-01-03','2020-01-04'],'value':[4,4.9,5]})\ndf2.set_index('date',inplace=True)\ncompareDiff(df1,df2)\n"

#### 1.3.2 AkShare和国泰君安A股清单差异分析
国泰君安与AkShare数据公共部分：common  
国泰君安数据与公共部分完全重合，差异为：0%  
AkShare数据与公共部分差异：5.09%  
结论：AkShare数据比国泰君安多出287条。

In [6]:
#分步代码，需要分步运行时解除以下注释
#AkGtjaDifDf=compareDiff(gt_A,AKrealtimeA,['代码'])
#AkGtjaDifDf

AkShare数据的差异部分列表(比国泰君安多出的股票)

In [7]:
#分步代码，需要分步运行时解除以下注释
#AkGtjaDifList=AkGtjaDifDf['df2Diff'].tolist()[0].split(',')
#df_filtered = AKrealtimeA[AKrealtimeA['代码'].isin(AkGtjaDifList)]
#df_filtered

这些股票都没有最新价信息（即AkShare比国泰君安多出部分的股票清单与没有实时报价信息的股票清单重合————即国泰君安股票清单与AkShare股票清单在具备实时报价信息上一致）。

In [8]:
#分步代码，需要分步运行时解除以下注释
#df_filtered2=df_filtered[df_filtered['最新价'].isna()]  #stocks without current quotes
#AkDifAkNulldf=compareDiff(df_filtered,df_filtered2,['代码'])
#AkDifAkNulldf

#### 1.3.3 补充说明：AkShare 两网和退市股票并不包括全部退市股票
AkShare溢出的股票清单并不在 AkShare 两网及退市股票信息中

In [9]:
import akshare as ak

#分步代码，需要分步运行时解除以下注释

##AkShare delisted stocks
#AKShareFunc=ak.stock_zh_a_stop_em
#paraDict={}
#tablename='delist'+current_date

#AKdelist = get_data(AKShareFunc,paraDict,tablename,filelink)[['代码','名称']]

#filtereddf3=compareDiff(df_filtered,AKdelist,['代码'])
#filtereddf3

### 1.4 A股清单做基本面信息验证
AkShare数据有基本面信息接口，我们查询此基本面信息接口，做以下两件事情： 
- 与我们保存的A股清单的“代码”和“名称”做验证，防止股票代码信息与股票信息错位（见 [涨停板敢死队](../涨停板敢死队/涨停板敢死队.ipynb) 2.2.4.2 获得基准验证数据：国泰君安富易App2020年数据集）
- 增加IPO日期（用于校验历史数据起始日期）

对A股清单的代码和名称做交叉验证。本步骤需要人工处理，因为简称可能有所不同，但标的还是一个。代码如果需要从下文再重新运行，需要手动删除faillist.pkl文件，避免跳过检查和IPO添加环节。  

In [10]:
# A is Brief of B which means:
# 1)all characters in A are in B
# 2)the order of characters in A is the same as those in B
def isBriefOf(A, B):
    lenA, lenB = len(A), len(B)
    i, j = 0, 0
    
    while i < lenA and j < lenB:
        if A[i] == B[j]:
            i += 1
        j += 1
    return i == lenA

In [11]:
import tempfile
import pickle
import numpy as np
import re
import os

# Save a temporary file named "faillist.pkl" in the current directory to support the resumption of the network query for fundamental information. 
# To start over, please manually delete the file or choose loadTemp=False.
# AShareDf: the target stock DataFrame to be checked
# addDate: add IPO date information to the DataFrame
# fix: fixed automatically if there's in accordance with the source
# loadtemp: load temp file which stores last source query, False to restart all queries
def checkFoundamental(AShareDf, addDate=True, fix=False, loadtemp=True):
    mismatch = []
    checkList = None
    faillist = []
    resdf = AShareDf
    i = 0
    failfile='faillist.pkl'
    foundfilepath='Data/found.csv'
    checkAkshare=True
    
    if loadtemp:
        if os.path.exists(failfile):
            with open(failfile, 'rb') as temp_file:
                checkList = pickle.load(temp_file)  # support resumable transfer 
            if os.path.exists(foundfilepath):
                resdf=pd.read_csv(foundfilepath)
                checkAkshare=False
        else:
            checkList = AShareDf['代码'].tolist()
    else:
        checkList = AShareDf['代码'].tolist()

    # for the stability of AKShare source, we retreat 3 times if fail
    while i < 3 and len(checkList) > 0 and checkAkshare:
        print(f'Try time {i+1}:')
        for s in checkList:
            msg = f'checking {s}...'
            print(f'{msg:<100}', end='\r')
            df_row=AShareDf.loc[AShareDf['代码'] == s, '名称']
            if not df_row.empty:
                df_value = df_row.values[0]

                querySuccess=False
                try:
                    s_df = ak.stock_individual_info_em(symbol=s)
                    querySuccess=True
                except Exception as e:
                    msg = f'{e}. Try {s} another {2-i} times.'
                    print(f'{msg:<100}', end='\r')
                    faillist.append(s)

                if querySuccess:
                    s_value = s_df.loc[s_df['item'] == '股票简称', 'value'].values[0]

                    # delete the spaces
                    df_value_no_space = re.sub(r'\s+', '', df_value)
                    s_value_no_space = re.sub(r'\s+', '', s_value)

                    # delete the special prefixes ('XD','N','ST','C') of stock names
                    # XD: Ex-dividend and ex-rights
                    # N: Newly IPO stocks the first day
                    # C: Newly IPO stocks the first week
                    # ST:Special Treatment
                    df_value_trim = re.sub(r'^XD|^N|^ST|\*ST$|C', '', df_value_no_space)
                    s_value_trim = re.sub(r'^XD|^N|^ST|\*ST$|C', '', s_value_no_space)

                    # get Chinese Characters
                    df_value_chinese = re.findall(r'[\u4e00-\u9fff]', df_value_trim)
                    s_value_chinese = re.findall(r'[\u4e00-\u9fff]', s_value_trim)

                    #whether the two strings are at least one direction inclusion relationship
                    isBrief=isBriefOf(s_value_chinese,df_value_chinese) or isBriefOf(df_value_chinese,s_value_chinese)

                    # 判断两个字符串相等
                    if df_value_no_space == s_value_no_space or isBrief:
                        msg = f"code: {s} checked: matched codes."
                        print(f'{msg:<100}', end='\r')
                        pass
                    else:
                        mismatch.append({'code': s, 'source': s_value, 'target': df_value})
                        msg = f"code: {s},df({df_value}) vs source({s_value}) mismatched."
                        if fix:
                            AShareDf.loc[AShareDf['代码'] == s,'名称'] = s_value
                            msg += "replaced."
                        print(f'{msg:<100}', end='\r')

                    # add IPO date to DataFrame
                    if addDate:
                        ipostr = str(s_df.loc[s_df['item'] == '上市时间', 'value'].values[0])
                        print(f'adding {msg:<100} IPO:{ipostr}', end='\r')
                        resdf.loc[resdf['代码'] == s, 'IPO'] = datetime.datetime.strptime(ipostr, "%Y%m%d") if ipostr!='-' else np.nan
                else:
                    faillist.append(s)
                    msg = f'{s} 在faillist.pkl文件中有该股票代码，但是在目标dataframe中没有找到，可能来源于上一次的记录，解决此问题，可以删除上一次记录faillist.pkl后重试.'
                    print(msg)
        print(f'faillist({len(faillist)}):{faillist}')
        checkList = faillist
        faillist = []
        i += 1
    if checkList == []:
        msg = 'All codes are checked'
        msg += ' and fixed.' if fix else '.'
    else:
        msg = f'Check the following manually because of the inquiry failure(total {len(checkList)}): \n{checkList}'
    
    if loadtemp:
        with open(failfile, 'wb') as temp_file:
            pickle.dump(checkList, temp_file)
    
    print(f'{msg:<100}', end='\r')
    
    return {'mismatch': mismatch, 'df': resdf, 'fail': checkList}

In [12]:
#分步代码，需要分步运行时解除以下注释
#checkFound=checkFoundamental(gt_A)

不匹配的股票清单：原因是曾用名。

In [13]:
#分步代码，需要分步运行时解除以下注释
#checkFound['mismatch']

In [14]:
#分步代码，需要分步运行时解除以下注释
#gt_A_val=checkFound['df']
#gt_A_val.to_csv(foundfilepath)
#gt_A_val[['代码','名称','IPO']]

### 1.5 一体化运行代码：获得验证后的A股股票清单：
获取可信A股清单：
- 国泰君安大智慧富易App，点击市场行情，A股后手工导出所有清单
- AkShare采用京沪深A股实时行情接口（ak.stock_zh_a_spot_em），获得验证数据
- 若国泰君安数据缺失，则检查是否这部分缺失数据的“最新价”为空，若是，则验证通过；否则，报警，验证不通过，并列出非空的股票清单。 
- 对国泰君安数据做基本面校验，手工检查代码和简称没有错位情况。
- 若验证通过，将该股票清单保存到共享stock数据库(/Users/mac/Downloads/同步空间/notebook/薛猫选股/Data/stock.db)，按照不同日期保存A股清单的快照。

集中处理函数代码如下：**以下代码运行时，注意以下几点**
- 按照1.2提示下载国泰君安股票清单到指定目录
- 若不是断点继续，请删除同程序目录下faillist.pkl文件

In [15]:
import os

#get validated AshareList from AkShare and check
# dblink: db link to store the validated AShare list
# validlink: (optional) read from validlink of gtja data (.csv) for validation
# fix: fix names from source automatically
# current_date: 当前日期戳用于标记数据库表序号批次
def storeAShareList(datalink,dblink,current_date,fix=False):
    
    gt_A=pd.read_csv(datalink,dtype={"代码":str})
    
    #validate source from AkShare
    AKrealtimeA = ak.stock_zh_a_spot_em()
    
    #validate data
    res={'valid':0,'diff':'','df':pd.DataFrame()}
    AkGtjaDifDf=compareDiff(gt_A,AKrealtimeA,['代码'])
    diffrate=AkGtjaDifDf.loc[0,'df22common']
    if diffrate>0:
        print(f'There is a {diffrate:.2%} gap comparing with AKShare verification data.')
        AkGtjaDifList=AkGtjaDifDf['df2Diff'].tolist()[0].split(',')
        df_filtered = AKrealtimeA[AKrealtimeA['代码'].isin(AkGtjaDifList)]
        df_filtered2= df_filtered[df_filtered['最新价'].isna()]
        AkDifAkNulldf=compareDiff(df_filtered,df_filtered2,['代码'])
        if AkDifAkNulldf.loc[0,'df12common']==0:
            res['valid']=1
            print('All are of none quotes, pass the verification.')
        else:
            res['diff']=AkDifAkNulldf.loc[0,'df1Diff']
            print('Check res[\'diff\'] to see details of shortage.')
    else:
        res['valid']=1
        print(f'There is no data shortage from AKShare verification.')
    
    #check manually if not automatically fixed
    file_path = 'faillist.pkl'
    if os.path.exists(file_path):
        os.remove(file_path)
        print(f"{file_path} deleted.")
    else:
        print(f"{file_path} no faillist file.")
    checkfound=checkFoundamental(gt_A,fix=fix)
    
    if res['valid']==0 and fix==False:
        print('The difference of stock codes and its names from the verification source:\n',checkfound['mismatch'])
        m=''
        while m.lower()!='y' and m.lower()!='n':
            m=input('Check manually if the names pass the verification in accordance with the source(y/n)?')
            if m.lower()=='n':
                res['valid']=0
            elif m.lower()!='y':
                print('Only support press \'y\' or \'n\', press again.')
            else:
                res['valid']=1
                
    #store to database    
    if res['valid']==1 or fix:
        table_name =  "Asharelist" + current_date
        conn = sqlite3.connect(dblink)
        res['df']=checkfound['df'][['代码','名称','IPO']]
        res['df'].to_sql(table_name, conn, if_exists='replace', index=False)
        conn.close()
    return res

In [16]:
dblink='/Users/mac/Downloads/同步空间/notebook/薛猫选股/Data/stock.db'
result=storeAShareList(datalink,dblink,current_date,True)   #直接覆盖检查差异结果
result['df']

There is a 5.13% gap comparing with AKShare verification data.
All are of none quotes, pass the verification.
faillist.pkl deleted.
Try time 1:
faillist(0):[]88708 checked: matched codes.                                                                 IPO:20241205
All codes are checked and fixed.                                                                    

Unnamed: 0,代码,名称,IPO
0,834765,美之高,2021-07-05
1,871478,巨能股份,2023-05-12
2,838275,驱动力,2021-01-25
3,831278,泰德股份,2022-06-20
4,837212,智新电子,2021-06-08
...,...,...,...
5373,301622,英思特,2024-12-04
5374,301617,博苑股份,NaT
5375,301585,蓝宇股份,NaT
5376,920098,科隆新材,2024-12-05


## 2. A股股票历史数据

我们采用时序数据库 DolphinDB 来保存所有A股历史日数据信息。相关参考模板（https://docs.dolphindb.cn/zh/tutorials/stockdata_csv_import_demo.html）

### 2.1 建立基准验证数据库：verifystock
根据查询需求（查询特定股票的日线数据、具体日期段数据等），我们决定将股票代码（Symbol)作为数据库分区维度，时间作为数据表分区维度。（具体分区大小论证，见“A股验证后股票数据study1.ipynb”文档）  
根据官方IT建议，可采用Datetime VALUE和SYMBOL HASH 30 方式复合分区。 

In [17]:
import dolphindb as ddb
import dolphindb.settings as keys

#将gtja数据保存入verify stock库
def createNativeDB(dbname,tablename):
    #建立verifystock库
    s = ddb.session(protocol=keys.PROTOCOL_DDB)
    s.connect("localhost", 8848,'admin','123456')

    SQL=f'''
    dbName="dfs://{dbname}"
    db1 = database(, RANGE, 1990.11.26 2010.11.26 2024.12.31)
    db2 = database(, HASH, [SYMBOL, 30])
    db = database(directory=dbName, partitionType=COMPO, partitionScheme=[db1, db2], engine="TSDB")
    '''

    #建立gtja表
    SQL2=f'''
    tb=table(2000:0,["Symbol","DateTime", "Open","High","Low","Close","Volume"],[SYMBOL,DATE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT])
    db.createPartitionedTable(table=tb, tableName=`{tablename}, partitionColumns=`DateTime`Symbol,sortColumns=`DateTime)
    '''
    
    s.run(SQL+SQL2)
    s.close()

#需要建库时，解除以下注释运行。
#createNativeDB('verifystock','gtja')

### 2.2 基准验证数据入库：国泰君安富易App2020年数据集

通过国泰君安富易APP手工批量导出日线功能，实现全量A股有限日期导出(批量导出功能受限于2020年开始数据)。由于有限日期，作为校验线上数据AkShare使用。注意由于复权方法各软件存在差异，为避免校验出现偏差，我们一律采用不复权的价格和成交量数据。导出文件在：
通过国泰君安富易App实现2020年至今数据全量A股导出csv文件（保存于Data/gtja_daily）,添加全部A股栏目，加入批量导出数据。  
![export_A.png](export_A.png)

App导出的文本数据文件：
- 第一行是股票代码和名称等信息；第二行是表头，分隔符是tab（不论手工导出分隔符是什么，都是tab）；
- 有两种格式文件，遇到股票名为生僻词，是GBK形式，而其它utf-8格式。因此，需要采用两种读取引擎读取文件。

[涨停板敢死队](../涨停板敢死队/涨停板敢死队.ipynb) 中（2.2.4.2 获得基准验证数据：国泰君安富易App2020年数据集）详细记录了另一种较繁琐的方法，可以防止导出文件格式更新后，与原设定不一致的情况（目前没有发生）。为简化操作，我们采用根据数据自定义表头，跳过标题和表头与固定行跳过尾部非数据文本的方式，代码简化很多，但需要定期关注GTJA软件更新后，手工导出的数据格式变化。  

#### 2.2.1 先读取国泰君安App数据

In [18]:
import os
import chardet
import pandas as pd

# 读取一个gtja文档并转化为DataFrame
# skiplast：当在开始时导出国泰君安数据时，最后一天由于未收市，其股票记录数字不同于未来统计的真实情况，所以可以选择跳过最后一行。
def readSingleGtja(file_path,skiplast=True):

    # judgement of encoder
    with open(file_path, 'rb') as f:
        raw_data = f.read()
        encoding = chardet.detect(raw_data)['encoding']

    #When there're uncommon characters, the txt file is GBK encoding else UTF-8, so read with either decoder.
    #print(f'{file_path} encoder:{encoding}',end='\r')
    if encoding=='GB2312':
        encoding='GBK'
    df = pd.read_csv(file_path, header=2, skipfooter=1,encoding=encoding)
    if not df.empty:
        names=['DateTime','Open','High','Low','Close','Volume','Amount']
        df.rename(columns=dict(zip(df.columns, names)), inplace=True)
        df['DateTime'] = pd.to_datetime(df['DateTime'])
        df.set_index('DateTime',inplace=True)
        if skiplast:
            df = df.drop(df.index[-1])
    return df

gtja_folder='Data/gtja_daily_bfq'
#若需要测试，解除以下注释
#readSingleGtja(gtja_folder+'/600016.txt')  #空文件：688605.txt

#### 2.2.2 将国泰君安A股历史验证数据存入verifystock库
考虑到很有可能证券市场还没有关闭时导出的历史数据，所以当天价格成交量等信息，由于未结束，所以去掉最后一行。

In [19]:
#保存验证数据到verifystock库gtja表
# mode:数据库添加模式
#      'append':会根据symbol和 DateTime 判断数据库已有记录，仅补全数据库中未有的记录。
#      'replace':替换原有symbol的的记录
def storeVerifystock(gtja_filename,dbname,tablename,mode='append',silent=False,skiplast=True):
    
    sym=os.path.basename(gtja_filename)[:6]
    df=readSingleGtja(gtja_filename,skiplast).reset_index()

    fail=None
    if 'DateTime' in df.columns.to_list():
        s = ddb.session(protocol=keys.PROTOCOL_DDB)
        s.connect("localhost", 8848,'admin','123456')
        df['Symbol']=sym

        if mode=='replace':
            SQL_DELE=f'''
            pt=loadTable(db,`{tablename})
            delete from pt WHERE Symbol==`{sym}
            '''
            s.run(SQL_DELE)
            iter_df=df
        else:
            SQL_QUERY=f'''
            db=database("dfs://{dbname}")
            pt=loadTable(db,`{tablename})
            select DateTime from pt where Symbol==`{sym};
            '''
            exist_df=s.run(SQL_QUERY)
            if exist_df.empty:
                iter_df=df
            else:
                iter_df = df[(df['DateTime'] > exist_df['DateTime'].max()) |
                            (df['DateTime'] < exist_df['DateTime'].min())]
            
        num=0
        if not iter_df.empty:
            symbolStr='symbol=`'+'`'.join(iter_df['Symbol'].values)
            dateStr='date='+' '.join(iter_df['DateTime'].dt.strftime('%Y.%m.%d').values)
            openStr='open='+' '.join(iter_df['Open'].astype(str).values)
            highStr='high='+' '.join(iter_df['High'].astype(str).values)
            lowStr='low='+' '.join(iter_df['Low'].astype(str).values)
            closeStr='close='+' '.join(iter_df['Close'].astype(str).values)
            volStr='volume='+' '.join(iter_df['Volume'].astype(str).values)
            SQL=f'''
            tb = loadTable("dfs://{dbname}","{tablename}")
            {symbolStr}\n{dateStr}\n{openStr}\n{highStr}\n{lowStr}\n{closeStr}\n{volStr}
            temp=table(symbol,date,open,high,low,close,volume)
            tableInsert(tb, temp);
            '''
            num=s.run(SQL)

        msg='数据库中已经有所有记录，无需追加,' if num==0 else ''
        if not silent:
            print(f'{sym}{msg}完成{num}条记录导入',end='\r')
        s.close()
    else:
        msg=f'{sym}表数据无标准数据，跳过。'
        fail=sym
        if not silent:
            print(msg,end='\r')
    return fail

#需要调试时，解除以下注释运行。
#storeVerifystock(gtja_folder+'/688602.txt','verifystock','gtja')

In [20]:
#将所有指定目录下的gtja股票数据文件经过校验后保存入
def storeAllVerifystock(folder_path):
    fails=[]
    successes=[]
    for filename in os.listdir(folder_path):
        sym=filename[:6]
        msg=f'正在导入{sym}...'
        print(f'{msg:<100}',end='\r')
        
        if filename[-4:]=='.txt':
            fail=storeVerifystock(folder_path+'/'+filename,'verifystock','gtja')
            if fail:
                fails.append(fail)
            else:
                successes.append(sym)
    
    if len(fails)>0:
        print(f'所有验证数据导入完毕.以下股票数据无标准数据：{fails}')
    else:
        print(f'完成所有验证数据导入.(总共{len(successes)})')
    s.close()
    return {'success':successes,'failure':fails}
            
            
#需要调试时，解除以下注释运行。
#storeAllVerifystock(gtja_folder)

### 2.3 AkShare数据校验并入库
#### 2.3.1 AkShare源获取数据和格式化
我们采用东财数据源（后复权），我们将列替换为英文，并采用OHLCV字段。

In [21]:
import akshare as ak

# download akshare stock data(hfq) in the certain period, and format the data
def getFormattedAkshare(symbol,start_date=None,end_date=None,fq=""):
    if end_date:
        stock = ak.stock_zh_a_hist(symbol=symbol, period="daily", start_date=start_date, end_date=end_date, adjust=fq)
    else:
        stock = ak.stock_zh_a_hist(symbol=symbol, period="daily", start_date=start_date, adjust=fq)
    if '日期' in stock.columns.to_list():
        stock_res=stock[['日期','股票代码','开盘','收盘','最高','最低','成交量','换手率']]
        stock_res=stock_res.rename(columns={
            '日期': 'DateTime',
            '股票代码': 'Symbol',
            '开盘': 'Open',
            '收盘': 'Close',
            '最高': 'High',
            '最低': 'Low',
            '成交量': 'Volume',
            '换手率': 'TurnOver'
        })
        stock_res['DateTime'] = pd.to_datetime(stock_res['DateTime'])
        stock_res['Market'] = 'cn_stock'
        stock_res.set_index('DateTime',inplace=True)
    else:
        stock_res=pd.DataFrame()
    return stock_res

#测试代码，需要调试时，可打开注释
#getFormattedAkshare('600016','20200101').reset_index()

Unnamed: 0,DateTime,Symbol,Open,Close,High,Low,Volume,TurnOver,Market
0,2020-01-02,600016,6.36,6.36,6.40,6.35,1043665,0.29,cn_stock
1,2020-01-03,600016,6.37,6.34,6.38,6.34,599124,0.17,cn_stock
2,2020-01-06,600016,6.32,6.30,6.37,6.28,802168,0.23,cn_stock
3,2020-01-07,600016,6.32,6.33,6.36,6.30,574019,0.16,cn_stock
4,2020-01-08,600016,6.32,6.29,6.33,6.26,693523,0.20,cn_stock
...,...,...,...,...,...,...,...,...,...
1190,2024-12-02,600016,3.96,3.98,3.99,3.93,2197577,0.62,cn_stock
1191,2024-12-03,600016,3.97,4.04,4.04,3.97,2542357,0.72,cn_stock
1192,2024-12-04,600016,4.04,4.05,4.07,4.01,2464741,0.70,cn_stock
1193,2024-12-05,600016,4.03,4.02,4.06,4.01,1810655,0.51,cn_stock


#### 2.3.2 国泰君安本地数据校验

国泰君安富易数据作校验(取值差异0.001作为阈值，即不超过0.1%)

In [22]:
# compare the difference of two DataFrame of the same indexes
# colList：columns list to compare
# thresh: difference threshold
def compareDiff2(tDf,vDf,thresh,colList):
    compareind=vDf.index
    targetdfcomp = tDf[tDf.index.isin(compareind)]
    
    #生成差异报告和超过阈值的差异报告（DataFrame）
    comparedf=(targetdfcomp[colList]-vDf[colList])/(targetdfcomp[colList]+vDf[colList])*2 #差异报告
    diff_df=comparedf[(comparedf.abs()>thresh).any(axis=1)] #超过阈值的差异报告
    
    #包含空值和非空值的差异统计率
    non_null_count = comparedf.notna().sum().sum()
    null_count = comparedf.isna().sum().sum()
    diff_col_count = (comparedf.abs() > thresh).sum()
    diff_count = diff_col_count.sum() 
    if non_null_count > 0:
        commonRate = diff_count / non_null_count   #不包含空值的差异率
    total_count = non_null_count + null_count
    if total_count > 0:
        totalRate = (diff_count + null_count) / total_count  #包含空值的差异率
    
    return {'diff_rate_not_null':commonRate,'total_diff_rate':totalRate,'compared_df':comparedf,'diff_df':diff_df}

In [23]:
#测试代码，需要调试时，可打开注释
#a=pd.DataFrame({'val':[1,2,3,4]})
#b=pd.DataFrame({'val':[1,2,None,4.01]})
#compareDiff2(a,b,0.13,['val'])

#### 2.3.3 建立dailystock数据库

In [24]:
#需要建库时，解除以下注释运行。
#createNativeDB('dailystock','ashare')

#### 2.3.4 读取验证数据(DolphinDB)
从数据库中读取gtja验证数据（为支持断点续传，逐条存入DolphinDB)

In [25]:
#查询verifystock库中指定校验表tablename中指定symbol的DataFrame
def getVerify(symbol,tablename):
    s = ddb.session(protocol=keys.PROTOCOL_DDB)
    s.connect("localhost", 8848,'admin','123456')
    
    SQL=f'''
    db=database("dfs://verifystock")
    pt=loadTable(db,`{tablename})
    select DateTime,Open,High,Low,Close,Volume from pt where Symbol=`{symbol};
    '''
    res=s.run(SQL)
    res['DateTime']=pd.to_datetime(res['DateTime'])
    res=res.set_index('DateTime')
    s.close()
    return res

#测试代码，需要调试时，可打开注释
#getVerify('6000160','gtja').empty

#### 2.4.2 校验下载AKShare并入库（DolphinDB）
下载和验证函数

In [26]:
#按照symbol查询获取A股清单数据（本地sqllite3）指定字段信息
# current_date:数据表后缀，指定版本的Asharelist表
# collist: 指定返回的列值列表
def getAsharelistBySymbol(dblink,current_date,symbol,collist):
    conn = sqlite3.connect(dblink)
    table_name =  "Asharelist" + current_date
    colStr=','.join(collist)
    Asharelist=pd.read_sql(f'select {colStr} from {table_name} where 代码={symbol}',conn,parse_dates=['IPO'])
    return Asharelist

#需要调试时，解除以下注释运行。
#getAsharelistBySymbol(dblink,current_date,'6886012',['名称','IPO'])
#getAsharelistBySymbol(dblink,current_date,'688602',['名称','IPO']).loc[0,'IPO'].strftime('%Y%m%d')

In [27]:
# get a single stock data from AKshare from foundamental DataFrame stocklist and verified with validateDfs Dict
# symbol:stock symbol
# ipodayStr: A股清单库版本日期字符串
# dfThresh: threshhold for judging DataFrame difference
# valThresh: threshhold for judging value difference  
# fq: 'qfq','hfq' and '' for forward,backward and none rights and devidend adjustments
# strictVerify: 如果True，则严格限定在验证数据库范围内的股票，即不在此库中的股票一律不通过；
#              如果False，则只排除验证数据库内股票校验不通过的股票，如果验证数据库内不存在目标股票，也通过。
# ATTENTION: For the variety of methods unpublished, the verification DOES NOT verify adjust data,
#   if the none adjust data pass the verification, then pass the adjust data 
# 注意单个检验函数并不检查:
#    - symbol是否在verifystock库中，需要额外添加该逻辑。
#    - foundDf['IPO']可能是NAT空值，需要额外添加逻辑处理。
def getVerifiedAKShareSingle(symbol,ipodayStr,dfThresh,valThresh,fq,strictVerify=False):
    res={}
    if not asharelist.empty:
        akshareDaily=getFormattedAkshare(symbol,ipodayStr)
        validateDf=getVerify(symbol,'gtja')
        
        #如果验证数据库非空，则不论是否strictVerify，根据验证结果判断是否pass，并返回差异或者准确数据集
        #如果验证数据库为空，若严格校验，则pass不通过，但返回准确数据集；若非严格校验，则pass通过，且返回准确数据集。
        if not validateDf.empty:
            validateDf['Volume']=(validateDf['Volume']/100).round()

            comparelist=['Open','High','Low','Close','Volume']
            comres=compareDiff2(validateDf,akshareDaily,valThresh,comparelist)
            if comres['total_diff_rate']>=dfThresh:
                res['pass']=False
                res['diff_rate_not_null']=comres['diff_rate_not_null']
                res['diff_df']=comres['diff_df']
                res['total_diff_rate']=comres['total_diff_rate']
            else:
                res['pass']=True
                res['df']=akshareDaily
                res['IPOstr']=ipodayStr
        elif strictVerify:
            res['pass']=False
            res['df']=akshareDaily
            res['IPOstr']=ipodayStr
        else:
            res['pass']=True
            res['df']=akshareDaily
            res['IPOstr']=ipodayStr
        
    return res

#测试代码，需要调试时，可打开注释

#conn = sqlite3.connect(dblink)
#symbol='600016'
#table_name =  "Asharelist" + current_date
#asharelist=pd.read_sql(f'select * from {table_name}',conn,parse_dates='IPO')
#ipodayStr=asharelist.loc[asharelist['代码']==symbol,'IPO'].dt.strftime('%Y%m%d').values[0]
#getVerifiedAKShareSingle('600016',ipodayStr,0.005,0.001,"")['df'].reset_index()[['Symbol','DateTime','Open','High','Low','Close','Volume']]
#asharelist['代码'].to_list()

Unnamed: 0,Symbol,DateTime,Open,High,Low,Close,Volume
0,600016,2000-12-19,20.00,21.00,18.50,18.56,1563524
1,600016,2000-12-20,18.47,18.47,17.91,18.10,290467
2,600016,2000-12-21,18.18,18.48,18.00,18.09,113629
3,600016,2000-12-22,18.10,18.17,17.60,17.66,130264
4,600016,2000-12-25,17.60,17.62,17.01,17.20,101131
...,...,...,...,...,...,...,...
5743,600016,2024-12-02,3.96,3.99,3.93,3.98,2197577
5744,600016,2024-12-03,3.97,4.04,3.97,4.04,2542357
5745,600016,2024-12-04,4.04,4.07,4.01,4.05,2464741
5746,600016,2024-12-05,4.03,4.06,4.01,4.02,1810655


根据A股清单、gtja股票验证库（DolphinDB)遍历下载、搜集并验证数据(后复权)。   
**注意事项**  
- 需要运行打开DolphinDB
- Data/storeVarified.txt文件是上一次运行过程文件，支持断点续传，但如果需要重新运行，需要手动删除。

In [34]:
# get all verified stock datas
# strictVerify: 如果True，则严格限定在验证数据库范围内的股票，即不在此库中的股票一律不通过；
#              如果False，则只排除验证数据库内股票校验不通过的股票，如果验证数据库内不存在目标股票，也通过。
# loadTemp: 是否断点续传
# dblink: A股清单Asharelist的数据库链接(sqlite3)
# Asharelist表的当前版本日期：current_date
def storeVerifiedAShareAll(dblink,current_date,dfThresh,valThresh,fq,strictVerify=False,loadTemp=True):
    logfile='Data/storeVarified.txt'
    successlist=[]
    faillist=[]
    if loadTemp:
        contentlist=[]
        if os.path.exists(logfile):
            with open(logfile, 'r') as temp_file:
                for line in temp_file:
                    contentlist.append(line.strip())
                    
    conn = sqlite3.connect(dblink)
    table_name =  "Asharelist" + current_date
    asharelist=pd.read_sql(f'select * from {table_name}',conn,parse_dates='IPO')
    
    
    if loadTemp:
        asharelist=asharelist[~asharelist['代码'].isin(contentlist)]
    
    pool = ddb.DBConnectionPool("localhost", 8848, 3, "admin", "123456")
    appender = ddb.PartitionedTableAppender(dbPath="dfs://dailystock", tableName="ashare", partitionColName="Symbol", dbConnectionPool=pool)
    
    total,no_fq,with_fq,fail_symbol,fail_with_no_IPO,fail_with_no_data,pass_with_no_IPO=0,0,0,0,0,0,0
    for index, row in asharelist.iterrows():
        symbol=row['代码']
        ipodayStr=asharelist.loc[asharelist['代码']==symbol,'IPO'].dt.strftime('%Y%m%d').values[0]
        msg=f'正在验证{symbol}...'
        print(f'{msg:<100}',end='\r')
        if not pd.isna(row['IPO']):
            singleres=getVerifiedAKShareSingle(symbol,ipodayStr,dfThresh,valThresh,fq,strictVerify=False)
            singleresStr=singleres["pass"]
            msg=f'{symbol}校验结果:{singleresStr}'
            print(f'{msg:<100}',end='\r')
            total+=1
            if singleres['pass']:
                if fq=='':
                    app_df=singleres['df']
                    no_fq+=1
                else:
                    msg=f'{symbol}获取从{ipodayStr}开始的数据...'
                    print(f'{msg:<100}',end='\r')
                    app_df=getFormattedAkshare(symbol,ipodayStr,fq=fq)
                    with_fq+=1
                re=appender.append(app_df.reset_index()[['Symbol','DateTime','Open','High','Low','Close','Volume']])
                successlist.append(symbol)
                msg=f'{symbol}完成{re}条记录存储。'
                print(f'{msg:<100}',end='\r')
            else:
                fail_symbol+=1
                msg=f'{symbol}数据有差异，总差异率{singleres["total_diff_rate"]}...'
                print(f'{msg:<100}',end='\r')
                faillist.append(symbol)
        else:
            if strictVerify:
                msg=f'{symbol} IPO day missing.'
                print(f'{msg:<100}',end='\r')
                faillist.append(symbol)
                fail_with_no_IPO+=1
            else:
                gtfa=getFormattedAkshare(symbol,fq=fq)
                if gtfa.empty:
                    msg='No data before IPO day.'
                    print(f'{msg:<100}',end='\r')
                    faillist.append(symbol)
                    fail_with_no_data+=1
                else:
                    app_df=gtfa
                    re=appender.append(app_df.reset_index()[['Symbol','DateTime','Open','High','Low','Close','Volume']])
                    msg=f'没有IPO日期信息，{symbol}获取从头开始的数据{re}条导入数据库。'
                    print(f'{msg:<100}',end='\r')
                    pass_with_no_IPO+=1
                    
        #记录已经处理的数据
        with open(logfile, 'a') as file:
            file.write(symbol+'\n')
                    
    msg='完成所有A股信息下载和认证。'
    print(f'{msg:<100}',end='\r')
    
    return {'total':total,'no_fq':no_fq,'with_fq':with_fq,'fail_symbol':fail_symbol,'fail_with_no_IPO':fail_with_no_IPO,'fail_with_no_data':fail_with_no_data,'pass_with_no_IPO':pass_with_no_IPO}

getRes=storeVerifiedAShareAll(dblink,current_date,0.005,0.001,'hfq')
getRes

### 2.5 将验证后的数据保存入dailystock数据库

In [None]:
#需要建库时，解除以下注释运行。
createNativeDB('dailystock','ashare')
storeStock(getRes['dfs'],'verifystock','gtja')

## 3. 附件：工具集和参考

#### 数据处理过程参考：
涨停板敢死队.ipynb

#### 单个表结构查询

In [None]:
#如果需要使用工具，去掉以下注释
#import sqlite3

#get the field list of a table
#def getStruct(filelink,table_name):
#    conn = sqlite3.connect(filelink)
#    cursor = conn.cursor()
#    cursor.execute(f"PRAGMA table_info({table_name});")
#    fields = cursor.fetchall()
#    conn.close()
#    return [field[1] for field in fields]

# getStruct(filelink,f'gtja_{symbol}')

#### AKShare查询个股信息和IPO等

In [None]:
#如果需要使用工具，去掉以下注释
# b_df=ak.stock_individual_info_em('301551')
# b_df

#### Dolphin DB 数据维护工具
删除指定表

In [None]:
#如果需要使用工具，去掉以下注释
#s = ddb.Session()
#s.connect("localhost", 8848, "admin", "123456")
#SQL='''
#dbName="dfs://verifystock"
#dropDatabase(dbName)
#'''
#s.run(SQL)
#s.close()

删除数据，保留结构

In [None]:
#如果需要使用工具，去掉以下注释
#s = ddb.Session()
#s.connect("localhost", 8848, "admin", "123456")
#SQL='''
#dbName="dfs://verifystock"
#truncate(dbName,`gtja)
#'''

#??为何报错在truncate中
#s.run(SQL)
#s.close()

DolphinDB查询是否存在数据

In [None]:
#如果需要使用工具，去掉以下注释
#s = ddb.Session()
#s.connect("localhost", 8848, "admin", "123456")
#SQL_QUERY=f'''
#                db=database("dfs://dailystock")
#                pt=loadTable(db,`ashare)
#                select * from pt;
#                '''
#exist_df=s.run(SQL_QUERY)
#s.close()
#exist_df

删除指定数据

In [None]:
#如果需要使用工具，去掉以下注释
#s = ddb.Session()
#s.connect("localhost", 8848, "admin", "123456")
#SQL_QUERY=f'''
#                db=database("dfs://verifystock")
#                pt=loadTable(db,`gtja)
#                delete from pt where DateTime==2024.12.03;
#                '''
##exist_df=s.run(SQL_QUERY)  #当删除部分记录，非删除所有时，谨慎打开此项，会导致内存被库中记录占满。
#s.run(SQL_QUERY)
#s.close()
#exist_df