In [2]:
import pandas as pd
import os
import json
import time
import matplotlib.pyplot as plt
import functools

In [3]:
data_root= "./data_root"

In [4]:
def get_dir():
    path = '/home/data/jupyter_root/data_root/3001'
    fullpath = []
    for dirpath,dirnames,filenames in os.walk(path):
        for file in filenames:
                p=os.path.join(dirpath,file)
                fullpath.append(p)
    return fullpath

def trans_root(row_path,period=60): #transfer file path
    path_part = row_path.strip().split('/')
    if path_part[-4]=='LME': #'3001/LME/0/LMCADS03/20180725'
        date = path_part[-1]
        code = path_part[-2]
        market = path_part[-4]
        expiry_date = None
    else:
        date = path_part[-1]
        expiry_date = path_part[-2]
        code = path_part[-3]
        market = path_part[-5]
    if period == 60:
        if expiry_date == None:
            return "{meta}/{market}/{period}/{code}".format(meta=3002, market=market, period=period, code=code),date
        else:
            return "{meta_id}/{market}/{period}/{code}/{YM}".format(meta_id=3002,market=market,period=period,code=code,YM=expiry_date),date
    elif period == 86400:
        if expiry_date == None:
            return "{meta}/{market}/{period}/{code}".format(meta=3002, market=market, period=period, code=code),code
        else:
            return "{meta_id}/{market}/{period}/{code}/{YM}".format(meta_id=3002,market=market,period=period,code=code,YM=expiry_date),expiry_date
    else:
        return None
    
def get_kline(x, last=[1]):
    if x.empty:
        d = {"o": last[0],"h": last[0], "l": last[0], "c":last[0], 'v':0}
    else:
        d = {"o": x.price.iloc[0],"h": x.price.max(), "l": x.price.min(), "c":x.price.iloc[-1], "v":x.vol.sum()}
        last[0] = d['c']
    return pd.Series(d)

def save(key,name,data):
    if key:
        _path = os.path.join(data_root, key)
        if not os.path.exists(_path):
            os.makedirs(_path)
        with open(os.path.join(_path, name),'w') as fp:
            data.to_json(fp, orient='records')
            
def transfer_kline(file_path,key,file_name): #for minute klines
    with open(file_path,'r') as fp:
        data = pd.read_json(fp)
        data.times = pd.DatetimeIndex(data.times) # .tz_localize('Asia/Shanghai')
        data = data.set_index('times')
        try:
            if len(data) == 1:
                kline_data = data.resample('T',closed='right').apply(functools.partial(get_kline,last=[data.iloc[0].price]))
                #kline_data=pd.DataFrame({"o":data_price,"h":data_price,"l":data_price,"c":data_price,'v':0},index=data.index)    
            else:
                kline_data = data.iloc[1:].resample('T',closed='right').apply(functools.partial(get_kline,last=[data.iloc[1].price]))
        except Exception as err:
            print(file_path,'error')
            return None
        kline_data['times'] = kline_data.index
        kline_data = kline_data.reindex(columns=['times','c','h','l','o','v'])
        save(key,file_name,kline_data)
        #print(file_name,'finish')
        return None


In [16]:
def transfer_all_history():
    all_file = get_dir()
    for file in all_file:
        key,file_name = trans_root(file,period=60)
        transfer_kline(file,key,file_name)
    print("all_done")
        
def transfer_daily(count=1): #reansfer recent (count) days data
    all_file = get_dir()
    file_names = set()
    for file in all_file:
        key,file_name = trans_root(file,period=60)
        file_names.add(file_name)
    file_names=sorted(list(file_names))
    date = file_names[-count]
    for file in all_file:
        key,file_name = trans_root(file,period=60)
        if file_name == date:
            transfer_kline(file,key,file_name)
        else:
            continue
    print('all done')

In [6]:
def trans_day_kline(file_path):
    with open(file_path,'r') as f:
        data = pd.read_json(f)
        data.times = pd.DatetimeIndex(data.times)
        data = data.set_index('times')
        data1 = data.resample("D").apply(functools.partial(get_kline, last=[data.iloc[0].price]))  #, closed="right"
        data1['times']=data1.index
        data1 = data1.reindex(columns=['times','c','h','l','o','v'])
        return data1

In [7]:
def trans_daykline_allhistory():
    #transfer_all_history about day_kline
    path = '/home/data/jupyter_root/data_root/3001'
    all_keys = []
    for dirpath,dirname,filenames in os.walk(path):
        all_keys.append(dirpath)
        all_files = []
        df_list = []
        date_list = []
        for file in filenames:
            p = os.path.join(dirpath,file)
            all_files.append(p) #all file
        if all_files == []:
            pass
        else:
            for real_file in sorted(all_files):
                key,file_name = trans_root(real_file,period=86400) 
                try:
                    data1 = trans_day_kline(real_file)
                    df_list.append(data1)
                except Exception as err:
                    print(file,'error')
#         date_list = sorted(date_list)
#         start_date = date_list[0]
#         end_date = date_list[-1]
            data2 = pd.concat(df_list)
            #print(data2,file_name)
            save(data=data2,key=key,name=file_name)
            print(key,'finish')
    print('all done')

In [500]:
trans_daykline_allhistory()

3002/SHFE/86400/AL/201810 finish
3002/SHFE/86400/AL/201811 finish
3002/SHFE/86400/AL/201809 finish
3002/SHFE/86400/AL/201808 finish
3002/SHFE/86400/AL/201812 finish
3002/SHFE/86400/AL/201901 finish
3002/SHFE/86400/AL/201902 finish
3002/SHFE/86400/AL/201904 finish
3002/SHFE/86400/AL/201903 finish
3002/SHFE/86400/AL/201905 finish
3002/SHFE/86400/AL/201907 finish
3002/SHFE/86400/AL/201906 finish
3002/SHFE/86400/AU/201812 finish
3002/SHFE/86400/AU/201906 finish
3002/SHFE/86400/AU/201904 finish
3002/SHFE/86400/AU/201810 finish
3002/SHFE/86400/AU/201809 finish
3002/SHFE/86400/AU/201902 finish
3002/SHFE/86400/AU/201808 finish
3002/SHFE/86400/BU/201906 finish
3002/SHFE/86400/BU/201812 finish
3002/SHFE/86400/BU/201809 finish
3002/SHFE/86400/BU/201901 finish
3002/SHFE/86400/BU/201810 finish
3002/SHFE/86400/BU/201811 finish
3002/SHFE/86400/BU/201903 finish
3002/SHFE/86400/BU/201808 finish
3002/SHFE/86400/CU/201809 finish
3002/SHFE/86400/CU/201812 finish
3002/SHFE/86400/CU/201811 finish
3002/SHFE/

3002/DCE/86400/L/201808 finish
3002/DCE/86400/L/201904 finish
3002/DCE/86400/L/201903 finish
3002/DCE/86400/L/201902 finish
3002/DCE/86400/L/201812 finish
3002/DCE/86400/L/201811 finish
3002/DCE/86400/L/201810 finish
3002/DCE/86400/L/201906 finish
3002/DCE/86400/L/201907 finish
3002/DCE/86400/V/201809 finish
3002/DCE/86400/V/201901 finish
3002/DCE/86400/V/201905 finish
3002/DCE/86400/V/201808 finish
3002/DCE/86400/V/201904 finish
3002/DCE/86400/V/201903 finish
3002/DCE/86400/V/201902 finish
3002/DCE/86400/V/201812 finish
3002/DCE/86400/V/201811 finish
3002/DCE/86400/V/201810 finish
3002/DCE/86400/V/201906 finish
3002/DCE/86400/V/201907 finish
3002/DCE/86400/PP/201809 finish
3002/DCE/86400/PP/201901 finish
3002/DCE/86400/PP/201905 finish
3002/DCE/86400/PP/201808 finish
3002/DCE/86400/PP/201904 finish
3002/DCE/86400/PP/201903 finish
3002/DCE/86400/PP/201902 finish
3002/DCE/86400/PP/201812 finish
3002/DCE/86400/PP/201811 finish
3002/DCE/86400/PP/201810 finish
3002/DCE/86400/PP/201906 fini

In [17]:
def transfer_daykline_daily(): #transfer recent one day(must persist)
    all_file = get_dir()
    file_names = set()
    for file in all_file:
        key,file_name = trans_root(file,period=60)
        file_names.add(file_name)
    file_names=sorted(list(file_names))
    date = file_names[-1] #most recent day
    for file in all_file:
        _key,file_date = trans_root(file,period=60) #解析路径,得到date
        key,file_name = trans_root(file,period=86400) #得到day_kline 路径 
        if file_date == date:
            data_new = trans_day_kline(file) #transfer recentday kline
        else:
            continue
        kline_file = os.path.join(os.path.join(data_root, key),file_name)
        with open(kline_file ,'r') as f:
            data = pd.read_json(f)
            data.times = pd.DatetimeIndex(data.times*1000000) #transfer ?
            recent_day = pd.Timestamp(data.times.iloc[-1])
            if recent_day == pd.Timestamp(date):
                continue
            else:
                data_update = pd.concat([data,data_new])
            
            save(data=data_update,key=key,name=file_name)
            print(key)
    print('all done')

In [10]:
transfer_daykline_daily()

3002/SHFE/86400/AL/201810
3002/SHFE/86400/AL/201811
3002/SHFE/86400/AL/201809
3002/SHFE/86400/AL/201808
3002/SHFE/86400/AL/201812
3002/SHFE/86400/AL/201901
3002/SHFE/86400/AL/201902
3002/SHFE/86400/AL/201904
3002/SHFE/86400/AL/201903
3002/SHFE/86400/AL/201905
3002/SHFE/86400/AL/201907
3002/SHFE/86400/AL/201906
3002/SHFE/86400/AU/201812
3002/SHFE/86400/AU/201906
3002/SHFE/86400/AU/201904
3002/SHFE/86400/AU/201810
3002/SHFE/86400/AU/201809
3002/SHFE/86400/AU/201902
3002/SHFE/86400/AU/201808
3002/SHFE/86400/BU/201906
3002/SHFE/86400/BU/201812
3002/SHFE/86400/BU/201809
3002/SHFE/86400/BU/201901
3002/SHFE/86400/BU/201810
3002/SHFE/86400/BU/201811
3002/SHFE/86400/BU/201903
3002/SHFE/86400/BU/201808
3002/SHFE/86400/CU/201809
3002/SHFE/86400/CU/201812
3002/SHFE/86400/CU/201811
3002/SHFE/86400/CU/201901
3002/SHFE/86400/CU/201904
3002/SHFE/86400/CU/201810
3002/SHFE/86400/CU/201808
3002/SHFE/86400/CU/201902
3002/SHFE/86400/CU/201906
3002/SHFE/86400/CU/201905
3002/SHFE/86400/CU/201903
3002/SHFE/86

3002/ZCE/86400/FG/201906
3002/ZCE/86400/FG/201903
3002/ZCE/86400/FG/201907
3002/ZCE/86400/SM/201809
3002/ZCE/86400/SM/201901
3002/ZCE/86400/SM/201905
3002/ZCE/86400/SM/201808
3002/ZCE/86400/SM/201903
3002/ZCE/86400/SM/201902
3002/ZCE/86400/SM/201812
3002/ZCE/86400/SM/201811
3002/ZCE/86400/SM/201810
3002/ZCE/86400/SM/201906
3002/ZCE/86400/SM/201904
3002/ZCE/86400/SM/201907
3002/ZCE/86400/SF/201901
3002/ZCE/86400/SF/201809
3002/ZCE/86400/SF/201905
3002/ZCE/86400/SF/201810
3002/ZCE/86400/SF/201808
3002/ZCE/86400/SF/201903
3002/ZCE/86400/SF/201902
3002/ZCE/86400/SF/201812
3002/ZCE/86400/SF/201811
3002/ZCE/86400/SF/201906
3002/ZCE/86400/SF/201904
3002/ZCE/86400/SF/201907
3002/ZCE/86400/RI/201811
3002/ZCE/86400/RI/201903
3002/ZCE/86400/RI/201809
3002/ZCE/86400/RI/201905
3002/ZCE/86400/RI/201901
3002/ZCE/86400/RI/201907
3002/ZCE/86400/LR/201903
3002/ZCE/86400/LR/201811
3002/ZCE/86400/LR/201901
3002/ZCE/86400/LR/201809
3002/ZCE/86400/LR/201905
3002/ZCE/86400/LR/201907
3002/ZCE/86400/TA/201811


In [501]:
all_file = get_dir()
file_names = set()
for file in all_file:
    key,file_name = trans_root(file,period=60)
    file_names.add(file_name)
file_names=sorted(list(file_names))
date = pd.Timestamp(file_names[-1]) #grt recent day
 
with open('/home/data/jupyter_root/data_root/3002/DCE/86400/C/201903/201903','r') as f:
    data = pd.read_json(f)
    data.times = pd.DatetimeIndex(data.times*1000000)
    recent_day = data.times.iloc[-1]
    print(pd.Timestamp(recent_day))
    
#print(pd.Timestamp(date) == pd.Timestamp(recent_day))

2018-08-06 00:00:00


In [9]:
transfer_daily()

all done


In [None]:
data1=data.iloc[1:].resample("T", closed="right").apply(functools.partial(get_kline, last=[data.iloc[1].price])) 
data1

In [318]:
path = '/home/data/jupyter_root/data_root/3001/LME/0/LMCADS03/20180725'
with open(path,'r') as f:
    key,filename = trans_root(path,period=60)
    data = pd.read_json(f)
    data.times = pd.DatetimeIndex(data.times) #.tz_localize("Asia/Shanghai")# "America/New_York" "Asia/Shanghai"
    data = data.set_index('times')
    #data1=data.iloc[1:].resample("D", closed="right").apply(functools.partial(get_kline, last=[data.iloc[1].price])) 
    data1=data.resample("D", closed="right").apply(functools.partial(get_kline, last=[data.iloc[0].price])) 
    data1['times']=data1.index
    data1 = data1.reindex(columns=['times','c','h','l','o','v'])

data1

Unnamed: 0_level_0,times,c,h,l,o,v
times,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2018-07-25,2018-07-25,6305.5,6331.0,6241.0,6269.0,13951.0


In [208]:
with open('./data_root/3001/SHFE/0/AL/201809/20180801','r') as f:
    data = pd.read_json(f)
    data.times = pd.DatetimeIndex(data.times).tz_localize("Asia/Shanghai")# "America/New_York"
#     #data.times = pd.DatetimeIndex(([time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(t[:10]))) for t in ts]))
    data = data.set_index('times')
import functools
data1=data.iloc[1:].resample("D", closed="right").apply(functools.partial(get_kline, last=[data.iloc[1].price]))   # "T":minute
data1.to_json('/home/data/jupyter_root/data_root/3002/tt')

In [353]:
path_new = '/home/data/jupyter_root/data_root/3001/SHFE/0/AL/201810'
for dirpath,dirnames,filenames in os.walk(path_new):
    l = []
    l_date = []
    df_list = []
    for file in filenames:
        real_path=os.path.join(dirpath,file)
        key,date = trans_root(real_path,period=86400)
        l_date.append(date)
        l.append(real_path)
    for file in sorted(l):
        with open(file,'r') as fp:
            data = pd.read_json(fp)
            data.times = pd.DatetimeIndex(data.times)
            data = data.set_index('times')
            data1 = data.resample("D", closed="right").apply(functools.partial(get_kline, last=[data.iloc[0].price])) 
            data1['times']=data1.index
            data1 = data1.reindex(columns=['times','c','h','l','o','v'])
            df_list.append(data1)
    l_date = sorted(l_date)
    start_date = l_date[0]
    end_date = l_date[-1]
    #print(l_date)
    key = '3002/SHFE/86400/AL/201810'
    file_name = start_date + 'to'+end_date
    print(file_name)
    data2 = pd.concat(df_list)
    print(data2)
    save(data=data2,key=key,name=file_name)

20180725to20180803
                times      c      h      l      o      v
times                                                   
2018-07-25 2018-07-25  14470  14490  14420  14475  38732
2018-07-26 2018-07-26  14390  14425  14360  14390  37922
2018-07-27 2018-07-27  14445  14505  14410  14440  55682
2018-07-30 2018-07-30  14490  14530  14460  14470  54120
2018-07-31 2018-07-31  14575  14650  14530  14550  62132
2018-08-01 2018-08-01  14580  14615  14500  14585  62562
2018-08-02 2018-08-02  14405  14475  14315  14445  95104
2018-08-03 2018-08-03  14465  14530  14430  14450  81886


In [416]:
path_new = '/home/data/jupyter_root/data_root/3001/X/0/XUC/201808/20180726'
data1 = trans_day_kline(path_new)
print(data1)

                times       c       h       l       o       v
times                                                        
2018-07-26 2018-07-26  2587.5  2624.5  2560.0  2597.0  7558.0


In [438]:
with open('/home/data/jupyter_root/data_root/3002/DCE/86400/C/201903/201903','r') as f:
    data = pd.read_json(f)
    data.times = pd.DatetimeIndex(data.times*1000000)
    print(data.times)
# path_new = '/home/data/jupyter_root/data_root/3001/LME/0/LMCADS03/20180725'
# data1 = trans_day_kline(path_new)
# print(data1)
# data2 = pd.concat([data,data1])
# print(data2)
# key ='3002/DCE/86400/C/201903'
# name = 'test'
# save(data=data2,key=key,name=name

0   2018-07-26
1   2018-07-26
2   2018-07-26
Name: times, dtype: datetime64[ns]


A year
M month
W week
D day
H hour
T minute
S second
pandas resample params

In [135]:
data.resample("T")

DatetimeIndexResampler [freq=<Minute>, axis=0, closed=left, label=left, convention=start, base=0]