In [1]:
import import_ipynb
import oss2
from setting import SETTINGS
import pickle
import pandas as pd
import os
import io
from dateutil import parser
import datetime
import pandas as pd
from fastparquet import write
from pathlib import  Path
from oss_handler import OssClient
import sys, os
import numpy as np
from tqdm import tqdm
from matplotlib import pyplot as plt
from pylab import mpl
import seaborn as sns
from sklearn.linear_model import LinearRegression
import datetime
from datetime import datetime, timedelta
from oss2.exceptions import NoSuchKey
import warnings

warnings.filterwarnings('ignore')

AccessKeyId = SETTINGS["oss.accesskey"]
AccessKeySecret = SETTINGS["oss.secret"]
BucketName = SETTINGS["oss.bucketname"]
Endpoint = SETTINGS["oss.endpoint"]


class newBytes(io.BytesIO):
    def close(self):
        pass


class OssClient(object):
    __instance = None
    __first_init = False

    # 单例模式
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super().__new__(cls)
        return cls.__instance

    def __init__(self):
        cls = self.__class__
        if not cls.__first_init:
            self.auth = oss2.Auth(AccessKeyId, AccessKeySecret)
            self.bucket = oss2.Bucket(self.auth, Endpoint, BucketName)
            cls.__first_init = True


    def upload_file_from_fileobj(self, object_name, local_file_path):
        """
            upload_file_from_fileobj方法：上传文件对象到oss存储空间, 该方法可用于我们从上游服务接收了图片参数，然后以二进制形式读文件，上传到oss存储空间指定位置（abc/efg/00），
        当然也可以将本地文件上传到oss我们的bucket. 其中fileobj不止可以是文件对象，也可以是本地文件路径。 put_object方法底层仍是RESTful API的调用，可以指定headers，规定Content-Type等内容
        """
        # 判断bucket中文件是否存在，也可以不判断，会上传更新
        #exist = self.bucket.object_exists(object_name) #<yourObjectName>
        #if exist:
        #    return True
        with open(local_file_path, 'rb') as fileobj:
            result = self.bucket.put_object(object_name, fileobj) #<yourObjectName>
        if result.status == 200:
            return True
        else:
            return False

    def upload_pickle_data(self,df, target_path, *args, report_date=None):
        if isinstance(report_date, datetime.date):
            d = report_date.strftime("%Y-%m-%d")
        if isinstance(report_date, str):
            d = parser.parse(report_date).strftime("%Y-%m-%d")
        if args:
            d = [arg for arg in args][0]
        pickle_buffer = io.BytesIO()
        pickle.dump(df, pickle_buffer)
        target_file_key = os.path.join(target_path, '{}.pkl'.format(d)).replace("\\","/")
        result = self.bucket.put_object(target_file_key, pickle_buffer.getvalue())
        if result.status == 200:
            return True
        else:
            return False


    def list_files(self,prefix = None):
        res = []
        for object_info in oss2.ObjectIterator(self.bucket,prefix):
            print(object_info.key)
            res.append(object_info.key)
        return res

    def upload_parquet_data(self,df, target_path, *args, report_date=None):
        if isinstance(report_date, datetime.date):
            d = report_date.strftime("%Y-%m-%d")
        if isinstance(report_date, str):
            d = parser.parse(report_date).strftime("%Y-%m-%d")
        if args:
            d = [arg for arg in args][0]
        target_file_key = os.path.join(target_path, '{}.parquet'.format(d)).replace("\\","/")
        mem_buffer = newBytes()
        df.to_parquet('noname', engine='fastparquet', open_with=lambda x, y: mem_buffer)
        result = self.bucket.put_object(target_file_key, mem_buffer.getvalue())
        #f = Path(os.getcwd())/'tmp.parquet'
        #write(f, df)
        #with open(f, 'rb') as fileobj:
        #    result = self.bucket.put_object(target_file_key, fileobj)
        #os.remove(f)
        if result.status == 200:
            return True
        else:
            return False

    def save_data_to_pickle(self,df, file_dir_path, *args,report_date=None):
        if isinstance(report_date, datetime.date):
            d = report_date.strftime("%Y-%m-%d")
        if isinstance(report_date, str):
            d = parser.parse(report_date).strftime("%Y-%m-%d")
        if args:
            d = [arg for arg in args][0]
        target_file_key = os.path.join(file_dir_path, '{}.pkl'.format(d))
        with open(target_file_key, 'wb') as f:
            pickle.dump(df, f)

    def save_data_to_parquet(self,df, file_dir_path, *args,report_date=None):
        if isinstance(report_date, datetime.date):
            d = report_date.strftime("%Y-%m-%d")
        if isinstance(report_date, str):
            d = parser.parse(report_date).strftime("%Y-%m-%d")
        if args:
            d = [arg for arg in args][0]
        target_file_key = os.path.join(file_dir_path, f'{d}.parquet')
        df.to_parquet(target_file_key)


    def read_oss_pickle_file(self,object_name):
        """
            download_file_to_fileobj：下载文件到文件流对象。由于get_object接口返回的是一个stream流，需要执行read()后才能计算出返回Object数据的CRC checksum，因此需要在调用该接口后做CRC校验。
        """
        object_stream = self.bucket.get_object(object_name) #<yourObjectName>
        result = object_stream.read()
        if object_stream.client_crc != object_stream.server_crc:
            print("The CRC checksum between client and server is inconsistent!")
            result = None
        return pickle.loads(result)

    def read_oss_parquet_file(self,object_name):
        """
            download_file_to_fileobj：下载文件到文件流对象。由于get_object接口返回的是一个stream流，需要执行read()后才能计算出返回Object数据的CRC checksum，因此需要在调用该接口后做CRC校验。
        """
        object_stream = self.bucket.get_object(object_name) #<yourObjectName>
        result = object_stream.read()
        i = io.BytesIO(result)
        if object_stream.client_crc != object_stream.server_crc:
            print("The CRC checksum between client and server is inconsistent!")
            result = None
        return pd.read_parquet(i)


    def download_file_to_loaclfilepath(self, object_name, local_file_path):
        """
            download_file_to_loaclfilepath：下载文件到本地路径。get_object和get_object_to_file的区别是前者是获取文件流实例，可用于代码处理和远程调用参赛。后者是存储到本地路径，返回的是一个http状态的json结果
        """
        result = self.bucket.get_object_to_file(object_name, local_file_path) # ('<yourObjectName>', '<yourLocalFile>')
        if result.status == 200:
            return True
        else:
            return False

    def generate_temporary_download_url(self,object_name):
        """
            generate_temporary_download_url: 生成加签的临时URL以供授信用户下载。一般在实际业务中，我们是提供给调用方一个临时下载链接，来让其获取文件数据，而不是直接使用以上暴露AccessKeyId和AccessKeySecret的方法。
            因此一般我们会存储某条数据oss的路径（<yourObjectName>）与调用方某个唯一标识的对应关系（如手机号身份证号），在调用方请求时，通过该标识获取其数据的oss文件路径（<yourObjectName>），
            然后制定过期时间，为其生成临时下载链接
            http://bucketname.oss-ap-south-1.aliyuncs.com/abc/efg/0?OSSAccessKeyId=LTA************oN9&Expires=1604638842&Signature=tPgvWz*************Uk%3D
        """
        res_temporary_url = self.bucket.sign_url('GET', object_name, 60, slash_safe=True)
        return res_temporary_url

oss_client = OssClient()

importing Jupyter notebook from setting.ipynb
importing Jupyter notebook from oss_handler.ipynb
LTAI5tN7V51xtxUmaKxmRsWo rmqsMszRtvurcckVMD9M14riQYshi6 2nd-data oss-cn-shenzhen.aliyuncs.com


In [2]:
#导入数据 
def k1min_data_process(date_str):
    k1min= oss_client.read_oss_parquet_file(f'ad_hoc_prod/1min_cs/{date_str}.parquet')
    k1min = k1min.sort_values(by=["RIC", "TIME"])
    K1min = k1min.reset_index(drop=True)
    k1min['RETURN'] = k1min['CLOSE']/k1min['OPEN'] - 1
    k1min = k1min.rename(columns={'RIC': 'ticker', 'DATE': 'date'})
    return k1min

In [3]:
#尾盘半小时动量
def mmt_last30min(single_day_data):
    
    ### 公式函数
    df = single_day_data.groupby('ticker').tail(31)
    df = df.groupby(['ticker', 'date'])['CLOSE'].apply(lambda x: x.iloc[-1] / x.iloc[0] - 1)
    ### 公式函数
    
    df = pd.DataFrame(df)
    df = df.reset_index()
    
    df = df.pivot(index='date', columns='ticker', values='CLOSE')
    return df

In [4]:
single_day_data = k1min_data_process(date)

In [5]:
cal_factor = mmt_last30min(single_day_data)

In [6]:
cal_factor

ticker,000001,000002,000004,000005,000006,000007,000008,000009,000010,000011,...,603986,603987,603988,603989,603990,603993,603996,603997,603998,603999
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2017-01-03,0.001093,0.000483,-0.001348,0.0,0.001035,0.003846,0.001072,-0.005703,0.005161,0.004663,...,0.0,0.002712,0.012089,-0.002942,-0.00915,0.002674,0.000988,0.004073,-0.00167,0.008389


In [9]:
start_date = '2017-01-03'
end_date = '2025-01-04'
date_range = pd.date_range(start=start_date, end=end_date, freq='D')

#尾盘半小时动量
def mmt_last30min(single_day_data):
    
    ### 公式函数
    df = single_day_data.groupby('ticker').tail(31)
    df = df.groupby(['ticker', 'date'])['CLOSE'].apply(lambda x: x.iloc[-1] / x.iloc[0] - 1)
    ### 公式函数
    
    df = pd.DataFrame(df)
    df = df.reset_index()
    
    df = df.pivot(index='date', columns='ticker', values='CLOSE')
    return df
final_df = pd.DataFrame()

for date in date_range:
    date_str = date.date().strftime('%Y-%m-%d')
 
    try:
        single_day_data = k1min_data_process(date_str)

        ### 公式函数
        cal_factor = mmt_last30min(single_day_data)
        ### 公式函数

        final_df = pd.concat([final_df, cal_factor])
        
    except NoSuchKey:
        # 捕获 NoSuchKey 异常并跳过该日期
        print(f"File for date {date_str} does not exist. Skipping...")
        continue
        
final_df = -1*final_df 
final_df.to_pickle('D:/redata/factor/mmt_last30min@factor.pkl')

File for date 2017-01-07 does not exist. Skipping...
File for date 2017-01-08 does not exist. Skipping...
File for date 2017-01-14 does not exist. Skipping...
File for date 2017-01-15 does not exist. Skipping...
File for date 2017-01-21 does not exist. Skipping...
File for date 2017-01-22 does not exist. Skipping...
File for date 2017-01-27 does not exist. Skipping...
File for date 2017-01-28 does not exist. Skipping...
File for date 2017-01-29 does not exist. Skipping...
File for date 2017-01-30 does not exist. Skipping...
File for date 2017-01-31 does not exist. Skipping...
File for date 2017-02-01 does not exist. Skipping...
File for date 2017-02-02 does not exist. Skipping...
File for date 2017-02-04 does not exist. Skipping...
File for date 2017-02-05 does not exist. Skipping...
File for date 2017-02-11 does not exist. Skipping...
File for date 2017-02-12 does not exist. Skipping...
File for date 2017-02-18 does not exist. Skipping...
File for date 2017-02-19 does not exist. Skipp

KeyboardInterrupt: 

In [32]:
# 最大20vulume的mmt
def mmt_top20_volume(single_day_data):

    ### 公式函数
    df = (
        single_day_data.assign(vol_rank=lambda x: x.groupby(['ticker', 'date'])['VOL']
        .transform(lambda s: s.rank(method='first', ascending=False))).query('vol_rank <= 20')
        .groupby(['ticker', 'date'])['RETURN']
        .sum()
        .reset_index()
        .drop(columns=['vol_rank'], errors='ignore')
    )
    ### 公式函数
    df = pd.DataFrame(df)
    df = df.reset_index()
    df = df.rename(columns={df.columns[-1]: 'factor'})
    df = df.pivot(index='date', columns='ticker', values='factor')
    return df
    
##多天   
start_date = '2020-01-03'
end_date = '2025-01-04'
date_range = pd.date_range(start=start_date, end=end_date, freq='D')

final_df = pd.DataFrame()

for date in date_range:
    date_str = date.date().strftime('%Y-%m-%d')
 
    try:
        single_day_data = k1min_data_process(date_str)

        ### 公式函数
        cal_factor = mmt_top20_volume(single_day_data)
        ### 公式函数

        final_df = pd.concat([final_df, cal_factor])
        
    except NoSuchKey:
        # 捕获 NoSuchKey 异常并跳过该日期
        print(f"File for date {date_str} does not exist. Skipping...")
        continue
        
final_df = -1*final_df 
final_df.to_pickle('D:/redata/factor/mmt_last30min@factor.pkl') #如果错过了读这个文件先

File for date 2020-01-04 does not exist. Skipping...
File for date 2020-01-05 does not exist. Skipping...
File for date 2020-01-11 does not exist. Skipping...
File for date 2020-01-12 does not exist. Skipping...
File for date 2020-01-18 does not exist. Skipping...
File for date 2020-01-19 does not exist. Skipping...
File for date 2020-01-24 does not exist. Skipping...
File for date 2020-01-25 does not exist. Skipping...
File for date 2020-01-26 does not exist. Skipping...
File for date 2020-01-27 does not exist. Skipping...
File for date 2020-01-28 does not exist. Skipping...
File for date 2020-01-29 does not exist. Skipping...
File for date 2020-01-30 does not exist. Skipping...
File for date 2020-01-31 does not exist. Skipping...
File for date 2020-02-01 does not exist. Skipping...
File for date 2020-02-02 does not exist. Skipping...
File for date 2020-02-08 does not exist. Skipping...
File for date 2020-02-09 does not exist. Skipping...
File for date 2020-02-15 does not exist. Skipp

In [33]:
final_df.to_pickle('D:/redata/factor/mmt_top20_volume@factor.pkl')

In [None]:
mmt_last30min_today = mmt_last30min(k1min)
mmt_last30min_full_data = pd.read_pickle('D:/redata/factor/mmt_last30min@factor.pkl')
mmt_last30min_full_data =  pd.concat([mmt_last30min_full_data, mmt_last30min_today])
mmt_last30min_full_data.to_pickle('D:/redata/factor/mmt_last30min@factor.pkl')

mmt_last30min_std = -1 * mmt_last30min_full_data.rolling(20).std()
mmt_last30min_std.to_pickle('D:/redata/factor/mmt_last30min_std@factor.pkl')