In [1]:
%load_ext dotenv
%dotenv -o

import os
not not os.environ["POLYGON_API_KEY"]

True

In [2]:
from zipline_polygon_bundle.config import PolygonConfig
from zipline_polygon_bundle import date_to_path, trades_schema, cast_trades, custom_aggs_schema, custom_aggs_partitioning

from typing import Iterator, Tuple

import pandas as pd

import pyarrow as pa
from pyarrow import dataset as pa_ds
from pyarrow import compute as pa_compute
from pyarrow import parquet as pa_parquet

from fsspec.implementations.arrow import ArrowFSWrapper

import os

import datetime
import pandas_market_calendars

from itables import show

In [3]:
if not "CUSTOM_ASSET_FILES_DIR" in os.environ:
    os.environ["CUSTOM_ASSET_FILES_DIR"]= "/home/jovyan/data"

config = PolygonConfig(
    environ=os.environ,
    calendar_name="NYSE",
    start_date="2021-01-01",
    end_date="2021-01-08",
    agg_time="1min",
)

print(f"{config.minute_aggs_dir=}")
print(f"{config.trades_dir=}")
print(f"{config.custom_aggs_dir=}")
print(f"{config.aggs_dir=}")
print(f"{config.by_ticker_dir=}")

config.minute_aggs_dir='/media/mirror/files.polygon.io/flatfiles/us_stocks_sip/minute_aggs_v1'
config.trades_dir='/media/mirror/files.polygon.io/flatfiles/us_stocks_sip/trades_v1'
config.custom_aggs_dir='/home/jovyan/data/60sec_aggs'
config.aggs_dir='/home/jovyan/data/60sec_aggs'
config.by_ticker_dir='/home/jovyan/data/60sec_aggs_by_ticker'


In [4]:
aggs_ds = pa_ds.dataset(config.custom_aggs_dir, schema=custom_aggs_schema(), partitioning=custom_aggs_partitioning())
aggs_ds.schema

ticker: string not null
volume: int64 not null
open: double not null
close: double not null
high: double not null
low: double not null
window_start: timestamp[ns, tz=UTC] not null
transactions: int64 not null
date: date32[day] not null
year: uint16 not null
month: uint8 not null

In [5]:
pa_ds.get_partition_keys(aggs_ds.partition_expression)

{}

In [9]:
for fragment in aggs_ds.get_fragments():
    print(f"{fragment.partition_expression=}")
    print(f"{pa_ds.get_partition_keys(fragment.partition_expression)=}")
    # print(f"{fragment.partition_expression.get_partition_values()=}")
    # print(f"{fragment.physical_schema=}")


fragment.partition_expression=<pyarrow.compute.Expression (((year == 2017) and (month == 1)) and (date == 2017-01-03))>
pa_ds.get_partition_keys(fragment.partition_expression)={'date': datetime.date(2017, 1, 3), 'month': 1, 'year': 2017}
fragment.partition_expression=<pyarrow.compute.Expression (((year == 2017) and (month == 1)) and (date == 2017-01-04))>
pa_ds.get_partition_keys(fragment.partition_expression)={'date': datetime.date(2017, 1, 4), 'month': 1, 'year': 2017}
fragment.partition_expression=<pyarrow.compute.Expression (((year == 2017) and (month == 1)) and (date == 2017-01-05))>
pa_ds.get_partition_keys(fragment.partition_expression)={'date': datetime.date(2017, 1, 5), 'month': 1, 'year': 2017}
fragment.partition_expression=<pyarrow.compute.Expression (((year == 2017) and (month == 1)) and (date == 2017-01-06))>
pa_ds.get_partition_keys(fragment.partition_expression)={'date': datetime.date(2017, 1, 6), 'month': 1, 'year': 2017}


In [10]:
for fragment in aggs_ds.get_fragments():
    print(f"{fragment.partition_expression=}")
    print(f"{pa_ds.get_partition_keys(fragment.partition_expression)=}")
    # print(f"{fragment.physical_schema=}")
    table = fragment.to_table()
    print(f"{table.slice(length=1)=}")
    print(f"{table.schema=}")

fragment.partition_expression=<pyarrow.compute.Expression (((year == 2017) and (month == 1)) and (date == 2017-01-03))>
pa_ds.get_partition_keys(fragment.partition_expression)={'date': datetime.date(2017, 1, 3), 'month': 1, 'year': 2017}
table.slice(length=1)=pyarrow.Table
ticker: string not null
volume: int64 not null
open: double not null
close: double not null
high: double not null
low: double not null
window_start: timestamp[ns, tz=UTC] not null
transactions: int64 not null
----
ticker: [["AAL"]]
volume: [[82]]
open: [[46.6]]
close: [[46.6]]
high: [[46.6]]
low: [[46.6]]
window_start: [[2017-01-03 09:00:00.000000000Z]]
transactions: [[2]]
table.schema=ticker: string not null
volume: int64 not null
open: double not null
close: double not null
high: double not null
low: double not null
window_start: timestamp[ns, tz=UTC] not null
transactions: int64 not null
fragment.partition_expression=<pyarrow.compute.Expression (((year == 2017) and (month == 1)) and (date == 2017-01-04))>
pa_ds.ge