## Dolphindb Quickstart

In [1]:
import datetime
import dolphindb
import shioaji as sj
import pandas as pd

In [2]:
import json
with open("login.json", "r") as f:
    login_kwargs = json.loads(f.read())

In [3]:
api = sj.Shioaji()
accounts = api.login(**login_kwargs, contracts_cb=print)

Response Code: 0 | Event Code: 0 | Info: host '203.66.91.161:80', IP 203.66.91.161:80 (host 1 of 1) (host connection attempt 1 of 1) (total connection attempt 1 of 1) | Event: Session up
SecurityType.Index
SecurityType.Future
SecurityType.Option
SecurityType.Stock


In [4]:
ddb = dolphindb.session()
ddb.connect("localhost", 8828, "admin", "123456")

True

### Use sj to fetch today tick data

In [5]:
contract = api.Contracts.Stocks["2330"]

In [6]:
ticks = api.ticks(contract)

In [7]:
df = pd.DataFrame({**ticks})
df["ts"] = pd.to_datetime(df["ts"])
df.head()

Unnamed: 0,bid_price,ts,close,volume,bid_volume,ask_price,ask_volume
0,318.5,2020-06-24 09:00:01.116046,319.0,3027,482,319.0,756
1,318.5,2020-06-24 09:00:01.949696,318.5,1,508,319.0,823
2,318.5,2020-06-24 09:00:02.068738,318.5,3,505,319.0,823
3,318.5,2020-06-24 09:00:02.071996,319.0,1,505,319.0,822
4,318.5,2020-06-24 09:00:02.087298,318.5,1,504,319.0,822


### upload dataframe to dolphindb memory

In [8]:
ddb.upload({"ddb_table": df})

### query ddb table back to python as dataframe

In [9]:
ddb.run("""
select top 5 * from ddb_table
""")

Unnamed: 0,bid_price,ts,close,volume,bid_volume,ask_price,ask_volume
0,318.5,2020-06-24 09:00:01.116046,319.0,3027,482,319.0,756
1,318.5,2020-06-24 09:00:01.949696,318.5,1,508,319.0,823
2,318.5,2020-06-24 09:00:02.068738,318.5,3,505,319.0,823
3,318.5,2020-06-24 09:00:02.071996,319.0,1,505,319.0,822
4,318.5,2020-06-24 09:00:02.087298,318.5,1,504,319.0,822


### create dolphindb table

In [32]:
CREATE_TABLE = """
ticks_db = database("dfs://ticks", VALUE, 2020.06.23..2020.06.24)
ticks_table = table(
    array(DATE, 0) as Date,
    array(TIME, 0) as Time,
    array(SYMBOL, 0) as Code,
    array(DOUBLE, 0) as Price,
    array(LONG, 0) as Volume,
    array(DOUBLE, 0) as AskPrice,
    array(LONG, 0) as AskVolume,
    array(DOUBLE, 0) as BidPrice,
    array(LONG, 0) as BidVolume
)
ticks_db.createPartitionedTable(ticks_table, `ticks_table, `Date)
"""

In [33]:
ddb.run(CREATE_TABLE)

Unnamed: 0,Date,Time,Code,Price,Volume,AskPrice,AskVolume,BidPrice,BidVolume


### load table and insert data

In [10]:
LOAD_TABLE = """
ticks_table = loadTable("dfs://ticks",`ticks_table)
ticks_table
"""

In [11]:
ddb.run(LOAD_TABLE)

Unnamed: 0,Date,Time,Code,Price,Volume,AskPrice,AskVolume,BidPrice,BidVolume


In [12]:
ddb.upload({"df": df})

In [13]:
ddb.run(f"""
select date(ts) as Date, time(ts) as Time, 
       "{contract.code}" as Code, close as Price,
       volume, ask_price, ask_volume, 
       bid_price, bid_volume from df
""").head()

Unnamed: 0,Date,Time,Code,Price,volume,ask_price,ask_volume,bid_price,bid_volume
0,2020-06-24,1970-01-01 09:00:01.116,2330,319.0,3027,319.0,756,318.5,482
1,2020-06-24,1970-01-01 09:00:01.949,2330,318.5,1,319.0,823,318.5,508
2,2020-06-24,1970-01-01 09:00:02.068,2330,318.5,3,319.0,823,318.5,505
3,2020-06-24,1970-01-01 09:00:02.071,2330,319.0,1,319.0,822,318.5,505
4,2020-06-24,1970-01-01 09:00:02.087,2330,318.5,1,319.0,822,318.5,504


In [49]:
ddb.run(f"""
ticks_table.append!(select date(ts) as Date, time(ts) as Time, 
       "{contract.code}" as Code, close as Price,
       volume, ask_price, ask_volume, 
       bid_price, bid_volume from df)
""")

### query table

In [14]:
ddb.run("""
select concatDateTime(Date, Time) as DateTime, 
       Code, Price, Volume, AskPrice, AskVolume, 
       BidPrice, BidVolume from ticks_table
""").head()

Unnamed: 0,DateTime,Code,Price,Volume,AskPrice,AskVolume,BidPrice,BidVolume
0,2020-06-23 09:00:00.963,2330,316.0,2586,316.5,793,316.0,14
1,2020-06-23 09:00:00.986,2330,316.0,1,316.5,793,316.0,13
2,2020-06-23 09:00:01.084,2330,316.0,10,316.5,814,316.0,3
3,2020-06-23 09:00:01.170,2330,316.0,2,316.5,814,316.0,1
4,2020-06-23 09:00:01.588,2330,316.0,2,316.5,811,316.0,125


### streaming table

In [45]:
STREAM_TABLE = """
ticks_streaming_schema = streamTable(
    array(DATE, 0) as Date,
    array(TIME, 0) as Time,
    array(SYMBOL, 0) as Code,
    array(DOUBLE, 0) as Price,
    array(LONG, 0) as Volume,
    array(DOUBLE, 0) as AskPrice,
    array(LONG, 0) as AskVolume,
    array(DOUBLE, 0) as BidPrice,
    array(LONG, 0) as BidVolume
)
enableTableShareAndPersistence(table=ticks_streaming_schema,
    asynWrite=true, compress=false, 
    cacheSize=50000, tableName="StreamTicks")
undef(`ticks_streaming_schema)
"""

In [46]:
ddb.run(STREAM_TABLE)

In [44]:
# ddb.run("dropStreamTable(`StreamTicks)")

In [47]:
ddb.run("select * from StreamTicks")

Unnamed: 0,Date,Time,Code,Price,Volume,AskPrice,AskVolume,BidPrice,BidVolume


### subscribe stream table

In [18]:
ddb.run("""
subscribeTable(,`StreamTicks, "Ticks_to_dfs", -1, ticks_table, true)
""")

'localhost:8848:local8848/StreamTicks/Ticks_to_dfs'

In [48]:
def handler(data: pd.DataFrame):
    print(data)

In [50]:
ddb.enableStreaming(6543)

In [53]:
ddb.subscribe("127.0.0.1", 8828, handler, "StreamTicks", "PythonSub", msgAsTable=True)

In [52]:
# ddb.unsubscribe("127.0.0.1", 8828, "StreamTicks", "PythonSub")

### push streaming data into stream table

In [54]:
for i in range(10):
    ddb.upload({"df": df.iloc[i: i+1]})
    ddb.run(f"""
    objByName('StreamTicks').append!(select date(ts) as Date, time(ts) as Time, 
       "{contract.code}" as Code, close as Price,
       volume, ask_price, ask_volume, 
       bid_price, bid_volume from df)
    """)

In [55]:
ddb.run("""
select * from StreamTicks
""")

Unnamed: 0,Date,Time,Code,Price,Volume,AskPrice,AskVolume,BidPrice,BidVolume
0,2020-06-24,1970-01-01 09:00:01.116,2330,319.0,3027,319.0,756,318.5,482
1,2020-06-24,1970-01-01 09:00:01.949,2330,318.5,1,319.0,823,318.5,508
2,2020-06-24,1970-01-01 09:00:02.068,2330,318.5,3,319.0,823,318.5,505
3,2020-06-24,1970-01-01 09:00:02.071,2330,319.0,1,319.0,822,318.5,505
4,2020-06-24,1970-01-01 09:00:02.087,2330,318.5,1,319.0,822,318.5,504
5,2020-06-24,1970-01-01 09:00:02.120,2330,318.5,5,319.0,822,318.5,499
6,2020-06-24,1970-01-01 09:00:02.292,2330,318.5,1,319.0,822,318.5,498
7,2020-06-24,1970-01-01 09:00:02.889,2330,319.0,54,319.0,767,318.5,498
8,2020-06-24,1970-01-01 09:00:02.934,2330,319.0,3,319.0,764,318.5,499
9,2020-06-24,1970-01-01 09:00:03.031,2330,318.5,2,319.0,764,318.5,497


In [42]:
ddb.run('unsubscribeTable(, `StreamTicks, "Ticks_to_dfs")')

In [56]:
ddb.run("""
getStreamingStat().subWorkers
""")

Unnamed: 0,workerId,topic,queueDepthLimit,queueDepth,processedMsgCount,lastMsgId,failedMsgCount,lastFailedMsgId,lastFailedTimestamp,lastErrMsg


In [34]:
# ddb.run("""dropTable(database("dfs://ticks"), `ticks_table)""")

### StreamTable as DataPipeline

Observer Pattern