# Building base data
We will run a set of programs to understand how ib_insync generates data using async

## SET THE MARKET

In [1]:
MARKET = "SNP"

## IMPORTS, CONNECTIONS, LOG, TIMER

In [2]:
import asyncio
import pickle
import sys
import time
import pandas as pd
import random

from collections import defaultdict
from datetime import datetime
from pprint import pprint

from ib_insync import *

from ib01_getsyms import get_syms
from support import timestr

from typing import Callable, Coroutine

random.seed(8888)

if sys.version_info[0] == 3 and sys.version_info[1] >= 8 and sys.platform.startswith('win'):
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

In [3]:
import nest_asyncio
util.startLoop()
nest_asyncio.apply()

pd.options.display.max_columns = None

In [4]:
HOST = '127.0.0.1'
PORT = 4004 if MARKET.upper() == 'NSE' else 4002 # Paper trades!
CID = 0
MASTERCID = 10

In [5]:
# Direct logs to file with level at WARNING (30)
util.logToFile(path='./data/data.log', level=30)
with open('./data/data.log', 'w'): # to clear the log
    pass

## UNDERLYING SYMBOLS AND LOTS

In [6]:
%%time
# get all the symbols
df_syms = get_syms(MARKET)

# ...make the symbols unique
symbols = set(df_syms.symbol)

# ...build the contracts
raw_cts = [i for j in [[Stock(symbol, exchange, currency), Index(symbol, exchange, currency)]
                       for symbol, exchange, currency
                       in zip(df_syms.symbol, df_syms.exchange, df_syms.currency)] for i in j]

# raw_cts = raw_cts[18:25]  # !!! DATA LIMITER !!!

Wall time: 1.98 s


In [7]:
%%time

## Qualify the underlyings
ib = IB()

with ib.connect(HOST, PORT, CID) as ib:
    qunds = ib.qualifyContracts(*raw_cts)

qunds = list({q for q in qunds}) # Remove duplicates

Wall time: 4.6 s


# COROUTINES

## OHLC coroutine

In [8]:
async def ohlcCoro(c, DURATION=365):
    ohlc = await ib.reqHistoricalDataAsync(
                        contract=c,
                        endDateTime="",
                        durationStr=str(DURATION) + ' D',
                        barSizeSetting="1 day",
                        whatToShow="Trades",
                        useRTH=True)
    await asyncio.sleep(5)
    df = util.df(ohlc)
    try:
        df.insert(0, 'symbol', c.symbol)
    except AttributeError:
        df = None
    return df

## Market data coroutine

In [9]:
async def mktdataCoro(c, FILL_DELAY=5):

    tick = ib.reqMktData(c, '456, 104, 106, 100, 101, 165')
    await asyncio.sleep(FILL_DELAY)
    ib.cancelMktData(c)
    
    m_df = pd.DataFrame(util.df([tick]))
    
    div_df = pd.DataFrame(m_df.dividends.tolist())
    df1 = m_df.drop('dividends', 1).join(div_df)
    df1.insert(0, 'symbol', [c.symbol for c in df1.contract])

    df2 = df1.dropna(axis=1)

    # Extract columns with legit values in them
    df3 = df2[[c for c in df2.columns if df2.loc[0, c]]]

    return df3

## Chains coroutine

In [10]:
async def chainsCoro(c):
    chains = await ib.reqSecDefOptParamsAsync(underlyingSymbol=c.symbol,
                                               futFopExchange="",
                                               underlyingSecType=c.secType,
                                               underlyingConId=c.conId)
    

    # Pick up one chain if it is a list
    chain = chains[0] if isinstance(chains, list) else chains

    df1 = pd.DataFrame([chain])

    # Do a cartesian merge
    df2 = pd.merge(pd.DataFrame(df1.expirations[0], columns=['expiry']).assign(key=1), 
             pd.DataFrame(df1.strikes[0], columns=['strike']).assign(key=1), on='key').\
                merge(df1.assign(key=1)).rename(columns={'tradingClass': 'symbol', 'multiplier': 'mult'})\
                    [['symbol', 'expiry', 'strike', 'exchange', 'mult']]

    return df2

## Base Cororutine

In [11]:
async def baseCoro(qunds:list) -> None: 
    for c in qunds:
        todo.add(asyncio.create_task(ohlcCoro(c, DURATION=365), name=c.symbol+'_ohlc'))
        todo.add(asyncio.create_task(mktdataCoro(c, FILL_DELAY=11), name=c.symbol+'_und'))
        todo.add(asyncio.create_task(chainsCoro(c), name=c.symbol+'_chains'))

## Progress Coroutine

In [29]:
async def progressAsync(cts, 
                        algo: Callable[..., Coroutine],
                        save_algo: Callable[..., Coroutine],
                        pkl_timeout: float=2.0,
                        total_timeout: float=0.0,
                        retries: int=1,
                        part_pkl: str="",
                        FSPATH: str='./data/',
                        ) -> None: 
    
    part_pkl = algo.__name__+'_partial.pkl'
    
    # create a task for the algo
    task = asyncio.create_task(algo(cts), name=algo.__name__)
    
    todo.add(task) # add task to the asyncio loop
    
    start = time.time()
    
    while len(todo):
        
        done, pending = await asyncio.wait(todo, timeout=pkl_timeout)
        
        # remove done task from todo after the timeout, update result and pickle it
        todo.difference_update(done)
        result.update(done)
        save_algo(FSPATH, result)
        
        # report pendings
        pending_names = (t.get_name() for t in todo)
        print(f"{len(todo)}: "+ " ".join(sorted(pending_names))[-75:])
        
        # check for total_timeout
        if total_timeout > 0.0:
            if time.time() - start > total_timeout:
                print(f'\nProgram exceeded total_timeout of {total_timeout} seconds')
                print(f'Cancelling pending todos')
                for task in todo:
                    task.cancel()
                done, pending = await asyncio.wait(todo, timeout=1.0)
                todo.difference_update(done)
                todo.difference_update(pending) 
    
    # success!
    save_algo(FSPATH, result)
    end = time.time()
    print(f"Took {int(end-start)} seconds")

## Saving progress output function

In [30]:
def saveBase(FSPATH, result):
    ohlcs = []
    unds = []
    chains = []
    basetype = []
    for v in list(result):
        try:
          basetype = v.get_name().split('_')[1]
        except IndexError as e:
#             print(f"{v.get_name()} is not a valid base df")
            pass
        if basetype == 'ohlc':
            ohlcs.append(v.result())
        if basetype == 'und':
            unds.append(v.result())
        if basetype == 'chains':
            chains.append(v.result())

    # build the dataframes and pickle
    if unds:
        df_unds = pd.concat(unds, ignore_index=True)
        df_unds.to_pickle(FSPATH+'df_unds.pkl')
    
    if ohlcs:
        df_ohlcs = pd.concat(ohlcs, ignore_index=True)
        df_ohlcs.to_pickle(FSPATH+'df_ohlcs.pkl')
        
    if chains:
        df_chains = pd.concat(chains, ignore_index=True)
        df_chains.to_pickle(FSPATH+'df_chains.pkl')

In [None]:
%%time
cts = qunds
todo = set()
result = set()
with ib.connect(HOST, PORT, CID) as ib:
    ib.run(progressAsync(cts=cts, algo=baseCoro, save_algo=saveBase, total_timeout=0))

In [24]:
FSPATH = './data/'
saveBase(FSPATH, result)

baseCoro is not a valid base df


# SINGLE CONTRACT TEST

In [None]:
ct1 = qunds[0]
ct1

In [None]:
%%time
### THIS MAY NOT WORK FOR INDEX OUTSIDE MARKET HOURS ###
with ib.connect(HOST, PORT, CID) as ib:
    one_ohlc = ib.run(ohlcCoro(ct1))

one_ohlc

## Other coros

In [None]:
%%time
with ib.connect(HOST, PORT, CID) as ib:
    one_mkt = ib.run(mktdataCoro(ct1))

one_mkt

In [None]:
%%time
with ib.connect(HOST, PORT, CID) as ib:
    one_chain = ib.run(chainsCoro(ct1))

one_chain

## Creating one base

In [None]:
def asyncBase(c):
    tasks = [ohlcCoro(c, DURATION=365), mktdataCoro(c, FILL_DELAY=5), chainsCoro(c)]   
    return asyncio.gather(*tasks)

In [None]:
%%time
with ib.connect(HOST, PORT, CID) as ib:
    one_base = ib.run(asyncBase(ct1))

one_base

# MULTIPLE CONTRACT TEST
## Test 50 stock contracts

In [None]:
stk50 = [q for q in qunds if isinstance(q, Stock)]
stk50 = stk50[:50]  # !!! DATA LIMITER for 50 max simultaneous API hist records !!!
len(stk50)

In [None]:
%%time
with ib.connect(HOST, PORT, CID) as ib:
    base_stk50 = ib.run(asyncio.wait({asyncBase(c) for c in stk50}))

base_stk50

## Test ALL contracts
It takes 2 mins and 17 seconds for 99 SNP contracts.

In [None]:
%%time
with ib.connect(HOST, PORT, CID) as ib:
    base_qunds = ib.run(asyncio.wait({asyncBase(c) for c in qunds}))


# Build individual base data frames
ohlcs = []
mdatas = []
chains = []
for i in range(len(qunds)):
    ohlcs.append(list(base_qunds[0])[i].result()[0])
    mdatas.append(list(base_qunds[0])[i].result()[1])
    chains.append(list(base_qunds[0])[i].result()[2])

df_ohlcs = pd.concat(ohlcs, ignore_index=True)
df_unds = pd.concat(mdatas, ignore_index=True)
df_chains = pd.concat([pd.concat(chains).assign(right='C'), 
                       pd.concat(chains).assign(right='P')], ignore_index=True)

### Making it more robust
27 market data is missing.

In [None]:
from typing import Callable, Coroutine
import sys
import time

if sys.version_info[0] == 3 and sys.version_info[1] >= 8 and sys.platform.startswith('win'):
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

async def baseCoro(qunds:list) -> None: 
    for c in qunds:
        todo.add(asyncio.create_task(ohlcCoro(c, DURATION=365), name=c.symbol+'_ohlc'))
        todo.add(asyncio.create_task(mktdataCoro(c, FILL_DELAY=11), name=c.symbol+'_mdata'))
        todo.add(asyncio.create_task(chainsCoro(c), name=c.symbol+'_chains'))
    
async def progress(contracts, algo: Callable[..., Coroutine], result: set=set()) -> set:
    
    task = asyncio.create_task(algo(contracts), name='base_data')
    
    todo.add(task)
    
    start = time.time()
    
    while len(todo):
        done, pending = await asyncio.wait(todo, timeout=2)
        
        todo.difference_update(done)
        result.update(done)
        
        bases = (t.get_name() for t in todo)
        print(f"{len(todo)}: "+ " ".join(sorted(bases))[-75:])
    
    end = time.time()
    print(f"Took {int(end-start)} seconds")
    
    return result

In [None]:
%%time

todo = set()
result = set()
with ib.connect(HOST, PORT, CID) as ib:
    result = ib.run(progress(qunds, baseCoro))

In [None]:
# Build individual base data frames

res = [r for r in list(result) if r.get_name() !="base_data"]
ohlcs = []
mdatas = []
chains = []

In [None]:
for v in res:
    if v.get_name().split('_')[1] == 'ohlc':
        ohlcs.append(v.result())
    if v.get_name().split('_')[1] == 'mdata':
        mdatas.append(v.result())
    if v.get_name().split('_')[1] == 'chains':
        chains.append(v.result())

df_ohlcs = pd.concat(ohlcs, ignore_index=True)
df_unds = pd.concat(mdatas, ignore_index=True)
df_chains = pd.concat([pd.concat(chains).assign(right='C'), 
                       pd.concat(chains).assign(right='P')], ignore_index=True)

In [None]:
len(df_unds[df_unds.time.isnull()])

In [None]:
len(df_ohlcs.symbol.unique())

In [None]:
s.split('_')[1]

In [None]:
%%time

with ib.connect(HOST, PORT, CID) as ib:
    task1 = ib.run(asyncio.gather(*[ohlcCoro(c) for c in und_cts1]))

pd.concat(task1, ignore_index=True).groupby('symbol').head(1)

#### Test a mixed-bag of 6 good and bad contracts using wait_for

In [None]:
und_cts2 = [q for q in qunds if isinstance(q, Index)][:3] + \
            random.sample([q for q in qunds if isinstance(q, Stock)], 3)
und_cts2 = random.sample(und_cts2, 6)
und_cts2

In [None]:
%%time

with ib.connect(HOST, PORT, CID) as ib:
    
    async def get_ohlc():
        tasks = [ohlcCoro(c) for c in und_cts2]
        return asyncio.gather(*tasks)
    
    tasks2 = await get_ohlc()

In [None]:
    while tasks2:
        done, pending = asyncio.wait_for(tasks2, 10)
        
        for task in done:
            result = task.result()
            pprint(result)
        tasks2 = pending

In [None]:
%%time
with ib.connect(HOST, PORT, CID) as ib:
    qunds = ib.qualifyContracts(*raw_cts[:50])
    async def coro(c):
        ohlc_task = [ohlcCoro(c, 365) for c in qunds]
        mkt_task = [mktdataCoro(c, 5) for c in qunds]
        chain_task = [chainsCoro(c) for c in qunds]
        tasks = ohlc_task + mkt_task + chain_task
        return await asyncio.gather(*tasks)
    
    tasks = [coro(c) for c in qunds]
    
    r = []
    
    while tasks:
        done, pending = ib.run(asyncio.as_completed(tasks))
        
        for task in done:
            result = task.result()
            
            if result:
                r.append(result)
        tasks = pending

# print(r)

In [None]:
r[20]

In [None]:
with open('./data/first.pkl', 'wb') as f:
    pickle.dump(r, f, protocol=pickle.HIGHEST_PROTOCOL)

In [None]:
        
    
    
    print("Get first result:")
    done, pending = ib.run(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
    
    for task in done:
        print(task.result())
    print("pending:", len(pending))
    
    print("\nGet more results in 15 seconds:\n")
    done2, pending2 = ib.run(asyncio.wait(pending, timeout=15))
    
    for task in done2:
        print(task.result())
    print("pending:", len(pending2))
    

In [None]:
    blk = 5
    cb = [qunds[i: i+5] for i in range(0, len(qunds), 5)]
    result = [ib.run(coro(c)) for c in cb]

In [None]:
r = [i for j in result for i in j]

In [None]:
list(list(r[5939].values())[0].values())[0].result()

In [None]:
ib.run?

In [None]:
async def get_data(raw_cts):
    '''Sequentially getting data from each qualified contract'''
    
    d = []
    
    qunds = [ib.qualifyContractsAsync(*raw_cts)]
    
    
    while qunds:
        d1, p1 = await asyncio.wait(qunds, return_when = asyncio.FIRST_COMPLETED)
        
        for x in d1:
            qct = x.result()
            
            for c in qct:
                
                tasks = [asyncio.create_task(mktdataCoro(c)), 
                         asyncio.create_task(ohlcCoro(c)),
                         asyncio.ensure_future(ib.reqSecDefOptParamsAsync(underlyingSymbol=c.symbol,
                                               futFopExchange="",
                                               underlyingSecType=c.secType,
                                               underlyingConId=c.conId))]
                while tasks:
                    d2, p2 = await asyncio.wait(tasks, return_when = asyncio.FIRST_COMPLETED)
                    
                    for y in d2:
                        data = y.result()
                        
                        print(f'{c.symbol} data collected!')
                        d.append([c.symbol, type(data), data])
                    
                    tasks = p2
        
        qunds = p1
        
    return d

In [None]:
ib = IB()

In [None]:
%%time
with ib.connect(HOST, PORT, CID) as ib:
    data = ib.run(get_data(raw_cts))

In [None]:
data[1]

In [None]:
async def make_tasks(c):
    tasks = [asyncio.create_task(ohlcCoro(c, DURATION=2), name=c.symbol+'_'+'ohlc'),
             asyncio.create_task(mktdataCoro(c, FILL_DELAY=5), name=c.symbol+'_'+'mdata'),
             asyncio.create_task(chainsCoro(c), name=c.symbol+'_'+'chains')]
    
    return asyncio.gather(tasks)

In [None]:
async def get_pll_data(raw_cts):
    '''Getting data parallely'''
    
    d = dict()
    
    qunds = await ib.qualifyContractsAsync(*raw_cts)
    
    for c in qunds:
        exec(c.symbol + "=asyncio.gather(*[ohlcCoro("+c+",2), mktdataCoro("+c+",5), chainsCoro("+c+")])")
        d[c.symbol] = asyncio.gather(*[ohlcCoro(c, 2), mktdataCoro(c, 5), chainsCoro(c)])
        
    
    
    """tasks = [i for j in [[ohlcCoro(c, 2), mktdataCoro(c, 5), chainsCoro(c)] for c in qunds] for i in j]
    
    print(tasks)
    
    while tasks:
        
        done, pending = await asyncio.wait(tasks, return_when = asyncio.FIRST_COMPLETED)
        
        for task in done:
            data = task.result()
            print(f'Completed {data} \n')            
            d.append(data)
            
        tasks = pending"""
    
    return d

In [None]:
%%time
with ib.connect(HOST, PORT, CID) as ib:
    data = ib.run(get_pll_data(raw_cts))

In [None]:
data

In [None]:
list(data.values())[0].result()

In [None]:
import asyncio
from pprint import pprint

import random

async def coro(tag):
    print(">", tag)
    await asyncio.sleep(random.uniform(1, 3))
    print("<", tag)
    return tag


loop = asyncio.get_event_loop()

group1 = asyncio.gather(*[coro("group 1.{}".format(i)) for i in range(1, 6)])
group2 = asyncio.gather(*[coro("group 2.{}".format(i)) for i in range(1, 4)])
group3 = asyncio.gather(*[coro("group 3.{}".format(i)) for i in range(1, 10)])

all_groups = asyncio.gather(group1, group2, group3)

results = loop.run_until_complete(all_groups)

loop.close()

pprint(results)