In [23]:
import pandas as pd
import sqlite3
import datetime as dt
import os
from pathlib import Path
import akshare as ak
import traceback

# global para
dbpath = Path(r'./../db')
logpath = Path(r'./../log')

In [24]:
def get_ak_data():
    """
    通过akshare接口获取数据，数据以kv存储在dict中返回;
    key 为数据库中表名，value 为dataframe
    目前获取的数据为:
    东财实时数据  spot_price_em
    新浪实时数据  spot_price_sina
    """
    res = {}
    res['ODS_STOCK_SPOT_PRICE_EM'] = ak.stock_zh_a_spot_em()  # 东财实时数据
    res['ODS_STOCK_SPOT_PRICE_SINA'] = ak.stock_zh_a_spot()  # 新浪实时数据

    return res

In [None]:
def db_ingest(dict, db_path, if_tab_exists='append', event_dt=True, ingest_tm=True, index=False):
    try:
        tab_suffix = '-' + dt.datetime.now().strftime("%y%m%d")
        conn = sqlite3.connect(db_path)
        for name, df in dict.items():
            if event_dt: df['event_dt'] = dt.datetime.now().strftime("%Y-%m-%d")  # 目前为数据摄入时间，后续需要优化为自动计算上一交易日
            if ingest_tm: df['ingest_tm'] = dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # 数据写入时间
            df.to_sql(name=name + tab_suffix, con=conn, if_exists=if_tab_exists, index=index)
            print("表" + name + "写入成功")
        conn.commit()
        print("事务提交，共提交" + str(len(dict)) + "张表")
        conn.close()
        print("数据库关闭,入库成功")
    except Exception as e:
        print(dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "数据入库异常:{0}".format(e))
        traceback.print_exc()


In [25]:
df_pool = get_ak_data()
print("数据已拉取" + " " + dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))


                                                                                                                       

数据已拉取 2022-06-25 22:44:31




In [28]:
df_em = df_pool['ODS_STOCK_SPOT_PRICE_EM']
df_sina = df_pool['ODS_STOCK_SPOT_PRICE_SINA']

In [31]:
# 数据校验在这里
# 数据量
df_em.head()

# 所有list中的股票数据都有

# 数据饱和度
# ，是否重复


Unnamed: 0,序号,代码,名称,最新价,涨跌幅,涨跌额,成交量,成交额,振幅,最高,...,量比,换手率,市盈率-动态,市净率,总市值,流通市值,涨速,5分钟涨跌,60日涨跌幅,年初至今涨跌幅
0,1,688047,N龙芯,89.07,48.3,29.01,242084.0,2164045000.0,28.75,99.67,...,,77.56,245.16,9.27,35717070000.0,2780231000.0,0.21,-0.64,48.3,48.3
1,2,1316,N润贝,42.05,44.01,12.85,6909.0,28958170.0,24.01,42.05,...,,3.45,44.71,3.48,3364000000.0,841000000.0,0.0,0.0,44.01,44.01
2,3,300731,科创新源,27.76,20.02,4.63,206476.0,530436700.0,17.29,27.76,...,1.25,17.49,-77.76,6.01,3472451000.0,3277488000.0,0.0,0.0,30.33,-23.95
3,4,301149,隆华新材,15.47,20.02,2.58,114953.0,167697100.0,19.86,15.47,...,3.64,17.41,48.12,4.34,6652100000.0,1021450000.0,0.0,0.0,-4.86,-26.68
4,5,688302,海创药业-U,46.06,20.01,7.68,54087.0,240412300.0,19.41,46.06,...,3.62,23.9,-13.12,2.64,4560658000.0,1042506000.0,0.0,0.0,7.32,7.32


In [None]:
# akshare获取数据
    df_pool = get_ak_data()
    print("数据已拉取" + " " + dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    # 数据质量校验
    # 数据饱和度，code重复

    # 数据写入
    db_ingest(df_pool, Path(dbpath, 'ods.db'), 'replace')

In [26]:
# path
dbpath = Path(r'./../db')
sqlpath = Path(r"D:\jupyteLabData\finance\sql")

now = dt.datetime.now()
ts = now.strftime("%Y-%m-%d %H:%M:%S")
tdy = now.strftime('%y%m%d')

# load sql 
def loadSql(path):
    ddl = {}
    dml = {}
    files = os.listdir(path)
    for f in files:
        fpath = Path(path,f)
        if os.path.isfile(fpath):
            filename, extension = os.path.splitext(f)
            if extension == ".sql":
                with open(fpath,'r',encoding='utf-8') as tt:
                    sql = tt.read().format(tdy)
                    tab_name = filename+'_'+tdy
                print("已读取"+f)
                ddl[tab_name] = sql
    return ddl,dml

# # parameter 
# def para_gen():
#     tdy = dt.date.today().strftime('%y%m%d')


2022-06-02 17:30:14
220602


In [29]:
# 收集当天实时数据
# 东财
rt_em = ak.stock_zh_a_spot_em()
# 新浪
rt_sina = ak.stock_zh_a_spot()
print("数据已拉取"+" "+ ts)
rt_em.head()

                                                                                                                       

数据已拉取 2022-06-24 23:18:15




Unnamed: 0,序号,代码,名称,最新价,涨跌幅,涨跌额,成交量,成交额,振幅,最高,...,量比,换手率,市盈率-动态,市净率,总市值,流通市值,涨速,5分钟涨跌,60日涨跌幅,年初至今涨跌幅
0,1,688047,N龙芯,89.07,48.3,29.01,242084.0,2164045000.0,28.75,99.67,...,,77.56,245.16,9.27,35717070000.0,2780231000.0,0.21,-0.64,48.3,48.3
1,2,1316,N润贝,42.05,44.01,12.85,6909.0,28958170.0,24.01,42.05,...,,3.45,44.71,3.48,3364000000.0,841000000.0,0.0,0.0,44.01,44.01
2,3,300731,科创新源,27.76,20.02,4.63,206476.0,530436700.0,17.29,27.76,...,1.25,17.49,-77.76,6.01,3472451000.0,3277488000.0,0.0,0.0,30.33,-23.95
3,4,301149,隆华新材,15.47,20.02,2.58,114953.0,167697100.0,19.86,15.47,...,3.64,17.41,48.12,4.34,6652100000.0,1021450000.0,0.0,0.0,-4.86,-26.68
4,5,688302,海创药业-U,46.06,20.01,7.68,54087.0,240412300.0,19.41,46.06,...,3.62,23.9,-13.12,2.64,4560658000.0,1042506000.0,0.0,0.0,7.32,7.32


In [30]:
# print(rt_sina["代码"].count())
print(rt_em["代码"].count())
print(rt_sina["代码"].count())

5014
4826


In [8]:

rt_em.columns

Index(['序号', '代码', '名称', '最新价', '涨跌幅', '涨跌额', '成交量', '成交额', '振幅', '最高', '最低',
       '今开', '昨收', '量比', '换手率', '市盈率-动态', '市净率', '总市值', '流通市值', '涨速', '5分钟涨跌',
       '60日涨跌幅', '年初至今涨跌幅'],
      dtype='object')

In [16]:
# 数据清洗
em_map = {"代码": "code", "名称": "name", "最新价": "latest_price", "涨跌幅": "change_percent","涨跌额": "change", "成交量": "amount",
          "成交额": "volume", "最高": "high", "最低": "low", "今开": "open", "昨收": "yesterday_close" }
# 数据清洗
em_map1 = {"代码": "code", "名称": "name", "最新价": "price_ltst", "涨跌幅": "chg_pct","涨跌额": "chg", "成交量": "vol",
          "成交额": "amt", "最高": "price_high", "最低": "price_low", "今开": "price_open", "昨收": "price_close_pre", "量比": "vol_ratio", "换手率": "turnover_rt",
          "市盈率-动态": "pe", "市净率": "pb", "总市值": "total_mv", "流通市值": "circ_mv", "60日涨跌幅": "chg_pct_60d", "年初至今涨跌幅": "chg_pct_y"}

# rt_em_pre = rt_em[["代码","名称","最新价","涨跌幅","涨跌额","成交量","成交额","最高","最低","今开","昨收"]]\
#             .rename(columns=em_map)
rt_em_pre = rt_em.rename(columns=em_map1)
print(rt_em_pre.shape)
# print(rt_sina_pre.shape())
rt_em_pre.columns





(5014, 23)


Index(['序号', 'code', 'name', 'price_ltst', 'chg_pct', 'chg', 'vol', 'amt',
       '振幅', 'price_high', 'price_low', 'price_open', 'price_close_pre',
       'vol_ratio', 'turnover_rt', 'pe', 'pb', 'total_mv', 'circ_mv', '涨速',
       '5分钟涨跌', 'chg_pct_60d', 'chg_pct_y'],
      dtype='object')

In [42]:
rt_em_pre1 = rt_em_pre.drop(['序号','振幅','涨速','5分钟涨跌'],axis=1)
rt_em_pre1['event_date'] = datetdy
rt_em_pre1['pk'] = datetdy+'_'+rt_em_pre1['code'] 
rt_em_pre1['price_close'] = rt_em_pre1['price_ltst']
print(rt_em_pre1.shape)
rt_em_pre1.columns
rt_em_pre1.head()


rt_sina_pre = rt_sina.rename(columns=em_map1)

(5014, 22)


In [39]:
# 读取sql
ddl, dml = loadSql(sqlpath)

conn = sqlite3.connect(Path(dbpath,'finance.db'))
print("数据库打开成功")

# 执行ddl,并写入数据
for k,v in ddl.items():
    conn.executescript(v)
    print("执行ddl "+k)
    rt_em_pre.to_sql(name=k,con=conn,if_exists='append',index=False)
    print("插入数据 "+k)
print("ddl全部执行成功")

# 写入当天数据
conn.commit()
print("数据写入成功")

# 数据校验


conn.commit()
conn.close()
print("数据库关闭")

已读取ODS_SPOT_PRICE_EM.sql
数据库打开成功
执行ddl ODS_SPOT_PRICE_EM_220602
插入数据ODS_SPOT_PRICE_EM_220602
ddl全部执行成功
数据写入成功
数据库关闭


In [None]:
conn = sqlite3.connect(Path(dbpath,'finance.db'))
print("数据库打开成功")

# 执行ddl,并写入数据
for k,v in ddl.items():
    conn.executescript(v)
    print("执行ddl "+k)
    rt_em_pre.to_sql(name=k,con=conn,if_exists='append',index=False)
    print("插入数据 "+k)
print("ddl全部执行成功")

# 写入当天数据
conn.commit()
print("数据写入成功")

# 数据校验


conn.commit()
conn.close()
print("数据库关闭")



In [14]:
Path(r'./../db')

WindowsPath('../db')

In [15]:
os.listdir(Path(r'./../db'))

['finance.db', 'id_cache.db', 'id_cache.db-shm', 'id_cache.db-wal']

In [43]:
conn = sqlite3.connect(Path(Path(r'./../db'),'finance.db'))
print("数据库打开成功")

k = r'sina0624'
rt_sina_pre.to_sql(name=k,con=conn,if_exists='append',index=False)
print("插入数据 "+k)

# 写入当天数据
conn.commit()
print("数据写入成功")

# 数据校验


conn.commit()
conn.close()
print("数据库关闭")

数据库打开成功
插入数据 sina0624
数据写入成功
数据库关闭
