In [253]:
from datetime import datetime, timedelta
from ibapi import wrapper
from ibapi import utils
from ibapi.client import EClient
from ibapi.utils import iswrapper
from ibapi.contract import *  
from ibapi.common import *
from ibapi.ticktype import * 
import threading
from app.wsgi import app
from app.extensions import db
from app.models.Equity import (
    Equity,
    EquityHistoricalData
)

# NOTE: Hist vol seems to not be able to be fetched day of
dataTypes = ["TRADES", "OPTION_IMPLIED_VOLATILITY", "HISTORICAL_VOLATILITY"] # ["TRADES", "MIDPOINT", "BID", "ASK", "HISTORICAL_VOLATILITY", "OPTION_IMPLIED_VOLATILITY"]
optionDataTypes = ["TRADES", "MIDPOINT", "BID", "ASK"]

dataTypeDuplicateDateSetMap = { dt: set() for dt in dataTypes}

failed_options_req = 0
optionsReqIdScheduledDataMap = {}
optionsReqIdDataMap = {}

reqIdScheduledDataMap = {}
reqIdDataMap = {}
failedRequests = 0

connectivityErrorCodes = [1100, 1300]
clientErrorCodes = [502, 504]
warningErrorCodes = [2100, 2137]
rateLimitErrorCodes = [100, 101]
twsErrorCodes = [102, 449]
twsErrorCodes2 = [10000, 10148]


# Global vars that would be set via the command line
day = "20200515"


reqIds = {}
    
class IBHistoricalDataAggregator(wrapper.EWrapper):
    
    def historicalData(self, reqId:int, bar: BarData):
        super().historicalData(reqId, bar)
        requestProcessor.updateRequest(reqId, bar)
#         if reqId not in reqIds:
    
        reqIds[reqId] = 0
        
        
            
        
    def historicalDataEnd(self, reqId: int, start: str, end: str):
        super().historicalDataEnd(reqId, start, end)
        requestProcessor.finishedRequest(reqId)
        
        if len(reqIds) + requestProcessor.failedRequests == len(requestProcessor.allReqsMap):
            ibapi.disconnect()
#         print("historicalDataEnd. reqId: ", reqId, "start: ", start, "end: ", end)
        
    
    def tickPrice(self, reqId: TickerId, tickType: TickType, price: float, attrib: TickAttrib):
        super().tickPrice(reqId, tickType, price, attrib)
        print("TickPrice. TickerId:", reqId, "tickType:", tickType,
            "Price:", price, "CanAutoExecute:", attrib.canAutoExecute,
            "PastLimit:", attrib.pastLimit, end=' ')
        if tickType == TickTypeEnum.BID or tickType == TickTypeEnum.ASK:
            print("PreOpen:", attrib.preOpen)
        else:
            print()
            
    def tickSize(self, reqId: TickerId, tickType: TickType, size: int):
        super().tickSize(reqId, tickType, size)
        print("TickSize. TickerId:", reqId, "TickType:", tickType, "Size:", size)
        
    
    def tickString(self, reqId: TickerId, tickType: TickType, value: str):
        super().tickString(reqId, tickType, value)
        print("TickString. TickerId:", reqId, "Type:", tickType, "Value:", value)
        
    def tickGeneric(self, reqId: TickerId, tickType: TickType, value: float):
        super().tickGeneric(reqId, tickType, value)
        print("TickGeneric. TickerId:", reqId, "TickType:", tickType, "Value:", value)
                
    def error(self, reqId: TickerId, errorCode: int, errorString: str):
        global failedRequests
        global failed_options_req
        super().error(reqId, errorCode, errorString)
        
        if reqId == -1:
            if errorCode >= rateLimitErrorCodes[0] and errorCode <= rateLimitErrorCodes[1]:
                print("RATE LIMIT ERROR")
                # TODO: Hope it doesnt happen
            print("WARNING: ", errorString)
            return
        elif errorCode >= twsErrorCodes[0] and errorCode <= twsErrorCodes[1] or errorCode >= twsErrorCodes2[0] and errorCode <= twsErrorCodes2[1]:
            print("TWS ERROR: ", errorString)
            # Decrement Expected Results Count
            if reqId > len(optionsReqIdScheduledDataMap.keys()):
                requestProcessor.failedRequest(reqId, errorString)
#                 failedRequests += 1
            else:  
                failed_options_req += 1
            
        elif errorCode >= rateLimitErrorCodes[0] and errorCode <= rateLimitErrorCodes[1]:
            print("RATE LIMIT ERROR")
            
    def securityDefinitionOptionParameter(self, reqId:int, exchange:str, underlyingConId:int, tradingClass:str, multiplier:str, expirations:SetOfString, strikes:SetOfFloat):
        super().securityDefinitionOptionParameter(reqId, exchange, underlyingConId, tradingClass, multiplier, expirations, strikes)
        global options_strikes
        global options_exp
        
        if exchange == "SMART":
            optionsReqDataMap[reqId] = {
                "expiration": expirations,
                "strike": strikes,                
            }
        else:
            failed_options_req += 1
            
        if len(optionsReqDataMap.keys()) == (len(optionsReqIdScheduledDataMap.keys()) - failed_options_req):
            # schedule options
            curReqId = len(reqIdScheduledDataMap.keys()) + len(optionsReqIdScheduledDataMap.keys())
            for key, val in optionsReqIdDataMap.items():
                for exp in val["expiration"]:
                    for strike in val["strike"]:     
                        pContract = Contract()
                        pContract.conId = optionsReqIdScheduledDataMap[key].conId
                        pContract.symbol = optionsReqIdScheduledDataMap[key].symbol
                        pContract.currency = optionsReqIdScheduledDataMap[key].currency
                        pContract.exchange = optionsReqIdScheduledDataMap[key].exchange
                        pContract.secType = "OPT"
                        pContract.lastTradeDateOrContractMonth = exp
                        pContract.strike = strike
                        pContract.right = "P"
                        
                        cContract = Contract()
                        cContract.conId = optionsReqIdScheduledDataMap[key].conId
                        cContract.symbol = optionsReqIdScheduledDataMap[key].symbol
                        cContract.currency = optionsReqIdScheduledDataMap[key].currency
                        cContract.exchange = optionsReqIdScheduledDataMap[key].exchange
                        cContract.secType = "OPT"
                        cContract.lastTradeDateOrContractMonth = exp
                        cContract.strike = strike
                        cContract.right = "C"
                        
                        for dataType in optionDataTypes:
                            scheduleIBHistoricalAPI(curReqId, cContract, endDateStr, "1 D", "1 min", dataType, 0)
                            curReqId += 1
                            scheduleIBHistoricalAPI(curReqId, pContract, endDateStr, "1 D", "1 min", dataType, 0)
                            curReqId += 1
                            
            # Start requesting
            processRequests()
        
    def nextValidId(self, reqId: int):
        super().nextValidId(reqId)
        print("Next Valid Id: ", reqId)
        
    def connectionClosed(self):
        super().connectionClosed()
    
batch_size = 50
granular_timeout = 600
regular_timeout = 1.2

granular_deffered_req = []
deffered_req = []
        
def scheduleIBHistoricalAPI(reqId, contract, endDateStr, durationStr, intervalStr, dataType, useRTH):
    def_req = {
        "reqId": reqId,
        "contract": contract,
        "endDateStr": endDateStr,
        "durationStr": durationStr,
        "intervalStr": intervalStr,
        "dataType": dataType,
        "useRTH": useRTH
    }
    
    reqIdScheduledDataMap[reqId] = def_req
    if intervalStr == "1 secs":
        requestProcessor.addGranularRequest(reqId, def_req)
#         granular_deffered_req.append(def_req)
    else:
        requestProcessor.addRegularRequest(reqId, def_req)
#         deffered_req.append(def_req)
    
class HistoricalRequestProcessor():
    # TODO: Add granular limit logic
    # Data Limit requirements
    # No more than 50 historical data req open at the same time
    # No more than 50 calls per sec
    # Granular:
    # Identical hist req within 15sec (we shouldnt see this?)
    # Making 6+ hist req for same contract, exch, and Historical Data Types within 2 sec (is tick type Historical Data Types)
    # Making more that 60 req within 10 min
    MAX_ACTIVE_REQ = 50
    MAX_REQ_PER_SEC = 50
    def __init__(self):
        self.non_granular_requests = []
        self.granular_requests = []
        self.activeRequests = 0
        self.failedRequests = 0
        self.startIdx = 0
        self.reqIdDataMap = {}
        self.allReqsMap = {}
        
    def addGranularRequest(self, reqId, req):
        self.granular_requests.append(req)
        self.allReqsMap[reqId] = req
    
    def addRegularRequest(self, reqId, req):
        self.non_granular_requests.append(req)
        self.allReqsMap[reqId] = req
        
    def run_non_granular_batch(self):
        for i in range(self.startIdx, min(self.startIdx + 50, len(self.non_granular_requests))):
            req = self.non_granular_requests[i]
            print("Running Request: ", req["reqId"])
            ibapi.reqHistoricalData(req["reqId"], req["contract"], req["endDateStr"], req["durationStr"], req["intervalStr"], req["dataType"], req["useRTH"], 1, False, [])
            self.activeRequests += 1
        self.startIdx = min(self.startIdx + 50, len(self.non_granular_requests)) - 1
        
        if self.startIdx == len(self.non_granular_requests):
            self.completion()
            
    def completion(self):
        print(datetime.now(), "HistoricalRequestProcessor: completion")
#         ibapi.disconnect()
        
    def process(self):
        self.run_non_granular_batch() 
        
    def updateRequest(self, reqId, bar):
        if reqId not in self.reqIdDataMap:
            self.reqIdDataMap[reqId] = []
        
        reqDT = self.allReqsMap[reqId]["dataType"]
#         if bar.date not in dataTypeDuplicateDateSetMap[reqDT]:
        self.reqIdDataMap[reqId].append({
            "date": bar.date,#datetime.strptime(bar.date, "%Y%m%d %H:%M:%S"),
            "open": bar.open,
            "high": bar.high,
            "low": bar.low,
            "close": bar.close,
            "volume": bar.volume,
            "barCount": bar.barCount,
            "average": bar.average,
        })
#             dataTypeDuplicateDateSetMap[reqDT].add(bar.date)
        
    def failedRequest(self, reqId, msg):
        print(datetime.now(), " HistoricalRequestProcessor: failedReqeust. ReqId: ", reqId, ": ", msg)
        self.failedRequests += 1
        if self.startIdx < len(self.non_granular_requests) - 1:
            self.startIdx += 1
            req = self.non_granular_requests[self.startIdx]
            print("Running Request: ", req["reqId"])
            ibapi.reqHistoricalData(req["reqId"], req["contract"], req["endDateStr"], req["durationStr"], req["intervalStr"], req["dataType"], req["useRTH"], 1, False, [])
        else:
            self.completion()
            
    def finishedRequest(self, reqId):
        self.activeRequests -= 1
        print(datetime.now(), " HistoricalRequestProcessor: finishedRequest: ", reqId)
        print("Progress: ", self.startIdx / len(self.non_granular_requests))
        if self.startIdx < len(self.non_granular_requests) - 1:
            self.startIdx += 1
            req = self.non_granular_requests[self.startIdx]
            print("Running Request: ", req["reqId"])
            ibapi.reqHistoricalData(req["reqId"], req["contract"], req["endDateStr"], req["durationStr"], req["intervalStr"], req["dataType"], req["useRTH"], 1, False, [])
        else:
            self.completion()
            

"""
    
    day: Precision of only a day: yyyyMMdd
"""
def getDayEquityInfo(contract, day, granular):
    curRequestID = len(optionsReqIdScheduledDataMap.keys()) + len(reqIdScheduledDataMap.keys())
    timeIntervals = []
    if type(granular) is bool and granular:
        endRTH = datetime.strptime('{} 13:00:00'.format(day), "%Y%m%d %H:%M:%S")
        beginRTH = datetime.strptime('{} 6:30:00'.format(day), "%Y%m%d %H:%M:%S")
   
        curEndTime = endRTH
        while curEndTime > beginRTH:
            timeIntervals.append({
                "endTimeStr": "{} PST".format(curEndTime.strftime("%Y%m%d %H:%M:%S")),
                "durationStr": "1800 S",
                "intervalStr": "1 secs"
            })
            
            curEndTime = curEndTime - timedelta(minutes=30)
        # Premarket
        timeIntervals.append({
            "endTimeStr": "{} 06:29:59 PST".format(day),
            "durationStr": "19740 S",
            "intervalStr": "1 min"
        })
        # After Market
        timeIntervals.append({
            "endTimeStr": "{} 17:00:00 PST".format(day),
            "durationStr": "14400 S",
            "intervalStr": "1 min"
        })
    else:
        timeIntervals.append({
            "endTimeStr": "{} 23:59:59 PST".format(day),
            "durationStr": "1 D",
            "intervalStr": granular if type(granular) is str else "1 min"
        })
                
#         timeIntervals.append({
#             "endTimeStr": "{} 23:59:59 PST".format(day),
#             "durationStr": "1 D",
#             "intervalStr": "1 min"
#         })
        
    for dt in dataTypes:
        if dt == "HISTORICAL_VOLATILITY":
            timeInterval = {
                    "endTimeStr": "{} 23:59:59 PST".format(day),
                    "durationStr": "1 D",
                    "intervalStr": granular if type(granular) is str and barSizeOrder[granular] > barSizeOrder["3 hours"] else "3 hours"
            }
            scheduleIBHistoricalAPI(curRequestID, contract, timeInterval["endTimeStr"], timeInterval["durationStr"], timeInterval["intervalStr"], dt, 0)
            curRequestID += 1
        else:
            for ti in timeIntervals:
                scheduleIBHistoricalAPI(curRequestID, contract, ti["endTimeStr"], ti["durationStr"], ti["intervalStr"], dt, 0)
                curRequestID += 1

barSizeOrder = { "1 secs": 0, "5 secs": 1, "10 secs": 2, "15 secs": 3, "30 secs": 4, "1 min": 5, "2 mins": 6, "3 mins": 7, "5 mins": 8, "10 mins": 9, "15 mins": 10, "20 min":11, "30 mins": 12, "1 hour": 13, "2 hours": 14, "3 hours": 15, "4 hours": 16, "8 hours": 17, "1 day": 18, "1 week": 19, "1 month": 20}
      
# def startScdsheduling(include_options, day):
#     options = []
#     with app.app_context():
#         # Yes doing a double loop is less effiecient than doin it all in one, but its simplier
#         def equityToContract(equity):
#             contract = Contract()
#             contract.symbol = equity.ticker
#             contract.secType = "STK"
#             contract.exchange = "SMART"
#             contract.currency = "USD"
#             contract.conId = equity.ib_con_id
#             return contract
        
#         if include_options:
#             optionsReqId = 0
            
                
#             options = [equity for equity in db.session.query(Equity).all() if equity.include_options]
#             options = map(equityToContract, options)
            
#             for op in options:
#                 optionsReqIdScheduledDataMap[optionsReqId] = op
#                 ibapi.reqSecDefOptParams(optionsReqId, op.symbol, "", "STK", op.conId)
#                 optionsReqId += 1
                
#         for equity in db.session.query(Equity).all():
#                 contract = equityToContract(equity)
#                 getDayEquityInfo(contract, day, equity.granular)
        
#         if not include_options:
#             processRequests()
            
            
def scheduleSPYData():
    contract = Contract()
    contract.symbol = "SPY"
    contract.secType = "STK"
    contract.exchange = "SMART"
    contract.currency = "USD"
    startDate = datetime.strptime("20190430", "%Y%m%d")
    endDate = datetime.strptime('20190812', "%Y%m%d")

    curTime = endDate
    while curTime >= startDate:
        if curTime.weekday() < 5:
            getDayEquityInfo(contract, curTime.strftime("%Y%m%d"), "4 hours")
        curTime = curTime - timedelta(days=1)
        
def scheduleVIXData():
    contract = Contract()
    contract.symbol = "VIX"
    contract.secType = "IND"
    contract.exchange = "CBOE"
    contract.currency = "USD"
    startDate = datetime.strptime("20191007", "%Y%m%d")
    endDate = datetime.strptime('20191209', "%Y%m%d")

    curTime = endDate
    while curTime >= startDate:
        if curTime.weekday() < 5:
            getDayEquityInfo(contract, curTime.strftime("%Y%m%d"), "4 hours")
        curTime = curTime - timedelta(days=1)
        

class ThreadedIBApi(EClient, threading.Thread):
    def __init__(self, wrapper):
        threading.Thread.__init__(self)
        EClient.__init__(self, wrapper=wrapper)           
        
requestProcessor = HistoricalRequestProcessor()
histDataAgregator = IBHistoricalDataAggregator()   
ibapi = ThreadedIBApi(wrapper=histDataAgregator)
gatewayPort = 4001
twsPort = 7496
ibapi.connect("host.docker.internal", gatewayPort, clientId=0)
ibapi.start()


ERROR -1 2104 Market data farm connection is OK:usfarm.nj


Next Valid Id:  1


In [252]:
ibapi.disconnect()

unhandled exception in EReader thread
Traceback (most recent call last):
  File "/venv/lib/python3.6/site-packages/ibapi/reader.py", line 34, in run
    data = self.conn.recvMsg()
  File "/venv/lib/python3.6/site-packages/ibapi/connection.py", line 99, in recvMsg
    buf = self._recvAllMsg()
  File "/venv/lib/python3.6/site-packages/ibapi/connection.py", line 119, in _recvAllMsg
    buf = self.socket.recv(4096)
OSError: [Errno 9] Bad file descriptor


In [254]:
scheduleSPYData()

In [255]:
requestProcessor.process()

Running Request:  0
Running Request:  1
Running Request:  2
Running Request:  3
Running Request:  4
Running Request:  5
Running Request:  6
Running Request:  7
Running Request:  8
Running Request:  9
Running Request:  10
Running Request:  11
Running Request:  12
Running Request:  13
Running Request:  14
Running Request:  15
Running Request:  16
Running Request:  17
Running Request:  18
Running Request:  19
Running Request:  20
Running Request:  21
Running Request:  22
Running Request:  23
Running Request:  24
Running Request:  25
Running Request:  26
Running Request:  27
Running Request:  28
Running Request:  29
Running Request:  30
Running Request:  31
Running Request:  32
Running Request:  33
Running Request:  34
Running Request:  35
Running Request:  36
Running Request:  37
Running Request:  38
Running Request:  39
Running Request:  40
Running Request:  41
Running Request:  42
Running Request:  43
Running Request:  44
Running Request:  45
Running Request:  46
Running Request:  47
Ru

ERROR -1 2106 HMDS data farm connection is OK:ushmds


2020-07-20 04:31:28.620722  HistoricalRequestProcessor: finishedRequest:  30
Progress:  0.21777777777777776
Running Request:  50
2020-07-20 04:31:28.639921  HistoricalRequestProcessor: finishedRequest:  32
Progress:  0.2222222222222222
Running Request:  51
2020-07-20 04:31:28.644575  HistoricalRequestProcessor: finishedRequest:  31
Progress:  0.22666666666666666
Running Request:  52
2020-07-20 04:31:28.651657  HistoricalRequestProcessor: finishedRequest:  34
Progress:  0.2311111111111111
Running Request:  53
2020-07-20 04:31:28.655798  HistoricalRequestProcessor: finishedRequest:  35
Progress:  0.23555555555555555
Running Request:  54
2020-07-20 04:31:28.662849  HistoricalRequestProcessor: finishedRequest:  2
Progress:  0.24
Running Request:  55
2020-07-20 04:31:28.668766  HistoricalRequestProcessor: finishedRequest:  38
Progress:  0.24444444444444444
Running Request:  56
2020-07-20 04:31:28.684608  HistoricalRequestProcessor: finishedRequest:  37
Progress:  0.24888888888888888
Running

ERROR -1 2103 Market data farm connection is broken:usfarm.nj


2020-07-20 04:32:58.605881  HistoricalRequestProcessor: finishedRequest:  86
Progress:  0.4222222222222222
Running Request:  96


ERROR -1 2104 Market data farm connection is OK:usfarm.nj


2020-07-20 04:33:04.574043  HistoricalRequestProcessor: finishedRequest:  8
Progress:  0.4266666666666667
Running Request:  97
2020-07-20 04:33:10.670610  HistoricalRequestProcessor: finishedRequest:  78
Progress:  0.4311111111111111
Running Request:  98
2020-07-20 04:33:16.613531  HistoricalRequestProcessor: finishedRequest:  92
Progress:  0.43555555555555553
Running Request:  99
2020-07-20 04:33:28.585204  HistoricalRequestProcessor: finishedRequest:  51
Progress:  0.44
Running Request:  100
2020-07-20 04:33:28.590891  HistoricalRequestProcessor: finishedRequest:  18
Progress:  0.4444444444444444
Running Request:  101
2020-07-20 04:33:28.596094  HistoricalRequestProcessor: finishedRequest:  75
Progress:  0.4488888888888889
Running Request:  102
2020-07-20 04:33:28.606764  HistoricalRequestProcessor: finishedRequest:  95
Progress:  0.4533333333333333
Running Request:  103
2020-07-20 04:33:28.610303  HistoricalRequestProcessor: finishedRequest:  83
Progress:  0.4577777777777778
Running

2020-07-20 04:36:28.811582  HistoricalRequestProcessor: finishedRequest:  110
Progress:  0.7244444444444444
Running Request:  164
2020-07-20 04:36:28.887474  HistoricalRequestProcessor: finishedRequest:  134
Progress:  0.7288888888888889
Running Request:  165
2020-07-20 04:36:28.916264  HistoricalRequestProcessor: finishedRequest:  149
Progress:  0.7333333333333333
Running Request:  166
2020-07-20 04:36:28.924221  HistoricalRequestProcessor: finishedRequest:  143
Progress:  0.7377777777777778
Running Request:  167
2020-07-20 04:36:28.934437  HistoricalRequestProcessor: finishedRequest:  127
Progress:  0.7422222222222222
Running Request:  168
2020-07-20 04:36:28.939983  HistoricalRequestProcessor: finishedRequest:  146
Progress:  0.7466666666666667
Running Request:  169
2020-07-20 04:36:28.952047  HistoricalRequestProcessor: finishedRequest:  154
Progress:  0.7511111111111111
Running Request:  170
2020-07-20 04:36:28.986943  HistoricalRequestProcessor: finishedRequest:  136
Progress:  0

2020-07-20 04:38:34.571301  HistoricalRequestProcessor: finishedRequest:  216
Progress:  0.9955555555555555
2020-07-20 04:38:34.571557 HistoricalRequestProcessor: completion
2020-07-20 04:38:34.574458  HistoricalRequestProcessor: finishedRequest:  204
Progress:  0.9955555555555555
2020-07-20 04:38:34.574639 HistoricalRequestProcessor: completion
2020-07-20 04:38:34.586050  HistoricalRequestProcessor: finishedRequest:  224
Progress:  0.9955555555555555
2020-07-20 04:38:34.586366 HistoricalRequestProcessor: completion
2020-07-20 04:38:34.588112  HistoricalRequestProcessor: finishedRequest:  221
Progress:  0.9955555555555555
2020-07-20 04:38:34.588246 HistoricalRequestProcessor: completion
2020-07-20 04:38:40.580888  HistoricalRequestProcessor: finishedRequest:  219
Progress:  0.9955555555555555
2020-07-20 04:38:40.581165 HistoricalRequestProcessor: completion
2020-07-20 04:38:40.586615  HistoricalRequestProcessor: finishedRequest:  186
Progress:  0.9955555555555555
2020-07-20 04:38:40.58

unhandled exception in EReader thread
Traceback (most recent call last):
  File "/venv/lib/python3.6/site-packages/ibapi/reader.py", line 34, in run
    data = self.conn.recvMsg()
  File "/venv/lib/python3.6/site-packages/ibapi/connection.py", line 99, in recvMsg
    buf = self._recvAllMsg()
  File "/venv/lib/python3.6/site-packages/ibapi/connection.py", line 119, in _recvAllMsg
    buf = self.socket.recv(4096)
OSError: [Errno 9] Bad file descriptor


In [111]:
len(requestProcessor.reqIdDataMap) + requestProcessor.activeRequests + requestProcessor.failedRequests

138

In [192]:
len(requestProcessor.reqIdDataMap)

138

In [190]:
[i for i in range(len(requestProcessor.allReqsMap)) if i not in reqIds]

[]

In [191]:
requestProcessor.activeRequests

-88

In [185]:
requestProcessor.allReqsMap[45]

{'reqId': 45,
 'contract': 140339332077552: 0,SPY,STK,,0.0,,,SMART,,USD,,,False,,combo:,
 'endDateStr': '20191118 23:59:59 PST',
 'durationStr': '1 D',
 'intervalStr': '4 hours',
 'dataType': 'TRADES',
 'useRTH': 0}

In [168]:
len(requestProcessor.reqIdDataMap)

89

In [117]:
for reqId in requestProcessor.reqIdDataMap.keys():
    print(requestProcessor.allReqsMap[reqId])
    

{'reqId': 1, 'contract': 140339334337872: 0,SPY,STK,,0.0,,,NYSE,,USD,,,False,,combo:, 'endDateStr': '20191209 23:59:59 PST', 'durationStr': '1 D', 'intervalStr': '4 hours', 'dataType': 'OPTION_IMPLIED_VOLATILITY', 'useRTH': 0}
{'reqId': 2, 'contract': 140339334337872: 0,SPY,STK,,0.0,,,NYSE,,USD,,,False,,combo:, 'endDateStr': '20191209 23:59:59 PST', 'durationStr': '1 D', 'intervalStr': '4 hours', 'dataType': 'HISTORICAL_VOLATILITY', 'useRTH': 0}
{'reqId': 5, 'contract': 140339334337872: 0,SPY,STK,,0.0,,,NYSE,,USD,,,False,,combo:, 'endDateStr': '20191206 23:59:59 PST', 'durationStr': '1 D', 'intervalStr': '4 hours', 'dataType': 'HISTORICAL_VOLATILITY', 'useRTH': 0}
{'reqId': 4, 'contract': 140339334337872: 0,SPY,STK,,0.0,,,NYSE,,USD,,,False,,combo:, 'endDateStr': '20191206 23:59:59 PST', 'durationStr': '1 D', 'intervalStr': '4 hours', 'dataType': 'OPTION_IMPLIED_VOLATILITY', 'useRTH': 0}
{'reqId': 8, 'contract': 140339334337872: 0,SPY,STK,,0.0,,,NYSE,,USD,,,False,,combo:, 'endDateStr': 

In [None]:
contract = Contract()
contract.symbol = "SPY"
contract.secType = "STK"
contract.exchange = "SMART"
contract.currency = "USD"

getDayEquityInfo(contract, day, True)

In [None]:
aggregation_periods = ["1min", "5min", "15min", "30min", "1hr", "4hr", "1day", "2day", "3day", "4day", "5day", "10day", "15day", "20day", "25day"]

# Next 

# windows of and find the lowest points and start to find trends until the trend with 
# define min_window per aggregation period
# max_window of aggregation period

# Lows on the RSI
# Open and the close
# RSI Lows supports

# IV 


In [158]:
len(reqIdScheduledDataMap)

138

In [155]:
len(requestProcessor.non_granular_requests)

138

In [156]:
len(requestProcessor.allReqsMap)

138

In [151]:
l = [requestProcessor.allReqsMap[reqIdDataPair[0]] for reqIdDataPair in requestProcessor.reqIdDataMap.items() ] # if "20191025" in requestProcessor.allReqsMap[reqIdDataPair[1]]['date']] # if "20191025" in reqIdDataPair[1]["date"]]
l.sort(key=lambda x: x['endDateStr'])
l
# requestProcessor.allReqsMap[136]
#requestProcessor.non_granular_requests

[{'reqId': 133,
  'contract': 140339333576632: 0,SPY,STK,,0.0,,,SMART,,USD,,,False,,combo:,
  'endDateStr': '20191008 23:59:59 PST',
  'durationStr': '1 D',
  'intervalStr': '4 hours',
  'dataType': 'OPTION_IMPLIED_VOLATILITY',
  'useRTH': 0},
 {'reqId': 128,
  'contract': 140339333576632: 0,SPY,STK,,0.0,,,SMART,,USD,,,False,,combo:,
  'endDateStr': '20191010 23:59:59 PST',
  'durationStr': '1 D',
  'intervalStr': '4 hours',
  'dataType': 'HISTORICAL_VOLATILITY',
  'useRTH': 0},
 {'reqId': 120,
  'contract': 140339333576632: 0,SPY,STK,,0.0,,,SMART,,USD,,,False,,combo:,
  'endDateStr': '20191014 23:59:59 PST',
  'durationStr': '1 D',
  'intervalStr': '4 hours',
  'dataType': 'TRADES',
  'useRTH': 0},
 {'reqId': 122,
  'contract': 140339333576632: 0,SPY,STK,,0.0,,,SMART,,USD,,,False,,combo:,
  'endDateStr': '20191014 23:59:59 PST',
  'durationStr': '1 D',
  'intervalStr': '4 hours',
  'dataType': 'HISTORICAL_VOLATILITY',
  'useRTH': 0},
 {'reqId': 121,
  'contract': 140339333576632: 0,SP

In [256]:
tsv_dict = {}
for reqId, data in requestProcessor.reqIdDataMap.items():
    og_req = requestProcessor.allReqsMap[reqId]
    req_type = og_req["dataType"].lower()
    for row in data:
        dateObj = datetime.strptime(row["date"], "%Y%m%d %H:%M:%S")
        
        if dateObj not in tsv_dict:
            tsv_dict[dateObj] = {
                "time": row["date"]
            }
            
        tsv_dict[dateObj][req_type + "_open"] = row["open"]
        tsv_dict[dateObj][req_type + "_high"] = row["high"]
        tsv_dict[dateObj][req_type + "_low"] = row["low"]
        tsv_dict[dateObj][req_type + "_close"] = row["close"]
        tsv_dict[dateObj][req_type + "_volume"] = row["volume"]
        tsv_dict[dateObj][req_type + "_barCount"] = row["barCount"]
        tsv_dict[dateObj][req_type + "_average"] = row["average"]
        
spy_data_tsv = open("spy_043019_081219.tsv", "w")

columnTitles = ["time", 
                "trades_open", 
                "trades_high", 
                "trades_low", 
                "trades_close", 
                "trades_volume", 
                "trades_barCount", 
                "trades_average", 
                
                "option_implied_volatility_open", 
                "option_implied_volatility_high", 
                "option_implied_volatility_low", 
                "option_implied_volatility_close", 
                "option_implied_volatility_volume", 
                "option_implied_volatility_barCount", 
                "option_implied_volatility_average",
                
                "historical_volatility_open", 
                "historical_volatility_high", 
                "historical_volatility_low", 
                "historical_volatility_close", 
                "historical_volatility_volume", 
                "historical_volatility_barCount", 
                "historical_volatility_average",
               ]


    
spy_data_tsv.write("\t".join(columnTitles) + "\n")

for dateObjKey in sorted(tsv_dict.keys()):
    row = tsv_dict[dateObjKey]
    
    def transformedRow(title):
        val = row.get(title, "")
        if type(val) == int or type(val) == float:
            val = str(val)
        return val
    t_row = map(transformedRow, columnTitles)
    spy_data_tsv.write("\t".join(t_row) + "\n")
    
spy_data_tsv.close()


In [None]:
spy_data_tsv = open("spy_data.tsv", "w")

spy_data_tsv.write("\t".join(columnTitles) + "\n")

for dateObjKey in sorted(tsv_dict.keys()):
    row = tsv_dict[dateObjKey]
    
    def transformedRow(title):
        val = row.get(title, "")
        if type(val) == int or type(val) == float:
            val = str(val)
        return val
    t_row = map(transformedRow, columnTitles)
    spy_data_tsv.write("\t".join(t_row) + "\n")
    
spy_data_tsv.close()


In [None]:
import json

w_file = open("spydata.json", "w")
json.dump(requestProcessor.reqIdDataMap, w_file)
w_file.close()

In [None]:
og_gran_data = granular_deffered_req

In [None]:
# Goal: Disambuigate the following violation
# Making six or more historical data requests for the same Contract, Exchange and Tick Type within two seconds.
# og_gran_data

ten_same_datatype = list(og_gran_data)[:10]
five_diff_datatype = [d for i, d in enumerate(list(og_gran_data)) if i % 13 == 0]

ten_diff_datatype = list(five_diff_datatype)

for item in five_diff_datatype:
    modified_req = item.copy()
    modified_req["endDateStr"] = "20200514 " + modified_req["endDateStr"].split("20200515")[1]
    ten_diff_datatype.append(modified_req)

In [None]:
granular_deffered_req = ten_diff_datatype
deffered_req = []
processRequests()

In [None]:
def processGranularRequests():
    startIdx = 0
    lastIdx = len(granular_deffered_req)
    
    def run_first50_batch():
        nonlocal startIdx
        def run_last10_batch():
            nonlocal startIdx
            for i in range(startIdx, min(startIdx + 10, lastIdx)):
                req = granular_deffered_req[i]
                print("Running Granular Request: ", req["reqId"])

                ibapi.reqHistoricalData(req["reqId"], req["contract"], req["endDateStr"], req["durationStr"], req["intervalStr"], req["dataType"], req["useRTH"], 1, False, [])
            
            if startIdx + 10 < lastIdx:     
                t = threading.Timer(granular_timeout, run_first50_batch)
                t.start()
            else:
                pass
                # Completion
        
        for i in range(startIdx, min(startIdx + 50, lastIdx)):
            req = granular_deffered_req[i]
            print("Running Granular Request: ", req["reqId"])

            ibapi.reqHistoricalData(req["reqId"], req["contract"], req["endDateStr"], req["durationStr"], req["intervalStr"], req["dataType"], req["useRTH"], 1, False, [])
        
        if startIdx + 50 < lastIdx:
            startIdx += 50
            t = threading.Timer(regular_timeout, run_last10_batch)
            t.start()
            
    run_first50_batch()

    
def processRequests():
    startIdx = 0
    lastIdx = len(deffered_req)
    
    def run_batch():
        nonlocal startIdx
        for i in range(startIdx, min(startIdx + 50, lastIdx)):
            req = deffered_req[i]
            print("Running Request: ", req["reqId"])

            ibapi.reqHistoricalData(req["reqId"], req["contract"], req["endDateStr"], req["durationStr"], req["intervalStr"], req["dataType"], req["useRTH"], 1, False, [])
        
        if startIdx + 50 < lastIdx:
            startIdx += 50
            t = threading.Timer(regular_timeout, run_batch)
            t.start()
        else:
            t = threading.Timer(regular_timeout, processGranularRequests)
            t.start()
            # Completion
        
    run_batch()

class HistoricalRequestProcessor():
    # TODO: Add granular limit logic
    # Data Limit requirements
    # No more than 50 historical data req open at the same time
    # No more than 50 calls per sec
    # Granular:
    # Identical hist req within 15sec (we shouldnt see this?)
    # Making 6+ hist req for same contract, exch, and Historical Data Types within 2 sec (is tick type Historical Data Types)
    # Making more that 60 req within 10 min
    MAX_ACTIVE_REQ = 50
    MAX_REQ_PER_SEC = 50
    def __init__(self):
        self.non_granular_requests = []
        self.granular_requests = []
        self.activeRequests = 0
        self.failedRequests = 0
        self.startIdx = 0
        self.currentRequestId
        self.reqIdDataMap = {}
        self.allReqsMap = {}
        
    def addGranularRequest(self, reqId, req):
        self.granular_requests.append(req)
        self.allReqsMap[reqId] = req
    
    def addRegularRequest(self, reqId, req):
        self.non_granular_requests.append(req)
        self.allReqsMap[reqId] = req
        
    def run_non_granular_batch(self):
        for i in range(self.startIdx, min(self.startIdx + 50, len(self.non_granular_requests))):
            req = self.non_granular_requests[i]
            print("Running Request: ", req["reqId"])
            ibapi.reqHistoricalData(req["reqId"], req["contract"], req["endDateStr"], req["durationStr"], req["intervalStr"], req["dataType"], req["useRTH"], 1, False, [])
            self.activeRequests += 1
        self.startIdx = min(self.startIdx + 50, len(self.non_granular_requests))
        
        if self.start == len(self.non_granular_requests):
            self.completion()
            
    def completion(self):
        print("HistoricalRequestProcessor: completion")
        ibapi.disconnect()
        
    def process(self):
        self.run_non_granular_batch() 
        
    def updateRequest(self, reqId, data):
        if reqId not in self.reqIdDataMap:
            self.reqIdDataMap[reqId] = []
        
        reqDT = self.allReqsMap[reqId]["dataType"]
#         if bar.date not in dataTypeDuplicateDateSetMap[reqDT]:
        self.reqIdDataMap[reqId].append({
            "date": bar.date,#datetime.strptime(bar.date, "%Y%m%d %H:%M:%S"),
            "open": bar.open,
            "high": bar.high,
            "low": bar.low,
            "close": bar.close,
            "volume": bar.volume,
            "barCount": bar.barCount,
            "average": bar.average,
        })
#             dataTypeDuplicateDateSetMap[reqDT].add(bar.date)
        
    def failedRequest(self, reqId, msg):
        print("HistoricalRequestProcessor: failedReqeust. ReqId: ", reqId, ": ", msg)
        self.failedRequests += 1
        if self.startIdx < len(self.non_granular_requests):
            self.startIdx += 1
            req = self.non_granular_requests[self.startIdx]
            print("Running Request: ", req["reqId"])
            ibapi.reqHistoricalData(req["reqId"], req["contract"], req["endDateStr"], req["durationStr"], req["intervalStr"], req["dataType"], req["useRTH"], 1, False, [])
        else:
            this.completion()
            
    def finishedRequest(reqId, barData):
        self.activeRequests -= 1
        print("HistoricalRequestProcessor: finishedRequest: ", reqId)
        if self.startIdx < len(self.non_granular_requests):
            self.startIdx += 1
            req = self.non_granular_requests[self.startIdx]
            print("Running Request: ", req["reqId"])
            ibapi.reqHistoricalData(req["reqId"], req["contract"], req["endDateStr"], req["durationStr"], req["intervalStr"], req["dataType"], req["useRTH"], 1, False, [])
        else:
            this.completion()
            
        
        
        

In [None]:
from datetime import datetime
from ibapi import wrapper
from ibapi import utils
from ibapi.client import EClient
from ibapi.utils import iswrapper
from ibapi.contract import *  
from ibapi.common import *
from ibapi.ticktype import * 

from app.wsgi import app
from app.extensions import db
from app.models.Equity import (
    Equity,
    EquityHistoricalData
)

# https://www.interactivebrokers.com/en/?f=%2Fen%2Fsoftware%2FsystemStatus.php

connectivityErrorCodes = [1100, 1300]
clientErrorCodes = [502, 504]
warningErrorCodes = [2100, 2137]
rateLimitErrorCodes = [100, 101]
twsErrorCodes = [102, 449]
twsErrorCodes2 = [10000, 10148]

contractReqIDDataMap = {}
isGranularmap = {}
tickersToFetch = []
failedRequests = 0
regular_timeout = 1.2


def determineEnd():
    if len(contractReqIDDataMap.keys()) == len(tickersToFetch) - failedRequests:
        updateEquitiesTable()
        ibapi.disconnect()

class IBContractDetailsCollector(wrapper.EWrapper):
    def contractDetails(self, reqId: int, contractDetails: ContractDetails):
        super().contractDetails(reqId, contractDetails)
        contractReqIDDataMap[reqId] = contractDetails
        

        determineEnd()
        
        
            
    def error(self, reqId: TickerId, errorCode: int, errorString: str):
        global failedRequests
        super().error(reqId, errorCode, errorString)
        
        if reqId == -1:
            if errorCode >= rateLimitErrorCodes[0] and errorCode <= rateLimitErrorCodes[1]:
                print("RATE LIMIT ERROR")
                # TODO: Hope it doesnt happen
            print("WARNING: ", errorString)
            return
        elif errorCode >= twsErrorCodes[0] and errorCode <= twsErrorCodes[1] or errorCode >= twsErrorCodes2[0] and errorCode <= twsErrorCodes2[1]:
            print("TWS ERROR: ", errorString)
            # Decrement Expected Results Count
            failedRequests += 1
            
        elif errorCode >= rateLimitErrorCodes[0] and errorCode <= rateLimitErrorCodes[1]:
            print("RATE LIMIT ERROR")
            
        determineEnd()
        
    def contractDetailsEnd(self, reqId: int):
        super().contractDetailsEnd(reqId)
        
    def nextValidId(self, reqId: int):
        super().nextValidId(reqId)
        print("Next Valid Id: ", reqId)
        loadEquitiesFromFile(open("scripts/equities.tsv"))

        
    def connectionClosed(self):
        super().connectionClosed()

def updateEquitiesTable():
    with app.app_context():
        for contract in contractReqIDDataMap.values():
            equity = Equity(
                name=contract.longName,
                ticker=contract.contract.symbol,
                ib_con_id=contract.contract.conId,
                should_update=isGranularmap[contract.contract.symbol]
            )
            db.session.add(equity)
        
        db.session.commit()

def processRequests():
    startIdx = 0
    lastIdx = len(scheduled_requests)
    
    def run_batch():
        nonlocal startIdx
        for i in range(startIdx, min(startIdx + 50, lastIdx)):
            req = scheduled_requests[i]
            print("Running Request: ", req["reqId"])

            ibapi.reqContractDetails(req["reqId"], req["contract"])
        
        if startIdx + 50 < lastIdx:
            startIdx += 50
            t = threading.Timer(regular_timeout, run_batch)
            t.start()
        
        
    run_batch()

scheduled_requests = []
def scheduleIBContractAPI(reqId, contract):
    def_req = {
        "reqId": reqId,
        "contract": contract,
    }
    scheduled_requests.append(def_req)

    
"""
file_obj: <ticker>\t<is_granular>
"""
def loadEquitiesFromFile(file_obj):
    global tickersToFetch

    lines = [x.strip().split("\t") for x in file_obj.readlines()]
    
    with app.app_context():
        didAnyEquityExist = False
        for line in lines:
            ticker = line[0]
            is_granular = int(line[1])
            
            equity = db.session.query(Equity).filter(Equity.ticker == ticker).first()
            
            if equity:
                didAnyEquityExist = True
                equity.granular = True if is_granular else False
                
            else:
                tickersToFetch.append(ticker)
                isGranularmap[ticker] = is_granular
                
        db.session.commit()
            
    curRequestId = 0
    for ticker in tickersToFetch:
        contract = Contract()
        contract.symbol = ticker
        contract.secType = "STK"
        contract.exchange = "SMART"
        contract.currency = "USD"
        
        scheduleIBContractAPI(curRequestId, contract)
        curRequestId += 1
        
    file_obj.close()
    processRequests()
    
        

contractDetailsCollector = IBContractDetailsCollector()   
ibapi = EClient(wrapper=contractDetailsCollector)
gatewayPort = 4001
twsPort = 7496
ibapi.connect("host.docker.internal", gatewayPort, clientId=0)
ibapi.run()

In [None]:
with app.app_context():
    print(db.session.query(Equity).filter(Equity.ticker == "GOOGL").first().ib_con_id)

In [None]:
from threading import Thread

options_strikes = None
options_exp = None

class IBHistoricalDataAggregator(wrapper.EWrapper):
    
    def historicalData(self, reqId:int, bar: BarData):
        super().historicalData(reqId, bar)
        requestProcessor.updateRequest(reqId, bar)
            
        
    def historicalDataEnd(self, reqId: int, start: str, end: str):
        super().historicalDataEnd(reqId, start, end)
        requestProcessor.finishedRequest(reqId)
        
    def error(self, reqId: TickerId, errorCode: int, errorString: str):
        global failedRequests
        global failed_options_req
        super().error(reqId, errorCode, errorString)
        
        if reqId == -1:
            if errorCode >= rateLimitErrorCodes[0] and errorCode <= rateLimitErrorCodes[1]:
                print("RATE LIMIT ERROR")
                # TODO: Hope it doesnt happen
            print("WARNING: ", errorString)
            return
        elif errorCode >= twsErrorCodes[0] and errorCode <= twsErrorCodes[1] or errorCode >= twsErrorCodes2[0] and errorCode <= twsErrorCodes2[1]:
            print("TWS ERROR: ", errorString)
            # Decrement Expected Results Count
            if reqId > len(optionsReqIdScheduledDataMap.keys()):
                requestProcessor.failedRequest(reqId, errorString)
#                 failedRequests += 1
            else:  
                failed_options_req += 1
            
        elif errorCode >= rateLimitErrorCodes[0] and errorCode <= rateLimitErrorCodes[1]:
            print("RATE LIMIT ERROR")
            
    def securityDefinitionOptionParameter(self, reqId:int, exchange:str, underlyingConId:int, tradingClass:str, multiplier:str, expirations:SetOfString, strikes:SetOfFloat):
        super().securityDefinitionOptionParameter(reqId, exchange, underlyingConId, tradingClass, multiplier, expirations, strikes)
        global options_strikes
        global options_exp
        
        if exchange == "SMART":
            optionsReqDataMap[reqId] = {
                "expiration": expirations,
                "strike": strikes,                
            }
        else:
            failed_options_req += 1
            
        if len(optionsReqDataMap.keys()) == (len(optionsReqIdScheduledDataMap.keys()) - failed_options_req):
            # schedule options
            curReqId = len(reqIdScheduledDataMap.keys()) + len(optionsReqIdScheduledDataMap.keys())
            for key, val in optionsReqIdDataMap.items():
                for exp in val["expiration"]:
                    for strike in val["strike"]:     
                        pContract = Contract()
                        pContract.conId = optionsReqIdScheduledDataMap[key].conId
                        pContract.symbol = optionsReqIdScheduledDataMap[key].symbol
                        pContract.currency = optionsReqIdScheduledDataMap[key].currency
                        pContract.exchange = optionsReqIdScheduledDataMap[key].exchange
                        pContract.secType = "OPT"
                        pContract.lastTradeDateOrContractMonth = exp
                        pContract.strike = strike
                        pContract.right = "P"
                        
                        cContract = Contract()
                        cContract.conId = optionsReqIdScheduledDataMap[key].conId
                        cContract.symbol = optionsReqIdScheduledDataMap[key].symbol
                        cContract.currency = optionsReqIdScheduledDataMap[key].currency
                        cContract.exchange = optionsReqIdScheduledDataMap[key].exchange
                        cContract.secType = "OPT"
                        cContract.lastTradeDateOrContractMonth = exp
                        cContract.strike = strike
                        cContract.right = "C"
                        
                        for dataType in optionDataTypes:
                            scheduleIBHistoricalAPI(curReqId, cContract, endDateStr, "1 D", "1 min", dataType, 0)
                            curReqId += 1
                            scheduleIBHistoricalAPI(curReqId, pContract, endDateStr, "1 D", "1 min", dataType, 0)
                            curReqId += 1
                            
            # Start requesting
            processRequests()
        
    def nextValidId(self, reqId: int):
        super().nextValidId(reqId)
        print("Next Valid Id: ", reqId)
        
    def connectionClosed(self):
        super().connectionClosed()

class IBContractDetailsCollector(wrapper.EWrapper):
    def contractDetails(self, reqId: int, contractDetails: ContractDetails):
        super().contractDetails(reqId, contractDetails)
        contractReqIDDataMap[reqId] = contractDetails
        
        print("----Contract Details----")
        print("thread: ", threading.current_thread().name)
        print("reqID: ", reqId)
        print("company: ", contractDetails.longName)
        print("symbol: ", contractDetails.contract.symbol)
        print("tradingHours: ", contractDetails.tradingHours)
        print("liquidHours: ", contractDetails.liquidHours)

    def securityDefinitionOptionParameter(self, reqId:int, exchange:str, underlyingConId:int, tradingClass:str, multiplier:str, expirations:SetOfString, strikes:SetOfFloat):
        super().securityDefinitionOptionParameter(reqId, exchange, underlyingConId, tradingClass, multiplier, expirations, strikes)
        global options_strikes
        global options_exp
        print("----Option Chain Details----")
        print("exchange: ", exchange)
        print("underlyingConId: ", underlyingConId)
        print("tradingClass: ", tradingClass)
        print("multiplier: ", multiplier)
        print("expirations: ", expirations)
        print("strikes: ", strikes)
        
        if exchange == "SMART":
            options_exp = expirations
            options_strikes = strikes
        
    def contractDetailsEnd(self, reqId: int):
        super().contractDetailsEnd(reqId)
        
    def nextValidId(self, reqId: int):
        super().nextValidId(reqId)
        print("Next Valid Id: ", reqId)
        print("thread: ", threading.current_thread().name)
        
    def connectionClosed(self):
        super().connectionClosed()
        print("ConnectionClosed")

def 
        
        
class ThreadedIBApi(EClient, Thread):
    def __init__(self, wrapper):
        Thread.__init__(self)
        EClient.__init__(self, wrapper=wrapper)

contractDetailsCollector = IBContractDetailsCollector()   
ibapi = ThreadedIBApi(wrapper=contractDetailsCollector)
# ibapi2 = ThreadedIBApi(wrapper=contractDetailsCollector)
gatewayPort = 4001



ibapi.connect("host.docker.internal", gatewayPort, clientId=0)


ibapi.start()