In [None]:
import histdatacom
from histdatacom.options import Options
options = Options()

import datatable as dt
from datatable import f
from datatable import update

options.api_return_type = "datatable"
options.formats = {"ascii"}
options.timeframes = {"tick-data-quotes"}
options.pairs = {"eurusd"}
options.start_yearmonth = "2000-04"
options.end_yearmonth = "2000-05"
options.cpu_utilization = "high"

data = histdatacom(options)  # (Jupyter)

In [None]:
DT = data.copy()
print(f"""
    type: {type(DT)}
    shape: {DT.shape}
    names: {DT.names}
    types: {DT.stypes}\n\n""",
    DT[0:5,:])


In [None]:
DT = data.copy()
dt_timestamp_to_datetime = ((f.datetime * 10**6).as_type(dt.Type.time64))
DT[:, update(datetime = dt_timestamp_to_datetime)]
print(f"""
    type: {type(DT)}
    shape: {DT.shape}
    names: {DT.names}
    types: {DT.stypes}\n\n""",
    DT[0:5,:])

In [None]:
DT = data.copy()
import rx
from rx import operators as ops
from functools import partial
from collections import namedtuple
from histdatacom.records import Record

In [None]:
 new_record = Record(
     data_fxpair = "EURUSD",
     data_format = "ASCII",
     data_timeframe = "T",
 )

In [None]:
def parse_jay_row(row, record):
    measurement = f"{record.data_fxpair}"
    tags = f"source=histdata.com,format={record.data_format},timeframe={record.data_timeframe}".replace(" ", "")

    match record.data_timeframe:
        case "M1":
            Row = namedtuple('Row', ['datetime', 'open', 'high', 'low', 'close', 'vol'])
            named_row = Row(row[0], row[1], row[2], row[3], row[4], row[5])

            fields = f"openbid={named_row.open},highbid={named_row.high},lowbid={named_row.low},closebid={named_row.close}".replace(" ", "")
            time = str(named_row.datetime)
        case "T":
            Row = namedtuple('Row', ['datetime','bid','ask','vol'])
            named_row = Row(row[0], row[1], row[2], row[3])

            fields = f"bidquote={named_row.bid},askquote={named_row.ask}".replace(" ", "")
            time = str(named_row.datetime)
    
    line_protocol = f"{measurement},{tags} {fields} {time}"

    return line_protocol

In [None]:
def parse_jay_rows(iterable, record):
    mapfunc = partial(parse_row, record=record)
    parsed_rows = list(map(mapfunc, iterable))

    return (parsed_rows)

In [None]:
rx_data = rx.from_iterable(DT.to_tuples()).pipe(ops.buffer_with_count(25_000),
                                    ops.flat_map(lambda rows: parse_rows(rows, new_record)))


In [None]:
rx_data.subscribe(on_next=lambda x: None, 
                  on_error=lambda er: print(f"Unexpected error: {er}"))