## Imports

In [None]:
%load_ext autoreload
%autoreload 2

import logging
import os

import pandas as pd

import helpers.dbg as dbg
import helpers.printing as prnt

In [None]:
prnt.config_notebook()

# dbg.init_logger(verbosity=logging.DEBUG)
dbg.init_logger(verbosity=logging.INFO)
# dbg.test_logger()
_LOG = logging.getLogger(__name__)

# Real-time node

## Build pipeline


In [None]:
import core.config as cconfig
import dataflow as cdataf
import dataflow.real_time as cdrt
import dataflow_amp.returns.pipeline as darp

dag_builder = darp.ReturnsPipeline()
config = dag_builder.get_config_template()

# # Add the source node.
# source_config = cconfig.get_config_from_nested_dict(
#     {
#         "func": cldns.load_single_instrument_data,
#         "func_kwargs": {
#             "start_date": datetime.date(2010, 6, 29),
#             "end_date": datetime.date(2010, 7, 13),
#         },
#     }
# )
# config["load_prices"] = source_config
# config["resample_prices_to_1min", "func_kwargs", "volume_cols"] = ["volume"]
# config["compute_vwap", "func_kwargs", "rule"] = "15T"
# config["compute_vwap", "func_kwargs", "volume_col"] = "volume"

if False:
    from im.kibot.data.config import S3_PREFIX

    ticker = "AAPL"
    file_path = os.path.join(S3_PREFIX, "pq/sp_500_1min", ticker + ".pq")
    source_node_kwargs = {
        "func": cdataf.load_data_from_disk,
        "func_kwargs": {
            "file_path": file_path,
            "start_date": pd.to_datetime("2010-01-04 9:30:00"),
            "end_date": pd.to_datetime("2010-01-04 16:05:00"),
        },
    }
    config["load_prices"] = cconfig.get_config_from_nested_dict(
        source_node_kwargs
    )

else:
    start_date = pd.Timestamp("2010-01-04 09:30:00")
    end_date = pd.Timestamp("2010-01-04 11:30:00")

    source_node_kwargs = {
        "columns": ["close", "vol"],
        "start_date": start_date,
        "end_date": end_date,
    }
    config["load_prices"] = cconfig.get_config_from_nested_dict(
        {
            "source_node_name": "real_time_synthetic",
            "source_node_kwargs": source_node_kwargs,
        }
    )

print(config)

In [None]:
dag = dag_builder.get_dag(config)

In [None]:
if False:
    # nid = "compute_ret_0"
    nid = "load_prices"
    node = dag.get_node("load_prices")
    node.reset_current_time()
    node.set_current_time(pd.to_datetime("2010-01-06 9:30:00"))

    dict_ = dag.run_leq_node(nid, "fit")

    print(dict_)

In [None]:
node = dag.get_node("load_prices")
node.reset_current_time()

for now in cdrt.get_now_time(start_date, end_date):
    print("now=", now)
    execute = cdrt.is_dag_to_execute(now)
    if execute:
        print("Time to execute the DAG")
        node = dag.get_node("load_prices")
        node.set_current_time(now)
        #
        sink = dag.get_unique_sink()
        dict_ = dag.run_leq_node(sink, "fit")
        print(dict_["df_out"].tail(3))

## Use real_time_return_pipeline

In [None]:
import dataflow_amp.real_time.real_time_return_pipeline as dtfart

dag_builder = dtfart.RealTimeReturnPipeline()

config = dag_builder.get_config_template()
print("\n# config=\n%s" % config)

dag_builder.validate_config(config)

dag = dag_builder.get_dag(config)

In [None]:
# print(dag)
cdataf.draw(dag)

In [None]:
# # Align on a even second.
# cdrt.align_on_even_second()
# #
# sleep_interval_in_secs = 1.0
# num_iterations = 3
# get_current_time = rrt.get_replayed_current_time
# need_to_execute = cdrt.execute_every_2_seconds
# #
# execution_trace, results = cdrt.execute_dag_with_real_time_loop(
#     sleep_interval_in_secs,
#     num_iterations,
#     get_current_time,
#     need_to_execute,
#     dag,
# )

In [None]:
results[0][1]["df_out"]

In [None]:
##

import dataflow.real_time as cdrt

In [None]:
import helpers.datetime_ as hdatetime

start_datetime = pd.Timestamp("2010-01-04 09:30:00", tz=hdatetime.get_ET_tz())
end_datetime = pd.Timestamp("2010-01-05 09:30:00", tz=hdatetime.get_ET_tz())

# Use a replayed real-time starting at the same time as the data.
rrt = cdrt.ReplayRealTime(start_datetime)
get_current_time = rrt.get_replayed_current_time

In [None]:
import dataflow as cdtf

execute_rt_loop_kwargs = {
    "sleep_interval_in_secs": 1.0,
    "num_iterations": 3,
    "get_current_time": rrt.get_replayed_current_time,
    "need_to_execute": cdrt.execute_every_2_seconds,
}

#
kwargs = {
    "config": config,
    "dag_builder": dag_builder,
    "fit_state": None,
    #
    "execute_rt_loop_kwargs": execute_rt_loop_kwargs,
    #
    "dst_dir": None,
}

dag_runner = cdtf.RealTimeDagRunner(**kwargs)

# Align on a even second.
cdrt.align_on_even_second()
result = dag_runner.predict()

In [None]:
result[0]

In [None]:
len(result[1])

In [None]:
import pandas as pd
import helpers.unit_test as hut

num_cols = 2
seed = 42
date_range_kwargs = {
    "start": pd.Timestamp("2010-01-01"),
    "end": pd.Timestamp("2010-02-01"),
    "freq": "1B",
}
#pd.date_range(**date_range_kwargs)
data = hut.get_random_df(num_cols, seed=seed, date_range_kwargs=date_range_kwargs)
print(data)


config = {
        "rule": "1B",
        "agg_func": "last",
        "resample_kwargs": None,
        "agg_func_kwargs": None,
    }
node = cdnt.Reample("resample", **config)
df_out = node.fit(data)["df_out"]

In [None]:
import dataflow.nodes.transformers as cdtfnt

nid = "nop"
def func(df_in):
    return df_in


func_kwargs = {}

node = cdtfnt.FunctionWrapper(nid,
                       func,
                       func_kwargs)

node.fit(data)

In [None]:
import dataflow.test.test_builders as test_builders

dag_builder = test_builders._NaivePipeline()

config = dag_builder.get_config_template()

dag_builder.get_dag(config)