## Imports

In [1]:
%load_ext autoreload
%autoreload 2

import datetime
import logging
import os

import pandas as pd
from pyarrow import parquet
import s3fs

import helpers.dbg as dbg
import helpers.env as env
import helpers.printing as prnt

In [2]:
prnt.config_notebook()

# dbg.init_logger(verbosity=logging.DEBUG)
dbg.init_logger(verbosity=logging.INFO)
# dbg.test_logger()
_LOG = logging.getLogger(__name__)

[0m[36mINFO[0m: > cmd='/venv/lib/python3.8/site-packages/ipykernel_launcher.py -f /root/.local/share/jupyter/runtime/kernel-26b9c9dd-34a1-43ca-8ac4-d0545f037ca0.json'


# Real-time node

In [3]:
import time

## Test real-time node

In [4]:
import core.dataflow.nodes.sources as cdtfns

nid = "rtds"
start_date = pd.Timestamp("2010-01-04 09:30:00")
end_date = pd.Timestamp("2010-01-10 09:30:00")

columns = ["close", "volume"]
rtds = cdtfns.RealTimeSyntheticDataSource("rtds", columns, start_date, end_date)

now = pd.Timestamp("2010-01-04 09:35:00")
rtds.set_current_time(now)
    
rtds.fit()

  from tqdm.autonotebook import tqdm


{'df_out':                         close    volume
 2010-01-04 09:30:00  0.166325  0.476288
 2010-01-04 09:31:00  0.483731  0.870343
 2010-01-04 09:32:00  0.547852  0.966650
 2010-01-04 09:33:00  0.395545  0.802012
 2010-01-04 09:34:00  0.230954  1.045426
 2010-01-04 09:35:00  0.656137  1.241316}

## Build pipeline

In [5]:
import dataflow_amp.real_time.utils as dartu
import dataflow_amp.returns.pipeline as darp
import core.dataflow as cdataf
import core.config as cconfig

dag_builder = darp.ReturnsPipeline()
config = dag_builder.get_config_template()

# # Add the source node.
# source_config = cconfig.get_config_from_nested_dict(
#     {
#         "func": cldns.load_single_instrument_data,
#         "func_kwargs": {
#             "start_date": datetime.date(2010, 6, 29),
#             "end_date": datetime.date(2010, 7, 13),
#         },
#     }
# )
# config["load_prices"] = source_config
# config["resample_prices_to_1min", "func_kwargs", "volume_cols"] = ["volume"]
# config["compute_vwap", "func_kwargs", "rule"] = "15T"
# config["compute_vwap", "func_kwargs", "volume_col"] = "volume"

if False:
    from im.kibot.data.config import S3_PREFIX

    ticker = "AAPL"
    file_path = os.path.join(S3_PREFIX, "pq/sp_500_1min", ticker + ".pq")
    source_node_kwargs = {
        "func": cdataf.load_data_from_disk,
        "func_kwargs": {
            "file_path": file_path,
            "start_date": pd.to_datetime("2010-01-04 9:30:00"),
            "end_date": pd.to_datetime("2010-01-04 16:05:00"),
        },
    }
    config["load_prices"] = cconfig.get_config_from_nested_dict(
        source_node_kwargs
    )
    
else:
    start_date = pd.Timestamp("2010-01-04 09:30:00")
    end_date = pd.Timestamp("2010-01-04 11:30:00")
    
    source_node_kwargs = {
        "columns": ["close", "vol"],
        "start_date": start_date,
        "end_date": end_date,
    }
    config["load_prices"] = cconfig.get_config_from_nested_dict({
        "source_node_name": "real_time_synthetic",
        "source_node_kwargs": source_node_kwargs
    })

print(config)

load_prices:
  source_node_name: real_time_synthetic
  source_node_kwargs:
    columns: ['close', 'vol']
    start_date: 2010-01-04 09:30:00
    end_date: 2010-01-04 11:30:00
filter_weekends:
  col_mode: replace_all
filter_ath:
  col_mode: replace_all
  transformer_kwargs:
    start_time: 09:30:00
    end_time: 16:00:00
resample_prices_to_1min:
  func_kwargs:
    rule: 1T
    price_cols: ['close']
    volume_cols: ['vol']
compute_vwap:
  func_kwargs:
    rule: 5T
    price_col: close
    volume_col: vol
    add_bar_start_timestamps: True
    add_epoch: True
    add_last_price: True
compute_ret_0:
  cols: ['twap', 'vwap']
  col_mode: merge_all
  transformer_kwargs:
    mode: pct_change


In [6]:
dag = dag_builder.get_dag(config)

In [7]:
if False:
    #nid = "compute_ret_0"
    nid = "load_prices"
    node = dag.get_node("load_prices")
    node.reset_current_time()
    node.set_current_time(pd.to_datetime("2010-01-06 9:30:00"))

    dict_ = dag.run_leq_node(nid, "fit")

    print(dict_)

In [None]:
node = dag.get_node("load_prices")
node.reset_current_time()
    
for now in dartu.get_now_time(start_date, end_date):
    print("now=", now)
    execute = dartu.is_dag_to_execute(now)
    if execute:
        print("Time to execute the DAG")
        node = dag.get_node("load_prices")
        node.set_current_time(now)
        #
        sink = dag.get_unique_sink()
        dict_ = dag.run_leq_node(sink, "fit")
        print(dict_["df_out"].tail(3))

now= 2010-01-04 09:30:00
Time to execute the DAG


run_leq_node:   0%|          | 0/6 [00:00<?, ?it/s]

                     vwap  twap  last bar_start_timestamp    minute  twap_ret_0  vwap_ret_0
2010-01-04 09:30:00   NaN   NaN   NaN 2010-01-04 09:25:00  21043290         NaN         NaN
now= 2010-01-04 09:31:00
now= 2010-01-04 09:32:00
now= 2010-01-04 09:33:00
now= 2010-01-04 09:34:00
now= 2010-01-04 09:35:00
Time to execute the DAG
