In [None]:
from fractional_differentiation import find_stat_series
import pandas as pd
import numpy as np
from statsmodels.tsa.stattools import adfuller
import itertools
import multiprocess as mp

# Load Data


Loading fundamentals for divyield


In [None]:
funds = pd.read_csv("../../data/fundamentals_clean.csv", parse_dates=["public_date"])

In [None]:
funds.head()

In [None]:
def process_divyield(x):
    if x == "0":
        return 0
    elif x[-1] == "%":
        return float(x[:-1]) / 100
    else:
        raise ValueError(f"unexpected value {x}")


funds["divyield"] = funds["divyield"].apply(process_divyield)

In [None]:
divyield = funds.set_index(["permno", "public_date"])["divyield"]

divyield.head()

Loading Prices


In [None]:
prices = pd.read_csv("../../data/merged_fin.csv", parse_dates=["date"])

In [None]:
prices.shape[0]

In [None]:
prices.dropna(subset="date", inplace=True)
prices.sort_values(by=["permno", "date"], inplace=True)

prices.tail()

In [None]:
prices["mktcap"] = prices["mktcap"].apply(np.log)

In [None]:
divyield = divyield.reindex(
    prices[["permno", "date"]].set_index(["permno", "date"]).index
).ffill()

divyield.unique()

In [None]:
prices["divyield"] = divyield.values

prices["divyield"]

In [None]:
redundant = ["pe_op_basic", "pe_exi", "prc", "retx"]
prices.columns

# Construct Stationary Dataset


In [None]:
exclude_features = ["naics_processed", "permno", "date", "vol"]
features = prices.columns.drop(exclude_features)

features

In [None]:
permnos = prices["permno"].unique()
prices_stat = prices.copy()

diffs = np.linspace(0.05, 1.95, 39)

len(permnos)

In [None]:
from ipywidgets import IntProgress
from IPython.display import display

# just to display progress
f = IntProgress(min=0, max=np.prod([len(features), len(permnos)]))
display(f)

# iterate through all permnos and features
for permno, feature in itertools.product(permnos, features):
    f.value += 1

    # select the data relative to the permno and feature
    mask = prices["permno"] == permno
    data = prices.loc[mask, [feature]]
    original_index = data.index
    data = data.dropna()

    # handle features which are empty
    if data.empty:
        print(f"there is no data for {permno} - {feature}")
        continue

    if np.max(data) - np.min(data) < 1e-6:
        print(f"there is no variation in {permno} - {feature}")
        continue

    print(permno, feature)
    print("-----")
    # check if the series is stationary
    try:
        if adfuller(data, regression="ct")[1] > 0.01:
            stat_series = find_stat_series(data, diffs=diffs)
            stat_series = stat_series.reindex(original_index)
            prices_stat.loc[mask, feature] = stat_series.values

    except Exception as e:
        print(f"error in {permno} - {feature}")
        print(e)
        continue

In [None]:
prices_stat.info()

In [None]:
prices_stat.to_csv("../../data/DATA_STATIONARY.zip", index=False)

In [None]:
prices_stat.tail()

In [None]:
prices_stat.isna().sum()

In [None]:
data = pd.read_csv('../../data/DATA_STATIONARY.zip')
data.head()

# Find Non-Stationary Series


In [None]:
# 15401 aftret_invcapx


mask = prices["permno"] == 15401

prices.loc[mask].plot(x="date", y="aftret_invcapx")


# prices.loc[mask, "ticker"].unique()

In [None]:
try:
    find_stat_series(
        prices.loc[mask, ["aftret_invcapx"]], diffs=np.linspace(0.05, 2.95, 39)
    ).plot()
except Exception as e:
    print(e)

# Parallelizing


In [None]:
pd.Series().empty

In [None]:
def process_data(permnos: np.ndarray) -> dict[int, dict[str, bool]]:
    print(f"processing {permnos}", flush=True)
    is_stationary: dict[int, dict[str, bool]] = {}

    for permno, feature in itertools.product(permnos, features):
        # select the data relative to the permno and feature
        is_stationary[permno] = {}
        mask = prices["permno"] == permno
        data = prices[mask][feature].dropna()

        # handle features which are empty
        if data.empty:
            print(f"there is no data for {permno} - {feature}")
            is_stationary[permno][feature] = True
            continue

        # check if the series is stationary
        if adfuller(data, regression="ct")[1] > 0.01:
            is_stationary[permno][feature] = False
        else:
            is_stationary[permno][feature] = True

    return is_stationary


def process_permno(permno: int):
    print(f"processing {permno}", flush=True)
    res = {}
    for feature in features:

        mask = prices["permno"] == permno
        data = prices[mask][feature].dropna()

        # handle features which are empty
        if data.empty:
            print(f"there is no data for {permno} - {feature}")
            res[feature] = True

        # check if the series is stationary
        if adfuller(data, regression="ct")[1] > 0.01:
            res[feature] = False
        else:
            res[feature] = True

    return res


def store_results(res: list[dict[int, dict[str, bool]]]):
    for item in res:
        results.update(item)


if __name__ == "__main__":
    results: dict[int, dict[str, bool]] = {}

    ncpus = mp.cpu_count() - 1

    with mp.Pool(ncpus) as p:
        # res = p.map(process_permno, np.array(non_stat.index), chunksize=100)
        res = p.map_async(
            process_data,
            np.array_split(np.array(non_stat.index), ncpus),
            chunksize=100,
        )
        res.get()

        # for permnos in np.array_split(np.array(non_stat.index), ncpus):
        #     res = p.apply(process_data, args=(permnos,))
        #     print(res)

    p.join()

    print(results)

In [None]:
from time import sleep
import os


def f(r):
    import numpy as np
    import os
    from time import sleep

    res = np.arange(r[0], r[1])
    print(f"I am {os.getpid()}")
    sleep(10)
    print(f"I am {os.getpid()} and I am finished")
    return {"nums": res, "dubs": res * 2}


# if __name__ == "__main__":
#     ctx = mp.get_context("spawn")
#     with ctx.Pool(4) as p:
#         subsets = [[0, 3], [3, 6], [6, 7]]
#         res = p.map(f, subsets)
#         print(res)

#     print("Done!")

if __name__ == "__main__":
    with mp.Pool(4) as p:
        res = p.map(f, subsets)
        print(res)

In [None]:
final: dict[int, dict[str, bool]] = {}


def process_data(permnos: np.ndarray):
    import os

    print(f"initiating process {os.getpid()} - first permno {permnos[0]}\n")

    # stores permno and stationarity for each feature
    is_stationary: dict[int, dict[str, bool]] = {permno: {} for permno in permnos}

    # iterate through each permno and feature
    for permno in permnos:
        mask = prices["permno"] == permno
        permno_data = prices[mask]

        for feature in features:
            feature_data = permno_data[feature].dropna()

            if feature_data.empty:
                print(f"there is no data for {permno} - {feature}")
                is_stationary[permno][feature] = True
                continue

            if adfuller(feature_data, regression="ct")[1] > 0.01:
                is_stationary[permno][feature] = False
            else:
                is_stationary[permno][feature] = True

    return is_stationary


if __name__ == "__main__":
    ncpus = mp.cpu_count() - 1
    print(f"running with {ncpus} cpus")

    with mp.Pool(ncpus) as p:
        permnos_split = np.array_split(np.array(non_stat.index), ncpus)

        res: list[dict[int, dict[str, bool]]] = p.map(process_data, permnos_split)

        if not isinstance(res, list):
            raise Exception("res is not a list")

        for item in res:
            final.update(res)

# Fixing Stationarity
