In [4]:
import atoti as tt
import numpy as np
import pandas as pd
from atoti.config import create_config
from pandas_profiling import ProfileReport
from sklearn.preprocessing import StandardScaler
from statsmodels.tsa.stattools import acf, grangercausalitytests, pacf
from util import utils

ModuleNotFoundError: No module named 'atoti'

Collecting atoti
  Using cached atoti-0.5.2-0_6b58d72-py3-none-any.whl (141.4 MB)
Collecting typeguard
  Downloading typeguard-2.11.1-py3-none-any.whl (16 kB)
Collecting jdk4py==11.0.9.0
  Downloading jdk4py-11.0.9.0-py3-none-win_amd64.whl (34.1 MB)
Collecting pyarrow>=2.0.0
  Downloading pyarrow-3.0.0-cp38-cp38-win_amd64.whl (12.7 MB)
Installing collected packages: typeguard, jdk4py, pyarrow, atoti
Successfully installed atoti-0.5.2 jdk4py-11.0.9.0 pyarrow-3.0.0 typeguard-2.11.1


In [None]:
coins = [
    "BTC",
    "ETH",
    "USDT",
    "XRP",
    "BCH",
    "ADA",
    "BSV",
    "LTC",
    "LINK",
    "BNB",
    "EOS",
    "TRON",
]

In [None]:
config = create_config(metadata_db="./metadata.db")
session = tt.create_session(config=config)

In [None]:
price_store = session.read_csv(
    "s3://data.atoti.io/notebooks/twitter/crypto_prices.csv",
    #     "prices.csv",
    keys=["coin_symbol", "date"],
    store_name="currency price",
)
price_store.head()

In [None]:
tweets = pd.read_csv("https://data.atoti.io/notebooks/twitter/tweets_metrics.csv")
tweets.head()

In [None]:
tweets["Subset"] = "Train"
tweets.loc[tweets.groupby(["coin_symbol"])["date"].tail(7).index, "Subset"] = "Test"
tweets.tail(10)

In [None]:

tweets_store = session.read_pandas(
    tweets,
    keys=["coin_symbol", "date"],
    store_name="tweets",
    types={
        "Tweet volume": tt.type.FLOAT,
        "Retweet total": tt.type.FLOAT,
        "Retweet average": tt.type.FLOAT,
        "Followers total": tt.type.FLOAT,
        "Followers average": tt.type.FLOAT,
        "Favorite total": tt.type.FLOAT,
        "Favorite average": tt.type.FLOAT,
        "Polarity total": tt.type.FLOAT,
        "Polarity average": tt.type.FLOAT,
        "Negative": tt.type.FLOAT,
        "Neutral": tt.type.FLOAT,
        "Positive": tt.type.FLOAT,
        "Bullish ratio": tt.type.FLOAT,
        "Subset": tt.type.STRING,
    },
)
tweets_store.head()

In [None]:
tweets_store.shape

In [None]:
currency_store = session.read_csv(
    "s3://data.atoti.io/notebooks/twitter/currency_dict.csv",
    keys=["coin_symbol"],
    store_name="currency",
)
currency_store.head()

In [None]:
cube = session.create_cube(tweets_store, name="Cryptocurrency cube", mode="manual")

In [None]:
tweets_store.join(price_store)
price_store.join(currency_store)

In [None]:
cube

In [None]:
cube.schema

In [None]:
h = cube.hierarchies
l = cube.levels
m = cube.measures

In [None]:
h["Coin Symbol"] = [tweets_store["coin_symbol"], currency_store["currency_name"]]
h["Coin Symbol"].dimension = "Cryptocurrency"
h["Coin Symbol"].slicing = True

h["Date"] = [tweets_store["date"]]
h["Date"].dimension = "Time-series"

# used for splitting training and test data
h["Subset"] = [tweets_store["Subset"]]

In [None]:
session.visualize("Multi-level hierarchies")

In [None]:
h

In [None]:
m["Tweet volume"] = tt.agg.sum(tweets_store["Tweet volume"])
m["Retweet total"] = tt.agg.sum(tweets_store["Retweet total"])
m["Followers total"] = tt.agg.sum(tweets_store["Followers total"])
m["Favorite total"] = tt.agg.sum(tweets_store["Favorite total"])
m["Polarity total"] = tt.agg.sum(tweets_store["Polarity total"])
m["Negative"] = tt.agg.sum(tweets_store["Negative"])
m["Neutral"] = tt.agg.sum(tweets_store["Neutral"])
m["Positive"] = tt.agg.sum(tweets_store["Positive"])

In [None]:
m["Retweet average"] = tt.agg.mean(tweets_store["Retweet average"])
m["Followers average"] = tt.agg.mean(tweets_store["Followers average"])
m["Favorite average"] = tt.agg.mean(tweets_store["Favorite average"])
m["Polarity average"] = tt.agg.mean(tweets_store["Polarity average"])

m["Price"] = tt.agg.mean(price_store["Price"])
m["Returns"] = tt.agg.mean(price_store["Returns"])

In [None]:
m["Bullish ratio"] = m["Positive"] / m["Negative"]

In [None]:
session.visualize("Bullish ratio for BTC and ETH")

In [None]:
session.visualize("Tweet volumes for BTC and ETH")

In [None]:

metrics_folder = "metrics"
m["Tweet volume"].folder = metrics_folder
m["Retweet total"].folder = metrics_folder
m["Followers total"].folder = metrics_folder
m["Favorite total"].folder = metrics_folder
m["Polarity total"].folder = metrics_folder
m["Negative"].folder = metrics_folder
m["Neutral"].folder = metrics_folder
m["Positive"].folder = metrics_folder
m["Retweet average"].folder = metrics_folder
m["Followers average"].folder = metrics_folder
m["Favorite average"].folder = metrics_folder
m["Polarity average"].folder = metrics_folder
m["Price"].folder = metrics_folder
m["Returns"].folder = metrics_folder
m["Bullish ratio"].folder = metrics_folder

In [None]:
l["coin_symbol"].comparator = tt.comparator.first_members(coins)

In [None]:
session.visualize("Default slicing member ordering updated")

In [None]:
metrics_df = cube.query(
    m["Price"],
    m["Returns"],
    m["Tweet volume"],
    m["Retweet total"],
    m["Retweet average"],
    m["Followers total"],
    m["Followers average"],
    m["Favorite total"],
    m["Favorite average"],
    m["Polarity total"],
    m["Polarity average"],
    m["Bullish ratio"],
    levels=[l["coin_symbol"], l["date"]],
    condition=(l["Subset"] == "Train"),
)

metrics_df.shape

In [None]:
metrics_df.head()

In [None]:
ProfileReport(metrics_df)

In [None]:

data_stats = pd.DataFrame(
    columns=[
        "coin_symbol",
        "metric_name",
        "norm_stat",
        "norm_p",
        "kurtosis",
        "skewness",
        "lag_acf",
        "lag_pacf",
        "std",
        "durbin_watson",
    ]
)

In [None]:
session.url + "/#/dashboard/810"

In [None]:
for coin in coins:
    coin_df = metrics_df.loc[[coin]]
    for metric_name in coin_df.columns:
        metric = coin_df[metric_name]

        # autocorrelation
        lag_acf = ";".join(map(str, acf(metric, nlags=50, fft=True).tolist()))
        lag_pacf = ";".join(map(str, pacf(metric, nlags=50, method="ols").tolist()))

        data_stats = data_stats.append(
            {
                "coin_symbol": coin,
                "metric_name": metric_name,
                "lag_acf": lag_acf,
                "lag_pacf": lag_pacf,
                "std": metric.std(),
            },
            ignore_index=True,
        )

data_stats.head()

In [None]:
stats_store = session.read_pandas(
    data_stats,
    keys=["coin_symbol", "metric_name"],
    store_name="statistics",
    types={"lag_acf": tt.type.FLOAT_ARRAY, "lag_pacf": tt.type.FLOAT_ARRAY},
    array_sep=";",
)
stats_store.head()

In [None]:
tweets_store.join(stats_store)

In [None]:
cube.schema

In [None]:
h["metric_name"].dimension = "Metrics"
h

In [None]:

m["lag_acf"] = tt.agg._single_value(stats_store["lag_acf"])
m["lag_pacf"] = tt.agg._single_value(stats_store["lag_pacf"])
m["std"] = tt.agg._single_value(stats_store["std"])

# placeholder for later test data
m["norm_stat"] = tt.agg._single_value(stats_store["norm_stat"])
m["norm_p"] = tt.agg._single_value(stats_store["norm_p"])
m["kurtosis"] = tt.agg._single_value(stats_store["kurtosis"])
m["skewness"] = tt.agg._single_value(stats_store["skewness"])

In [None]:
m["durbin watson"] = tt.agg._stop(
    tt.agg._single_value(stats_store["durbin_watson"]), l["metric_name"]
)

In [None]:
stats_folder = "Statistics"
m["std"].folder = stats_folder
m["lag_acf"].folder = stats_folder
m["lag_pacf"].folder = stats_folder

In [None]:
session.visualize("Bit coin metrics statistics")

In [None]:
cube.create_static_parameter_hierarchy(
    "Lags", list(range(0, 50)), index_measure="Lag Index",
)

In [None]:
h["Lags"].slicing = False

In [None]:

m["acf"] = m["lag_acf"][m["Lag Index"]]
m["acf"].formatter = "DOUBLE[#,###.0000]"

m["pacf"] = m["lag_pacf"][m["Lag Index"]]
m["pacf"].formatter = "DOUBLE[#,###.0000]"

In [None]:
m["Sample size"] = tt.total(tt.agg.count_distinct(tweets_store["date"]), h["Date"])

In [None]:
m["critical value"] = 1.96
m["Upper 95% confidence interval"] = m["critical value"] / tt.math.sqrt(m["Sample size"])
m["Lower 95% confidence interval"] = -m["critical value"] / tt.math.sqrt(m["Sample size"])

In [None]:
session.visualize("Auto-correlation for BTC price")

In [None]:
session.visualize("Auto-correlation for BTC Returns")

In [None]:
metric_cols = metrics_df.reset_index().columns.to_list()
transformed_df = pd.DataFrame(columns=metric_cols).set_index(["coin_symbol", "date"])

metric_cols.append("order")
full_diff_df = pd.DataFrame(columns=metric_cols).set_index(
    ["coin_symbol", "date", "order"]
)

In [None]:
for coin in coins:
    coin_df = metrics_df.loc[[coin]].dropna()

    # reset dataframe
    full_diff_df = pd.DataFrame()

    for col in coin_df.columns:
        full_diff_df, coin_df[col] = utils.augmented_dickey_fuller_statistics(
            coin, col, coin_df[col], 0, full_diff_df
        )

    if len(full_diff_df) > 0:
        full_diff_df.reset_index(inplace=True)
        for diff_order in full_diff_df["order"].unique():
            scenario = full_diff_df.loc[full_diff_df["order"] == diff_order].copy()
            scenario.dropna(axis="columns", how="all", inplace=True)
            scenario["Subset"] = "Train"

            price_store.scenarios[f"d{diff_order}"].load_pandas(
                scenario.loc[
                    :, scenario.columns.isin(["coin_symbol", "date", "Price", "Return"])
                ]
            )

            tweets_store.scenarios[f"d{diff_order}"].load_pandas(
                scenario.drop(columns=["order", "Price", "Return"], errors="ignore")
            )

    coin_df.dropna(inplace=True)
    transformed_df = transformed_df.append(coin_df)

In [None]:
session.visualize("Non stationary Price")

In [None]:
session.visualize("Stationary Returns")

In [None]:
session.url + "/#/dashboard/010"

In [None]:
grangercausalitytests_df = pd.DataFrame(columns=["coin_symbol", "x", "y",])


def grangers_causality_matrix(coin, train_data, maxlag, verbose=False):
    global grangercausalitytests_df
    columns = train_data.columns

    r = "Returns"
    scal = StandardScaler()
    df_scaled = pd.DataFrame(
        scal.fit_transform(train_data.values),
        columns=train_data.columns,
        index=train_data.index,
    )

    for c in columns:
        if c not in ["Price", "Returns"]:
            if verbose:
                print(f"{coin} ============= Returns against {c}")

            X_train = df_scaled[[r, c]]

            gc_test_result = grangercausalitytests(
                X_train, maxlag=maxlag, verbose=False
            )
            if verbose:
                print("********************************************")
                print(gc_test_result)

            # transform test_result into stats for x causes y
            grangercausalitytests_df = utils.transform_gc_date(
                coin, gc_test_result, c, r, maxlag, grangercausalitytests_df, verbose
            )

In [None]:
for coin in coins:
    coin_train_data = transformed_df.loc[
        (transformed_df.index.get_level_values("coin_symbol") == coin)
    ]

    grangers_causality_matrix(coin, coin_train_data.copy(), 50, verbose=False)

In [None]:
grangercausalitytests_df.loc[
    (grangercausalitytests_df["coin_symbol"] == "BTC")
    & (grangercausalitytests_df["Test name"] == "params_ftest")
].head()

In [None]:

granger_causality_store = session.read_pandas(
    grangercausalitytests_df,
    keys=["coin_symbol", "Test name", "x", "y"],
    store_name="Granger Causality",
    types={
        "F": tt.type.FLOAT_ARRAY,
        "chi2": tt.type.FLOAT_ARRAY,
        "p-value": tt.type.FLOAT_ARRAY,
        "df": tt.type.FLOAT_ARRAY,
        "df_denom": tt.type.FLOAT_ARRAY,
        "df_num": tt.type.FLOAT_ARRAY,
    },
    array_sep=";",
)

In [None]:
granger_causality_store.head()

In [None]:
tweets_store.join(granger_causality_store)

In [None]:
cube.schema

In [None]:
h["Test name"] = [granger_causality_store["Test name"]]
h["Test name"].slicing = True

In [None]:
m["v_F"] = tt.agg._single_value(granger_causality_store["F"])
m["v_chi2"] = tt.agg._single_value(granger_causality_store["chi2"])
m["v_p_value"] = tt.agg._single_value(granger_causality_store["p-value"])

m["v_df"] = tt.agg._single_value(granger_causality_store["df"])
m["v_df_denom"] = tt.agg._single_value(granger_causality_store["df_denom"])
m["v_df_num"] = tt.agg._single_value(granger_causality_store["df_num"])

In [None]:
# level of significance
m["alpha"] = 0.05

m["GCT F"] = m["v_F"][m["Lag Index"]]
m["GCT chi2"] = m["v_chi2"][m["Lag Index"]]
m["GCT p-value"] = m["v_p_value"][m["Lag Index"]]
m["GCT df"] = m["v_df"][m["Lag Index"]]
m["GCT df_denom"] = m["v_df_denom"][m["Lag Index"]]
m["GCT df_num"] = m["v_df_num"][m["Lag Index"]]

m["GCT F"].formatter = "DOUBLE[#,###.000000]"
m["GCT chi2"].formatter = "DOUBLE[#,###.000000]"
m["GCT p-value"].formatter = "DOUBLE[#,###.000000]"

In [None]:
session.visualize("Granger causality test Params_ftest p-value for Bitcoin")

In [None]:
m["Granger causality"] = tt.agg.sum(
    tt.where(m["GCT p-value"] < m["alpha"], 1, None), scope=tt.scope.origin(l["Lags"])
)

In [None]:
session.visualize("Number of lags with null hypothesis rejected for params_ftest")

In [None]:
session.url + "/#/dashboard/70f"

In [None]:
residual_store = session.create_store(
    store_name="residual",
    keys=["coin_symbol", "date"],
    types={
        "coin_symbol": tt.type.STRING,
        "date": tt.type.LOCAL_DATE,
        "Returns residual": tt.type.FLOAT,
    },
)
residual_store.head()

In [None]:

forecast_store = session.create_store(
    store_name="Forecast accuracy",
    keys=["coin_symbol"],
    types={
        "coin_symbol": tt.type.STRING,
        "lag_order": tt.type.INT,
        "Observations": tt.type.INT,
        "mape": tt.type.FLOAT,
        "me": tt.type.FLOAT,
        "mae": tt.type.FLOAT,
        "mpe": tt.type.FLOAT,
        "rmse": tt.type.FLOAT,
        "corr": tt.type.FLOAT,
        "minmax": tt.type.FLOAT,
    },
)

In [None]:
price_store.join(residual_store)
granger_causality_store.join(forecast_store)
cube.schema

In [None]:

m["Returns residual"] = tt.agg._single_value(residual_store["Returns residual"])
m["Returns residual"].formatter = "DOUBLE[#,###.0000]"

In [None]:

stats_name = forecast_store.columns
stats_name.remove("coin_symbol")
stats_name

In [None]:
for name in stats_name:
    m[name] = tt.agg._single_value(forecast_store[name])

    m[name].folder = "Statistics"
    m[name].formatter = (
        "DOUBLE[#,###]"
        if name in ["lag_order", "Observations"]
        else "DOUBLE[#,###.0000]"
    )

In [None]:

features_metrics = cube.query(
    m["Granger causality"],
    levels=[l["coin_symbol"], l["x"]],
    condition=l["Test name"] == "params_ftest",
).reset_index()
features_metrics.head()

In [None]:

def feature_forecast(scenario_name, features, data, verbose=False):
    global coins
    for coin in coins:
        # obtain the features that may Granger cause returns
        metrics_col = features.loc[features["coin_symbol"] == coin]["x"].to_list()
        metrics_col = ["Returns"] + metrics_col

        actual_df = cube.query(
            m["Returns"],
            levels=[l["coin_symbol"], l["date"]],
            condition=(l["Subset"] == "Test") & (l["coin_symbol"] == coin),
        )

        if len(metrics_col) == 1:
            print(coin, "=========== No features Granger cause returns")
            actual_df["Returns"] = np.nan
            price_store.scenarios[scenario_name].load_pandas(actual_df)

        else:
            print(
                f"Forecasting {coin} returns based on features",
                features.loc[features["coin_symbol"] == coin]["x"].to_list(),
            )

            train_data = data.loc[
                (transformed_df.index.get_level_values("coin_symbol") == coin)
            ][metrics_col]

            train_data = train_data.reset_index(["coin_symbol"], drop=True)

            nobs = 7
            df_residual, ds, accuracy_prod, df_forecast = utils.var_forecast(
                coin,
                data_stats.copy(),
                train_data.copy(),
                actual_df,
                nobs,
                verbose=verbose,
            )

            accuracy_prod.reset_index(inplace=True)
            accuracy_prod.rename(columns={"index": "coin_symbol"}, inplace=True)

            if df_forecast is not None:
                price_store.scenarios[scenario_name].load_pandas(df_forecast)
                residual_store.scenarios[scenario_name].load_pandas(df_residual)
                stats_store.scenarios[scenario_name].load_pandas(ds)
                forecast_store.scenarios[scenario_name].load_pandas(accuracy_prod)

In [None]:
feature_forecast("forecast (0.05)", features_metrics, transformed_df, verbose=False)

In [None]:
session.visualize("Cryptocurrency forecast for BTC")

In [None]:
session.url + "/#/dashboard/010"

In [None]:
m["Prev day returns"] = tt.date_shift(
    m["Returns"], on=h["Date"], offset="-1D", method="exact"
)
m["Trend"] = tt.where(
    (m["Returns"] - m["Prev day returns"]) < 0,
    -1,
    tt.where((m["Returns"] - m["Prev day returns"]) > 0, 1, 0),
)
m["Trend Sign"] = tt.where(
    (m["Returns"] - m["Prev day returns"]) < 0,
    "📉",
    tt.where((m["Returns"] - m["Prev day returns"]) > 0, "📈", "➡"),

In [None]:
session.visualize("Trend analysis across forcast")

In [None]:

m["Gaussian confidence"] = 0.05
m["Distribution type"] = tt.where(
    m["norm_p"] != None,
    tt.where((m["norm_p"] < m["Gaussian confidence"]), "Non-Gaussian", "Gaussian"),
)

In [None]:
session.visualize("Distribution check for Returns at 95% confidence level")

In [None]:
session.visualize("Durbin-watson check")

In [None]:
session.visualize("Forecast accuracy")

In [None]:
session.visualize("Tweet volume by cryptocurrency")

In [None]:
session.url + "/#/dashboard/332"

In [None]:
significant_simulation = cube.setup_simulation(
    "Significance simulation", replace=[m["alpha"]], base_scenario="0.05"
).scenarios

In [None]:
significant_simulation["0.01"] = 0.01
significant_simulation["0.1"] = 0.1

In [None]:
features_metrics = cube.query(
    m["Granger causality"],
    levels=[l["Significance simulation"], l["coin_symbol"], l["x"]],
    condition=(l["Test name"] == "params_ftest")
    & (l["Significance simulation"].isin("0.01", "0.1")),
).reset_index()
features_metrics.head()

In [None]:

for alpha in features_metrics["Significance simulation"].unique():
    print("************************************************************")
    print(f"Forecasting at Granger Causality Test at significant level {alpha}")
    f = features_metrics.loc[features_metrics["Significance simulation"] == alpha]
    feature_forecast(f"forecast ({alpha})", f, transformed_df)

In [None]:
session.visualize("Total accuracy score over the number of forecasts")

In [None]:
session.visualize(
    "Features used in forecasting cryptocurrencies with more than 50% accuracy"
)

In [None]:
session.url + "/#/dashboard/332"