In [1]:
import os
from pathlib import Path

NOTEBOOK_PATH: Path = Path(os.path.abspath(''))

# print(f'当前运行在：{NOTEBOOK_PATH} 目录下。')

PATH_FOR_RESULT: Path = NOTEBOOK_PATH.joinpath('RESULT')

if not PATH_FOR_RESULT.exists():
    PATH_FOR_RESULT.mkdir()

# 获取上海期货交易所（上期所）的历史数据

## 观察数据来源规律

在[上期所官网](http://www.shfe.com.cn/)点击顶部菜单（如下图中红框所示）：

![上期所历史数据获取1](attachment/shfe_data_1.png)

然后在出现的页面中，更换下拉框中的年份并点击按钮（如下图中红框所示）。

![上期所历史数据获取2](attachment/shfe_data_2.png)

多试几次，可以发现历史数据的url是：

`
  http://www.shfe.com.cn/historyData/MarketData_Year_XXXX.zip
`

其中 XXXX 是四位数的年份数字。


## 下载上期所历史数据

于是我们可以用以下代码来获得上期所历史数据：

In [2]:
from pathlib import Path
import datetime as dt

import requests


def download_shfe_history_data(save_path: Path, year: int) -> None:
    """
    下载上海期货交易所（上期所，SHFE）的历史数据。
    :param save_path: 保存的位置。
    :param year: 需要下载数据的年份。
    :return: None.
    """

    # 上期所历史数据 url.
    url: str = 'http://www.shfe.com.cn/historyData/MarketData_Year_{year:4d}.zip'

    # 上期所历史数据从 2009 年开始提供。
    start_year: int = 2009
    
    # 如果参数 <year> 小于 2009 或者大于当前年份，引发异常。
    if year < start_year:
        raise ValueError(f'上期所历史数据自{start_year:4d}年起提供。')
    if year > dt.date.today().year:
        raise ValueError(f'{year}年是未来日期。')

    # 如果参数 <save_path> 不存在，引发异常。
    if not save_path.exists():
        raise FileNotFoundError(f'目录 {save_path} 不存在。')
    
    # 下载。
    response = requests.get(url.format(year=year))
    
    # 如果下载不顺利，引发异常。
    if response.status_code != 200:
        raise requests.exceptions.HTTPError(f'下载 <{url.format(year=year)}> 时发生错误。')
    
    # 保存文件。
    with open(save_path.joinpath(f'SHFE_{year:4d}.zip'), 'wb') as f:
        f.write(response.content)

# 测试。
download_shfe_history_data(save_path=PATH_FOR_RESULT, year=2020)

你应该可以在这个 Jupyter Notebook 所在的文件夹下的 RESULT/ 文件夹中看到名为 <SHFE_2020.zip> 文件。

我们可以自动化下载从2009年（上期所开始提供数据的年份）到现在的所有历史数据：

In [3]:
def download_all_shfe_history_data(save_path: Path) -> None:
    """
    下载上海期货交易所（上期所，SHFE）的全部历史数据。
    :param save_path: 保存的位置。
    :return: None.
    """
    start_year: int = 2009
    this_year: int = dt.date.today().year
    for year in range(start_year, this_year + 1):
        download_shfe_history_data(save_path=save_path, year=year)

# 测试。
download_all_shfe_history_data(save_path=PATH_FOR_RESULT)

## 解压缩文件

把中金所章节的解压缩程序复制过来：

In [4]:
from typing import List
import zipfile

def unzip_file(zipped_file: Path, unzip_path: Path, keep: bool = False) -> List[Path]:
    """
    解压单个 zip 文件。
    Unzip a zip file to ten temporary directory defined in <CONFIGS>, and return the unzipped file path.
    :param zipped_file:   Path，待解压文件的路径。
    :param unzip_path: Path，保存被解压出来的文件的路径。
    :param keep:       bool，保留待解压文件的话，取值 True，否则解压完成后删除待解压文件。
    :return: list，被解压出来的文件的路径（Path）的列表。
    """
    
    # 如果参数 <zip_file> 不存在，引发异常。
    if not zipped_file.exists():
        raise FileNotFoundError(f'<{zipped_file}> 不存在。')

    # 如果参数 <unzip_path> 不存在，引发异常。
    if not unzip_path.exists():
        raise FileNotFoundError(f'<{unzip_path}> 不存在。')

    # 用 zipfile 模块的 ZipFile 打开待解压文件。
    zip_file = zipfile.ZipFile(zipped_file, 'r')
    
    # 生成解压文件列表
    result: List[Path] = [unzip_path.joinpath(filename) for filename in zip_file.namelist()]
    
    # 解压文件。
    zip_file.extractall(unzip_path)
    
    # 关闭文件。
    zip_file.close()
    
    # 删除待解压文件
    if not keep:
        zipped_file.unlink()
    
    # 返回解压出来的文件列表
    return result

# 测试

# 测试用待解压文件
# 如果你的 RESULT\ 文件夹没有 SHFE_2020.zip 文件，去上期所下载一个。
# 其实只要是个 zip 文件就行，可能需要修改你的 zip 文件的名字，或者下面代码中的文件名。
test_zipped_file: Path = PATH_FOR_RESULT.joinpath('SHFE_2020.zip')

# 测试用解压路径
test_unzip_path: Path = PATH_FOR_RESULT.joinpath('unzipped')

# 如果解压路径不存在，创建它。
if not test_unzip_path.exists():
    test_unzip_path.mkdir()

# 测试
unzip_file(
    zipped_file=test_zipped_file,
    unzip_path=test_unzip_path,
    keep=True
)

[WindowsPath('D:/Development/Jupyter/InvestmentNotebook/notebook/collect_data/RESULT/unzipped/╦∙─┌║╧╘╝╨╨╟Θ▒¿▒φ2020.1.xls'),
 WindowsPath('D:/Development/Jupyter/InvestmentNotebook/notebook/collect_data/RESULT/unzipped/╦∙─┌║╧╘╝╨╨╟Θ▒¿▒φ2020.2.xls'),
 WindowsPath('D:/Development/Jupyter/InvestmentNotebook/notebook/collect_data/RESULT/unzipped/╦∙─┌║╧╘╝╨╨╟Θ▒¿▒φ2020.3.xls'),
 WindowsPath('D:/Development/Jupyter/InvestmentNotebook/notebook/collect_data/RESULT/unzipped/╦∙─┌║╧╘╝╨╨╟Θ▒¿▒φ2020.4.xls'),
 WindowsPath('D:/Development/Jupyter/InvestmentNotebook/notebook/collect_data/RESULT/unzipped/╦∙─┌║╧╘╝╨╨╟Θ▒¿▒φ2020.5.xls'),
 WindowsPath('D:/Development/Jupyter/InvestmentNotebook/notebook/collect_data/RESULT/unzipped/╦∙─┌║╧╘╝╨╨╟Θ▒¿▒φ2020.6.xls'),
 WindowsPath('D:/Development/Jupyter/InvestmentNotebook/notebook/collect_data/RESULT/unzipped/╦∙─┌║╧╘╝╨╨╟Θ▒¿▒φ2020.7.xls'),
 WindowsPath('D:/Development/Jupyter/InvestmentNotebook/notebook/collect_data/RESULT/unzipped/╦∙─┌║╧╘╝╨╨╟Θ▒¿▒φ2020.8.xls'),
 Windows

震惊！这一摊是啥！！！

好吧，但凡遇到乱码问题，基本都是上古时期遗留下来的。根据维基百科，zip 格式发布于 1989 年 1 月。现行的 Unicode （通用字符集）在 1991 年 10 月发布第一版，支持中日韩字符的第二版于 1992 年 6 月发布。

因此 zip 文件的编码应该是 MS-DOS 编码，或者称为 IBM437。在 Python 中命名为 cp437（Code Page 437，代码页 437）。其实是有好几个英文代码页的，逐一尝试最终确认cp437可用。

我们需要做一个转换。

In [5]:
def unzip_file(zipped_file: Path, unzip_path: Path, keep: bool = False) -> List[Path]:
    """
    解压单个 zip 文件，可以正确转换字符编码。
    :param zipped_file:   Path，待解压文件的路径。
    :param unzip_path: Path，保存被解压出来的文件的路径。
    :param keep:       bool，保留待解压文件的话，取值 True，否则解压完成后删除待解压文件。
    :return: list，被解压出来的文件的路径（Path）的列表。
    """
    
    # 如果参数 <zip_file> 不存在，引发异常。
    if not zipped_file.exists():
        raise FileNotFoundError(f'<{zipped_file}> 不存在。')

    # 如果参数 <unzip_path> 不存在，引发异常。
    if not unzip_path.exists():
        raise FileNotFoundError(f'<{unzip_path}> 不存在。')

    # 用 zipfile 模块的 ZipFile 打开待解压文件。
    zip_file = zipfile.ZipFile(zipped_file, 'r')
    
    # 生成解压文件列表，这里的 filename 编码是不正确的。
    origin: List[Path] = [unzip_path.joinpath(filename) for filename in zip_file.namelist()]
    
    # 编码正确的文件列表
    result: List[Path] = [unzip_path.joinpath(filename.encode('cp437').decode('gbk')) for filename in zip_file.namelist()]
    
    # 解压文件。
    zip_file.extractall(unzip_path)
    
    # 重命名解压出来的文件。
    for i in range(len(origin)):
        origin[i].rename(result[i])
    
    # 删除待解压文件
    if not keep:
        zipped_file.unlink()
    
    # 返回解压出来的文件列表
    return result

# 测试

# 换一个测试用的待解压文件。
test_zipped_file: Path = PATH_FOR_RESULT.joinpath('SHFE_2019.zip')

# 测试用解压路径
test_unzip_path: Path = PATH_FOR_RESULT.joinpath('temp')
# 如果解压路径不存在，创建它。
if not test_unzip_path.exists():
    test_unzip_path.mkdir()

# 测试
unzip_file(
    zipped_file=test_zipped_file,
    unzip_path=test_unzip_path,
    keep=True
)

[WindowsPath('D:/Development/Jupyter/InvestmentNotebook/notebook/collect_data/RESULT/temp/所内合约行情报表2019.3.xls'),
 WindowsPath('D:/Development/Jupyter/InvestmentNotebook/notebook/collect_data/RESULT/temp/所内合约行情报表2019.4.xls'),
 WindowsPath('D:/Development/Jupyter/InvestmentNotebook/notebook/collect_data/RESULT/temp/所内合约行情报表2019.5.xls'),
 WindowsPath('D:/Development/Jupyter/InvestmentNotebook/notebook/collect_data/RESULT/temp/所内合约行情报表2019.1.xls'),
 WindowsPath('D:/Development/Jupyter/InvestmentNotebook/notebook/collect_data/RESULT/temp/所内合约行情报表2019.2.xls'),
 WindowsPath('D:/Development/Jupyter/InvestmentNotebook/notebook/collect_data/RESULT/temp/所内合约行情报表2019.6.xls'),
 WindowsPath('D:/Development/Jupyter/InvestmentNotebook/notebook/collect_data/RESULT/temp/所内合约行情报表2019.7.xls'),
 WindowsPath('D:/Development/Jupyter/InvestmentNotebook/notebook/collect_data/RESULT/temp/所内合约行情报表2019.8.xls'),
 WindowsPath('D:/Development/Jupyter/InvestmentNotebook/notebook/collect_data/RESULT/temp/所内合约行情报表2019.9

## 写入数据库