Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(alpacabkfeeder): extended_hours config #613

Merged
merged 2 commits into from Aug 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 12 additions & 4 deletions contrib/alpacabkfeeder/README.md
Expand Up @@ -68,14 +68,22 @@ bgworkers:
# Numbers separated by commas are allowed. Example: "0,15,30,45" -> execute every 15 minutes.
# Whitespaces are ignored.
off_hours_schedule: "0,15,30,45"
# Alpaca Broker API Feeder runs from openTime ~ closeTime (UTC)
openTime: "14:30:00" # 14:30(UTC) = 09:30 (EST)
closeTime: "21:00:00" # 21:00(UTC) = 16:00 (EST)
# (Deprecated) Alpaca Broker API Feeder runs from openTime ~ closeTime (UTC)
# openTime: "14:30:00" # 14:30(UTC) = 09:30 (EST)
# closeTime: "14:29:00"
# Alpaca Broker API Feeder runs between open_time_NY and close_time_NY
# (in "America/New_York" Location)
open_time_NY: "9:25:00"
close_time_NY: "16:10:00"
# When extended_hours is false, TICK data during the off-hours
# (= time < openTime and time > closeTime) are dropped and not stored in DB,
# even when off_hours_schedule is set.
extended_hours: false
# Alpaca Broker API Feeder doesn't run on the following days of the week
closedDaysOfTheWeek:
- "Saturday"
- "Sunday"
# Alpaca Broker API Feeder doesn't run on the closed dates (in JST)
# Alpaca Broker API Feeder doesn't run on the closed dates (in "America/New_York" location)
# (cf. https://www.jpx.co.jp/corporate/about-jpx/calendar/ )
closedDays:
- "2021-12-24"
Expand Down
42 changes: 29 additions & 13 deletions contrib/alpacabkfeeder/alpacav2.go
Expand Up @@ -34,12 +34,7 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) {
apiCli := apiClient(config)

// init Market Time Checker
var timeChecker feed.MarketTimeChecker
timeChecker = feed.NewDefaultMarketTimeChecker(
config.ClosedDaysOfTheWeek,
config.ClosedDays,
config.OpenTime,
config.CloseTime)
var timeChecker feed.MarketTimeChecker = defaultTimeChecker(config)
if config.OffHoursSchedule != "" {
scheduleMin, err := feed.ParseSchedule(config.OffHoursSchedule)
if err != nil {
Expand All @@ -53,6 +48,11 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) {
timeChecker,
scheduleMin,
)

if !config.ExtendedHours {
log.Warn("[Alpaca Broker Feeder] both off_hours_schedule and extend_hours=false is set! " +
"off-hour records won't be stored.")
}
}

ctx := context.Background()
Expand All @@ -72,12 +72,6 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) {
timer.RunEveryDayAt(ctx, config.SymbolsUpdateTime, sm.UpdateSymbols)
log.Info("updated symbols using a remote json file.")

// init SnapshotWriter
var ssw writer.SnapshotWriter = writer.SnapshotWriterImpl{
MarketStoreWriter: &writer.MarketStoreWriterImpl{},
Timeframe: config.Timeframe,
Timezone: utils.InstanceConfig.Timezone,
}
// init BarWriter
var bw writer.BarWriter = writer.BarWriterImpl{
MarketStoreWriter: &writer.MarketStoreWriterImpl{},
Expand All @@ -99,7 +93,7 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) {
MarketTimeChecker: timeChecker,
APIClient: apiCli,
SymbolManager: sm,
SnapshotWriter: ssw,
SnapshotWriter: snapshotWriter(config),
BarWriter: bw,
Interval: config.Interval,
}, nil
Expand All @@ -120,4 +114,26 @@ func apiClient(config *configs.DefaultConfig) *api.Client {
return api.NewClient(cred)
}

func defaultTimeChecker(config *configs.DefaultConfig) *feed.DefaultMarketTimeChecker {
return feed.NewDefaultMarketTimeChecker(
config.ClosedDaysOfTheWeek,
config.ClosedDays,
config.OpenHourNY, config.OpenMinuteNY,
config.CloseHourNY, config.CloseMinuteNY)
}

func snapshotWriter(config *configs.DefaultConfig) writer.SnapshotWriter {
var tc writer.MarketTimeChecker = &writer.NoopMarketTimeChecker{}
if !config.ExtendedHours {
tc = defaultTimeChecker(config)
}

return writer.NewSnapshotWriterImpl(
&writer.MarketStoreWriterImpl{},
config.Timeframe,
utils.InstanceConfig.Timezone,
tc,
)
}

func main() {}
45 changes: 28 additions & 17 deletions contrib/alpacabkfeeder/configs/config.go
Expand Up @@ -4,6 +4,8 @@ import (
"strings"
"time"

"github.com/alpacahq/marketstore/v4/utils/log"

jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
)
Expand All @@ -22,19 +24,20 @@ var json = jsoniter.ConfigCompatibleWithStandardLibrary
// DefaultConfig is the configuration for Alpaca Broker API Feeder you can define in
// marketstore's config file through bgworker extension.
type DefaultConfig struct {
Exchanges []Exchange `json:"exchanges"`
SymbolsUpdateTime time.Time `json:"symbols_update_time"`
UpdateTime time.Time `json:"update_time"`
StocksJSONURL string `json:"stocks_json_url"`
StocksJSONBasicAuth string `json:"stocks_json_basic_auth"`
Timeframe string `json:"timeframe"`
APIKeyID string `json:"api_key_id"`
APISecretKey string `json:"api_secret_key"`
OpenTime time.Time
CloseTime time.Time
ClosedDaysOfTheWeek []time.Weekday
ClosedDays []time.Time
Interval int `json:"interval"`
Exchanges []Exchange `json:"exchanges"`
SymbolsUpdateTime time.Time `json:"symbols_update_time"`
UpdateTime time.Time `json:"update_time"`
StocksJSONURL string `json:"stocks_json_url"`
StocksJSONBasicAuth string `json:"stocks_json_basic_auth"`
Timeframe string `json:"timeframe"`
APIKeyID string `json:"api_key_id"`
APISecretKey string `json:"api_secret_key"`
OpenHourNY, OpenMinuteNY int
CloseHourNY, CloseMinuteNY int
ExtendedHours bool
ClosedDaysOfTheWeek []time.Weekday
ClosedDays []time.Time
Interval int `json:"interval"`
// The data-feeding is executed when 'minute' of the current time matches off_hours_schedule
// even when the market is cloded. Example: "10" -> execute at 00:10, 01:10, 02:10,...,23:10
// Numbers separated by commas are allowed. Example: "0,15,30,45" -> execute every 15 minutes.
Expand Down Expand Up @@ -84,8 +87,10 @@ func (c *DefaultConfig) UnmarshalJSON(input []byte) error {
aux := &struct {
SymbolsUpdateTime CustomTime `json:"symbols_update_time"`
UpdateTime CustomTime `json:"update_time"`
OpenTime CustomTime `json:"openTime"`
CloseTime CustomTime `json:"closeTime"`
OpenTime CustomTime `json:"openTime"` // deprecated
CloseTime CustomTime `json:"closeTime"` // deprecated
OpenTimeNY CustomTime `json:"open_time_NY"`
CloseTimeNY CustomTime `json:"close_time_NY"`
ClosedDaysOfTheWeek []weekday `json:"closedDaysOfTheWeek"`
ClosedDays []CustomDay `json:"closedDays"`
*Alias
Expand All @@ -96,8 +101,14 @@ func (c *DefaultConfig) UnmarshalJSON(input []byte) error {
}
c.SymbolsUpdateTime = time.Time(aux.SymbolsUpdateTime)
c.UpdateTime = time.Time(aux.UpdateTime)
c.OpenTime = time.Time(aux.OpenTime)
c.CloseTime = time.Time(aux.CloseTime)
if !time.Time(aux.OpenTime).IsZero() || !time.Time(aux.CloseTime).IsZero() {
log.Error("!!!!!!!!open_time and close_time config are DEPRECATED!!!!!!!! " +
"Please use open_time_NY and close_time_NY instead.")
return errors.New("!!!!!!!!open_time and close_time config are DEPRECATED!!!!!!!! " +
"Please use open_time_NY and close_time_NY instead.")
}
c.OpenHourNY, c.OpenMinuteNY, _ = time.Time(aux.OpenTimeNY).Clock()
c.CloseHourNY, c.CloseMinuteNY, _ = time.Time(aux.CloseTimeNY).Clock()
c.ClosedDaysOfTheWeek = convertTime(aux.ClosedDaysOfTheWeek)
c.ClosedDays = convertDate(aux.ClosedDays)

Expand Down
45 changes: 18 additions & 27 deletions contrib/alpacabkfeeder/feed/time_checker.go
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/alpacahq/marketstore/v4/utils/log"
)

var jst = time.FixedZone("Asia/Tokyo", 9*60*60)
var ny, _ = time.LoadLocation("America/New_York")

// MarketTimeChecker is an interface to check if the market is open at the specified time or not.
type MarketTimeChecker interface {
Expand All @@ -26,53 +26,44 @@ type MarketTimeChecker interface {
// all those settings should be defined in this object.
type DefaultMarketTimeChecker struct {
// i.e. []string{"Saturday", "Sunday"}
ClosedDaysOfTheWeek []time.Weekday
ClosedDays []time.Time
OpenTime time.Time
CloseTime time.Time
ClosedDaysOfTheWeek []time.Weekday
ClosedDays []time.Time
OpenHourNY, OpenMinuteNY int
CloseHourNY, CloseMinuteNY int
}

// NewDefaultMarketTimeChecker initializes the DefaultMarketTimeChecker object with the specifier parameters.s.
func NewDefaultMarketTimeChecker(
closedDaysOfTheWeek []time.Weekday,
closedDays []time.Time,
openTime time.Time,
closeTime time.Time,
openHourNY, openMinuteNY int,
closeHourNY, closeMinuteNY int,
) *DefaultMarketTimeChecker {
return &DefaultMarketTimeChecker{
ClosedDaysOfTheWeek: closedDaysOfTheWeek,
ClosedDays: closedDays,
OpenTime: openTime,
CloseTime: closeTime,
OpenHourNY: openHourNY, OpenMinuteNY: openMinuteNY,
CloseHourNY: closeHourNY, CloseMinuteNY: closeMinuteNY,
}
}

// IsOpen returns true on weekdays from 08:55 to 15:10.
// if closedDates are defined, return false on those days.
func (m *DefaultMarketTimeChecker) IsOpen(t time.Time) bool {
timeInJst := t.In(jst)
return m.isOpenDate(timeInJst) && m.isOpenWeekDay(timeInJst) && m.isOpenTime(t)
tNY := t.In(ny)
return m.isOpenDate(tNY) && m.isOpenWeekDay(tNY) && m.isOpenTime(tNY)
}

// isOpenTime returns true if the specified time is between the OpenTime and the CloseTime.
func (m *DefaultMarketTimeChecker) isOpenTime(t time.Time) bool {
minFrom12am := t.Hour()*60 + t.Minute()
func (m *DefaultMarketTimeChecker) isOpenTime(nyT time.Time) bool {
nyTYear, nyTMonth, nyTDay := nyT.Date()
openTimeNY := time.Date(nyTYear, nyTMonth, nyTDay, m.OpenHourNY, m.OpenMinuteNY, 0, 0, ny)
closeTimeNY := time.Date(nyTYear, nyTMonth, nyTDay, m.CloseHourNY, m.CloseMinuteNY, 0, 0, ny)

openMinFrom12am := m.OpenTime.Hour()*60 + m.OpenTime.Minute()
closeMinFrom12am := m.CloseTime.Hour()*60 + m.CloseTime.Minute()

// if the open hour is later than the close hour (i.e. open=23h, close=6h), +1day
if closeMinFrom12am < openMinFrom12am {
closeMinFrom12am += 24 * 60
}
if minFrom12am < openMinFrom12am {
minFrom12am += 24 * 60
}

if minFrom12am < openMinFrom12am || minFrom12am >= closeMinFrom12am {
if nyT.Before(openTimeNY) || nyT.After(closeTimeNY) {
log.Debug(fmt.Sprintf("[Alpaca Broker Feeder] market is not open. "+
"openTime=%02d:%02d, closeTime=%02d:%02d, now=%v",
m.OpenTime.Hour(), m.OpenTime.Minute(), m.CloseTime.Hour(), m.CloseTime.Minute(), t))
"openTime(NewYork)=%02d:%02d, closeTime(NewYork)=%02d:%02d, now=%v",
openTimeNY.Hour(), openTimeNY.Minute(), closeTimeNY.Hour(), closeTimeNY.Minute(), nyT))
return false
}
return true
Expand Down