In [1]:
import dask.dataframe as dd
import pandas as pd
import numpy as np

# Merge with issuers to get context info

In [2]:
issuers_ddf = dd.read_csv("data/issuer.csv")

In [3]:
issuers_ddf

Unnamed: 0_level_0,issuerId,osl_id,symbol,issuerSign,name,isActive
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
,int64,object,object,object,object,bool
,...,...,...,...,...,...


In [5]:
msg_ddf = dd.read_parquet("data/tokenized_msg.parquet", npartitions=4)

In [None]:
msg_ddf.compute().shape

In [6]:
msg_issuer_df = msg_ddf.merge(issuers_ddf, on="issuerId").compute()#, validate="many_to_one")

In [7]:
msg_issuer_df["publishedTime_dt"] = pd.to_datetime(msg_issuer_df["publishedTime"])

In [8]:
msg_issuer_df.to_parquet("msgs_w_issuer.parquet")

### Use publish time as indicator for which days and history to use
- If published Day 0, AFTER 16:30 (closing hours) -> Compare to next TRADING day
- If published Day 0, BEFORE 16:30 -> Compare to Same day
- Exclude holidays for now
- Weekends -> Next trading day is automatically from history

#### Data analysis
- Date 2020-05-10 is (May) is Nan for multiple companies
- Some companies have multiple missing days
- -> Remove those and remove messages if they are within that timespan

##### Scaleable approach?
1. Add issuerId info to each stock price row
2. For each stock row, add all featues, backwards and forwards
3. Merge all company histories
4. For each message, based on published time and issuer id, index the relevant trading day

In [116]:
from pathlib import Path

In [117]:
stock_history_path = Path("data/stock_history")

In [None]:
AKRBP = pd.read_csv("data/stock_history/AKRBP.csv", header=1, parse_dates=["Date"]).set_index("Date")
AKRBP.iloc[:, :-1] = AKRBP.iloc[:, :-1].apply(lambda col: col.str[3:])
AKRBP["Volume"] = AKRBP["Volume"].str.replace(",", "")
AKRBP = AKRBP.astype(float)
#.astype(float)

In [188]:
DAILY_COLS = ["Close", "Open", "High", "Low"]
daily_cols = [x.lower() for x in DAILY_COLS]

In [220]:
def load_clean(path):
    df = pd.read_csv(path, header=1, parse_dates=["Date"], thousands=",", index_col="Date",
                     converters={k:lambda col: float(col[3:].replace(",", "")) if "kr" in col else np.nan for k in DAILY_COLS},
                     dtype={"Volume": float})#.set_index("Date")
    df.columns = df.columns.str.lower()
    df["symbol"] = path.stem
    return df

In [216]:
def extract_intraday_feats(df):
    # Price action and volume moving averages
    # INTRA DAY AND GAP
    other_vals = {"dol_volume": df[daily_cols].mean(axis="columns").mul(df["volume"], axis=0),
                "intra_day_high_low_pct": (df["high"] - df["low"]).divide(df["low"])*100,
              "intra_day_open_close_pct": (df["close"] - df["open"]).divide(df["open"])*100,
              "gap_pct": (df["close"] - df["open"].shift(1)).divide(df["open"].shift(1))*100
             }
    return df.assign(**other_vals)


In [193]:
def extract_rolling_feats(df):
    """Extracting moving features based on dollar volume and daily prices.
    Currently it uses the average daily price as a compromise."""
    rolling_vals = [3, 7, 15, 30]
    action_mean = df[daily_cols].mean(axis="columns")
    
    # Exponential moving averages of prices
    ewms = {f"ewm_{k}": action_mean.ewm(span=k).mean() for k in rolling_vals}
    ewms_std = {f"ewm_std_{k}": action_mean.ewm(span=k).std() for k in rolling_vals}
    
    # EMA for dollar volume
    dol_vol_ewm = {f"dol_vol_ewm_{k}": df["volume"].ewm(span=k).mean() for k in rolling_vals}
    dol_vol_ewm_std = {f"dol_vol_ewm_std_{k}": df["volume"].ewm(span=k).std() for k in rolling_vals}
    return df.assign(**{**ewms, **ewms_std, **dol_vol_ewm, **dol_vol_ewm_std})
    

In [194]:
def extract_future_features(df):
    """Calculate periodic change in percentage for avg daily price.
    Consistent with rolling, so if average is moved to open/close, so should the other.
    """
    pct_changes = {f"d{k}_avg_pct": df["close"].pct_change(periods=k) * 100 for k in rolling_vals}
    return df.assign(**pct_changes)


In [178]:
%%time
test_df = load_clean(stock_history_path / "AKRBP.csv")

CPU times: user 209 ms, sys: 8.99 ms, total: 218 ms
Wall time: 254 ms


In [195]:
%%time
intra_df = extract_intraday_feats(test_df)

CPU times: user 9.92 ms, sys: 2.15 ms, total: 12.1 ms
Wall time: 18.4 ms


In [196]:
%%time
rolling_df = extract_rolling_feats(intra_df)

CPU times: user 13.2 ms, sys: 7.23 ms, total: 20.5 ms
Wall time: 23 ms


In [197]:
%%time
future_df = extract_future_features(rolling_df)

CPU times: user 8.23 ms, sys: 947 µs, total: 9.17 ms
Wall time: 8.82 ms


In [221]:
def process_df(df):
    """Process each df"""
    intra_feats = extract_intraday_feats(df)
    rolling_feats = extract_rolling_feats(intra_feats)
    future_df = extract_future_features(rolling_feats)
    return future_df.set_index(["symbol", future_df.index])

In [233]:
BLACKLIST = ["KRAB", "CSS", "ALT", "ECIT"]

In [237]:
%%time
dfs = []
for idx, hist_path in enumerate(list(stock_history_path.glob("*.csv")), start=1):
    try: 
        if hist_path.stem in BLACKLIST:
            continue
        #print(f"READING {idx}: {hist_path.stem}")
        df = load_clean(hist_path)
        dfs.append(process_df(df))
    except pd.errors.ParserError as e:
        if "header=1" in str(e):
            print("New blacklist:", hist_path.stem)
            BLACKLIST.append(hist_path.stem)
        else:
            print(e)
    

New blacklist: SMCRT
New blacklist: MVW
New blacklist: HMONY
New blacklist: NATTO
New blacklist: LYTIX
New blacklist: SMOP
New blacklist: NORDH
New blacklist: KOMPL
New blacklist: MAS
New blacklist: NTI
New blacklist: ELO
New blacklist: AFISH
CPU times: user 51.6 s, sys: 293 ms, total: 51.9 s
Wall time: 52.2 s


In [240]:
complete_history = pd.concat(dfs)

In [251]:
complete_history.count(level=0)["close"].describe()

count     315.000000
mean     1933.326984
std      1791.382119
min         0.000000
25%       157.500000
50%      1566.000000
75%      3593.000000
max      5018.000000
Name: close, dtype: float64

In [241]:
complete_history.shape

(615065, 29)

In [243]:
%%time
complete_history.to_parquet("data/processed_history.parquet")

CPU times: user 933 ms, sys: 500 ms, total: 1.43 s
Wall time: 1.36 s
