# 此文件用于在Part 1提取大型数据部分，因为网络连接不稳定、IP地址被暂时封禁等原因导致数据获取中断、数据无法恢复、必须重启程序的情况。

In [None]:
# Run this document if you raised internet error when crawling the data.
# Note that if no temporary data has been saved, then no need to run this document.

import pandas as pd
import akshare as ak
import re
import datetime
import time

In [None]:
# 初始设置
startDate, endDate = "19950101", "20211231"  # 时间区间

# 检查时间区间是否有效
if int(startDate) > int(endDate) or int(endDate) > datetime.date.today().year * 10000 + \
        datetime.date.today().month * 100 + datetime.date.today().day:
    print("Invalid Time Interval")
    quit()

In [None]:
# 将数据 index 的日期格式转换为 datetime.date
def ParseDate(date) -> datetime.date:
    date = list(map(int, re.findall(pattern="[0-9]+", string=str(date))))
    return datetime.date(date[0], date[1], date[2])

In [None]:
# 加载临时数据
close_df = pd.read_csv("C:\\Users\\asus\\Desktop\\Carhart\\HS300_temp_data\\close_temp.csv",
                       encoding="gbk", index_col="trade_date")
return_df = pd.read_csv("C:\\Users\\asus\\Desktop\\Carhart\\HS300_temp_data\\return_temp.csv",
                        encoding="gbk", index_col="trade_date")
BM_df = pd.read_csv("C:\\Users\\asus\\Desktop\\Carhart\\HS300_temp_data\\BM_temp.csv",
                    encoding="gbk", index_col="trade_date")
MV_df = pd.read_csv("C:\\Users\\asus\\Desktop\\Carhart\\HS300_temp_data\\MV_temp.csv",
                    encoding="gbk", index_col="trade_date")

# 调整数据类型与格式
close_df.index = pd.Series(map(ParseDate, close_df.index), name="trade_date")
return_df.index = pd.Series(map(ParseDate, return_df.index), name="trade_date")
BM_df.index = pd.Series(map(ParseDate, BM_df.index), name="trade_date")
MV_df.index = pd.Series(map(ParseDate, MV_df.index), name="trade_date")

In [None]:
# 从本地加载交易日历
calender = pd.read_csv("C:\\Users\\asus\\Desktop\\Carhart\\HS300_data\\calender.csv")
calender = pd.Series(map(lambda x: datetime.date(int(x[0:4]), int(x[5:7]), int(x[8:])),
                         calender["trade_date"].values), index=list(calender.iloc[:, 0]), name="trade_date")

In [None]:
# 从本地加载 exceptionLst
exceptionLst = pd.read_csv("C:\\Users\\asus\\Desktop\\Carhart\\HS300_temp_data\\exceptionLst.csv")
exceptionLst = list(map(lambda x: (int(re.findall("([0-9]+),", x)[0]), re.findall("\'([0-9]+)\'", x)[0]),
                        exceptionLst.iloc[:, 0]))

In [None]:
# 得到所有沪深300成分股或A股成分股的代码
def GetCodeLst(fromWhat: str) -> list:
    if fromWhat == "HS300":
        hs300_Stocks = pd.read_csv("C:\\Users\\asus\\Desktop\\Carhart\\HS300_data\\hs300_stocks.csv",
                                   encoding="gbk").set_index("code")
        return list(map(lambda x: re.search(pattern="[0-9]+", string=x).group(),
                        list(hs300_Stocks.index)))

    elif fromWhat == "A":
        return list(ak.stock_info_sh_name_code(indicator="主板A股")["代码"]) + \
               list(ak.stock_info_sh_name_code(indicator="科创板")["代码"]) + \
               list(ak.stock_info_sz_name_code(indicator="A股列表")["A股代码"])

# 设置使用HS300指数还是A股 !

In [None]:
codeLst = GetCodeLst(fromWhat="A")

In [None]:
# 用以将日期格式转换为 datetime.date
def ParseDate(date: str) -> datetime.date:
    date = list(map(int, re.findall(pattern="[0-9]+", string=str(date))))
    return datetime.date(date[0], date[1], date[2])

# 从断点处（上一个保存点）处重启。
# 注意此处需要根据先前打印的日志，手动输入what_now的值。

In [None]:
# what_now的取值为上一个保存点的进度 + 1
what_now, failure, maximum_failure_allowed, length = 1201, 0, 3, len(codeLst)
print(f"Data of {length} stocks in total need to be collected, waiting......")
while what_now < length:
    code = codeLst[what_now]

    try:
        # 获取个股自 1995-01-01 至 2021-12-31 的收盘价与涨跌幅(日频)
        this_stock_hist_daily = ak.stock_zh_a_hist(symbol=code, period="daily",
                                                   start_date=startDate, end_date=endDate,
                                                   adjust="hfq")[["日期", "收盘", "涨跌幅"]].set_index("日期")

        # 获取个股自 1995-01-01 至 2021-12-31 的市净率与总市值(日频)
        this_stock_BMMV_daily = \
            ak.stock_a_lg_indicator(symbol=code)[["trade_date", "pb", "total_mv"]].set_index("trade_date")

        # index格式全部转换为datetime
        this_stock_hist_daily.index = map(ParseDate, list(this_stock_hist_daily.index))
        this_stock_BMMV_daily.index = map(ParseDate, list(this_stock_BMMV_daily.index))

        # (获取数据)成功则清零failure
        failure = 0

        # 尝试合并数据
        try:
            close_df[code] = this_stock_hist_daily["收盘"]
            return_df[code] = this_stock_hist_daily["涨跌幅"]
            BM_df[code] = 1 / this_stock_BMMV_daily["pb"]  # 账面市值比BM与市净率互为倒数
            MV_df[code] = this_stock_BMMV_daily["total_mv"]

            # 报告数据获取且合并成功
            print(f"{what_now}/{length}. Data collected and merged for code: {code}")

        except:
            # 报告数据合并失败
            print(f"{what_now}/{length}. Met an unknown error when merging data of code: {code}")
            exceptionLst.append((what_now, code))

        # 每获取100组数据，就及时写入csv临时文件
        if what_now % 100 == 0:
            # 写临时文件
            close_df.to_csv("C:\\Users\\asus\\Desktop\\Carhart\\HS300_temp_data\\close_temp.csv",
                            index=True, header=True)
            return_df.to_csv("C:\\Users\\asus\\Desktop\\Carhart\\HS300_temp_data\\return_temp.csv",
                             index=True, header=True)
            BM_df.to_csv("C:\\Users\\asus\\Desktop\\Carhart\\HS300_temp_data\\BM_temp.csv", index=True,
                         header=True)
            MV_df.to_csv("C:\\Users\\asus\\Desktop\\Carhart\\HS300_temp_data\\MV_temp.csv", index=True,
                         header=True)

            if len(exceptionLst) != 0:
                pd.Series(exceptionLst).to_csv(
                    "C:\\Users\\asus\\Desktop\\Carhart\\HS300_temp_data\\calender.csv", 
                    index=False, header=True)

            # 打印当前备份保存情况
            print(f"Temporary file is saved at: {code}. Position is: {what_now}")

        # 每30组数据暂停45秒
        if what_now % 30 == 0:
            # 暂停45秒
            print("Resuming in 45 seconds......")
            time.sleep(45)

        what_now += 1

    except:
        # 报告数据获取失败，打印当前错误位置或断电，反馈连续错误次数
        failure += 1
        print(f"{what_now}/{length}. Problem encountered at code: {code}. Failure = {failure}")

        if failure > maximum_failure_allowed:
            break  # 错误过多直接退出
        else:
            print(f"Retrying in {60 * failure} seconds......")
            time.sleep(60 * failure)  # 睡眠 1-3 分钟
            continue


# 若 while loop 未曾中断，则写入csv最终文件
else:
    close_df.to_csv("C:\\Users\\asus\\Desktop\\Carhart\\HS300_data\\close.csv", index=True, header=True)
    return_df.to_csv("C:\\Users\\asus\\Desktop\\Carhart\\HS300_data\\return.csv", index=True, header=True)
    BM_df.to_csv("C:\\Users\\asus\\Desktop\\Carhart\\HS300_data\\BM.csv", index=True, header=True)
    MV_df.to_csv("C:\\Users\\asus\\Desktop\\Carhart\\HS300_data\\MV.csv", index=True, header=True)

    # 判断 exceptionLst 是否为空
    if len(exceptionLst) == 0:
        print("All data are collected and merged successfully")
    else:
        print("exceptionLst is not empty: failed to merge some data")