Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Basic anomaly detector #393

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions contrib/polygon/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backfill

import (
"fmt"
"github.com/alpacahq/marketstore/v4/uda/anomaly"
"math"

"sync"
Expand Down Expand Up @@ -29,6 +30,7 @@ var WriteTime time.Duration
var ApiCallTime time.Duration
var WaitTime time.Duration
var NoIngest bool
var AnomalyDetection bool

// https://polygon.io/glossary/us/stocks/conditions-indicators
var ConditionToUpdateInfo = map[int]ConsolidatedUpdateInfo{
Expand Down Expand Up @@ -99,6 +101,7 @@ var (
)

func Bars(symbol string, from, to time.Time, batchSize int, writerWP *worker.WorkerPool) (err error) {
// TODO: Add anomaly detection
if from.IsZero() {
from = time.Date(2014, 1, 1, 0, 0, 0, 0, NY)
}
Expand Down Expand Up @@ -256,6 +259,23 @@ func tradesToBars(ticks []api.TradeTick, model *models.Bar, exchangeIDs []int) {
}
}

func showAnomaliesInTrades(symbol string, trades []api.TradeTick) {

prices := make([]float64, len(trades))
for i, trade := range trades {
prices[i] = trade.Price
}

anomalies := anomaly.DetectAnomalyByFixedPct(prices, 3.0)

for i, anomaly_ := range anomalies {
if anomaly_ {
log.Info("Anomaly: %s: %#v\n", symbol, trades[i])
}
}

}

func Trades(symbol string, from time.Time, to time.Time, batchSize int, writerWP *worker.WorkerPool) error {
trades := make([]api.TradeTick, 0)
t := time.Now()
Expand All @@ -275,6 +295,10 @@ func Trades(symbol string, from time.Time, to time.Time, batchSize int, writerWP
}

if len(trades) > 0 {
if AnomalyDetection {
showAnomaliesInTrades(symbol, trades)
}

model := models.NewTrade(symbol, len(trades))
for _, tick := range trades {
// type conversions
Expand Down
2 changes: 2 additions & 0 deletions contrib/polygon/backfill/backfiller/backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (
readFromCache bool
noIngest bool
configFilePath string
anomalyDetection bool

format = "2006-01-02"
)
Expand All @@ -61,6 +62,7 @@ func init() {
flag.StringVar(&cacheDir, "cache-dir", "", "directory to dump polygon's json replies")
flag.BoolVar(&readFromCache, "read-from-cache", false, "read cached results if available")
flag.BoolVar(&noIngest, "no-ingest", false, "do not ingest downloaded data, just store it in cache")
flag.BoolVar(&anomalyDetection, "anomaly-detetion", false, "enable anomaly detection during ingest")
flag.StringVar(&configFilePath, "config", "/etc/mkts.yml", "path to the mkts.yml config file")

flag.Parse()
Expand Down
3 changes: 3 additions & 0 deletions sqlparser/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/alpacahq/marketstore/v4/contrib/candler/candlecandler"
"github.com/alpacahq/marketstore/v4/contrib/candler/tickcandler"
"github.com/alpacahq/marketstore/v4/uda"
"github.com/alpacahq/marketstore/v4/uda/anomaly"
"github.com/alpacahq/marketstore/v4/uda/avg"
"github.com/alpacahq/marketstore/v4/uda/count"
"github.com/alpacahq/marketstore/v4/uda/gap"
Expand All @@ -26,4 +27,6 @@ var AggRegistry = map[string]uda.AggInterface{
"avg": &avg.Avg{},
"Gap": &gap.Gap{},
"gap": &gap.Gap{},
"anomaly": &anomaly.Anomaly{},
"Anomaly": &anomaly.Anomaly{},
}
4 changes: 2 additions & 2 deletions tests/integ/dockerfiles/pyclient/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ test-jsonrpc:
# for json-RPC API
docker exec -e MARKETSTORE_PORT=5993 -e USE_GRPC=false $(CONTAINER_NAME) \
bash -c \
"pytest -v -v -v $(TEST_FILENAME)"
"pytest -v -v -v -s $(TEST_FILENAME)"
tibkiss marked this conversation as resolved.
Show resolved Hide resolved

test-grpc:
# for gRPC API
docker exec -e MARKETSTORE_PORT=5995 -e USE_GRPC=true $(CONTAINER_NAME) \
bash -c \
"pytest -v -v -v $(TEST_FILENAME)"
"pytest -v -v -v -s $(TEST_FILENAME)"
75 changes: 75 additions & 0 deletions tests/integ/tests/test_anomaly_detector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""
Integration Test for anomaly detector
"""
import pytest
import os

import numpy as np
import pandas as pd
from pandas.testing import assert_frame_equal

import pymarketstore as pymkts

# Constants
DATA_TYPE_TICK = [('Epoch', 'i8'), ('Bid', 'f4'), ('Ask', 'f4'), ('Nanoseconds', 'i4')]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i4 is the right type for Nanoseconds, but later in the tests, all nanosec data are given as floats, so numpy trucates all of them to 0. I'm don't think it's causing much trouble, but better be on the safe side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, we need to address that

MARKETSTORE_HOST = "localhost"
MARKETSTORE_PORT = 5993

client = pymkts.Client(f"http://127.0.0.1:{os.getenv('MARKETSTORE_PORT',5993)}/rpc",
grpc=(os.getenv("USE_GRPC", "false") == "true"))


def timestamp(datestr):
return int(pd.Timestamp(datestr).value / 10 ** 9)


@pytest.mark.parametrize('symbol, columns, detection_type, threshold, data, expected_df', [
('AT_SINGLE_COL_FIXED', 'Ask', 'fixed_pct', '0.045',
[(timestamp('2019-01-01 04:19:00'), 15, 11, 0.01), # epoch, bid, ask, nanosecond
(timestamp('2019-01-01 04:19:01'), 20, 11.5, 0.02),
(timestamp('2019-01-02 05:59:59'), 30, 11.6, 0.03)],
pd.DataFrame(index=[pd.Timestamp('2019-01-01 04:19:00', tz='UTC')],
data=np.array([1], dtype='uint64'), columns=['ColumnsBitmap']).rename_axis('Epoch')),
('AT_MULTI_COL_FIXED', 'Bid,Ask', 'fixed_pct', '0.045',
[(timestamp('2019-01-01 04:19:00'), 15, 11, 0.01),
(timestamp('2019-01-01 04:19:01'), 20, 11.5, 0.02),
(timestamp('2019-01-02 05:59:59'), 30, 11.6, 0.03)],
pd.DataFrame(index=[pd.Timestamp('2019-01-01 04:19:00', tz='UTC'),
pd.Timestamp('2019-01-01 04:19:01', tz='UTC')],
data=np.array([3, 1], dtype='uint64'), columns=['ColumnsBitmap']).rename_axis('Epoch')),
('AT_SINGLE_COL_ZSCORE', 'Bid', 'z_score', '1.0',
[(timestamp('2019-01-01 04:19:00'), 10.1, 11, 0.01),
(timestamp('2019-01-01 04:19:01'), 10.2, 11.5, 0.02),
(timestamp('2019-01-01 04:19:03'), 10.1, 11.5, 0.03),
(timestamp('2019-01-01 04:19:04'), 10.3, 11.5, 0.04),
(timestamp('2019-01-01 04:19:05'), 10.2, 11.5, 0.05),
(timestamp('2019-01-01 04:19:06'), 10.2, 11.5, 0.06),
(timestamp('2019-01-02 05:59:59'), 100.1, 11.6, 0.07)],
pd.DataFrame(index=[pd.Timestamp('2019-01-02 05:59:59', tz='UTC')],
data=np.array([1], dtype='uint64'), columns=['ColumnsBitmap']).rename_axis('Epoch')),
('AT_MULTI_COL_ZSCORE', 'Bid,Ask', 'z_score', '1.0',
[(timestamp('2019-01-01 04:19:00'), 10.1, 11, 0.01),
(timestamp('2019-01-01 04:19:01'), 10.2, 11.5, 0.02),
(timestamp('2019-01-01 04:19:03'), 10.1, 0.0015, 0.03),
(timestamp('2019-01-01 04:19:04'), 10.3, 11.5, 0.04),
(timestamp('2019-01-01 04:19:05'), 10.2, 11.5, 0.05),
(timestamp('2019-01-01 04:19:06'), 10.2, 11.5, 0.06),
(timestamp('2019-01-02 05:59:59'), 100.1, 11.6, 0.07)],
pd.DataFrame(index=[pd.Timestamp('2019-01-01 04:19:03', tz='UTC'),
pd.Timestamp('2019-01-02 05:59:59', tz='UTC')],
tibkiss marked this conversation as resolved.
Show resolved Hide resolved
data=np.array([2,1], dtype='uint64'), columns=['ColumnsBitmap']).rename_axis('Epoch')),
])
def test_anomaly_one_symbol(symbol, columns, detection_type, threshold, data, expected_df):
# ---- given ----
tbk = "{}/1Sec/TICK".format(symbol)
client.destroy(tbk)
client.write(np.array(data, dtype=DATA_TYPE_TICK), tbk, isvariablelength=True)

# ---- when ----
params = pymkts.Params(symbol, '1Sec', 'TICK')
params.functions = [f"anomaly('{columns}', '{detection_type}', '{threshold}')"]
reply = client.query(params)

# ---- then ----
actual_df = reply.first().df()
assert_frame_equal(actual_df, expected_df)
Loading