In [1]:
import datetime
import math
import os
import struct
import csv
import pandas as pd
import requests

from numba import jit
GlobalSession = requests.session()

from dateutil.parser import parse

import quant_utils.data_moudle as dm
from quant_utils.constant import DB_CONN_LOCAL_MYSQL

def create_table(code):
    query_sql = f"""
        CREATE TABLE IF not exists `{code}` (
        `ID` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
        `code` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL,
        `trade_time` datetime DEFAULT NULL,
        `date` date DEFAULT NULL,
        `tick` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL,
        `open` double(20,6) DEFAULT NULL,
        `high` double(20,6) DEFAULT NULL,
        `low` double(20,6) DEFAULT NULL,
        `close` double(20,6) DEFAULT NULL,
        `turnover` double(20,4) DEFAULT NULL,
        `total_amount` double(20,4) DEFAULT NULL,
        `update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
        PRIMARY KEY (`ID`),
        UNIQUE KEY `datetime` (`trade_time`) USING BTREE
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
    """
    DB_CONN_LOCAL_MYSQL.exec_non_query(query_sql)

In [29]:
file_path = 'E:/行情/指数1分钟/'
file_list = os.listdir(file_path)
for file in file_list:
    code = file.split(".")
    table_name = code[1] + code[0]
    code_name = f"{code[0]}.{code[1]}"
    # if code[0] == '1':
    #     table_name = f"SZ{code}"
    #     code_name = f"{code}.SZ"

    # elif code[0] == '5':
    #     table_name = f"SH{code}"
    #     code_name = f"{code}.SH"
    print(table_name)
    create_table(table_name)
    df = fast_read(file_path + file)
    print("读取完成")
    df.rename(
        columns={
            "vol": "turnover",
            "amount": "total_amount",
        },
        inplace=True
    )
    df["code"] = code_name
    df["date"] = df.trade_time.apply(lambda s: s[:10])
    df["tick"] = df.trade_time.apply(lambda s: s[11:].replace(":", ""))
    print("开始写入")
    DB_CONN_LOCAL_MYSQL.upsert(df, table=table_name)
    print("写入完成")
    print("=="*40)

SH000001
192.20.57.188:1min受影响行数为0行!
读取完成
开始写入
写入192.20.57.188:1min--SH000001-受影响行数为1115040行!
写入完成
==*40
SH000002
192.20.57.188:1min受影响行数为0行!
读取完成
开始写入
写入192.20.57.188:1min--SH000002-受影响行数为1115040行!
写入完成
==*40
SH000003
192.20.57.188:1min受影响行数为0行!
读取完成
开始写入
写入192.20.57.188:1min--SH000003-受影响行数为1115040行!
写入完成
==*40
SH000004
192.20.57.188:1min受影响行数为0行!
读取完成
开始写入
写入192.20.57.188:1min--SH000004-受影响行数为1115040行!
写入完成
==*40
SH000005
192.20.57.188:1min受影响行数为0行!
读取完成
开始写入
写入192.20.57.188:1min--SH000005-受影响行数为1115040行!
写入完成
==*40
SH000006
192.20.57.188:1min受影响行数为0行!
读取完成
开始写入
写入192.20.57.188:1min--SH000006-受影响行数为1115040行!
写入完成
==*40
SH000007
192.20.57.188:1min受影响行数为0行!
读取完成
开始写入
写入192.20.57.188:1min--SH000007-受影响行数为1115040行!
写入完成
==*40
SH000008
192.20.57.188:1min受影响行数为0行!
读取完成
开始写入
写入192.20.57.188:1min--SH000008-受影响行数为1115040行!
写入完成
==*40
SH000009
192.20.57.188:1min受影响行数为0行!
读取完成
开始写入
写入192.20.57.188:1min--SH000009-受影响行数为771360行!
写入完成
==*40
SH000010
192.20.57.188:1min受影响行数为0行!
读取完成
开始写入
写入192.20.

In [17]:
# 优化读取
# @jit(nopython=True)  # Set "nopython" mode for best performance, equivalent to @njit
def fast_read(path):
    na_vals = ["\\N", " ", "", "NULL"]
    df_tmp = []
    count = 0
    reader = pd.read_csv(
        path,  # 文件路径
        sep=",",  # 分割符号
        chunksize=10000000,  # 每次一个快读取的数据量
        encoding="utf-8",  # 编码方式
        low_memory=False,  # 避免内存不足
        quoting=csv.QUOTE_NONE,  # 引用约定。可选值包括csv.QUOTE _ ALL (引用用所有字段）,这里是不引用
        keep_default_na=True,
        na_values=na_vals,
        iterator=True,
    )
    # nrows=10000000, # 只读一千万行)
    # low_memory : boolean, default True
    # 分块加载到内存，再低内存消耗中解析，但是可能出现类型混淆。
    # 确保类型不被混淆需要设置为False，或者使用dtype 参数指定类型。
    # 注意使用chunksize 或者iterator 参数分块读入会将整个文件读入到一个Dataframe，而忽略类型（只能在C解析器中有效）
    for chunk in reader:
        df_tmp.append(chunk)
        del chunk
        # print("the chunk " + str(count) + " has been stored...")
        # print("the mem-cost is now: ", str(sys.getsizeof(df_tmp) / (1)), "MB \n")
        count += 1
    return pd.concat(df_tmp)

In [19]:
a = fast_read("E:/行情/指数1分钟/000001.SH.csv")