### Market data refresh 

### Input Description

RAW OHLC data.

### Output  

Clean OHLC data in a hdf store

### Operations

This code takes a financial market data file and runs it through a processing pipeline. The following operations are carried out :

- Localise the time data to market time
- Merge with existing RAW data based on datetime
- Save the resulting RAW data to HDF5

In [1]:
#!pip install --upgrade "../../quantutils"
!pip install "../../marketinsights-price-aggregator"
import json, os, pandas
import quantutils.dataset.pipeline as ppl
from quantutils.api.datasource import MarketDataStore
import MIPriceAggregator.connectors as connectors
import pandas as pd
import numpy as np
import time
from datetime import datetime, date, timedelta

import warnings
from tables import NaturalNameWarning
warnings.filterwarnings('ignore', category=NaturalNameWarning)
from tqdm import tqdm

Processing /home/cwilkin/Development/repos/marketinsights-price-aggregator
  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: MIPriceAggregator
  Building wheel for MIPriceAggregator (setup.py) ... [?25ldone
[?25h  Created wheel for MIPriceAggregator: filename=MIPriceAggregator-1.0.0-py3-none-any.whl size=7936 sha256=518de4a4d43af79b0f1324b20e7806d4bdff69bdf41f22a122902c98c39bd74b
  Stored in directory: /home/cwilkin/.cache/pip/wheels/a3/66/dc/3a75635dd1cbb1bf931d0df38e458b1890be1666bbb5b88c7f
Successfully built MIPriceAggregator
Installing collected packages: MIPriceAggregator
  Attempting uninstall: MIPriceAggregator
    Found existing installation: MIPriceAggregator 1.0.0
    Uninstalling MIPriceAggregator-1.0.0:
      Successfully uninstalled MIPriceAggregator-1.0.0
Successfully installed MIPriceAggregator-1.0.0


In [2]:
def getConnector(connClass, connName, tz, options):
    connectorClass = getattr(connectors, connClass)
    connectorInstance = connectorClass(connName, tz, options)
    return connectorInstance

def fetchHistoricalData(mds, ds_file, start="1979-01-01", end="2050-01-01", records=200, delta=False, newOnly=False, debug=False):

    datasources = json.load(open(ds_file))
    
    data = pd.DataFrame(index=pd.MultiIndex(levels=[[],[]], codes=[[],[]], names=[u'Date_Time', u'ID']))
    
    for datasource in datasources:

        dataConnector = getConnector(datasource["class"], datasource["ID"], datasource["timezone"], datasource["opts"])

        for market in datasource["markets"]:

            for source in tqdm(market["sources"]):
                
                #TODO Implement newOnly
                
                if delta:
                    try:
                        start = mds.aggregate(market["ID"], [source["ID"]]).index.get_level_values("Date_Time")[-1]
                        print(start)
                        records = (datetime.utcnow() - start.to_pydatetime().replace(tzinfo=None)).days + 1
                        print(records)
                        start = start.strftime('%Y-%m-%d')
                    except Exception as e:
                        print(e)
                        print("Could not find " + market["ID"])
                        start = "1979-01-01"
                    
                newData = dataConnector.getData(market, source, start, end, records)

                if newData is not None:

                    print("Adding " + source["ID"] + " to " + market["ID"] + " table")

                    if debug:
                        print(newData)

                    if mds is not None:  
                        mds.append(market["ID"], newData, update=True)

                    data = ppl.merge(data, newData)

    return data


In [22]:
def fetchOptionData(datasources, start="1979-01-01", end="2050-01-01"):
            
    options = pd.DataFrame(index=pd.MultiIndex(levels=[[],[]], codes=[[],[]], names=[u'Date_Time', u'ID']))
    
    for datasource in datasources:

        dataConnector = getConnector(datasource["class"], datasource["ID"], datasource["timezone"], datasource["opts"])

        for market in datasource["markets"]:

            for optionChain in tqdm(market["optionChains"]):
                
                if "options" not in optionChain:
                
                    optionData = dataConnector.getOptions(optionChain, appendUnderlying=False, start=start, end=end)

                    if optionData is not None:                    
                        print("Adding " + optionChain["ID"] + " to " + market["ID"] + " table")    

                        optionChain["options"] = json.loads(optionData.reset_index()[["ID","instrumentName","strike","type"]].to_json(orient="records"))
                        options = ppl.merge(options, optionData)                                    
        
                        #market["sources"].extend(json.loads(optionData.reset_index().assign(ID=lambda x: x["ID"]).assign(sample_unit="D")[["ID", "sample_unit"]].to_json(orient="records")))
    
    options = options.sort_values(ascending=[False, True, True], by=["Date_Time", "strike", "type"]).dropna()
    return options, datasources
    
def fetchHistoricalOptionData(mds, ds_file, start="1979-01-01", end="2050-01-01", records=200, newOnly=False, debug=False):
    
    datasources = json.load(open(ds_file))
    
    data = pd.DataFrame(index=pd.MultiIndex(levels=[[],[]], codes=[[],[]], names=[u'Date_Time', u'ID']))
        
    for datasource in datasources:

        dataConnector = getConnector(datasource["class"], datasource["ID"], datasource["timezone"], datasource["opts"])

        for market in datasource["markets"]:

            for optionChain in market["optionChains"]:
                
                for option in optionChain["options"]:
                    
                    # TODO: Implement newOnly
                                        
                    newData = dataConnector.getData(market, option, start, end, records)

                    if newData is not None:

                        print("Adding " + option["ID"] + " to " + market["ID"] + " table")

                        if debug:
                            print(newData)

                        if mds is not None:  
                            mds.append(market["ID"], newData, update=True)

                        data = ppl.merge(data, newData)
    return data
                    
                
    
    
def getOptionChains(datasource, root):
    
    dataConnector = getConnector(datasource["class"], datasource["ID"], datasource["timezone"], datasource["opts"])
    
    chains = dataConnector.getOptionChains(root)
    
    # Update expiry dates
    dates = []
    for _,chain in chains.iterrows():
        if np.isnan(chain["expiry"]):
            info = dataConnector.getOptionInfo(chain["ID"])
            dates.append(info["contractExpirationDate"])
        else:
            dates.append(str(date.today() + timedelta(int(chain["expiry"]) + 1)))
    chains["expiry"] = dates
    
    return chains


def appendOptionChainPrices(mds, ds_file):

    datasources = json.load(open(ds_file))
    mdsKeys = mds.getKeys()
    options = pd.DataFrame(index=pd.MultiIndex(levels=[[], []], codes=[[], []], names=[u'Date_Time', u'ID']))
    
    for datasource in datasources:

        dataConnector = getConnector(datasource["class"], datasource["ID"], datasource["timezone"], datasource["opts"])

        for market in datasource["markets"]:
            if "optionChains" in market:
                for optionChain in market["optionChains"]:

                    # Get todays prices from optionChain
                    print("Requesting " + optionChain["ID"])
                    optionData = dataConnector.getOptions(optionChain, appendUnderlying=False, debug=False)

                    if optionData is not None:

                        optionData = optionData.sort_values(["expiry", "strike"])[["Open", "High", "Low", "Close", "Volume", "OpenInterest"]]
                        options = ppl.merge(options, optionData)

                        if mds is not None and market["ID"] in mdsKeys:
                            print("Adding " + optionChain["ID"] + " to " + market["ID"] + " table")
                            mds.append(market["ID"], options, update=True, debug=True)

    return options


In [11]:
# Update options config file with all option chains
ds_file = "../datasources/datasources_BarChartOption.json"
datasources = json.load(open(ds_file))
market = datasources[0]["markets"][0]

# Get Chains
chains = getOptionChains(datasources[0], market["optionRoot"])
market["optionChains"].extend(json.loads(chains.to_json(orient="records")))

# Get Options and Sources from Chains
options, datasources = fetchOptionData(datasources)

# Add underlyings to sources
for optionChain in market["optionChains"]:
    market["sources"].extend([{"ID": optionChain["underlying"], "sample_unit": "D"}])

#with open(ds_file, 'w', encoding='utf-8') as f:
#    json.dump(datasources, f, ensure_ascii=False, indent=4)
    

In [38]:
str(datetime.utcnow())

'2022-10-11 23:14:11.932380'

In [32]:
data = fetchHistoricalData(mds, "../jobs/datasources/datasources.json", start=str(date.today()), end=str(date.today() + timedelta(days=1)), records=1, debug=True)

  0%|          | 0/1 [00:00<?, ?it/s]

[*********************100%***********************]  1 of 1 completed
Adding CL=F to WTICrudeOil table
                                     Open       High        Low      Close  \
Date_Time                 ID                                                 
2022-10-10 00:00:00+00:00 CL=F  93.480003  93.639999  90.540001  91.129997   
2022-10-11 00:00:00+00:00 CL=F  88.669998  88.680000  88.440002  88.570000   

                                  Volume  
Date_Time                 ID              
2022-10-10 00:00:00+00:00 CL=F  429162.0  
2022-10-11 00:00:00+00:00 CL=F     714.0  





KeyboardInterrupt: 

In [12]:
ds_file = "../datasources/datasources_BarChartOption.json"
datasources = json.load(open(ds_file))

mkt = datasources[0]["markets"][0]
srcs = []
options = []
for source in mkt["sources"]:
    srcs.append({"ID": source["ID"], "sample_unit": source["sample_unit"]})
    strike = float(source["ID"][5:-1])
    type = source["ID"][-1].lower()
    if type == 'c':
        typeString = "Call"
    else:
        typeString = "Put"
    options.append({
        "ID": source["ID"],
        "instrumentName": "Crude Oil WTI Oct '22 {} {}".format(strike, typeString),
        "strike": strike,
        "type": type
    })

mkt["sources"] = srcs
mkt["optionChains"][0]["options"] = options

#with open(ds_file, 'w', encoding='utf-8') as f:
#    json.dump(datasources, f, ensure_ascii=False, indent=4)

In [15]:
#fetchHistoricalData(None, "../datasources/datasources_MDS.json", end="2022-09-15")
ds_file = "../datasources/datasources_MDS.json"

options, datasources = fetchOptionData(json.load(open(ds_file)), end="2022-09-15")

  0%|          | 0/1 [00:00<?, ?it/s]

Loading data from CLV22 in ../datastore/data.hdf
Resampling to D periods
Resampling to D periods
Adding CLV2|210P to Option CLV22
Adding CLV2|220P to Option CLV22
Adding CLV2|230P to Option CLV22
Adding CLV2|240P to Option CLV22
Adding CLV2|250P to Option CLV22
Adding CLV2|260P to Option CLV22
Adding CLV2|270P to Option CLV22
Adding CLV2|275P to Option CLV22
Adding CLV2|280P to Option CLV22
Adding CLV2|290P to Option CLV22
Adding CLV2|300P to Option CLV22
Adding CLV2|310P to Option CLV22
Adding CLV2|315P to Option CLV22
Adding CLV2|320P to Option CLV22
Adding CLV2|325P to Option CLV22
Adding CLV2|330P to Option CLV22
Adding CLV2|340P to Option CLV22
Adding CLV2|345P to Option CLV22
Adding CLV2|350P to Option CLV22
Adding CLV2|355P to Option CLV22
Adding CLV2|360P to Option CLV22
Adding CLV2|365P to Option CLV22
Adding CLV2|370P to Option CLV22
Adding CLV2|375P to Option CLV22
Adding CLV2|380P to Option CLV22
Adding CLV2|390P to Option CLV22
Adding CLV2|395P to Option CLV22
Adding CLV2|

Adding CLV2|1050P to Option CLV22
Adding CLV2|1055P to Option CLV22
Adding CLV2|1055C to Option CLV22
Adding CLV2|1060C to Option CLV22
Adding CLV2|1060P to Option CLV22
Adding CLV2|1065C to Option CLV22
Adding CLV2|1065P to Option CLV22
Adding CLV2|1070C to Option CLV22
Adding CLV2|1070P to Option CLV22
Adding CLV2|1075C to Option CLV22
Adding CLV2|1075P to Option CLV22
Adding CLV2|1080C to Option CLV22
Adding CLV2|1080P to Option CLV22
Adding CLV2|1085P to Option CLV22
Adding CLV2|1085C to Option CLV22
Adding CLV2|1090P to Option CLV22
Adding CLV2|1090C to Option CLV22
Adding CLV2|1095C to Option CLV22
Adding CLV2|1095P to Option CLV22
Adding CLV2|1100C to Option CLV22
Adding CLV2|1100P to Option CLV22
Adding CLV2|1105C to Option CLV22
Adding CLV2|1105P to Option CLV22
Adding CLV2|1110C to Option CLV22
Adding CLV2|1110P to Option CLV22
Adding CLV2|1115C to Option CLV22
Adding CLV2|1115P to Option CLV22
Adding CLV2|1120C to Option CLV22
Adding CLV2|1120P to Option CLV22
Adding CLV2|11

100%|██████████| 1/1 [02:26<00:00, 146.49s/it]


In [74]:
data.to_pickle("data2.pkl")

In [36]:
import plotly.express as px
df = px.data.gapminder()

In [None]:
# Local Options
mds = MarketDataStore(location="../datastore")
options = fetchHistoricalData(mds, "../datasources/datasources_BarChartOption.json", records=300, newOnly=True)

In [17]:
# Remote (cluster)
mds = MarketDataStore(remote=True, location="http://pricestore.192.168.1.203.nip.io")
mds.get("WTICrudeOil")

Unnamed: 0_level_0,Unnamed: 1_level_0,Close,High,Low,Open,OpenInterest,Volume
Date_Time,ID,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2019-12-30 00:00:00+00:00,CLZ2|540C,6.23,6.230000,6.230000,6.230000,100.0,0.0
2019-12-31 00:00:00+00:00,CLZ2|540C,6.13,6.130000,6.130000,6.130000,100.0,0.0
2020-01-01 00:00:00+00:00,CLZ2|540C,,,,,0.0,0.0
2020-01-02 00:00:00+00:00,CL=F,61.18,61.599998,60.639999,61.599998,,486873.0
2020-01-02 00:00:00+00:00,CLZ2|540C,6.35,6.350000,6.350000,6.350000,100.0,0.0
...,...,...,...,...,...,...,...
2022-10-11 00:00:00+00:00,CLV23,77.25,78.010000,77.170000,77.920000,0.0,1062.0
2022-10-11 00:00:00+00:00,CLX22,89.35,91.350000,87.910000,91.140000,0.0,295289.0
2022-10-11 00:00:00+00:00,CLX23,76.70,77.730000,76.700000,77.730000,0.0,646.0
2022-10-11 00:00:00+00:00,CLZ22,87.97,90.150000,86.590000,89.750000,0.0,161056.0


In [None]:
# Remote (localhost)
mds = MarketDataStore(remote=True, location="http://localhost:8080")
refreshMarketData(mds, "../datasources", "datasources.json")

In [8]:
df1 = mds.aggregate(["D&J-IND","SANDP-500"], "H", "1979-01-01", "2050-01-01", debug=True)
df1

Loading data from D&J-IND in ../datastore/data.hdf
Resampling to H periods
Resampling to H periods
Loading data from SANDP-500 in ../datastore/data.hdf
Resampling to H periods
Resampling to H periods


Unnamed: 0_level_0,Open,High,Low,Close
Date_Time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2013-01-02 15:00:00+00:00,13366.480000,13374.820000,13338.680000,13345.450000
2013-01-02 16:00:00+00:00,13345.600000,13353.750000,13321.640000,13321.770000
2013-01-02 17:00:00+00:00,13321.760000,13339.250000,13320.280000,13326.660000
2013-01-02 18:00:00+00:00,13326.640000,13336.060000,13322.510000,13329.190000
2013-01-02 19:00:00+00:00,13329.090000,13339.820000,13318.190000,13331.800000
...,...,...,...,...
2018-03-19 16:00:00+00:00,24632.111675,24650.292289,24554.939070,24583.990050
2018-03-19 17:00:00+00:00,24579.349894,24585.490101,24473.096306,24568.679534
2018-03-19 18:00:00+00:00,24570.309589,24590.980286,24455.305705,24533.498346
2018-03-19 19:00:00+00:00,24535.438411,24638.041875,24492.786971,24623.971400
