In [110]:
# !pip install jupyter_contrib_nbextensions
# !jupyter contrib nbextension install --user
# !jupyter nbextension enable execute_time/ExecuteTime

In [111]:
import glob
import pandas as pd
import numpy as np

import os
import time

import json
from tqdm import tqdm
import os

from pprint import pprint
import socket

from IPython.display import display

In [112]:
# remove warnings

import warnings

warnings.filterwarnings("ignore")


In [113]:
# config

preload_version = "djia_stock_v1"
version = "djia_stock_v1"
preload_raw = True

N_WORKER = 20

data_dir = "../DeepTrader/data/DJIA/djia_stocks"
data_output = "/home/ted/data"

# loaded pickle
data_path = "{}/raw/{}/{}.pkl"

# feature config
n = 120
moms = [1, 3, 6, 9] + list(range(30, n + 30, 30))


config = dict(
    period_int=24,
    target_period_int=24,
    period="24h",
    target_period_list=[24],
    mom_list=moms,
    tv_nums=[1, 3, 7, 30, 60, 90],
    feature_dir=f"{data_output}/feature/{version}",
    std_period="24h",
    std_terms=[7, 30, 60, 90, 180, 360],
    cut_off_date=None,
    end_cut_off_date=None,
)


def mkdir_if_not_exists(p):
    if not os.path.exists(p):
        os.makedirs(p)

assert min(config["target_period_list"]) >= config["period_int"]


In [114]:
pprint(config)

{'cut_off_date': None,
 'end_cut_off_date': None,
 'feature_dir': '/home/ted/data/feature/djia_stock_v1',
 'mom_list': [1, 3, 6, 9, 30, 60, 90, 120],
 'period': '24h',
 'period_int': 24,
 'std_period': '24h',
 'std_terms': [7, 30, 60, 90, 180, 360],
 'target_period_int': 24,
 'target_period_list': [24],
 'tv_nums': [1, 3, 7, 30, 60, 90]}


## mutiple tasks

In [115]:
from multiprocessing import Pool
from functools import partial
import inspect


def parallal_task(func, iterable, *params):
    with open(f"./tmp_func.py", "w") as file:
        file.write(inspect.getsource(func).replace(func.__name__, "task"))
    
    import importlib as imp
    import sys
    import tmp_func
    
    imp.reload(tmp_func)
    
    if __name__ == "__main__":
        pool = Pool(processes=N_WORKER)
        res = pool.starmap(tmp_func.task, iterable)
        pool.close()
        return res
    else:
        raise "Not in Jupyter Notebook"

## get files path

In [116]:
path_dict = {}

for filepath in glob.iglob(f"{data_dir}/*.csv"):
    name = filepath.split("/")[-1]
    name = name.split(".")[0]
    if name not in path_dict:
        path_dict[name] = [filepath]
    else:
        path_dict[name].append(filepath)

In [117]:
len(path_dict)

30

## pre process

In [118]:
def preprocess(name, path_dict, data_output, data_path, preload_version, config):
    import dask.dataframe as dd
    import pandas as pd
    import os
    
    def mkdir_if_not_exists(p):
        if not os.path.exists(p):
            os.makedirs(p)
    
    df = dd.read_csv(path_dict[name], assume_missing=True).compute()
    df.drop(df.columns[0], inplace=True, axis=1)
    df = df.assign(token_name=name)

    cut_off_date = config["cut_off_date"]
    end_cut_off_date = config["end_cut_off_date"]

    if cut_off_date:
        cut_off_date = pd.to_datetime(config["cut_off_date"])

    if end_cut_off_date:
        end_cut_off_date = pd.to_datetime(config["end_cut_off_date"])

    # sort
    df = df.sort_values("date")

    # processing the timestamp
    df["date"] = pd.to_datetime(df["date"])

    # date range
    if cut_off_date and end_cut_off_date:
        df = df[
            (df["date"] >= cut_off_date)
            & (df["date"] <= end_cut_off_date)
        ]
    elif cut_off_date:
        df = df[(df["date"] >= cut_off_date)]
    elif end_cut_off_date:
        df = df[(df["date"] <= end_cut_off_date)]

    # save to tmp
    p_ = data_path.format(data_output, preload_version, name)
    mkdir_if_not_exists(os.path.dirname(p_))
    df.to_pickle(p_)

In [119]:
def mutiple_processes_on_preprocess():

    t0 = time.time()

    stars_list = [
        (name, path_dict, data_output, data_path, preload_version, config)
        for name, p in path_dict.items()
    ]
    ress = []

    for res in parallal_task(preprocess, stars_list):
        ress.append(res)

    t1 = time.time()
    total = t1 - t0

    print(total / 60)


In [120]:
path_pickle_dict = {}

if preload_raw:
    mutiple_processes_on_preprocess()

for filepath in glob.iglob(f"{data_output}/raw/{preload_version}/*.pkl"):
    name = filepath.split("/")[-1]
    name = name.split(".")[0]
    path_pickle_dict[name] = filepath

0.012257671356201172


In [121]:
path_pickle_dict

{'INTC': '/home/ted/data/raw/djia_stock_v1/INTC.pkl',
 'MSFT': '/home/ted/data/raw/djia_stock_v1/MSFT.pkl',
 'V': '/home/ted/data/raw/djia_stock_v1/V.pkl',
 'MCD': '/home/ted/data/raw/djia_stock_v1/MCD.pkl',
 'IBM': '/home/ted/data/raw/djia_stock_v1/IBM.pkl',
 'BA': '/home/ted/data/raw/djia_stock_v1/BA.pkl',
 'CAT': '/home/ted/data/raw/djia_stock_v1/CAT.pkl',
 'HD': '/home/ted/data/raw/djia_stock_v1/HD.pkl',
 'CSCO': '/home/ted/data/raw/djia_stock_v1/CSCO.pkl',
 'VZ': '/home/ted/data/raw/djia_stock_v1/VZ.pkl',
 'DOW': '/home/ted/data/raw/djia_stock_v1/DOW.pkl',
 'AXP': '/home/ted/data/raw/djia_stock_v1/AXP.pkl',
 'AAPL': '/home/ted/data/raw/djia_stock_v1/AAPL.pkl',
 'NKE': '/home/ted/data/raw/djia_stock_v1/NKE.pkl',
 'CVX': '/home/ted/data/raw/djia_stock_v1/CVX.pkl',
 'WMT': '/home/ted/data/raw/djia_stock_v1/WMT.pkl',
 'TRV': '/home/ted/data/raw/djia_stock_v1/TRV.pkl',
 'HON': '/home/ted/data/raw/djia_stock_v1/HON.pkl',
 'WBA': '/home/ted/data/raw/djia_stock_v1/WBA.pkl',
 'AMGN': '/hom

## checking gap of the kline

In [122]:
def convert_to_days(x):
    return x / np.timedelta64(1, 'D')

In [123]:
checker1 = []
checker2 = []

path_pickle_dict_cp = path_pickle_dict.copy()

for k, v in path_pickle_dict_cp.items():

    df = pd.read_pickle(v)

    if df.empty:
        del path_pickle_dict[k]
        continue

    ele = df["date"].diff().dropna().apply(lambda x: convert_to_days(x)).unique()
    ele_max = df["date"].diff().dropna().apply(lambda x: convert_to_days(x)).max()

    checker1.append(len(ele))
    checker2.append(ele_max)

In [124]:
df

Unnamed: 0,date,open,high,low,close,volume,tic,day,change,token_name
0,2004-06-23,3.750000,4.325000,3.687500,4.300000,43574400.0,CRM,2.0,,CRM
1,2004-06-24,4.387500,4.422500,4.125000,4.190000,8887200.0,CRM,3.0,-0.025581,CRM
2,2004-06-25,4.127500,4.187500,3.947500,3.950000,6710000.0,CRM,4.0,-0.057279,CRM
3,2004-06-28,4.000000,4.052500,3.860000,4.000000,2270800.0,CRM,0.0,0.012658,CRM
4,2004-06-29,4.000000,4.175000,3.957500,4.100000,2112000.0,CRM,1.0,0.025000,CRM
...,...,...,...,...,...,...,...,...,...,...
4576,2022-08-25,168.580002,174.139999,165.559998,173.910004,24025600.0,CRM,3.0,-0.033887,CRM
4577,2022-08-26,173.960007,176.300003,164.630005,165.229996,11074700.0,CRM,4.0,-0.049911,CRM
4578,2022-08-29,164.279999,165.820007,160.050003,160.210007,9329900.0,CRM,0.0,-0.030382,CRM
4579,2022-08-30,162.139999,163.850006,158.380005,159.669998,8079400.0,CRM,1.0,-0.003371,CRM


In [125]:
# gaps unique counts
print(checker1)
# max
print(checker2)

[6, 6, 5, 6, 6, 6, 6, 6, 6, 6, 4, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 5]
[7.0, 7.0, 5.0, 7.0, 7.0, 7.0, 7.0, 7.0, 7.0, 7.0, 4.0, 7.0, 7.0, 7.0, 7.0, 7.0, 7.0, 7.0, 7.0, 7.0, 7.0, 7.0, 7.0, 7.0, 7.0, 7.0, 7.0, 7.0, 7.0, 5.0]


In [126]:
mkdir_if_not_exists(config["feature_dir"])

# save meta
feature_dir = config["feature_dir"]

with open(f"{feature_dir}/meta.json", "w") as f:
    json.dump(config, f)


# map pool

def main(name, pkl_path, config, test=False):

    import pandas as pd
    import numpy as np
    import traceback

    PARAMS_4_RESAMPLE = dict(closed="right", label="right",)

    def profit(rows):
        if len(rows) < 2:
            return np.nan
        return rows[-1] / rows[0]

    def create_period_inc(df, period: str, params_4_resample=None):
        if params_4_resample is None:
            params_4_resample = PARAMS_4_RESAMPLE

        df_inc = df["close"].resample(
            period, **params_4_resample).last()
        return df_inc

    def create_mom(df_inc, mom_list: list):
        dfs = []
        for m in mom_list:
            s = df_inc.rolling(m + 1).agg({f"00_price_rising_rate{m:04d}": profit})
            dfs.append(s)
            
            if m == 1:
                dfs.append(s[s.columns[0]].shift(-1).apply(lambda x: x - 1).rename("00_target"))
        return pd.concat(dfs, 1)
    
    # 00 - Price Rising Rate (PR) 
    def create_mom_features(df, period, mom_list, *args, **kwargs):
        df_inc = create_period_inc(df, period)
        df_inc = df_inc.dropna()
        df_mom = create_mom(df_inc, mom_list)
        return df_mom

    # create price of the last
    def create_period_price(df, period: str, params_4_resample=None):
        if params_4_resample is None:
            params_4_resample = PARAMS_4_RESAMPLE
        df_price = df["close"].resample(period, **params_4_resample).last()
        return df_price

    # 01 - fine-grained volatility (VOL)
    def std_of_prices(df, period, std_period, std_terms, params_4_resample=None, *args, **kwargs):
        """
        df_price: it is a resampled Series
        """
        if params_4_resample is None:
            params_4_resample = PARAMS_4_RESAMPLE

        features = []
        df_price = create_period_price(df, std_period)
        df_price = df_price.dropna()

        for n in std_terms:
            s = df_price.rolling(n + 1).std()
            s = s.rename(f"01_vol_{n}")
            features.append(s)

        df_features = pd.concat(features, 1)
        return df_features.resample(period, **params_4_resample).last()

    
    # 02 - Trade Volume (TV)
    def trade_vol(df, period: str, tv_nums:list, params_4_resample=None, *args, **kwargs):
        if params_4_resample is None:
            params_4_resample = PARAMS_4_RESAMPLE
            
        df = df.resample(period, **params_4_resample).last()
        df = df.dropna(subset=["close"], axis=0, how="any")
        
        features = []
        for num in tv_nums:
            s = df["volume"].rolling(num).sum()
            s = s.rename(f"02_trade_vol_{num}")
            features.append(s)
        
        df_features = pd.concat(features, 1)
        return df_features
    
    # 03 - Market_Cap
    def market_cap(df, period: str, params_4_resample=None, *args, **kwargs):
        if params_4_resample is None:
            params_4_resample = PARAMS_4_RESAMPLE
            
        df = df.resample(period, **params_4_resample).last()
        return (df["Shares Outstanding"] * df["Close"]).rename("03_market_cap")
    
    # 04 - Price-earnings Ratio (PE)
    def price_earning_ratio(df, period: str, params_4_resample=None, *args, **kwargs):
        if params_4_resample is None:
            params_4_resample = PARAMS_4_RESAMPLE
            
        df = df.resample(period, **params_4_resample).last()
        return df["P/E"].rename("04_PE")
    
    # 05 - Book-to-market Ratio (BM)
    def book2market_ratio(df, period: str, params_4_resample=None, *args, **kwargs):
        if params_4_resample is None:
            params_4_resample = PARAMS_4_RESAMPLE
            
        df = df.resample(period, **params_4_resample).last()
        return df["Price to Book Value"].rename("05_book2market_ratio")
    
    # 06 - Dividend (Div)
    def div(df, period: str, params_4_resample=None, *args, **kwargs):
        if params_4_resample is None:
            params_4_resample = PARAMS_4_RESAMPLE
            
        df = df.resample(period, **params_4_resample).last()
        return df["Dividend Yield"].rename("06_div")
    
    
    def create_features(df, feature_config: dict):

        # all features
        df = df.set_index("date")
        df_mom = create_mom_features(df, **feature_config)
        df_price_std = std_of_prices(df, **feature_config)
        df_tv = trade_vol(df, **feature_config)
        
        # df_market_cap = market_cap(df, **feature_config)
        # df_price_earning_ratio = price_earning_ratio(df, **feature_config)
        # df_book2market_ratio = book2market_ratio(df, **feature_config)
        # df_div = div(df, **feature_config)

        
        df_all = pd.concat(
            [df_mom, df_price_std, df_tv], 1)

        
        # df_all = pd.concat(
        #     [df_price_std], 1)

        df_all = df_all.dropna(subset=[df_all.columns[0]], axis=0, how="any")
        
        # remove extra row, for resampling problem

        origin_lastest = df.index.max()
        resample_lastest = df_all.index.max()

        if resample_lastest > origin_lastest:
            return df_all[:-1]
        elif resample_lastest == origin_lastest:
            return df_all
        else:
            # it should never happen, just in case
            raise ValueError()
    
    df = pd.read_pickle(pkl_path)
    
    if test:
        df = df.head(test)
    
    try:
        df_features = create_features(df, config)
    except Exception as exc:
        print(traceback.format_exc())
        raise ValueError(name)

    # save to feature
    feature_dir = config["feature_dir"]

    if test:
        return df_features
    else:
        return df_features.to_pickle(f"{feature_dir}/{name}.pkl")


In [127]:
def mutiple_processes_on_features():

    t0 = time.time()

    stars_list = [(name, p, config) for name, p in path_pickle_dict.items()]
    ress = []

    for res in parallal_task(main, stars_list):
        ress.append(res)

    t1 = time.time()
    total = t1 - t0

    print(total / 60)

In [128]:
if 0:
    df_test = main(
        "AAPL",
        f"~/data/raw/{preload_version}/AAPL.csv.pkl",
        config,
        test=2000,
    )
    display(df_test)
else:
    mutiple_processes_on_features()

0.12941030263900757
