-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTestWriteDataAndQueryBidAsk.py
57 lines (49 loc) · 1.62 KB
/
TestWriteDataAndQueryBidAsk.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from datetime import datetime
import Artesian
from Artesian import Query
from dateutil import tz
from Artesian import MarketData
from Artesian.Granularity import Granularity
cfg = Artesian.ArtesianConfig("https://arkive.artesian.cloud/tenantName/", "APIKey")
mkdservice = Artesian.MarketData.MarketDataService(cfg)
bidAsk = Artesian.MarketData.MarketDataEntityInput(
"PythonSDK",
"TestBidAskWriteAndRead",
Granularity.Hour,
MarketData.MarketDataType.BidAsk,
"CET",
tags={"TestSDKPython": ["PythonValue2"]},
)
registered = mkdservice.readMarketDataRegistryByName(
bidAsk.providerName, bidAsk.marketDataName
)
if registered is None:
registered = mkdservice.registerMarketData(bidAsk)
bidAsk = MarketData.UpsertData(
Artesian.MarketData.MarketDataIdentifier(
bidAsk.providerName, bidAsk.marketDataName
),
"UTC",
bidAsk={
datetime(2020, 1, 1): {
"Feb-20": MarketData.BidAskValue(bestBidPrice=15.0, lastQuantity=14.0),
"Mar-20": MarketData.BidAskValue(bestBidPrice=25.0, lastQuantity=24.0),
},
datetime(2020, 1, 2): {
"Feb-20": MarketData.BidAskValue(bestBidPrice=15.0, lastQuantity=14.0),
"Mar-20": MarketData.BidAskValue(bestBidPrice=25.0, lastQuantity=24.0),
},
},
downloadedAt=datetime(2020, 1, 3).replace(tzinfo=tz.UTC),
)
mkdservice.upsertData(bidAsk)
query = Query.QueryService(cfg)
res = (
query.createBidAsk()
.forMarketData([registered.marketDataId])
.inAbsoluteDateRange("2020-01-01", "2020-01-02")
.forProducts(["Feb-20", "Mar-20"])
.inTimeZone("CET")
.execute()
)
print(res)