In [25]:
import sqlite3
import pandas as pd
import numpy as np
import datetime
pd.options.display.max_columns = 1000

In [26]:
class SqliteCore(object):
    def __init__(self, db_path):
        self.connection = sqlite3.connect(db_path)
        self.primary_idx_dict = {}
        
    def creat_table(self, table_name, para_list):
        sql_cmd = 'CREATE TABLE IF NOT EXISTS {}('.format(table_name)
        for idx in range(len(para_list) - 1):
            para_list[idx] += ','
        for para in para_list:
            sql_cmd += para
        sql_cmd += ')'
        self.connection.execute(sql_cmd)
    
    def _add_data(self, table_name, data_df, exists_type='append'):
        data_df.to_sql(table_name, con=self.connection, if_exists=exists_type, index=False)
        
    def _get_primary_idx(self, table_name, primary_key=None):
        if primary_key is None:
            primary_key = table_name + '_id'
        res = pd.read_sql('select * from {} order by {} desc'.format(table_name, primary_key), self.connection)
        cur_primary_id = 0 if res.empty else res.loc[0][primary_key]
        self.primary_idx_dict[table_name] = cur_primary_id
        return cur_primary_id
    
    # 适用于input_col是唯一的 scan
    # like abcxxxx比xxxxabc快
    # perfer inner join
    # where avoid function
    def query_0(self, table_name, input_col, input_value, target_col):
        res = pd.read_sql('select {} from {} where {}={}'.format(target_col, table_name, input_col, input_value), self.connection)
        return res[target_col]
    
    def query_current_position_by_instrument_id(self, instrument_id):
        res = pd.read_sql('select position_id from position where instrument_id={} and enddate is null'.format(instrument_id), self.connection)
        return res

In [27]:
sql_core = SqliteCore('test1225_v5.db')
sql_core.creat_table('instrument', [
    'instrument_id int primary key not null',
    'description text not null',
    'ticker text not null',
    'maturity datetime',
    'strike real',
    'option_type text'
])

sql_core.creat_table('market', [
    'market_id int primary key not null',
    'instrument_id',
    'timestamp datetime not null',
    'bid real',
    'ask real',
    'mid real',
    'delta real',
    'IV real',
    'gamma real',
    'thela real',
    'vega real',
    'foreign key (instrument_id) references instrument (instrument_id)'
])

sql_core.creat_table('position', [
    'position_id int primary key not null',
    'instrument_id',
    'replace_position_id',
    'qty real not null',
    'startdate datetime not null',
    'enddate datetime',
    'foreign key (instrument_id) references instrument (instrument_id)',
    'foreign key (replace_position_id) references position (replace_position_id)'
])

# order是关键字
sql_core.creat_table('my_order', [
    'order_id int primary key not null',
    'instrument_id',
    'timestamp datetime not null',
    'qty real not null',
    'order_type real not null',
    'order_price real not null',
    'foreign key (instrument_id) references instrument (instrument_id)'
])

sql_core.creat_table('trade', [
    'trade_id int primary key not null',
    'order_id',
    'timestamp datetime not null',
    'qty real not null',
    'price real not null',
    'reason text',
    'foreign key (order_id) references my_order (order_id)'
])

In [28]:
def market_data_entry_api(sql_core, csv_path):
    new_market_df = pd.DataFrame(columns=[
        'market_id', 'instrument_id', 'timestamp', 'bid', 'ask', 'mid', 'delta', 'IV', 'gamma', 'thela', 'vega'
    ])
    new_instrument_df = pd.DataFrame(columns=[
        'instrument_id', 'description', 'ticker', 'maturity', 'strike', 'option_type'
    ])
    exist_market_df = pd.read_sql('select * from market', sql_core.connection)
#     new_market_df = exist_market_df.drop(labels=[0,len(exist_market_df) - 1],axis=0)
    
    exist_instrument_df = pd.read_sql('select * from instrument', sql_core.connection)
#     new_instrument_df = exist_instrument_df.drop(labels=[0,len(exist_instrument_df) - 1],axis=0)

    cur_market_id = sql_core._get_primary_idx('market')
    cur_instrument_id = sql_core._get_primary_idx('instrument')
    query_instrument_df = exist_instrument_df[['instrument_id', 'description']]
    
    df = pd.read_csv(csv_path)
    for idx, row in df.iterrows():
        if row['symbol'] not in query_instrument_df['description'].values:
            new_instrument_df.loc[len(new_instrument_df)] = {
                'instrument_id': cur_instrument_id + 1,
                'description': row['symbol'],
                'ticker': row['symbol'].split('_')[0],
                'maturity': row['T'],
                'strike': row['strikePrice'],
                'option_type': row['putCall']
            }
            cur_instrument_id += 1
            query_instrument_df = pd.concat(
                [exist_instrument_df[['instrument_id', 'description']],new_instrument_df[['instrument_id', 'description']]],axis=0
            )
        new_market_df.loc[len(new_market_df)] = {
            'market_id': cur_market_id + 1,
            'instrument_id': int(query_instrument_df[query_instrument_df['description'] == row['symbol']]['instrument_id']),
            'timestamp': row['quoteTime'],
            'bid': row['bid'],
            'ask': row['ask'],
            'mid': (row['bid'] + row['ask']) / 2,
            'delta': row['delta'],
            'IV': row['volatility'],
            'gamma': row['gamma'],
            'thela': row['theta'],
            'vega': row['vega']
        }
        cur_market_id += 1
    sql_core._add_data('instrument', new_instrument_df)
    sql_core._add_data('market', new_market_df)
market_data_entry_api(sql_core, 'c3b5b352d23a34cfd555965199c8791c.csv')

In [29]:
pd.read_sql('select * from instrument', sql_core.connection)

Unnamed: 0,instrument_id,description,ticker,maturity,strike,option_type
0,1,AAPL_112522C70,AAPL,2022-11-25,70.0,CALL
1,2,AAPL_120222C70,AAPL,2022-12-02,70.0,CALL
2,3,AAPL_120922C70,AAPL,2022-12-09,70.0,CALL
3,4,AAPL_121622C70,AAPL,2022-12-16,70.0,CALL
4,5,AAPL_122322C70,AAPL,2022-12-23,70.0,CALL
...,...,...,...,...,...,...
2061,2062,AAPL_011924P310,AAPL,2024-01-19,310.0,PUT
2062,2063,AAPL_062124P310,AAPL,2024-06-21,310.0,PUT
2063,2064,AAPL_011725P310,AAPL,2025-01-17,310.0,PUT
2064,2065,AAPL_011924P320,AAPL,2024-01-19,320.0,PUT


In [30]:
pd.read_sql('select * from market', sql_core.connection)

Unnamed: 0,market_id,instrument_id,timestamp,bid,ask,mid,delta,IV,gamma,thela,vega
0,1,1,2022-11-25 11:26:21,77.80,78.00,77.900,0.999,548.517,0.0,-0.046,0.000
1,2,2,2022-11-25 11:26:05,77.80,78.10,77.950,1.000,163.080,0.0,-0.020,0.000
2,3,3,2022-11-25 11:26:27,77.90,78.20,78.050,1.001,118.022,0.0,-0.016,0.001
3,4,4,2022-11-25 11:26:07,77.90,78.30,78.100,1.002,97.219,0.0,-0.017,0.001
4,5,5,2022-11-25 11:25:39,77.95,78.30,78.125,1.003,84.620,0.0,-0.017,0.001
...,...,...,...,...,...,...,...,...,...,...,...
2061,2062,2062,2022-11-25 11:26:26,161.25,162.95,162.100,-1.000,29.145,0.0,0.000,0.000
2062,2063,2063,2022-11-25 11:25:49,160.45,163.20,161.825,-1.000,28.598,0.0,0.000,0.000
2063,2064,2064,2022-11-25 11:26:21,159.95,164.30,162.125,-1.000,28.060,0.0,0.000,0.000
2064,2065,2065,2022-11-25 11:26:18,171.25,172.95,172.100,-1.000,29.400,0.0,0.000,0.000


In [31]:
def get_foreign_key(input_value, foreign_key_table=None, input_col=None, foreign_key_col=None, cache=None):
    if cache is not None:
        return int(cache[cache[input_col] == input_value][foreign_key_col])
    else:
        # todo:没有cache的情况
        pass

In [32]:
def place_order_api(sql_core, order_info_df):
    new_order_df = pd.read_sql('select * from my_order limit 0', sql_core.connection)
    cur_order_id = sql_core._get_primary_idx('my_order', primary_key='order_id')  # order表主键
    query_instrument_df = pd.read_sql('select * from instrument', sql_core.connection)

    for idx, row in order_info_df.iterrows():
        instrument_id = get_foreign_key(row['description'], cache=query_instrument_df, input_col='description', foreign_key_col='instrument_id')
        new_order_df.loc[len(new_order_df)] = {
            'order_id': cur_order_id + 1,
            'instrument_id': instrument_id,
            'timestamp': row['timestamp'],
            'qty': row['qty'],
            'order_type': row['order_type'],
            'order_price': row['order_price'],
        }
        cur_order_id += 1
        sql_core._add_data('my_order', new_order_df)
place_order_api(sql_core, pd.DataFrame({
    'description': 'AAPL_120222C70',
    'timestamp': datetime.datetime(2022, 1, 22, 16 ,0),
    'qty': 100,
    'order_type': 'limit',
    'order_price': 10.54,
}, index=[0]))
pd.read_sql('select * from my_order', sql_core.connection)

Unnamed: 0,order_id,instrument_id,timestamp,qty,order_type,order_price
0,1,2,2022-01-22 16:00:00,100.0,limit,10.54


In [38]:
# order_fill的情况
def complete_order_api(sql_core, complete_order_info_df):
    new_trade_df = pd.read_sql('select * from trade limit 0', sql_core.connection)
    cur_trade_id = sql_core._get_primary_idx('trade')  # trade表主键

    for idx, row in complete_order_info_df.iterrows():
        new_trade_df.loc[len(new_trade_df)] = {
            'trade_id': cur_trade_id + 1,
            'order_id': row['order_id'],
            'timestamp': row['timestamp'],
            'qty': row['qty'],
            'price': row['price'],
            'reason': row['reason'],
        }
        sql_core._add_data('trade', new_trade_df)
        cur_trade_id += 1
    
        instrument_id = int(sql_core.query_0(table_name='my_order', input_col='order_id', input_value=row['order_id'], target_col='instrument_id'))
        postion_query_res = sql_core.query_current_position_by_instrument_id(instrument_id)
        postion_id = None if postion_query_res.empty else int(postion_query_res['position_id'])
        
        new_position_df = pd.read_sql('select * from position limit 0', sql_core.connection)
        cur_position_id = sql_core._get_primary_idx('position')  # position表主键
        
        if postion_id is None:
            # 对应的instrument在postion中不存在的情况
            new_position_df.loc[len(new_position_df)] = {
                'position_id': cur_position_id + 1,
                # 'order_id': row['order_id'],
                'startdate': row['timestamp'],
                'qty': row['qty'],
                'instrument_id': instrument_id,
            }
            sql_core._add_data('position', new_position_df)
        # todo:对应的instrument在postion中存在的情况
        else:
            postion_query_res['enddate'] = row['timestamp']
            sql_core._add_data('position', postion_query_res, exists_type='replace')
            # 对应的instrument在postion中不存在的情况
            new_position_df.loc[len(new_position_df)] = {
                'position_id': cur_position_id + 1,
                'startdate': row['timestamp'],
                'qty': row['qty'],
                'instrument_id': instrument_id,
                'replace_position_id': postion_id,
            }
            sql_core._add_data('position', new_position_df)

    
complete_order_api(sql_core, pd.DataFrame({
    'order_id': '1',
    'timestamp': datetime.datetime(2022, 1, 22, 16 ,0),
    'qty': 100,
    'price': 18.98,
    'reason': 'order fill',
}, index=[0]))

OperationalError: table position has no column named instrument_id

In [36]:
pd.read_sql('select * from trade', sql_core.connection)

Unnamed: 0,trade_id,order_id,timestamp,qty,price,reason
0,1,1,2022-01-22 16:00:00,100.0,18.98,order fill


In [39]:
pd.read_sql('select * from position', sql_core.connection)

Unnamed: 0,position_id,enddate
0,1,2022-01-22 16:00:00


In [None]:
"""
todoList:
add数据时的去重检测
fill时检测order
"""